Skip to content

Instantly share code, notes, and snippets.

@SecretX33
Last active November 24, 2025 15:28
Show Gist options
  • Select an option

  • Save SecretX33/c68e4c4b22853ffd91dc09df11d219a6 to your computer and use it in GitHub Desktop.

Select an option

Save SecretX33/c68e4c4b22853ffd91dc09df11d219a6 to your computer and use it in GitHub Desktop.
Vetro Pattern

VETRO Design Pattern

VETRO is a data processing pipeline pattern commonly used in enterprise integration architectures. It provides a structured approach for handling messages and data transformations in a predictable, maintainable way.

What is VETRO?

VETRO is an acronym representing five sequential processing stages:

Stage Purpose
Validate Ensure incoming data is well-formed and meets requirements
Enrich Add supplementary data from external sources
Transform Convert data to the required format or structure
Route Determine the destination based on content or rules
Operate Execute the final action or deliver output

When to Use VETRO

VETRO is ideal for:

  • API Gateway patterns - Processing incoming requests before routing to services
  • Message broker consumers - Handling messages from Kafka, RabbitMQ, etc.
  • ETL pipelines - Data ingestion and transformation workflows
  • Notification systems - Processing and routing notifications
  • Order processing - Validating, enriching, and routing orders

Stage Details

These are the stage responsibilities in detail, but don't get too hung up on them.

Important: Not every stage is required in every implementation, and stages can be combined or skipped as needed, think of them more like gradients than hard boundaries.

1. Validate

Check that incoming data meets structural and business requirements.

Responsibilities:

  • Schema validation
  • Required field checks
  • Data type validation
  • Business rule validation

2. Enrich

Augment the data with information from other sources.

Responsibilities:

  • Database lookups
  • External API calls
  • Cache retrievals
  • Computed fields

3. Transform

Convert data from one format or structure to another.

Responsibilities:

  • Format conversion
  • Field mapping
  • Data aggregation
  • Unit conversions

4. Route

Determine where the data should go next.

Responsibilities:

  • Content-based routing
  • Rule-based destination selection
  • Priority queue assignment

5. Operate

Execute the final action with the processed data.

Responsibilities:

  • Persist to database
  • Send to external service
  • Publish to message queue
  • Generate response

Implementation in Spring Boot with Kotlin Coroutines

Core Components

Message Container

You can use either a generic key-value container or a typed data class:

Option A: Generic InternalMessage (flexible, reusable)

// Type-safe key definition
data class Key<T>(val id: String, val type: Class<T>)

inline fun <reified T : Any> Key() = ReadOnlyProperty<Any?, Key<T>> { _, property ->
    Key(property.name, T::class.java)
}

// Generic message container
class InternalMessage {
    private val values = mutableMapOf<Key<*>, Any?>()

    operator fun <T> get(key: Key<T>): T = getOrNull(key)
        ?: throw IllegalArgumentException("Key '${key.id}' is missing")

    fun <T> getOrNull(key: Key<T>): T? = values[key]?.let { key.type.cast(it) }
    
    operator fun <T> set(key: Key<T>, value: T) {
        values[key] = value
    }
}

// Define keys for your domain
val ORDER_REQUEST by Key<OrderRequest>()
val CUSTOMER by Key<Customer>()
val SHOULD_PROCESS by Key<Boolean>()
val IS_CANCELLED by Key<Boolean>()

Advantages of Option A:

  • Executor reusability - The same executor can be used in multiple flows without duplicating code. An executor that enriches customer data can work in OrderFlow, InvoiceFlow, and ShippingFlow.
  • Flexible composition - Flows can share executors and compose them differently
  • Decoupled design - Executors don't depend on a specific message type
  • Easy to extend - Add new keys without modifying the message class

Option B: Typed Data Class (simpler, but coupled)

data class OrderMessage(
    val request: OrderRequest,
    var customer: Customer? = null,
    var enrichedItems: List<EnrichedItem> = emptyList(),
    var route: Route? = null,
    var shouldProcess: Boolean = true,
    var isCancelled: Boolean = false,
)

This approach is simpler and provides compile-time safety, but executors become tightly coupled to the specific message type and cannot be reused across different flows.

Executor Base Class

abstract class Executor<T> {
    open suspend fun isInterested(message: T): Boolean = true

    abstract suspend fun T.execute()
}

Flow Orchestrator

abstract class AbstractFlow<T> {
    protected abstract val executors: List<Executor<T>>

    protected open suspend fun execute(message: T) {
        for (executor in executors) {
            if (!executor.isInterested(message)) continue

            executor.run { message.execute() }

            if (shouldStop(message)) break
        }
    }

    protected abstract fun shouldStop(message: T): Boolean
}

XML-Based Flow Configuration (Optional)

For maximum flexibility and testability, executors can be instantiated by a Factory class that reads flow configuration from XML. This allows people to easily see and change the executor order:

<?xml version="1.0" encoding="UTF-8"?>
<flow xmlns="FlowSchema" id="orderProcessingFlow">
    <steps>
        <className>com.example.order.executor.ValidateOrder</className>
        <className>com.example.order.executor.EnrichOrder</className>
        <className>com.example.order.executor.TransformOrder</className>
        <className>com.example.order.executor.RouteOrder</className>
        <className>com.example.order.executor.ProcessOrder</className>
    </steps>
</flow>
class FlowFactory(private val scope: Scope) {

    fun <T> getExecutors(xmlPath: String): List<Executor<T>> {
        val config = parseFlowXml(xmlPath)
        return config.classNames.map { className ->
            scope.getInstance(Class.forName(className)) as Executor<T>
        }
    }
}

Benefits of XML configuration:

  • Very clean code - Flow classes contain no hardcoded executor lists
  • Highly testable - Swap executor implementations for testing by changing XML
  • Runtime configurability - Change flow behavior without redeploying
  • Clear visualization - The XML file documents the exact processing order
  • Easy A/B testing - Create multiple flow configurations for experimentation

Example: Order Processing Pipeline

Domain Models

data class OrderRequest(
    val customerId: String,
    val items: List<OrderItem>,
)

data class OrderItem(
    val productId: String,
    val quantity: Int,
)

data class Customer(
    val id: String,
    val name: String,
    val tier: CustomerTier,
)

enum class CustomerTier { STANDARD, PREMIUM, VIP }
enum class Route { STANDARD, PRIORITY, APPROVAL_REQUIRED }

Keys Definition

// Keys.kt
val ORDER_REQUEST by Key<OrderRequest>()
val CUSTOMER by Key<Customer>()
val ENRICHED_ITEMS by Key<List<EnrichedItem>>()
val TOTAL_AMOUNT by Key<BigDecimal>()
val ROUTE by Key<Route>()
val SHOULD_PROCESS by Key<Boolean>()
val IS_CANCELLED by Key<Boolean>()

Executors

1. Validate Stage

@Singleton
@InjectConstructor
class ValidateOrder : Executor<InternalMessage>() {

    override suspend fun InternalMessage.execute() {
        val request = get(ORDER_REQUEST)

        if (request.customerId.isBlank()) {
            // There are two ways of handling validation errors
            
            // 1. Can throw exceptions
            throw ValidationException("Customer ID is required")
            
            // 2. Use error-as-value pattern
            set(VALIDATION_ERROR, ValidationError("Customer ID is required"))
            set(IS_CANCELLED, true)
            
            // Or leverage extension functions to simplify
            setError(ValidationError("Customer ID is required"))
            return
        }

        if (request.items.isEmpty()) {
            throw ValidationException("Order must contain at least one item")
        }

        if (request.items.any { it.quantity <= 0 }) {
            throw ValidationException("Item quantities must be greater than zero")
        }
    }
}

2. Enrich Stage

@Singleton
@InjectConstructor
class EnrichOrder(
    private val customerRepository: CustomerRepository,
    private val productRepository: ProductRepository,
) : Executor<InternalMessage>() {

    override suspend fun InternalMessage.execute() {
        val request = get(ORDER_REQUEST)

        // Fetch customer
        val customer = customerRepository.findById(request.customerId)
            ?: throw NotFoundException("Customer not found: ${request.customerId}")
        
        // Save customer to message, making this data available for future executors as well
        set(CUSTOMER, customer)

        // Fetch product details
        val enrichedItems = request.items.map { item ->
            val product = productRepository.findById(item.productId)
            EnrichedItem(
                productId = item.productId,
                productName = product?.name ?: "Unknown",
                quantity = item.quantity,
                unitPrice = product?.price ?: BigDecimal.ZERO,
            )
        }
        set(ENRICHED_ITEMS, enrichedItems)

        // Calculate total
        val total = enrichedItems.sumOf { it.unitPrice * it.quantity.toBigDecimal() }
        set(TOTAL_AMOUNT, total)
    }
}

3. Transform Stage

@Singleton
@InjectConstructor
class TransformOrder : Executor<InternalMessage>() {

    override suspend fun InternalMessage.execute() {
        val customer = get(CUSTOMER)
        val total = get(TOTAL_AMOUNT)

        // Apply tier discount
        val discountedTotal = when (customer.tier) {
            CustomerTier.VIP -> total * BigDecimal("0.85")
            CustomerTier.PREMIUM -> total * BigDecimal("0.90")
            CustomerTier.STANDARD -> total
        }
        set(TOTAL_AMOUNT, discountedTotal)
    }
}

4. Route Stage

@Singleton
@InjectConstructor
class RouteOrder : Executor<InternalMessage>() {

    override suspend fun InternalMessage.execute() {
        val customer = get(CUSTOMER)
        val total = get(TOTAL_AMOUNT)

        val route = when {
            total > BigDecimal("1000") -> Route.APPROVAL_REQUIRED
            customer.tier == CustomerTier.VIP -> Route.PRIORITY
            else -> Route.STANDARD
        }
        set(ROUTE, route)
    }
}

5. Operate Stage

@Singleton
@InjectConstructor
class ProcessOrder(
    private val orderRepository: OrderRepository,
    private val messagingService: MessagingService,
) : Executor<InternalMessage>() {

    override suspend fun InternalMessage.execute() {
        val request = get(ORDER_REQUEST)
        val customer = get(CUSTOMER)
        val enrichedItems = get(ENRICHED_ITEMS)
        val total = get(TOTAL_AMOUNT)
        val route = get(ROUTE)

        // Create and save order
        val order = Order(
            id = UUID.randomUUID().toString(),
            customerId = customer.id,
            items = enrichedItems,
            total = total,
            status = OrderStatus.PENDING,
        )
        orderRepository.save(order)

        // Send to appropriate queue
        val queue = when (route) {
            Route.STANDARD -> "orders.standard"
            Route.PRIORITY -> "orders.priority"
            Route.APPROVAL_REQUIRED -> "orders.approval"
        }
        messagingService.send(queue, order)

        set(SHOULD_PROCESS, true)
    }
}

Flow Definition

@Singleton
@InjectConstructor
class OrderProcessingFlow(private val scope: Scope) : AbstractFlow<InternalMessage>() {

    // Define executors in order, can also be loaded from XML via FlowFactory
    override val executors = listOf(
        ValidateOrder::class,
        EnrichOrder::class,
        TransformOrder::class,
        RouteOrder::class,
        ProcessOrder::class,
    ).map { scope.getInstance(it.java) }

    suspend fun process(request: OrderRequest): OrderResult {
        val message = InternalMessage {
            set(ORDER_REQUEST, request)
            set(SHOULD_PROCESS, false)
        }
        execute(message)
        
        // If using the error-as-value pattern
//        message.getOrNull(VALIDATION_ERROR)?.let { validationError ->
//            return@withContext OrderResult.Failure(validationError.message)
//        }
        
        // Returns the final result of the operation
        return message[ORDER_RESULT]
    }

    // This method is optional, can be overridden to define custom stopping conditions
    override fun shouldStop(message: InternalMessage): Boolean =
        message.getOrNull(ORDER_RESULT) != null // Finished successfully
            || message[IS_CANCELLED] // Cancelled (maybe due to validation error)
}

sealed class OrderResult {
    data class Success(val route: Route?) : OrderResult()
    data class Failure(val reason: String) : OrderResult()
}

Controller Usage

@RestController
@RequestMapping("/api/orders")
class OrderController(
    private val orderFlow: OrderProcessingFlow,
) {

    @PostMapping
    suspend fun createOrder(@RequestBody request: OrderRequest) {
        return when (val result = orderFlow.process(request)) {
            is OrderResult.Success -> /* Return success response */
            is OrderResult.Failure -> /* Return proper error response or throw */
        }
    }
}

Benefits of VETRO

  1. Separation of Concerns - Each executor has a single responsibility
  2. Testability - Executors can be unit tested independently
  3. Reusability - Executors can be reused across different flows (especially with generic InternalMessage)
  4. Observability - Easy to add logging/metrics around executor execution
  5. Flexibility - Executors can skip processing via isInterested()

Best Practices

  • Keep executors stateless - Pass all data through the message
  • Fail fast in Validate - Set IS_CANCELLED early to avoid wasted processing
  • Use isInterested() for conditional execution - Skip executors that don't apply
  • Override shouldStop() in flows - Define custom stopping conditions
  • Use XML configuration for complex flows - Enables runtime changes and easier testing

Common Variations

  • VETO - Skip the Route stage for simple linear flows
  • VET - Minimal pipeline for basic transformations
  • Partial VETRO - Use only the stages you need (e.g., Validate + Enrich + Operate)

References

  • Enterprise Integration Patterns (Hohpe & Woolf)
  • MuleSoft VETRO Pattern Documentation
  • Apache Camel Enterprise Integration Patterns
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment