Choreography
Overview
The Choreography pattern is a decentralized coordination mechanism in enterprise integration architectures where services collaborate to achieve business goals through direct peer-to-peer interactions without the need for a central coordinator or orchestrator. Like a well-rehearsed dance troupe where each dancer knows their role and responds to the movements of their partners without a director, the Choreography pattern enables services to participate in complex business processes by reacting to events and messages from other services according to predefined protocols and agreements. This pattern is essential for building loosely coupled, scalable, and resilient distributed systems where services can autonomously participate in business workflows while maintaining independence and avoiding single points of failure.
Theoretical Foundation
The Choreography pattern is grounded in distributed systems theory, event-driven architecture principles, and autonomous agent coordination. It incorporates concepts from reactive systems, event sourcing, publish-subscribe messaging, and distributed consensus protocols to provide a framework for building self-organizing, resilient business process execution without centralized control. The pattern addresses the challenges of maintaining consistency, handling failures, and ensuring reliable process completion in loosely coupled environments where no single component has complete knowledge or control of the entire business process.
Core Principles
1. Decentralized Coordination
Enabling services to coordinate without centralized control: - Autonomous participation - each service decides independently how to participate in workflows - Event-driven collaboration - services communicate through asynchronous event publishing and consumption - Distributed decision making - business logic distributed across participating services - Self-organizing behavior - emergent workflow behavior through local service interactions
2. Event-Based Communication
Coordinating through publish-subscribe event mechanisms: - Event publishing - services publish domain events when significant state changes occur - Event subscription - services subscribe to relevant events from other services - Event correlation - tracking related events across the distributed workflow - Event ordering - handling event sequences and maintaining causal relationships
3. Protocol-Based Interaction
Defining interaction contracts and agreements: - Interaction protocols - well-defined rules for service communication and collaboration - Message schemas - standardized event and message formats across services - Behavioral contracts - agreements on expected service behaviors and responses - Compensating actions - predefined recovery mechanisms for handling failures
4. Loose Coupling and Autonomy
Maintaining service independence and flexibility: - Service autonomy - each service maintains its own data and business logic - Technology independence - services can use different technologies and platforms - Deployment independence - services can be deployed and updated independently - Failure isolation - failures in one service don't directly impact others
Why Choreography is Essential in Integration Architecture
1. Microservices Orchestration
In microservices architectures, the Choreography pattern provides: - Distributed workflow coordination - enabling complex business processes across multiple services - Service independence - avoiding tight coupling between services through centralized orchestrators - Scalability enablement - supporting horizontal scaling without centralized bottlenecks - Resilience enhancement - eliminating single points of failure in business process execution
2. Event-Driven System Design
For reactive and event-driven architectures: - Reactive system behavior - enabling systems to respond dynamically to changing conditions - Event stream processing - coordinating workflows through continuous event streams - Real-time collaboration - enabling real-time coordination between distributed services - Stream-based integration - integrating systems through event streaming platforms
3. Cross-System Integration
Enabling coordination across organizational and system boundaries: - Inter-organization workflows - coordinating processes across multiple organizations - Legacy system integration - enabling legacy systems to participate through event adapters - Cloud and hybrid integration - coordinating processes across different cloud platforms - Partner ecosystem integration - enabling flexible integration with business partners
4. Business Process Flexibility
Supporting dynamic and flexible business processes: - Process evolution - enabling processes to evolve through changing service behaviors - Dynamic participation - services can join or leave workflows dynamically - Conditional workflows - processes can adapt based on runtime conditions and events - Multi-variant processes - supporting different process variations simultaneously
Benefits in Integration Contexts
1. Technical Advantages
- No single point of failure - eliminating centralized orchestrators that can become bottlenecks
- High scalability - supporting massive scale through distributed event processing
- Loose coupling - minimizing dependencies between services and systems
- Technology diversity - enabling heterogeneous technology stacks across services
2. Business Value
- Business agility - rapid adaptation to changing business requirements through service evolution
- Process flexibility - supporting dynamic and evolving business processes
- Operational efficiency - reducing operational overhead through autonomous service behavior
- Innovation enablement - facilitating rapid experimentation and new service development
3. Integration Enablement
- Event-driven integration - enabling real-time integration through event streaming
- Asynchronous processing - supporting high-throughput asynchronous workflows
- Protocol mediation - handling communication between services using different protocols
- Data consistency - achieving eventual consistency through event-driven synchronization
4. Operational Excellence
- Fault tolerance - continuing operation despite individual service failures
- Self-healing capabilities - automatic recovery through compensating actions and retries
- Monitoring and observability - comprehensive visibility through distributed event tracking
- Performance optimization - optimizing performance through parallel and asynchronous processing
Integration Architecture Applications
1. E-commerce Order Processing
Choreographed order fulfillment across multiple services:
// Order Service - Publishes order events
@Component
public class OrderService {
@Autowired
private ApplicationEventPublisher eventPublisher;
@Autowired
private OrderRepository orderRepository;
public Order placeOrder(CreateOrderRequest request) {
Order order = new Order();
order.setCustomerId(request.getCustomerId());
order.setItems(request.getItems());
order.setShippingAddress(request.getShippingAddress());
order.setStatus(OrderStatus.PLACED);
order.setTotalAmount(calculateTotalAmount(request.getItems()));
order = orderRepository.save(order);
// Publish order placed event
OrderPlacedEvent event = new OrderPlacedEvent();
event.setOrderId(order.getId());
event.setCustomerId(order.getCustomerId());
event.setItems(order.getItems());
event.setTotalAmount(order.getTotalAmount());
event.setTimestamp(Instant.now());
eventPublisher.publishEvent(event);
log.info("Order placed: {}", order.getId());
return order;
}
@EventListener
public void handleInventoryReserved(InventoryReservedEvent event) {
Order order = orderRepository.findById(event.getOrderId())
.orElseThrow(() -> new OrderNotFoundException(event.getOrderId()));
order.setStatus(OrderStatus.INVENTORY_RESERVED);
orderRepository.save(order);
log.info("Inventory reserved for order: {}", event.getOrderId());
}
@EventListener
public void handlePaymentProcessed(PaymentProcessedEvent event) {
Order order = orderRepository.findById(event.getOrderId())
.orElseThrow(() -> new OrderNotFoundException(event.getOrderId()));
if (event.getStatus() == PaymentStatus.SUCCESSFUL) {
order.setStatus(OrderStatus.PAYMENT_CONFIRMED);
order.setPaymentId(event.getPaymentId());
} else {
order.setStatus(OrderStatus.PAYMENT_FAILED);
// Publish compensation event
OrderPaymentFailedEvent compensationEvent = new OrderPaymentFailedEvent();
compensationEvent.setOrderId(event.getOrderId());
compensationEvent.setReason(event.getFailureReason());
compensationEvent.setTimestamp(Instant.now());
eventPublisher.publishEvent(compensationEvent);
}
orderRepository.save(order);
log.info("Payment processed for order: {} with status: {}",
event.getOrderId(), event.getStatus());
}
@EventListener
public void handleShippingInitiated(ShippingInitiatedEvent event) {
Order order = orderRepository.findById(event.getOrderId())
.orElseThrow(() -> new OrderNotFoundException(event.getOrderId()));
order.setStatus(OrderStatus.SHIPPED);
order.setTrackingNumber(event.getTrackingNumber());
order.setEstimatedDelivery(event.getEstimatedDelivery());
orderRepository.save(order);
log.info("Shipping initiated for order: {} with tracking: {}",
event.getOrderId(), event.getTrackingNumber());
}
@EventListener
public void handleInventoryReservationFailed(InventoryReservationFailedEvent event) {
Order order = orderRepository.findById(event.getOrderId())
.orElseThrow(() -> new OrderNotFoundException(event.getOrderId()));
order.setStatus(OrderStatus.CANCELLED);
order.setCancellationReason("Inventory not available: " + event.getReason());
orderRepository.save(order);
log.info("Order cancelled due to inventory failure: {}", event.getOrderId());
}
}
// Inventory Service - Reacts to order events and manages inventory
@Component
public class InventoryService {
@Autowired
private ApplicationEventPublisher eventPublisher;
@Autowired
private InventoryRepository inventoryRepository;
@EventListener
@Async
public void handleOrderPlaced(OrderPlacedEvent event) {
try {
// Check inventory availability for all items
boolean allItemsAvailable = event.getItems().stream()
.allMatch(item -> checkInventoryAvailability(item.getProductId(), item.getQuantity()));
if (allItemsAvailable) {
// Reserve inventory for all items
Map<String, String> reservations = new HashMap<>();
for (OrderItem item : event.getItems()) {
String reservationId = reserveInventory(item.getProductId(), item.getQuantity());
reservations.put(item.getProductId(), reservationId);
}
// Publish inventory reserved event
InventoryReservedEvent reservedEvent = new InventoryReservedEvent();
reservedEvent.setOrderId(event.getOrderId());
reservedEvent.setReservations(reservations);
reservedEvent.setTimestamp(Instant.now());
eventPublisher.publishEvent(reservedEvent);
log.info("Inventory reserved for order: {}", event.getOrderId());
} else {
// Publish inventory reservation failed event
InventoryReservationFailedEvent failedEvent = new InventoryReservationFailedEvent();
failedEvent.setOrderId(event.getOrderId());
failedEvent.setReason("Insufficient inventory for requested items");
failedEvent.setTimestamp(Instant.now());
eventPublisher.publishEvent(failedEvent);
log.warn("Insufficient inventory for order: {}", event.getOrderId());
}
} catch (Exception e) {
log.error("Error processing inventory for order: {}", event.getOrderId(), e);
InventoryReservationFailedEvent failedEvent = new InventoryReservationFailedEvent();
failedEvent.setOrderId(event.getOrderId());
failedEvent.setReason("Inventory system error: " + e.getMessage());
failedEvent.setTimestamp(Instant.now());
eventPublisher.publishEvent(failedEvent);
}
}
@EventListener
public void handleOrderPaymentFailed(OrderPaymentFailedEvent event) {
try {
// Release inventory reservations
releaseInventoryForOrder(event.getOrderId());
log.info("Inventory released for failed order: {}", event.getOrderId());
} catch (Exception e) {
log.error("Error releasing inventory for order: {}", event.getOrderId(), e);
}
}
@EventListener
public void handleShippingInitiated(ShippingInitiatedEvent event) {
try {
// Convert reservations to actual inventory deductions
confirmInventoryReservations(event.getOrderId());
log.info("Inventory confirmed for shipped order: {}", event.getOrderId());
} catch (Exception e) {
log.error("Error confirming inventory for order: {}", event.getOrderId(), e);
}
}
private boolean checkInventoryAvailability(String productId, int quantity) {
InventoryItem item = inventoryRepository.findByProductId(productId);
return item != null && item.getAvailableQuantity() >= quantity;
}
private String reserveInventory(String productId, int quantity) {
InventoryItem item = inventoryRepository.findByProductId(productId);
if (item.getAvailableQuantity() >= quantity) {
item.setAvailableQuantity(item.getAvailableQuantity() - quantity);
item.setReservedQuantity(item.getReservedQuantity() + quantity);
inventoryRepository.save(item);
String reservationId = UUID.randomUUID().toString();
InventoryReservation reservation = new InventoryReservation();
reservation.setReservationId(reservationId);
reservation.setProductId(productId);
reservation.setQuantity(quantity);
reservation.setCreatedAt(Instant.now());
inventoryReservationRepository.save(reservation);
return reservationId;
}
throw new InsufficientInventoryException(productId);
}
}
// Payment Service - Processes payments based on order events
@Component
public class PaymentService {
@Autowired
private ApplicationEventPublisher eventPublisher;
@Autowired
private PaymentProcessor paymentProcessor;
@EventListener
@Async
public void handleInventoryReserved(InventoryReservedEvent event) {
try {
// Retrieve order details for payment processing
PaymentRequest request = buildPaymentRequest(event.getOrderId());
PaymentResult result = paymentProcessor.processPayment(request);
PaymentProcessedEvent processedEvent = new PaymentProcessedEvent();
processedEvent.setOrderId(event.getOrderId());
processedEvent.setPaymentId(result.getPaymentId());
processedEvent.setStatus(result.getStatus());
processedEvent.setAmount(result.getAmount());
processedEvent.setTimestamp(Instant.now());
if (result.getStatus() == PaymentStatus.FAILED) {
processedEvent.setFailureReason(result.getFailureReason());
}
eventPublisher.publishEvent(processedEvent);
log.info("Payment processed for order: {} with status: {}",
event.getOrderId(), result.getStatus());
} catch (Exception e) {
log.error("Error processing payment for order: {}", event.getOrderId(), e);
PaymentProcessedEvent failedEvent = new PaymentProcessedEvent();
failedEvent.setOrderId(event.getOrderId());
failedEvent.setStatus(PaymentStatus.FAILED);
failedEvent.setFailureReason("Payment system error: " + e.getMessage());
failedEvent.setTimestamp(Instant.now());
eventPublisher.publishEvent(failedEvent);
}
}
private PaymentRequest buildPaymentRequest(String orderId) {
// Retrieve order details and build payment request
// Implementation depends on order data access strategy
return new PaymentRequest();
}
}
// Shipping Service - Handles shipping based on payment confirmation
@Component
public class ShippingService {
@Autowired
private ApplicationEventPublisher eventPublisher;
@Autowired
private ShippingProvider shippingProvider;
@EventListener
@Async
public void handlePaymentProcessed(PaymentProcessedEvent event) {
if (event.getStatus() == PaymentStatus.SUCCESSFUL) {
try {
ShippingRequest request = buildShippingRequest(event.getOrderId());
ShippingResult result = shippingProvider.createShipment(request);
ShippingInitiatedEvent initiatedEvent = new ShippingInitiatedEvent();
initiatedEvent.setOrderId(event.getOrderId());
initiatedEvent.setShipmentId(result.getShipmentId());
initiatedEvent.setTrackingNumber(result.getTrackingNumber());
initiatedEvent.setCarrier(result.getCarrier());
initiatedEvent.setEstimatedDelivery(result.getEstimatedDelivery());
initiatedEvent.setTimestamp(Instant.now());
eventPublisher.publishEvent(initiatedEvent);
log.info("Shipping initiated for order: {} with tracking: {}",
event.getOrderId(), result.getTrackingNumber());
} catch (Exception e) {
log.error("Error initiating shipping for order: {}", event.getOrderId(), e);
ShippingFailedEvent failedEvent = new ShippingFailedEvent();
failedEvent.setOrderId(event.getOrderId());
failedEvent.setReason("Shipping system error: " + e.getMessage());
failedEvent.setTimestamp(Instant.now());
eventPublisher.publishEvent(failedEvent);
}
}
}
private ShippingRequest buildShippingRequest(String orderId) {
// Build shipping request from order data
return new ShippingRequest();
}
}
2. Financial Services Transaction Processing
Choreographed transaction processing across financial services:
// Account Service - Manages account state and publishes account events
@Component
public class AccountService {
@Autowired
private ApplicationEventPublisher eventPublisher;
@Autowired
private AccountRepository accountRepository;
public void initiateTransfer(TransferRequest request) {
try {
Account fromAccount = accountRepository.findByAccountNumber(request.getFromAccount());
if (fromAccount.getBalance().compareTo(request.getAmount()) >= 0) {
// Create pending transfer
PendingTransfer transfer = new PendingTransfer();
transfer.setTransferId(UUID.randomUUID().toString());
transfer.setFromAccount(request.getFromAccount());
transfer.setToAccount(request.getToAccount());
transfer.setAmount(request.getAmount());
transfer.setStatus(TransferStatus.INITIATED);
pendingTransferRepository.save(transfer);
// Publish transfer initiated event
TransferInitiatedEvent event = new TransferInitiatedEvent();
event.setTransferId(transfer.getTransferId());
event.setFromAccount(request.getFromAccount());
event.setToAccount(request.getToAccount());
event.setAmount(request.getAmount());
event.setTimestamp(Instant.now());
eventPublisher.publishEvent(event);
log.info("Transfer initiated: {}", transfer.getTransferId());
} else {
throw new InsufficientFundsException(request.getFromAccount());
}
} catch (Exception e) {
TransferFailedEvent failedEvent = new TransferFailedEvent();
failedEvent.setFromAccount(request.getFromAccount());
failedEvent.setToAccount(request.getToAccount());
failedEvent.setAmount(request.getAmount());
failedEvent.setReason(e.getMessage());
failedEvent.setTimestamp(Instant.now());
eventPublisher.publishEvent(failedEvent);
}
}
@EventListener
public void handleFraudCheckCompleted(FraudCheckCompletedEvent event) {
PendingTransfer transfer = pendingTransferRepository.findById(event.getTransferId())
.orElseThrow(() -> new TransferNotFoundException(event.getTransferId()));
if (event.getResult() == FraudCheckResult.APPROVED) {
transfer.setStatus(TransferStatus.FRAUD_CHECK_PASSED);
pendingTransferRepository.save(transfer);
// Publish compliance check required event
ComplianceCheckRequiredEvent complianceEvent = new ComplianceCheckRequiredEvent();
complianceEvent.setTransferId(event.getTransferId());
complianceEvent.setFromAccount(transfer.getFromAccount());
complianceEvent.setToAccount(transfer.getToAccount());
complianceEvent.setAmount(transfer.getAmount());
complianceEvent.setTimestamp(Instant.now());
eventPublisher.publishEvent(complianceEvent);
} else {
transfer.setStatus(TransferStatus.REJECTED);
transfer.setRejectionReason("Fraud check failed: " + event.getReason());
pendingTransferRepository.save(transfer);
TransferRejectedEvent rejectedEvent = new TransferRejectedEvent();
rejectedEvent.setTransferId(event.getTransferId());
rejectedEvent.setReason(event.getReason());
rejectedEvent.setTimestamp(Instant.now());
eventPublisher.publishEvent(rejectedEvent);
}
}
@EventListener
public void handleComplianceCheckCompleted(ComplianceCheckCompletedEvent event) {
PendingTransfer transfer = pendingTransferRepository.findById(event.getTransferId())
.orElseThrow(() -> new TransferNotFoundException(event.getTransferId()));
if (event.getResult() == ComplianceCheckResult.APPROVED) {
// Execute the actual transfer
executeTransfer(transfer);
} else {
transfer.setStatus(TransferStatus.REJECTED);
transfer.setRejectionReason("Compliance check failed: " + event.getReason());
pendingTransferRepository.save(transfer);
TransferRejectedEvent rejectedEvent = new TransferRejectedEvent();
rejectedEvent.setTransferId(event.getTransferId());
rejectedEvent.setReason(event.getReason());
rejectedEvent.setTimestamp(Instant.now());
eventPublisher.publishEvent(rejectedEvent);
}
}
private void executeTransfer(PendingTransfer transfer) {
try {
Account fromAccount = accountRepository.findByAccountNumber(transfer.getFromAccount());
Account toAccount = accountRepository.findByAccountNumber(transfer.getToAccount());
// Perform the transfer
fromAccount.setBalance(fromAccount.getBalance().subtract(transfer.getAmount()));
toAccount.setBalance(toAccount.getBalance().add(transfer.getAmount()));
accountRepository.save(fromAccount);
accountRepository.save(toAccount);
transfer.setStatus(TransferStatus.COMPLETED);
pendingTransferRepository.save(transfer);
// Publish transfer completed event
TransferCompletedEvent completedEvent = new TransferCompletedEvent();
completedEvent.setTransferId(transfer.getTransferId());
completedEvent.setFromAccount(transfer.getFromAccount());
completedEvent.setToAccount(transfer.getToAccount());
completedEvent.setAmount(transfer.getAmount());
completedEvent.setTimestamp(Instant.now());
eventPublisher.publishEvent(completedEvent);
log.info("Transfer completed: {}", transfer.getTransferId());
} catch (Exception e) {
log.error("Error executing transfer: {}", transfer.getTransferId(), e);
transfer.setStatus(TransferStatus.FAILED);
transfer.setRejectionReason("Transfer execution failed: " + e.getMessage());
pendingTransferRepository.save(transfer);
TransferFailedEvent failedEvent = new TransferFailedEvent();
failedEvent.setTransferId(transfer.getTransferId());
failedEvent.setReason(e.getMessage());
failedEvent.setTimestamp(Instant.now());
eventPublisher.publishEvent(failedEvent);
}
}
}
// Fraud Detection Service - Analyzes transactions for fraud
@Component
public class FraudDetectionService {
@Autowired
private ApplicationEventPublisher eventPublisher;
@Autowired
private FraudDetectionEngine fraudEngine;
@EventListener
@Async
public void handleTransferInitiated(TransferInitiatedEvent event) {
try {
FraudAnalysisRequest request = new FraudAnalysisRequest();
request.setTransferId(event.getTransferId());
request.setFromAccount(event.getFromAccount());
request.setToAccount(event.getToAccount());
request.setAmount(event.getAmount());
request.setTimestamp(event.getTimestamp());
FraudAnalysisResult result = fraudEngine.analyzeTransaction(request);
FraudCheckCompletedEvent completedEvent = new FraudCheckCompletedEvent();
completedEvent.setTransferId(event.getTransferId());
completedEvent.setResult(result.getResult());
completedEvent.setRiskScore(result.getRiskScore());
completedEvent.setReason(result.getReason());
completedEvent.setTimestamp(Instant.now());
eventPublisher.publishEvent(completedEvent);
log.info("Fraud check completed for transfer: {} with result: {}",
event.getTransferId(), result.getResult());
} catch (Exception e) {
log.error("Error in fraud detection for transfer: {}", event.getTransferId(), e);
FraudCheckCompletedEvent failedEvent = new FraudCheckCompletedEvent();
failedEvent.setTransferId(event.getTransferId());
failedEvent.setResult(FraudCheckResult.ERROR);
failedEvent.setReason("Fraud detection system error: " + e.getMessage());
failedEvent.setTimestamp(Instant.now());
eventPublisher.publishEvent(failedEvent);
}
}
}
// Compliance Service - Performs regulatory compliance checks
@Component
public class ComplianceService {
@Autowired
private ApplicationEventPublisher eventPublisher;
@Autowired
private ComplianceEngine complianceEngine;
@EventListener
@Async
public void handleComplianceCheckRequired(ComplianceCheckRequiredEvent event) {
try {
ComplianceCheckRequest request = new ComplianceCheckRequest();
request.setTransferId(event.getTransferId());
request.setFromAccount(event.getFromAccount());
request.setToAccount(event.getToAccount());
request.setAmount(event.getAmount());
ComplianceCheckResult result = complianceEngine.performComplianceCheck(request);
ComplianceCheckCompletedEvent completedEvent = new ComplianceCheckCompletedEvent();
completedEvent.setTransferId(event.getTransferId());
completedEvent.setResult(result.getResult());
completedEvent.setReason(result.getReason());
completedEvent.setRequiredActions(result.getRequiredActions());
completedEvent.setTimestamp(Instant.now());
eventPublisher.publishEvent(completedEvent);
log.info("Compliance check completed for transfer: {} with result: {}",
event.getTransferId(), result.getResult());
} catch (Exception e) {
log.error("Error in compliance check for transfer: {}", event.getTransferId(), e);
ComplianceCheckCompletedEvent failedEvent = new ComplianceCheckCompletedEvent();
failedEvent.setTransferId(event.getTransferId());
failedEvent.setResult(ComplianceCheckResult.ERROR);
failedEvent.setReason("Compliance system error: " + e.getMessage());
failedEvent.setTimestamp(Instant.now());
eventPublisher.publishEvent(failedEvent);
}
}
}
3. Apache Camel Choreography Implementation
Event-driven choreography using Apache Camel:
@Component
public class CamelChoreographyRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
// Order processing choreography
from("kafka:order-events?groupId=order-choreography")
.routeId("order-choreography")
.unmarshal().json(JsonLibrary.Jackson, OrderEvent.class)
.choice()
.when(simple("${body.eventType} == 'ORDER_PLACED'"))
.multicast().parallelProcessing()
.to("direct:processInventoryCheck")
.to("direct:validatePaymentMethod")
.end()
.when(simple("${body.eventType} == 'INVENTORY_RESERVED'"))
.to("direct:processPayment")
.when(simple("${body.eventType} == 'PAYMENT_CONFIRMED'"))
.to("direct:initiateShipping")
.when(simple("${body.eventType} == 'INVENTORY_FAILED'"))
.to("direct:handleOrderFailure")
.when(simple("${body.eventType} == 'PAYMENT_FAILED'"))
.to("direct:releaseInventory")
.otherwise()
.log("Unknown order event: ${body.eventType}")
.end();
from("direct:processInventoryCheck")
.routeId("process-inventory-check")
.process(exchange -> {
OrderEvent orderEvent = exchange.getIn().getBody(OrderEvent.class);
InventoryCheckRequest request = new InventoryCheckRequest();
request.setOrderId(orderEvent.getOrderId());
request.setItems(orderEvent.getItems());
exchange.getIn().setBody(request);
})
.to("kafka:inventory-check-requests")
.log("Inventory check requested for order: ${body.orderId}");
from("direct:validatePaymentMethod")
.routeId("validate-payment-method")
.process(exchange -> {
OrderEvent orderEvent = exchange.getIn().getBody(OrderEvent.class);
PaymentValidationRequest request = new PaymentValidationRequest();
request.setOrderId(orderEvent.getOrderId());
request.setPaymentMethod(orderEvent.getPaymentMethod());
request.setCustomerId(orderEvent.getCustomerId());
exchange.getIn().setBody(request);
})
.to("kafka:payment-validation-requests")
.log("Payment validation requested for order: ${body.orderId}");
from("direct:processPayment")
.routeId("process-payment")
.process(exchange -> {
OrderEvent orderEvent = exchange.getIn().getBody(OrderEvent.class);
PaymentProcessingRequest request = new PaymentProcessingRequest();
request.setOrderId(orderEvent.getOrderId());
request.setAmount(orderEvent.getTotalAmount());
request.setPaymentMethod(orderEvent.getPaymentMethod());
exchange.getIn().setBody(request);
})
.to("kafka:payment-processing-requests")
.log("Payment processing requested for order: ${body.orderId}");
from("direct:initiateShipping")
.routeId("initiate-shipping")
.process(exchange -> {
OrderEvent orderEvent = exchange.getIn().getBody(OrderEvent.class);
ShippingRequest request = new ShippingRequest();
request.setOrderId(orderEvent.getOrderId());
request.setShippingAddress(orderEvent.getShippingAddress());
request.setItems(orderEvent.getItems());
exchange.getIn().setBody(request);
})
.to("kafka:shipping-requests")
.log("Shipping requested for order: ${body.orderId}");
from("direct:releaseInventory")
.routeId("release-inventory")
.process(exchange -> {
OrderEvent orderEvent = exchange.getIn().getBody(OrderEvent.class);
InventoryReleaseRequest request = new InventoryReleaseRequest();
request.setOrderId(orderEvent.getOrderId());
request.setReason("Payment failed");
exchange.getIn().setBody(request);
})
.to("kafka:inventory-release-requests")
.log("Inventory release requested for order: ${body.orderId}");
from("direct:handleOrderFailure")
.routeId("handle-order-failure")
.process(exchange -> {
OrderEvent orderEvent = exchange.getIn().getBody(OrderEvent.class);
OrderFailureNotification notification = new OrderFailureNotification();
notification.setOrderId(orderEvent.getOrderId());
notification.setCustomerId(orderEvent.getCustomerId());
notification.setReason(orderEvent.getFailureReason());
exchange.getIn().setBody(notification);
})
.to("kafka:customer-notifications")
.log("Order failure notification sent for order: ${body.orderId}");
// Inventory service choreography
from("kafka:inventory-check-requests?groupId=inventory-choreography")
.routeId("inventory-choreography")
.unmarshal().json(JsonLibrary.Jackson, InventoryCheckRequest.class)
.process(exchange -> {
InventoryCheckRequest request = exchange.getIn().getBody(InventoryCheckRequest.class);
// Check inventory availability
boolean available = checkInventoryAvailability(request.getItems());
OrderEvent resultEvent = new OrderEvent();
resultEvent.setOrderId(request.getOrderId());
resultEvent.setEventType(available ? "INVENTORY_RESERVED" : "INVENTORY_FAILED");
resultEvent.setTimestamp(Instant.now());
if (!available) {
resultEvent.setFailureReason("Insufficient inventory");
}
exchange.getIn().setBody(resultEvent);
})
.to("kafka:order-events")
.log("Inventory check result sent for order: ${body.orderId}");
// Payment service choreography
from("kafka:payment-processing-requests?groupId=payment-choreography")
.routeId("payment-choreography")
.unmarshal().json(JsonLibrary.Jackson, PaymentProcessingRequest.class)
.process(exchange -> {
PaymentProcessingRequest request = exchange.getIn().getBody(PaymentProcessingRequest.class);
// Process payment
boolean success = processPayment(request);
OrderEvent resultEvent = new OrderEvent();
resultEvent.setOrderId(request.getOrderId());
resultEvent.setEventType(success ? "PAYMENT_CONFIRMED" : "PAYMENT_FAILED");
resultEvent.setTimestamp(Instant.now());
if (!success) {
resultEvent.setFailureReason("Payment processing failed");
}
exchange.getIn().setBody(resultEvent);
})
.to("kafka:order-events")
.log("Payment processing result sent for order: ${body.orderId}");
// Shipping service choreography
from("kafka:shipping-requests?groupId=shipping-choreography")
.routeId("shipping-choreography")
.unmarshal().json(JsonLibrary.Jackson, ShippingRequest.class)
.process(exchange -> {
ShippingRequest request = exchange.getIn().getBody(ShippingRequest.class);
// Initiate shipping
String trackingNumber = initiateShipping(request);
OrderEvent resultEvent = new OrderEvent();
resultEvent.setOrderId(request.getOrderId());
resultEvent.setEventType("SHIPPING_INITIATED");
resultEvent.setTrackingNumber(trackingNumber);
resultEvent.setTimestamp(Instant.now());
exchange.getIn().setBody(resultEvent);
})
.to("kafka:order-events")
.log("Shipping initiation result sent for order: ${body.orderId}");
// Event correlation and monitoring
from("kafka:order-events?groupId=event-correlation")
.routeId("event-correlation")
.unmarshal().json(JsonLibrary.Jackson, OrderEvent.class)
.process(exchange -> {
OrderEvent event = exchange.getIn().getBody(OrderEvent.class);
// Track event correlation and process state
ProcessCorrelation correlation = new ProcessCorrelation();
correlation.setOrderId(event.getOrderId());
correlation.setEventType(event.getEventType());
correlation.setTimestamp(event.getTimestamp());
// Store correlation data for monitoring and debugging
correlationRepository.save(correlation);
exchange.getIn().setBody(correlation);
})
.choice()
.when(simple("${body.eventType} == 'ORDER_PLACED'"))
.to("direct:startProcessMonitoring")
.when(simple("${body.eventType} in 'SHIPPING_INITIATED,ORDER_FAILED,ORDER_CANCELLED'"))
.to("direct:endProcessMonitoring")
.end();
from("direct:startProcessMonitoring")
.routeId("start-process-monitoring")
.process(exchange -> {
ProcessCorrelation correlation = exchange.getIn().getBody(ProcessCorrelation.class);
ProcessMonitor monitor = new ProcessMonitor();
monitor.setOrderId(correlation.getOrderId());
monitor.setStartTime(correlation.getTimestamp());
monitor.setStatus(ProcessStatus.IN_PROGRESS);
processMonitorRepository.save(monitor);
})
.log("Process monitoring started for order: ${body.orderId}");
from("direct:endProcessMonitoring")
.routeId("end-process-monitoring")
.process(exchange -> {
ProcessCorrelation correlation = exchange.getIn().getBody(ProcessCorrelation.class);
ProcessMonitor monitor = processMonitorRepository.findByOrderId(correlation.getOrderId());
if (monitor != null) {
monitor.setEndTime(correlation.getTimestamp());
monitor.setStatus(ProcessStatus.COMPLETED);
Duration duration = Duration.between(monitor.getStartTime(), monitor.getEndTime());
monitor.setProcessingDuration(duration);
processMonitorRepository.save(monitor);
}
})
.log("Process monitoring completed for order: ${body.orderId}");
}
}
Best Practices
1. Event Design and Schema Management
- Design events with clear semantic meaning and stable schemas
- Use event versioning strategies to handle schema evolution
- Include correlation identifiers in all related events
- Design events to be immutable and idempotent
- Use meaningful event names that describe business outcomes
2. Service Autonomy and Boundaries
- Ensure each service owns its data and business logic
- Avoid shared databases between services in the choreography
- Design services to be independently deployable and scalable
- Implement proper service boundaries based on business domains
- Minimize dependencies between services
3. Error Handling and Compensation
- Design comprehensive error handling for each service
- Implement compensating actions for reversible operations
- Use dead letter queues for unprocessable events
- Design timeout handling for all asynchronous operations
- Plan for partial failure scenarios and recovery
4. Event Ordering and Consistency
- Design for eventual consistency across services
- Use event ordering strategies when sequence matters
- Implement event deduplication to handle duplicate events
- Use event sourcing for complete event history
- Design services to handle out-of-order events gracefully
5. Monitoring and Observability
- Implement comprehensive event tracking and correlation
- Use distributed tracing to follow events across services
- Create dashboards for choreography health monitoring
- Implement alerting for failed choreographies and timeouts
- Track business metrics and process completion rates
6. Testing and Validation
- Create comprehensive end-to-end testing for choreographed processes
- Use contract testing to validate service interactions
- Implement chaos engineering to test failure scenarios
- Test event ordering and timing scenarios
- Validate compensating action effectiveness
The Choreography pattern is essential for building resilient, scalable, and loosely coupled distributed systems where services can autonomously participate in complex business processes without centralized coordination, enabling high performance and fault tolerance in enterprise integration architectures.
← Back to All Patterns