This site is in English. Use your browser's built-in translate feature to read it in your language.

Choreography

Overview

The Choreography pattern is a decentralized coordination mechanism in enterprise integration architectures where services collaborate to achieve business goals through direct peer-to-peer interactions without the need for a central coordinator or orchestrator. Like a well-rehearsed dance troupe where each dancer knows their role and responds to the movements of their partners without a director, the Choreography pattern enables services to participate in complex business processes by reacting to events and messages from other services according to predefined protocols and agreements. This pattern is essential for building loosely coupled, scalable, and resilient distributed systems where services can autonomously participate in business workflows while maintaining independence and avoiding single points of failure.

Theoretical Foundation

The Choreography pattern is grounded in distributed systems theory, event-driven architecture principles, and autonomous agent coordination. It incorporates concepts from reactive systems, event sourcing, publish-subscribe messaging, and distributed consensus protocols to provide a framework for building self-organizing, resilient business process execution without centralized control. The pattern addresses the challenges of maintaining consistency, handling failures, and ensuring reliable process completion in loosely coupled environments where no single component has complete knowledge or control of the entire business process.

Core Principles

1. Decentralized Coordination

Enabling services to coordinate without centralized control: - Autonomous participation - each service decides independently how to participate in workflows - Event-driven collaboration - services communicate through asynchronous event publishing and consumption - Distributed decision making - business logic distributed across participating services - Self-organizing behavior - emergent workflow behavior through local service interactions

2. Event-Based Communication

Coordinating through publish-subscribe event mechanisms: - Event publishing - services publish domain events when significant state changes occur - Event subscription - services subscribe to relevant events from other services - Event correlation - tracking related events across the distributed workflow - Event ordering - handling event sequences and maintaining causal relationships

3. Protocol-Based Interaction

Defining interaction contracts and agreements: - Interaction protocols - well-defined rules for service communication and collaboration - Message schemas - standardized event and message formats across services - Behavioral contracts - agreements on expected service behaviors and responses - Compensating actions - predefined recovery mechanisms for handling failures

4. Loose Coupling and Autonomy

Maintaining service independence and flexibility: - Service autonomy - each service maintains its own data and business logic - Technology independence - services can use different technologies and platforms - Deployment independence - services can be deployed and updated independently - Failure isolation - failures in one service don't directly impact others

Why Choreography is Essential in Integration Architecture

1. Microservices Orchestration

In microservices architectures, the Choreography pattern provides: - Distributed workflow coordination - enabling complex business processes across multiple services - Service independence - avoiding tight coupling between services through centralized orchestrators - Scalability enablement - supporting horizontal scaling without centralized bottlenecks - Resilience enhancement - eliminating single points of failure in business process execution

2. Event-Driven System Design

For reactive and event-driven architectures: - Reactive system behavior - enabling systems to respond dynamically to changing conditions - Event stream processing - coordinating workflows through continuous event streams - Real-time collaboration - enabling real-time coordination between distributed services - Stream-based integration - integrating systems through event streaming platforms

3. Cross-System Integration

Enabling coordination across organizational and system boundaries: - Inter-organization workflows - coordinating processes across multiple organizations - Legacy system integration - enabling legacy systems to participate through event adapters - Cloud and hybrid integration - coordinating processes across different cloud platforms - Partner ecosystem integration - enabling flexible integration with business partners

4. Business Process Flexibility

Supporting dynamic and flexible business processes: - Process evolution - enabling processes to evolve through changing service behaviors - Dynamic participation - services can join or leave workflows dynamically - Conditional workflows - processes can adapt based on runtime conditions and events - Multi-variant processes - supporting different process variations simultaneously

Benefits in Integration Contexts

1. Technical Advantages

2. Business Value

3. Integration Enablement

4. Operational Excellence

Integration Architecture Applications

1. E-commerce Order Processing

Choreographed order fulfillment across multiple services:

// Order Service - Publishes order events
@Component
public class OrderService {

    @Autowired
    private ApplicationEventPublisher eventPublisher;

    @Autowired
    private OrderRepository orderRepository;

    public Order placeOrder(CreateOrderRequest request) {
        Order order = new Order();
        order.setCustomerId(request.getCustomerId());
        order.setItems(request.getItems());
        order.setShippingAddress(request.getShippingAddress());
        order.setStatus(OrderStatus.PLACED);
        order.setTotalAmount(calculateTotalAmount(request.getItems()));

        order = orderRepository.save(order);

        // Publish order placed event
        OrderPlacedEvent event = new OrderPlacedEvent();
        event.setOrderId(order.getId());
        event.setCustomerId(order.getCustomerId());
        event.setItems(order.getItems());
        event.setTotalAmount(order.getTotalAmount());
        event.setTimestamp(Instant.now());

        eventPublisher.publishEvent(event);

        log.info("Order placed: {}", order.getId());
        return order;
    }

    @EventListener
    public void handleInventoryReserved(InventoryReservedEvent event) {
        Order order = orderRepository.findById(event.getOrderId())
            .orElseThrow(() -> new OrderNotFoundException(event.getOrderId()));

        order.setStatus(OrderStatus.INVENTORY_RESERVED);
        orderRepository.save(order);

        log.info("Inventory reserved for order: {}", event.getOrderId());
    }

    @EventListener
    public void handlePaymentProcessed(PaymentProcessedEvent event) {
        Order order = orderRepository.findById(event.getOrderId())
            .orElseThrow(() -> new OrderNotFoundException(event.getOrderId()));

        if (event.getStatus() == PaymentStatus.SUCCESSFUL) {
            order.setStatus(OrderStatus.PAYMENT_CONFIRMED);
            order.setPaymentId(event.getPaymentId());
        } else {
            order.setStatus(OrderStatus.PAYMENT_FAILED);

            // Publish compensation event
            OrderPaymentFailedEvent compensationEvent = new OrderPaymentFailedEvent();
            compensationEvent.setOrderId(event.getOrderId());
            compensationEvent.setReason(event.getFailureReason());
            compensationEvent.setTimestamp(Instant.now());

            eventPublisher.publishEvent(compensationEvent);
        }

        orderRepository.save(order);
        log.info("Payment processed for order: {} with status: {}", 
                event.getOrderId(), event.getStatus());
    }

    @EventListener
    public void handleShippingInitiated(ShippingInitiatedEvent event) {
        Order order = orderRepository.findById(event.getOrderId())
            .orElseThrow(() -> new OrderNotFoundException(event.getOrderId()));

        order.setStatus(OrderStatus.SHIPPED);
        order.setTrackingNumber(event.getTrackingNumber());
        order.setEstimatedDelivery(event.getEstimatedDelivery());

        orderRepository.save(order);

        log.info("Shipping initiated for order: {} with tracking: {}", 
                event.getOrderId(), event.getTrackingNumber());
    }

    @EventListener
    public void handleInventoryReservationFailed(InventoryReservationFailedEvent event) {
        Order order = orderRepository.findById(event.getOrderId())
            .orElseThrow(() -> new OrderNotFoundException(event.getOrderId()));

        order.setStatus(OrderStatus.CANCELLED);
        order.setCancellationReason("Inventory not available: " + event.getReason());

        orderRepository.save(order);

        log.info("Order cancelled due to inventory failure: {}", event.getOrderId());
    }
}

// Inventory Service - Reacts to order events and manages inventory
@Component
public class InventoryService {

    @Autowired
    private ApplicationEventPublisher eventPublisher;

    @Autowired
    private InventoryRepository inventoryRepository;

    @EventListener
    @Async
    public void handleOrderPlaced(OrderPlacedEvent event) {
        try {
            // Check inventory availability for all items
            boolean allItemsAvailable = event.getItems().stream()
                .allMatch(item -> checkInventoryAvailability(item.getProductId(), item.getQuantity()));

            if (allItemsAvailable) {
                // Reserve inventory for all items
                Map<String, String> reservations = new HashMap<>();

                for (OrderItem item : event.getItems()) {
                    String reservationId = reserveInventory(item.getProductId(), item.getQuantity());
                    reservations.put(item.getProductId(), reservationId);
                }

                // Publish inventory reserved event
                InventoryReservedEvent reservedEvent = new InventoryReservedEvent();
                reservedEvent.setOrderId(event.getOrderId());
                reservedEvent.setReservations(reservations);
                reservedEvent.setTimestamp(Instant.now());

                eventPublisher.publishEvent(reservedEvent);

                log.info("Inventory reserved for order: {}", event.getOrderId());

            } else {
                // Publish inventory reservation failed event
                InventoryReservationFailedEvent failedEvent = new InventoryReservationFailedEvent();
                failedEvent.setOrderId(event.getOrderId());
                failedEvent.setReason("Insufficient inventory for requested items");
                failedEvent.setTimestamp(Instant.now());

                eventPublisher.publishEvent(failedEvent);

                log.warn("Insufficient inventory for order: {}", event.getOrderId());
            }

        } catch (Exception e) {
            log.error("Error processing inventory for order: {}", event.getOrderId(), e);

            InventoryReservationFailedEvent failedEvent = new InventoryReservationFailedEvent();
            failedEvent.setOrderId(event.getOrderId());
            failedEvent.setReason("Inventory system error: " + e.getMessage());
            failedEvent.setTimestamp(Instant.now());

            eventPublisher.publishEvent(failedEvent);
        }
    }

    @EventListener
    public void handleOrderPaymentFailed(OrderPaymentFailedEvent event) {
        try {
            // Release inventory reservations
            releaseInventoryForOrder(event.getOrderId());

            log.info("Inventory released for failed order: {}", event.getOrderId());

        } catch (Exception e) {
            log.error("Error releasing inventory for order: {}", event.getOrderId(), e);
        }
    }

    @EventListener
    public void handleShippingInitiated(ShippingInitiatedEvent event) {
        try {
            // Convert reservations to actual inventory deductions
            confirmInventoryReservations(event.getOrderId());

            log.info("Inventory confirmed for shipped order: {}", event.getOrderId());

        } catch (Exception e) {
            log.error("Error confirming inventory for order: {}", event.getOrderId(), e);
        }
    }

    private boolean checkInventoryAvailability(String productId, int quantity) {
        InventoryItem item = inventoryRepository.findByProductId(productId);
        return item != null && item.getAvailableQuantity() >= quantity;
    }

    private String reserveInventory(String productId, int quantity) {
        InventoryItem item = inventoryRepository.findByProductId(productId);
        if (item.getAvailableQuantity() >= quantity) {
            item.setAvailableQuantity(item.getAvailableQuantity() - quantity);
            item.setReservedQuantity(item.getReservedQuantity() + quantity);
            inventoryRepository.save(item);

            String reservationId = UUID.randomUUID().toString();

            InventoryReservation reservation = new InventoryReservation();
            reservation.setReservationId(reservationId);
            reservation.setProductId(productId);
            reservation.setQuantity(quantity);
            reservation.setCreatedAt(Instant.now());

            inventoryReservationRepository.save(reservation);

            return reservationId;
        }

        throw new InsufficientInventoryException(productId);
    }
}

// Payment Service - Processes payments based on order events
@Component
public class PaymentService {

    @Autowired
    private ApplicationEventPublisher eventPublisher;

    @Autowired
    private PaymentProcessor paymentProcessor;

    @EventListener
    @Async
    public void handleInventoryReserved(InventoryReservedEvent event) {
        try {
            // Retrieve order details for payment processing
            PaymentRequest request = buildPaymentRequest(event.getOrderId());

            PaymentResult result = paymentProcessor.processPayment(request);

            PaymentProcessedEvent processedEvent = new PaymentProcessedEvent();
            processedEvent.setOrderId(event.getOrderId());
            processedEvent.setPaymentId(result.getPaymentId());
            processedEvent.setStatus(result.getStatus());
            processedEvent.setAmount(result.getAmount());
            processedEvent.setTimestamp(Instant.now());

            if (result.getStatus() == PaymentStatus.FAILED) {
                processedEvent.setFailureReason(result.getFailureReason());
            }

            eventPublisher.publishEvent(processedEvent);

            log.info("Payment processed for order: {} with status: {}", 
                    event.getOrderId(), result.getStatus());

        } catch (Exception e) {
            log.error("Error processing payment for order: {}", event.getOrderId(), e);

            PaymentProcessedEvent failedEvent = new PaymentProcessedEvent();
            failedEvent.setOrderId(event.getOrderId());
            failedEvent.setStatus(PaymentStatus.FAILED);
            failedEvent.setFailureReason("Payment system error: " + e.getMessage());
            failedEvent.setTimestamp(Instant.now());

            eventPublisher.publishEvent(failedEvent);
        }
    }

    private PaymentRequest buildPaymentRequest(String orderId) {
        // Retrieve order details and build payment request
        // Implementation depends on order data access strategy
        return new PaymentRequest();
    }
}

// Shipping Service - Handles shipping based on payment confirmation
@Component
public class ShippingService {

    @Autowired
    private ApplicationEventPublisher eventPublisher;

    @Autowired
    private ShippingProvider shippingProvider;

    @EventListener
    @Async
    public void handlePaymentProcessed(PaymentProcessedEvent event) {
        if (event.getStatus() == PaymentStatus.SUCCESSFUL) {
            try {
                ShippingRequest request = buildShippingRequest(event.getOrderId());

                ShippingResult result = shippingProvider.createShipment(request);

                ShippingInitiatedEvent initiatedEvent = new ShippingInitiatedEvent();
                initiatedEvent.setOrderId(event.getOrderId());
                initiatedEvent.setShipmentId(result.getShipmentId());
                initiatedEvent.setTrackingNumber(result.getTrackingNumber());
                initiatedEvent.setCarrier(result.getCarrier());
                initiatedEvent.setEstimatedDelivery(result.getEstimatedDelivery());
                initiatedEvent.setTimestamp(Instant.now());

                eventPublisher.publishEvent(initiatedEvent);

                log.info("Shipping initiated for order: {} with tracking: {}", 
                        event.getOrderId(), result.getTrackingNumber());

            } catch (Exception e) {
                log.error("Error initiating shipping for order: {}", event.getOrderId(), e);

                ShippingFailedEvent failedEvent = new ShippingFailedEvent();
                failedEvent.setOrderId(event.getOrderId());
                failedEvent.setReason("Shipping system error: " + e.getMessage());
                failedEvent.setTimestamp(Instant.now());

                eventPublisher.publishEvent(failedEvent);
            }
        }
    }

    private ShippingRequest buildShippingRequest(String orderId) {
        // Build shipping request from order data
        return new ShippingRequest();
    }
}

2. Financial Services Transaction Processing

Choreographed transaction processing across financial services:

// Account Service - Manages account state and publishes account events
@Component
public class AccountService {

    @Autowired
    private ApplicationEventPublisher eventPublisher;

    @Autowired
    private AccountRepository accountRepository;

    public void initiateTransfer(TransferRequest request) {
        try {
            Account fromAccount = accountRepository.findByAccountNumber(request.getFromAccount());

            if (fromAccount.getBalance().compareTo(request.getAmount()) >= 0) {
                // Create pending transfer
                PendingTransfer transfer = new PendingTransfer();
                transfer.setTransferId(UUID.randomUUID().toString());
                transfer.setFromAccount(request.getFromAccount());
                transfer.setToAccount(request.getToAccount());
                transfer.setAmount(request.getAmount());
                transfer.setStatus(TransferStatus.INITIATED);

                pendingTransferRepository.save(transfer);

                // Publish transfer initiated event
                TransferInitiatedEvent event = new TransferInitiatedEvent();
                event.setTransferId(transfer.getTransferId());
                event.setFromAccount(request.getFromAccount());
                event.setToAccount(request.getToAccount());
                event.setAmount(request.getAmount());
                event.setTimestamp(Instant.now());

                eventPublisher.publishEvent(event);

                log.info("Transfer initiated: {}", transfer.getTransferId());

            } else {
                throw new InsufficientFundsException(request.getFromAccount());
            }

        } catch (Exception e) {
            TransferFailedEvent failedEvent = new TransferFailedEvent();
            failedEvent.setFromAccount(request.getFromAccount());
            failedEvent.setToAccount(request.getToAccount());
            failedEvent.setAmount(request.getAmount());
            failedEvent.setReason(e.getMessage());
            failedEvent.setTimestamp(Instant.now());

            eventPublisher.publishEvent(failedEvent);
        }
    }

    @EventListener
    public void handleFraudCheckCompleted(FraudCheckCompletedEvent event) {
        PendingTransfer transfer = pendingTransferRepository.findById(event.getTransferId())
            .orElseThrow(() -> new TransferNotFoundException(event.getTransferId()));

        if (event.getResult() == FraudCheckResult.APPROVED) {
            transfer.setStatus(TransferStatus.FRAUD_CHECK_PASSED);
            pendingTransferRepository.save(transfer);

            // Publish compliance check required event
            ComplianceCheckRequiredEvent complianceEvent = new ComplianceCheckRequiredEvent();
            complianceEvent.setTransferId(event.getTransferId());
            complianceEvent.setFromAccount(transfer.getFromAccount());
            complianceEvent.setToAccount(transfer.getToAccount());
            complianceEvent.setAmount(transfer.getAmount());
            complianceEvent.setTimestamp(Instant.now());

            eventPublisher.publishEvent(complianceEvent);

        } else {
            transfer.setStatus(TransferStatus.REJECTED);
            transfer.setRejectionReason("Fraud check failed: " + event.getReason());
            pendingTransferRepository.save(transfer);

            TransferRejectedEvent rejectedEvent = new TransferRejectedEvent();
            rejectedEvent.setTransferId(event.getTransferId());
            rejectedEvent.setReason(event.getReason());
            rejectedEvent.setTimestamp(Instant.now());

            eventPublisher.publishEvent(rejectedEvent);
        }
    }

    @EventListener
    public void handleComplianceCheckCompleted(ComplianceCheckCompletedEvent event) {
        PendingTransfer transfer = pendingTransferRepository.findById(event.getTransferId())
            .orElseThrow(() -> new TransferNotFoundException(event.getTransferId()));

        if (event.getResult() == ComplianceCheckResult.APPROVED) {
            // Execute the actual transfer
            executeTransfer(transfer);

        } else {
            transfer.setStatus(TransferStatus.REJECTED);
            transfer.setRejectionReason("Compliance check failed: " + event.getReason());
            pendingTransferRepository.save(transfer);

            TransferRejectedEvent rejectedEvent = new TransferRejectedEvent();
            rejectedEvent.setTransferId(event.getTransferId());
            rejectedEvent.setReason(event.getReason());
            rejectedEvent.setTimestamp(Instant.now());

            eventPublisher.publishEvent(rejectedEvent);
        }
    }

    private void executeTransfer(PendingTransfer transfer) {
        try {
            Account fromAccount = accountRepository.findByAccountNumber(transfer.getFromAccount());
            Account toAccount = accountRepository.findByAccountNumber(transfer.getToAccount());

            // Perform the transfer
            fromAccount.setBalance(fromAccount.getBalance().subtract(transfer.getAmount()));
            toAccount.setBalance(toAccount.getBalance().add(transfer.getAmount()));

            accountRepository.save(fromAccount);
            accountRepository.save(toAccount);

            transfer.setStatus(TransferStatus.COMPLETED);
            pendingTransferRepository.save(transfer);

            // Publish transfer completed event
            TransferCompletedEvent completedEvent = new TransferCompletedEvent();
            completedEvent.setTransferId(transfer.getTransferId());
            completedEvent.setFromAccount(transfer.getFromAccount());
            completedEvent.setToAccount(transfer.getToAccount());
            completedEvent.setAmount(transfer.getAmount());
            completedEvent.setTimestamp(Instant.now());

            eventPublisher.publishEvent(completedEvent);

            log.info("Transfer completed: {}", transfer.getTransferId());

        } catch (Exception e) {
            log.error("Error executing transfer: {}", transfer.getTransferId(), e);

            transfer.setStatus(TransferStatus.FAILED);
            transfer.setRejectionReason("Transfer execution failed: " + e.getMessage());
            pendingTransferRepository.save(transfer);

            TransferFailedEvent failedEvent = new TransferFailedEvent();
            failedEvent.setTransferId(transfer.getTransferId());
            failedEvent.setReason(e.getMessage());
            failedEvent.setTimestamp(Instant.now());

            eventPublisher.publishEvent(failedEvent);
        }
    }
}

// Fraud Detection Service - Analyzes transactions for fraud
@Component
public class FraudDetectionService {

    @Autowired
    private ApplicationEventPublisher eventPublisher;

    @Autowired
    private FraudDetectionEngine fraudEngine;

    @EventListener
    @Async
    public void handleTransferInitiated(TransferInitiatedEvent event) {
        try {
            FraudAnalysisRequest request = new FraudAnalysisRequest();
            request.setTransferId(event.getTransferId());
            request.setFromAccount(event.getFromAccount());
            request.setToAccount(event.getToAccount());
            request.setAmount(event.getAmount());
            request.setTimestamp(event.getTimestamp());

            FraudAnalysisResult result = fraudEngine.analyzeTransaction(request);

            FraudCheckCompletedEvent completedEvent = new FraudCheckCompletedEvent();
            completedEvent.setTransferId(event.getTransferId());
            completedEvent.setResult(result.getResult());
            completedEvent.setRiskScore(result.getRiskScore());
            completedEvent.setReason(result.getReason());
            completedEvent.setTimestamp(Instant.now());

            eventPublisher.publishEvent(completedEvent);

            log.info("Fraud check completed for transfer: {} with result: {}", 
                    event.getTransferId(), result.getResult());

        } catch (Exception e) {
            log.error("Error in fraud detection for transfer: {}", event.getTransferId(), e);

            FraudCheckCompletedEvent failedEvent = new FraudCheckCompletedEvent();
            failedEvent.setTransferId(event.getTransferId());
            failedEvent.setResult(FraudCheckResult.ERROR);
            failedEvent.setReason("Fraud detection system error: " + e.getMessage());
            failedEvent.setTimestamp(Instant.now());

            eventPublisher.publishEvent(failedEvent);
        }
    }
}

// Compliance Service - Performs regulatory compliance checks
@Component
public class ComplianceService {

    @Autowired
    private ApplicationEventPublisher eventPublisher;

    @Autowired
    private ComplianceEngine complianceEngine;

    @EventListener
    @Async
    public void handleComplianceCheckRequired(ComplianceCheckRequiredEvent event) {
        try {
            ComplianceCheckRequest request = new ComplianceCheckRequest();
            request.setTransferId(event.getTransferId());
            request.setFromAccount(event.getFromAccount());
            request.setToAccount(event.getToAccount());
            request.setAmount(event.getAmount());

            ComplianceCheckResult result = complianceEngine.performComplianceCheck(request);

            ComplianceCheckCompletedEvent completedEvent = new ComplianceCheckCompletedEvent();
            completedEvent.setTransferId(event.getTransferId());
            completedEvent.setResult(result.getResult());
            completedEvent.setReason(result.getReason());
            completedEvent.setRequiredActions(result.getRequiredActions());
            completedEvent.setTimestamp(Instant.now());

            eventPublisher.publishEvent(completedEvent);

            log.info("Compliance check completed for transfer: {} with result: {}", 
                    event.getTransferId(), result.getResult());

        } catch (Exception e) {
            log.error("Error in compliance check for transfer: {}", event.getTransferId(), e);

            ComplianceCheckCompletedEvent failedEvent = new ComplianceCheckCompletedEvent();
            failedEvent.setTransferId(event.getTransferId());
            failedEvent.setResult(ComplianceCheckResult.ERROR);
            failedEvent.setReason("Compliance system error: " + e.getMessage());
            failedEvent.setTimestamp(Instant.now());

            eventPublisher.publishEvent(failedEvent);
        }
    }
}

3. Apache Camel Choreography Implementation

Event-driven choreography using Apache Camel:

@Component
public class CamelChoreographyRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {

        // Order processing choreography
        from("kafka:order-events?groupId=order-choreography")
            .routeId("order-choreography")
            .unmarshal().json(JsonLibrary.Jackson, OrderEvent.class)
            .choice()
                .when(simple("${body.eventType} == 'ORDER_PLACED'"))
                    .multicast().parallelProcessing()
                        .to("direct:processInventoryCheck")
                        .to("direct:validatePaymentMethod")
                    .end()
                .when(simple("${body.eventType} == 'INVENTORY_RESERVED'"))
                    .to("direct:processPayment")
                .when(simple("${body.eventType} == 'PAYMENT_CONFIRMED'"))
                    .to("direct:initiateShipping")
                .when(simple("${body.eventType} == 'INVENTORY_FAILED'"))
                    .to("direct:handleOrderFailure")
                .when(simple("${body.eventType} == 'PAYMENT_FAILED'"))
                    .to("direct:releaseInventory")
                .otherwise()
                    .log("Unknown order event: ${body.eventType}")
            .end();

        from("direct:processInventoryCheck")
            .routeId("process-inventory-check")
            .process(exchange -> {
                OrderEvent orderEvent = exchange.getIn().getBody(OrderEvent.class);

                InventoryCheckRequest request = new InventoryCheckRequest();
                request.setOrderId(orderEvent.getOrderId());
                request.setItems(orderEvent.getItems());

                exchange.getIn().setBody(request);
            })
            .to("kafka:inventory-check-requests")
            .log("Inventory check requested for order: ${body.orderId}");

        from("direct:validatePaymentMethod")
            .routeId("validate-payment-method")
            .process(exchange -> {
                OrderEvent orderEvent = exchange.getIn().getBody(OrderEvent.class);

                PaymentValidationRequest request = new PaymentValidationRequest();
                request.setOrderId(orderEvent.getOrderId());
                request.setPaymentMethod(orderEvent.getPaymentMethod());
                request.setCustomerId(orderEvent.getCustomerId());

                exchange.getIn().setBody(request);
            })
            .to("kafka:payment-validation-requests")
            .log("Payment validation requested for order: ${body.orderId}");

        from("direct:processPayment")
            .routeId("process-payment")
            .process(exchange -> {
                OrderEvent orderEvent = exchange.getIn().getBody(OrderEvent.class);

                PaymentProcessingRequest request = new PaymentProcessingRequest();
                request.setOrderId(orderEvent.getOrderId());
                request.setAmount(orderEvent.getTotalAmount());
                request.setPaymentMethod(orderEvent.getPaymentMethod());

                exchange.getIn().setBody(request);
            })
            .to("kafka:payment-processing-requests")
            .log("Payment processing requested for order: ${body.orderId}");

        from("direct:initiateShipping")
            .routeId("initiate-shipping")
            .process(exchange -> {
                OrderEvent orderEvent = exchange.getIn().getBody(OrderEvent.class);

                ShippingRequest request = new ShippingRequest();
                request.setOrderId(orderEvent.getOrderId());
                request.setShippingAddress(orderEvent.getShippingAddress());
                request.setItems(orderEvent.getItems());

                exchange.getIn().setBody(request);
            })
            .to("kafka:shipping-requests")
            .log("Shipping requested for order: ${body.orderId}");

        from("direct:releaseInventory")
            .routeId("release-inventory")
            .process(exchange -> {
                OrderEvent orderEvent = exchange.getIn().getBody(OrderEvent.class);

                InventoryReleaseRequest request = new InventoryReleaseRequest();
                request.setOrderId(orderEvent.getOrderId());
                request.setReason("Payment failed");

                exchange.getIn().setBody(request);
            })
            .to("kafka:inventory-release-requests")
            .log("Inventory release requested for order: ${body.orderId}");

        from("direct:handleOrderFailure")
            .routeId("handle-order-failure")
            .process(exchange -> {
                OrderEvent orderEvent = exchange.getIn().getBody(OrderEvent.class);

                OrderFailureNotification notification = new OrderFailureNotification();
                notification.setOrderId(orderEvent.getOrderId());
                notification.setCustomerId(orderEvent.getCustomerId());
                notification.setReason(orderEvent.getFailureReason());

                exchange.getIn().setBody(notification);
            })
            .to("kafka:customer-notifications")
            .log("Order failure notification sent for order: ${body.orderId}");

        // Inventory service choreography
        from("kafka:inventory-check-requests?groupId=inventory-choreography")
            .routeId("inventory-choreography")
            .unmarshal().json(JsonLibrary.Jackson, InventoryCheckRequest.class)
            .process(exchange -> {
                InventoryCheckRequest request = exchange.getIn().getBody(InventoryCheckRequest.class);

                // Check inventory availability
                boolean available = checkInventoryAvailability(request.getItems());

                OrderEvent resultEvent = new OrderEvent();
                resultEvent.setOrderId(request.getOrderId());
                resultEvent.setEventType(available ? "INVENTORY_RESERVED" : "INVENTORY_FAILED");
                resultEvent.setTimestamp(Instant.now());

                if (!available) {
                    resultEvent.setFailureReason("Insufficient inventory");
                }

                exchange.getIn().setBody(resultEvent);
            })
            .to("kafka:order-events")
            .log("Inventory check result sent for order: ${body.orderId}");

        // Payment service choreography
        from("kafka:payment-processing-requests?groupId=payment-choreography")
            .routeId("payment-choreography")
            .unmarshal().json(JsonLibrary.Jackson, PaymentProcessingRequest.class)
            .process(exchange -> {
                PaymentProcessingRequest request = exchange.getIn().getBody(PaymentProcessingRequest.class);

                // Process payment
                boolean success = processPayment(request);

                OrderEvent resultEvent = new OrderEvent();
                resultEvent.setOrderId(request.getOrderId());
                resultEvent.setEventType(success ? "PAYMENT_CONFIRMED" : "PAYMENT_FAILED");
                resultEvent.setTimestamp(Instant.now());

                if (!success) {
                    resultEvent.setFailureReason("Payment processing failed");
                }

                exchange.getIn().setBody(resultEvent);
            })
            .to("kafka:order-events")
            .log("Payment processing result sent for order: ${body.orderId}");

        // Shipping service choreography
        from("kafka:shipping-requests?groupId=shipping-choreography")
            .routeId("shipping-choreography")
            .unmarshal().json(JsonLibrary.Jackson, ShippingRequest.class)
            .process(exchange -> {
                ShippingRequest request = exchange.getIn().getBody(ShippingRequest.class);

                // Initiate shipping
                String trackingNumber = initiateShipping(request);

                OrderEvent resultEvent = new OrderEvent();
                resultEvent.setOrderId(request.getOrderId());
                resultEvent.setEventType("SHIPPING_INITIATED");
                resultEvent.setTrackingNumber(trackingNumber);
                resultEvent.setTimestamp(Instant.now());

                exchange.getIn().setBody(resultEvent);
            })
            .to("kafka:order-events")
            .log("Shipping initiation result sent for order: ${body.orderId}");

        // Event correlation and monitoring
        from("kafka:order-events?groupId=event-correlation")
            .routeId("event-correlation")
            .unmarshal().json(JsonLibrary.Jackson, OrderEvent.class)
            .process(exchange -> {
                OrderEvent event = exchange.getIn().getBody(OrderEvent.class);

                // Track event correlation and process state
                ProcessCorrelation correlation = new ProcessCorrelation();
                correlation.setOrderId(event.getOrderId());
                correlation.setEventType(event.getEventType());
                correlation.setTimestamp(event.getTimestamp());

                // Store correlation data for monitoring and debugging
                correlationRepository.save(correlation);

                exchange.getIn().setBody(correlation);
            })
            .choice()
                .when(simple("${body.eventType} == 'ORDER_PLACED'"))
                    .to("direct:startProcessMonitoring")
                .when(simple("${body.eventType} in 'SHIPPING_INITIATED,ORDER_FAILED,ORDER_CANCELLED'"))
                    .to("direct:endProcessMonitoring")
            .end();

        from("direct:startProcessMonitoring")
            .routeId("start-process-monitoring")
            .process(exchange -> {
                ProcessCorrelation correlation = exchange.getIn().getBody(ProcessCorrelation.class);

                ProcessMonitor monitor = new ProcessMonitor();
                monitor.setOrderId(correlation.getOrderId());
                monitor.setStartTime(correlation.getTimestamp());
                monitor.setStatus(ProcessStatus.IN_PROGRESS);

                processMonitorRepository.save(monitor);
            })
            .log("Process monitoring started for order: ${body.orderId}");

        from("direct:endProcessMonitoring")
            .routeId("end-process-monitoring")
            .process(exchange -> {
                ProcessCorrelation correlation = exchange.getIn().getBody(ProcessCorrelation.class);

                ProcessMonitor monitor = processMonitorRepository.findByOrderId(correlation.getOrderId());
                if (monitor != null) {
                    monitor.setEndTime(correlation.getTimestamp());
                    monitor.setStatus(ProcessStatus.COMPLETED);

                    Duration duration = Duration.between(monitor.getStartTime(), monitor.getEndTime());
                    monitor.setProcessingDuration(duration);

                    processMonitorRepository.save(monitor);
                }
            })
            .log("Process monitoring completed for order: ${body.orderId}");
    }
}

Best Practices

1. Event Design and Schema Management

2. Service Autonomy and Boundaries

3. Error Handling and Compensation

4. Event Ordering and Consistency

5. Monitoring and Observability

6. Testing and Validation

The Choreography pattern is essential for building resilient, scalable, and loosely coupled distributed systems where services can autonomously participate in complex business processes without centralized coordination, enabling high performance and fault tolerance in enterprise integration architectures.

← Back to All Patterns