VETRO is a data processing pipeline pattern commonly used in enterprise integration architectures. It provides a structured approach for handling messages and data transformations in a predictable, maintainable way.
VETRO is an acronym representing five sequential processing stages:
| Stage | Purpose |
|---|---|
| Validate | Ensure incoming data is well-formed and meets requirements |
| Enrich | Add supplementary data from external sources |
| Transform | Convert data to the required format or structure |
| Route | Determine the destination based on content or rules |
| Operate | Execute the final action or deliver output |
VETRO is ideal for:
- API Gateway patterns - Processing incoming requests before routing to services
- Message broker consumers - Handling messages from Kafka, RabbitMQ, etc.
- ETL pipelines - Data ingestion and transformation workflows
- Notification systems - Processing and routing notifications
- Order processing - Validating, enriching, and routing orders
These are the stage responsibilities in detail, but don't get too hung up on them.
Important: Not every stage is required in every implementation, and stages can be combined or skipped as needed, think of them more like gradients than hard boundaries.
Check that incoming data meets structural and business requirements.
Responsibilities:
- Schema validation
- Required field checks
- Data type validation
- Business rule validation
Augment the data with information from other sources.
Responsibilities:
- Database lookups
- External API calls
- Cache retrievals
- Computed fields
Convert data from one format or structure to another.
Responsibilities:
- Format conversion
- Field mapping
- Data aggregation
- Unit conversions
Determine where the data should go next.
Responsibilities:
- Content-based routing
- Rule-based destination selection
- Priority queue assignment
Execute the final action with the processed data.
Responsibilities:
- Persist to database
- Send to external service
- Publish to message queue
- Generate response
You can use either a generic key-value container or a typed data class:
Option A: Generic InternalMessage (flexible, reusable)
// Type-safe key definition
data class Key<T>(val id: String, val type: Class<T>)
inline fun <reified T : Any> Key() = ReadOnlyProperty<Any?, Key<T>> { _, property ->
Key(property.name, T::class.java)
}
// Generic message container
class InternalMessage {
private val values = mutableMapOf<Key<*>, Any?>()
operator fun <T> get(key: Key<T>): T = getOrNull(key)
?: throw IllegalArgumentException("Key '${key.id}' is missing")
fun <T> getOrNull(key: Key<T>): T? = values[key]?.let { key.type.cast(it) }
operator fun <T> set(key: Key<T>, value: T) {
values[key] = value
}
}
// Define keys for your domain
val ORDER_REQUEST by Key<OrderRequest>()
val CUSTOMER by Key<Customer>()
val SHOULD_PROCESS by Key<Boolean>()
val IS_CANCELLED by Key<Boolean>()Advantages of Option A:
- Executor reusability - The same executor can be used in multiple flows without duplicating code. An executor that enriches customer data can work in OrderFlow, InvoiceFlow, and ShippingFlow.
- Flexible composition - Flows can share executors and compose them differently
- Decoupled design - Executors don't depend on a specific message type
- Easy to extend - Add new keys without modifying the message class
Option B: Typed Data Class (simpler, but coupled)
data class OrderMessage(
val request: OrderRequest,
var customer: Customer? = null,
var enrichedItems: List<EnrichedItem> = emptyList(),
var route: Route? = null,
var shouldProcess: Boolean = true,
var isCancelled: Boolean = false,
)This approach is simpler and provides compile-time safety, but executors become tightly coupled to the specific message type and cannot be reused across different flows.
abstract class Executor<T> {
open suspend fun isInterested(message: T): Boolean = true
abstract suspend fun T.execute()
}abstract class AbstractFlow<T> {
protected abstract val executors: List<Executor<T>>
protected open suspend fun execute(message: T) {
for (executor in executors) {
if (!executor.isInterested(message)) continue
executor.run { message.execute() }
if (shouldStop(message)) break
}
}
protected abstract fun shouldStop(message: T): Boolean
}For maximum flexibility and testability, executors can be instantiated by a Factory class that reads flow configuration from XML. This allows people to easily see and change the executor order:
<?xml version="1.0" encoding="UTF-8"?>
<flow xmlns="FlowSchema" id="orderProcessingFlow">
<steps>
<className>com.example.order.executor.ValidateOrder</className>
<className>com.example.order.executor.EnrichOrder</className>
<className>com.example.order.executor.TransformOrder</className>
<className>com.example.order.executor.RouteOrder</className>
<className>com.example.order.executor.ProcessOrder</className>
</steps>
</flow>class FlowFactory(private val scope: Scope) {
fun <T> getExecutors(xmlPath: String): List<Executor<T>> {
val config = parseFlowXml(xmlPath)
return config.classNames.map { className ->
scope.getInstance(Class.forName(className)) as Executor<T>
}
}
}Benefits of XML configuration:
- Very clean code - Flow classes contain no hardcoded executor lists
- Highly testable - Swap executor implementations for testing by changing XML
- Runtime configurability - Change flow behavior without redeploying
- Clear visualization - The XML file documents the exact processing order
- Easy A/B testing - Create multiple flow configurations for experimentation
data class OrderRequest(
val customerId: String,
val items: List<OrderItem>,
)
data class OrderItem(
val productId: String,
val quantity: Int,
)
data class Customer(
val id: String,
val name: String,
val tier: CustomerTier,
)
enum class CustomerTier { STANDARD, PREMIUM, VIP }
enum class Route { STANDARD, PRIORITY, APPROVAL_REQUIRED }// Keys.kt
val ORDER_REQUEST by Key<OrderRequest>()
val CUSTOMER by Key<Customer>()
val ENRICHED_ITEMS by Key<List<EnrichedItem>>()
val TOTAL_AMOUNT by Key<BigDecimal>()
val ROUTE by Key<Route>()
val SHOULD_PROCESS by Key<Boolean>()
val IS_CANCELLED by Key<Boolean>()1. Validate Stage
@Singleton
@InjectConstructor
class ValidateOrder : Executor<InternalMessage>() {
override suspend fun InternalMessage.execute() {
val request = get(ORDER_REQUEST)
if (request.customerId.isBlank()) {
// There are two ways of handling validation errors
// 1. Can throw exceptions
throw ValidationException("Customer ID is required")
// 2. Use error-as-value pattern
set(VALIDATION_ERROR, ValidationError("Customer ID is required"))
set(IS_CANCELLED, true)
// Or leverage extension functions to simplify
setError(ValidationError("Customer ID is required"))
return
}
if (request.items.isEmpty()) {
throw ValidationException("Order must contain at least one item")
}
if (request.items.any { it.quantity <= 0 }) {
throw ValidationException("Item quantities must be greater than zero")
}
}
}2. Enrich Stage
@Singleton
@InjectConstructor
class EnrichOrder(
private val customerRepository: CustomerRepository,
private val productRepository: ProductRepository,
) : Executor<InternalMessage>() {
override suspend fun InternalMessage.execute() {
val request = get(ORDER_REQUEST)
// Fetch customer
val customer = customerRepository.findById(request.customerId)
?: throw NotFoundException("Customer not found: ${request.customerId}")
// Save customer to message, making this data available for future executors as well
set(CUSTOMER, customer)
// Fetch product details
val enrichedItems = request.items.map { item ->
val product = productRepository.findById(item.productId)
EnrichedItem(
productId = item.productId,
productName = product?.name ?: "Unknown",
quantity = item.quantity,
unitPrice = product?.price ?: BigDecimal.ZERO,
)
}
set(ENRICHED_ITEMS, enrichedItems)
// Calculate total
val total = enrichedItems.sumOf { it.unitPrice * it.quantity.toBigDecimal() }
set(TOTAL_AMOUNT, total)
}
}3. Transform Stage
@Singleton
@InjectConstructor
class TransformOrder : Executor<InternalMessage>() {
override suspend fun InternalMessage.execute() {
val customer = get(CUSTOMER)
val total = get(TOTAL_AMOUNT)
// Apply tier discount
val discountedTotal = when (customer.tier) {
CustomerTier.VIP -> total * BigDecimal("0.85")
CustomerTier.PREMIUM -> total * BigDecimal("0.90")
CustomerTier.STANDARD -> total
}
set(TOTAL_AMOUNT, discountedTotal)
}
}4. Route Stage
@Singleton
@InjectConstructor
class RouteOrder : Executor<InternalMessage>() {
override suspend fun InternalMessage.execute() {
val customer = get(CUSTOMER)
val total = get(TOTAL_AMOUNT)
val route = when {
total > BigDecimal("1000") -> Route.APPROVAL_REQUIRED
customer.tier == CustomerTier.VIP -> Route.PRIORITY
else -> Route.STANDARD
}
set(ROUTE, route)
}
}5. Operate Stage
@Singleton
@InjectConstructor
class ProcessOrder(
private val orderRepository: OrderRepository,
private val messagingService: MessagingService,
) : Executor<InternalMessage>() {
override suspend fun InternalMessage.execute() {
val request = get(ORDER_REQUEST)
val customer = get(CUSTOMER)
val enrichedItems = get(ENRICHED_ITEMS)
val total = get(TOTAL_AMOUNT)
val route = get(ROUTE)
// Create and save order
val order = Order(
id = UUID.randomUUID().toString(),
customerId = customer.id,
items = enrichedItems,
total = total,
status = OrderStatus.PENDING,
)
orderRepository.save(order)
// Send to appropriate queue
val queue = when (route) {
Route.STANDARD -> "orders.standard"
Route.PRIORITY -> "orders.priority"
Route.APPROVAL_REQUIRED -> "orders.approval"
}
messagingService.send(queue, order)
set(SHOULD_PROCESS, true)
}
}@Singleton
@InjectConstructor
class OrderProcessingFlow(private val scope: Scope) : AbstractFlow<InternalMessage>() {
// Define executors in order, can also be loaded from XML via FlowFactory
override val executors = listOf(
ValidateOrder::class,
EnrichOrder::class,
TransformOrder::class,
RouteOrder::class,
ProcessOrder::class,
).map { scope.getInstance(it.java) }
suspend fun process(request: OrderRequest): OrderResult {
val message = InternalMessage {
set(ORDER_REQUEST, request)
set(SHOULD_PROCESS, false)
}
execute(message)
// If using the error-as-value pattern
// message.getOrNull(VALIDATION_ERROR)?.let { validationError ->
// return@withContext OrderResult.Failure(validationError.message)
// }
// Returns the final result of the operation
return message[ORDER_RESULT]
}
// This method is optional, can be overridden to define custom stopping conditions
override fun shouldStop(message: InternalMessage): Boolean =
message.getOrNull(ORDER_RESULT) != null // Finished successfully
|| message[IS_CANCELLED] // Cancelled (maybe due to validation error)
}
sealed class OrderResult {
data class Success(val route: Route?) : OrderResult()
data class Failure(val reason: String) : OrderResult()
}@RestController
@RequestMapping("/api/orders")
class OrderController(
private val orderFlow: OrderProcessingFlow,
) {
@PostMapping
suspend fun createOrder(@RequestBody request: OrderRequest) {
return when (val result = orderFlow.process(request)) {
is OrderResult.Success -> /* Return success response */
is OrderResult.Failure -> /* Return proper error response or throw */
}
}
}- Separation of Concerns - Each executor has a single responsibility
- Testability - Executors can be unit tested independently
- Reusability - Executors can be reused across different flows (especially with generic InternalMessage)
- Observability - Easy to add logging/metrics around executor execution
- Flexibility - Executors can skip processing via
isInterested()
- Keep executors stateless - Pass all data through the message
- Fail fast in Validate - Set
IS_CANCELLEDearly to avoid wasted processing - Use
isInterested()for conditional execution - Skip executors that don't apply - Override
shouldStop()in flows - Define custom stopping conditions - Use XML configuration for complex flows - Enables runtime changes and easier testing
- VETO - Skip the Route stage for simple linear flows
- VET - Minimal pipeline for basic transformations
- Partial VETRO - Use only the stages you need (e.g., Validate + Enrich + Operate)
- Enterprise Integration Patterns (Hohpe & Woolf)
- MuleSoft VETRO Pattern Documentation
- Apache Camel Enterprise Integration Patterns