Saga Pattern
Overview
The Saga pattern is a sophisticated distributed transaction management mechanism in enterprise integration architectures that coordinates long-running business processes across multiple services by breaking them into a sequence of local transactions. Like a carefully choreographed performance where each act must complete successfully for the story to progress, or where compensating actions must undo previous acts if something goes wrong, the Saga pattern ensures data consistency in distributed systems without requiring traditional ACID transactions. This pattern is essential for managing complex business workflows that span multiple services, databases, and external systems while maintaining system reliability and business process integrity.
Theoretical Foundation
The Saga pattern is grounded in distributed systems theory and transaction processing principles, specifically addressing the challenges of maintaining consistency and managing failures in loosely coupled, distributed environments. The pattern incorporates concepts from compensating transactions, eventual consistency, workflow management, and fault tolerance to provide a robust framework for handling long-running business processes that cannot be managed by traditional two-phase commit protocols due to their distributed nature and extended duration.
Core Principles
1. Transaction Decomposition
Breaking complex transactions into manageable, atomic units: - Local transactions - each service manages its own data using local ACID transactions - Sequential execution - transactions execute in a predetermined order with clear dependencies - Compensating actions - each transaction has a corresponding compensation operation for rollback - Idempotency - transactions and compensations can be safely retried without side effects
2. Coordination Strategies
Two primary approaches for managing saga execution: - Choreography-based sagas - services coordinate directly with each other through events - Orchestration-based sagas - a central coordinator manages the saga execution flow - Hybrid approaches - combining choreography and orchestration for optimal flexibility - State management - tracking saga progress and handling intermediate states
3. Failure Handling
Comprehensive strategies for dealing with failures and inconsistencies: - Forward recovery - attempting to complete the saga despite failures - Backward recovery - compensating completed transactions to restore consistency - Semantic rollback - using business logic to undo the effects of completed transactions - Partial compensation - handling scenarios where some compensations may also fail
4. Consistency Models
Understanding the consistency guarantees provided by sagas: - Eventual consistency - the system will reach a consistent state eventually - Compensating consistency - using compensation to maintain business invariants - Semantic atomicity - maintaining business-level atomicity without technical atomicity - Isolation relaxation - accepting intermediate states visible to other transactions
Why Saga Pattern is Essential in Integration Architecture
1. Distributed Transaction Management
In microservices and distributed architectures, the Saga pattern provides: - Cross-service transactions - coordinating transactions across multiple independent services - Long-running processes - managing business workflows that may take hours, days, or weeks - System decoupling - avoiding tight coupling between services through asynchronous coordination - Scalability support - enabling horizontal scaling without distributed locking mechanisms
2. Business Process Automation
For complex business workflows: - Order processing - managing multi-step order fulfillment across inventory, payment, and shipping - Account opening - coordinating identity verification, credit checks, and account creation - Insurance claims - handling complex approval workflows with multiple stakeholders - Supply chain management - orchestrating procurement, manufacturing, and distribution processes
3. System Resilience
Enhancing system reliability and fault tolerance: - Failure isolation - limiting the impact of failures to specific transaction steps - Recovery mechanisms - providing clear paths for both forward and backward recovery - Partial success handling - managing scenarios where some operations succeed while others fail - Graceful degradation - maintaining system functionality despite component failures
4. Integration Flexibility
Supporting various integration scenarios: - Legacy system integration - coordinating with systems that don't support modern transaction protocols - External service coordination - managing interactions with third-party services and APIs - Multi-cloud orchestration - coordinating processes across multiple cloud providers - Hybrid environment support - managing workflows that span on-premises and cloud systems
Benefits in Integration Contexts
1. Technical Advantages
- No distributed locking - avoiding performance bottlenecks and deadlock scenarios
- Loose coupling - services remain independent and can evolve separately
- Fault tolerance - system continues operating despite individual service failures
- Scalability - supporting high-throughput scenarios through asynchronous processing
2. Business Value
- Process visibility - providing clear tracking and monitoring of business workflows
- Compliance support - maintaining audit trails and supporting regulatory requirements
- Business agility - enabling rapid changes to business processes without system redesign
- Customer experience - ensuring reliable completion of customer-initiated processes
3. Operational Excellence
- Monitoring and observability - clear visibility into transaction progress and health
- Error handling - systematic approaches to handling and recovering from failures
- Testing capabilities - ability to test complex scenarios including failure conditions
- Maintenance support - clear separation of concerns for easier system maintenance
4. Integration Enablement
- Service orchestration - coordinating complex multi-service workflows
- Event-driven architecture - supporting reactive, event-based system designs
- API composition - combining multiple API calls into cohesive business operations
- Data consistency - maintaining consistency across distributed data stores
Integration Architecture Applications
1. E-commerce and Retail
Saga applications in retail scenarios: - Order processing sagas - coordinating inventory reservation, payment processing, and fulfillment - Return and refund sagas - managing complex return workflows with inventory, payment, and customer service - Promotion and pricing sagas - updating prices and promotions across multiple channels and systems - Customer onboarding sagas - coordinating account creation, verification, and initial setup
2. Financial Services
In banking and financial applications: - Loan origination sagas - managing application processing, credit checks, approval, and account setup - Payment processing sagas - coordinating complex payment workflows with multiple parties - Account transfer sagas - handling money transfers with fraud detection and compliance checks - Trading settlement sagas - managing multi-step securities trading and settlement processes
3. Healthcare and Insurance
For healthcare workflow management: - Patient admission sagas - coordinating registration, insurance verification, and care planning - Claims processing sagas - managing insurance claim workflows with multiple approval stages - Prescription fulfillment sagas - coordinating doctor orders, pharmacy fulfillment, and insurance processing - Treatment plan sagas - orchestrating complex, multi-provider treatment workflows
4. Supply Chain and Manufacturing
In operational workflows: - Procurement sagas - managing purchase orders, supplier coordination, and delivery tracking - Manufacturing sagas - coordinating production planning, resource allocation, and quality control - Shipping and logistics sagas - managing complex shipping workflows with multiple carriers and tracking - Vendor onboarding sagas - coordinating supplier qualification, contract negotiation, and system setup
Implementation Patterns
1. Orchestration-Based Saga Implementation
// Central saga coordinator that manages the entire workflow
@Service
public class OrderProcessingSagaOrchestrator {
@Autowired
private InventoryService inventoryService;
@Autowired
private PaymentService paymentService;
@Autowired
private ShippingService shippingService;
@Autowired
private SagaStateRepository sagaStateRepository;
public void processOrder(OrderSagaRequest request) {
String sagaId = UUID.randomUUID().toString();
SagaState sagaState = SagaState.builder()
.sagaId(sagaId)
.orderId(request.getOrderId())
.currentStep(SagaStep.INVENTORY_RESERVATION)
.status(SagaStatus.IN_PROGRESS)
.startTime(Instant.now())
.context(request.toMap())
.build();
sagaStateRepository.save(sagaState);
try {
executeOrderSaga(sagaState);
} catch (Exception e) {
log.error("Saga execution failed: {}", sagaId, e);
compensateOrderSaga(sagaState);
}
}
private void executeOrderSaga(SagaState sagaState) {
Map<String, Object> context = sagaState.getContext();
switch (sagaState.getCurrentStep()) {
case INVENTORY_RESERVATION:
executeInventoryReservation(sagaState, context);
break;
case PAYMENT_PROCESSING:
executePaymentProcessing(sagaState, context);
break;
case SHIPPING_ARRANGEMENT:
executeShippingArrangement(sagaState, context);
break;
case ORDER_COMPLETION:
completeOrderSaga(sagaState);
break;
}
}
private void executeInventoryReservation(SagaState sagaState, Map<String, Object> context) {
try {
String orderId = (String) context.get("orderId");
List<OrderItem> items = (List<OrderItem>) context.get("items");
InventoryReservationResult result = inventoryService.reserveInventory(orderId, items);
if (result.isSuccess()) {
// Store reservation details for potential compensation
context.put("inventoryReservationId", result.getReservationId());
context.put("reservedItems", result.getReservedItems());
// Move to next step
sagaState.setCurrentStep(SagaStep.PAYMENT_PROCESSING);
sagaStateRepository.save(sagaState);
// Continue with payment processing
executePaymentProcessing(sagaState, context);
} else {
throw new SagaExecutionException("Inventory reservation failed: " + result.getErrorMessage());
}
} catch (Exception e) {
sagaState.setStatus(SagaStatus.COMPENSATION_REQUIRED);
sagaStateRepository.save(sagaState);
throw new SagaExecutionException("Inventory reservation step failed", e);
}
}
private void executePaymentProcessing(SagaState sagaState, Map<String, Object> context) {
try {
String orderId = (String) context.get("orderId");
PaymentInfo paymentInfo = (PaymentInfo) context.get("paymentInfo");
BigDecimal amount = (BigDecimal) context.get("totalAmount");
PaymentResult result = paymentService.processPayment(orderId, paymentInfo, amount);
if (result.isSuccess()) {
// Store payment details for potential compensation
context.put("paymentTransactionId", result.getTransactionId());
context.put("paymentAmount", amount);
// Move to next step
sagaState.setCurrentStep(SagaStep.SHIPPING_ARRANGEMENT);
sagaStateRepository.save(sagaState);
// Continue with shipping
executeShippingArrangement(sagaState, context);
} else {
throw new SagaExecutionException("Payment processing failed: " + result.getErrorMessage());
}
} catch (Exception e) {
sagaState.setStatus(SagaStatus.COMPENSATION_REQUIRED);
sagaStateRepository.save(sagaState);
throw new SagaExecutionException("Payment processing step failed", e);
}
}
private void executeShippingArrangement(SagaState sagaState, Map<String, Object> context) {
try {
String orderId = (String) context.get("orderId");
Address shippingAddress = (Address) context.get("shippingAddress");
List<OrderItem> items = (List<OrderItem>) context.get("reservedItems");
ShippingResult result = shippingService.arrangeShipping(orderId, shippingAddress, items);
if (result.isSuccess()) {
// Store shipping details
context.put("shippingTrackingNumber", result.getTrackingNumber());
context.put("estimatedDelivery", result.getEstimatedDelivery());
// Move to completion
sagaState.setCurrentStep(SagaStep.ORDER_COMPLETION);
sagaStateRepository.save(sagaState);
completeOrderSaga(sagaState);
} else {
throw new SagaExecutionException("Shipping arrangement failed: " + result.getErrorMessage());
}
} catch (Exception e) {
sagaState.setStatus(SagaStatus.COMPENSATION_REQUIRED);
sagaStateRepository.save(sagaState);
throw new SagaExecutionException("Shipping arrangement step failed", e);
}
}
private void completeOrderSaga(SagaState sagaState) {
sagaState.setStatus(SagaStatus.COMPLETED);
sagaState.setEndTime(Instant.now());
sagaStateRepository.save(sagaState);
// Publish order completion event
orderEventPublisher.publishOrderCompleted(sagaState.getOrderId());
log.info("Order saga completed successfully: {}", sagaState.getSagaId());
}
private void compensateOrderSaga(SagaState sagaState) {
log.warn("Starting compensation for saga: {}", sagaState.getSagaId());
Map<String, Object> context = sagaState.getContext();
// Compensate in reverse order
switch (sagaState.getCurrentStep()) {
case SHIPPING_ARRANGEMENT:
compensateShipping(context);
// Fall through to compensate payment
case PAYMENT_PROCESSING:
compensatePayment(context);
// Fall through to compensate inventory
case INVENTORY_RESERVATION:
compensateInventory(context);
break;
}
sagaState.setStatus(SagaStatus.COMPENSATED);
sagaState.setEndTime(Instant.now());
sagaStateRepository.save(sagaState);
// Publish order failed event
orderEventPublisher.publishOrderFailed(sagaState.getOrderId());
log.info("Order saga compensated: {}", sagaState.getSagaId());
}
private void compensateInventory(Map<String, Object> context) {
try {
String reservationId = (String) context.get("inventoryReservationId");
if (reservationId != null) {
inventoryService.cancelReservation(reservationId);
log.info("Inventory reservation compensated: {}", reservationId);
}
} catch (Exception e) {
log.error("Failed to compensate inventory reservation", e);
// In real implementation, this would trigger manual intervention or retry
}
}
private void compensatePayment(Map<String, Object> context) {
try {
String transactionId = (String) context.get("paymentTransactionId");
BigDecimal amount = (BigDecimal) context.get("paymentAmount");
if (transactionId != null) {
paymentService.refundPayment(transactionId, amount);
log.info("Payment compensated: {}", transactionId);
}
} catch (Exception e) {
log.error("Failed to compensate payment", e);
// This might require manual intervention or alternative compensation
}
}
private void compensateShipping(Map<String, Object> context) {
try {
String trackingNumber = (String) context.get("shippingTrackingNumber");
if (trackingNumber != null) {
shippingService.cancelShipment(trackingNumber);
log.info("Shipping compensated: {}", trackingNumber);
}
} catch (Exception e) {
log.error("Failed to compensate shipping", e);
}
}
}
@Entity
public class SagaState {
@Id
private String sagaId;
private String orderId;
@Enumerated(EnumType.STRING)
private SagaStep currentStep;
@Enumerated(EnumType.STRING)
private SagaStatus status;
private Instant startTime;
private Instant endTime;
@ElementCollection
@MapKeyColumn(name = "context_key")
@Column(name = "context_value", columnDefinition = "TEXT")
private Map<String, Object> context = new HashMap<>();
// Getters, setters, and builder
}
public enum SagaStep {
INVENTORY_RESERVATION,
PAYMENT_PROCESSING,
SHIPPING_ARRANGEMENT,
ORDER_COMPLETION
}
public enum SagaStatus {
IN_PROGRESS,
COMPLETED,
COMPENSATION_REQUIRED,
COMPENSATED,
FAILED
}
2. Choreography-Based Saga Implementation
// Event-driven saga where services coordinate through events
@Service
public class OrderProcessingChoreographySaga {
@Autowired
private DomainEventPublisher eventPublisher;
@Autowired
private SagaStateRepository sagaStateRepository;
@EventHandler
public void handleOrderCreated(OrderCreatedEvent event) {
String sagaId = UUID.randomUUID().toString();
SagaState sagaState = SagaState.builder()
.sagaId(sagaId)
.orderId(event.getOrderId())
.status(SagaStatus.IN_PROGRESS)
.startTime(Instant.now())
.build();
sagaStateRepository.save(sagaState);
// Publish event to start inventory reservation
eventPublisher.publish(InventoryReservationRequested.builder()
.sagaId(sagaId)
.orderId(event.getOrderId())
.items(event.getItems())
.build());
}
@EventHandler
public void handleInventoryReserved(InventoryReservedEvent event) {
SagaState sagaState = sagaStateRepository.findBySagaId(event.getSagaId());
if (sagaState != null && sagaState.getStatus() == SagaStatus.IN_PROGRESS) {
// Store reservation details
sagaState.getContext().put("inventoryReservationId", event.getReservationId());
sagaStateRepository.save(sagaState);
// Publish payment processing request
eventPublisher.publish(PaymentProcessingRequested.builder()
.sagaId(event.getSagaId())
.orderId(sagaState.getOrderId())
.amount(event.getTotalAmount())
.paymentInfo(event.getPaymentInfo())
.build());
}
}
@EventHandler
public void handlePaymentProcessed(PaymentProcessedEvent event) {
SagaState sagaState = sagaStateRepository.findBySagaId(event.getSagaId());
if (sagaState != null && sagaState.getStatus() == SagaStatus.IN_PROGRESS) {
// Store payment details
sagaState.getContext().put("paymentTransactionId", event.getTransactionId());
sagaStateRepository.save(sagaState);
// Publish shipping arrangement request
eventPublisher.publish(ShippingArrangementRequested.builder()
.sagaId(event.getSagaId())
.orderId(sagaState.getOrderId())
.shippingAddress(event.getShippingAddress())
.items(event.getItems())
.build());
}
}
@EventHandler
public void handleShippingArranged(ShippingArrangedEvent event) {
SagaState sagaState = sagaStateRepository.findBySagaId(event.getSagaId());
if (sagaState != null && sagaState.getStatus() == SagaStatus.IN_PROGRESS) {
// Complete the saga
sagaState.setStatus(SagaStatus.COMPLETED);
sagaState.setEndTime(Instant.now());
sagaState.getContext().put("trackingNumber", event.getTrackingNumber());
sagaStateRepository.save(sagaState);
// Publish order completion event
eventPublisher.publish(OrderCompletedEvent.builder()
.sagaId(event.getSagaId())
.orderId(sagaState.getOrderId())
.trackingNumber(event.getTrackingNumber())
.build());
}
}
// Compensation event handlers
@EventHandler
public void handleInventoryReservationFailed(InventoryReservationFailedEvent event) {
compensateOrderSaga(event.getSagaId(), "Inventory reservation failed");
}
@EventHandler
public void handlePaymentProcessingFailed(PaymentProcessingFailedEvent event) {
SagaState sagaState = sagaStateRepository.findBySagaId(event.getSagaId());
// Compensate inventory reservation
String reservationId = (String) sagaState.getContext().get("inventoryReservationId");
if (reservationId != null) {
eventPublisher.publish(InventoryReservationCancellationRequested.builder()
.reservationId(reservationId)
.reason("Payment processing failed")
.build());
}
compensateOrderSaga(event.getSagaId(), "Payment processing failed");
}
@EventHandler
public void handleShippingArrangementFailed(ShippingArrangementFailedEvent event) {
SagaState sagaState = sagaStateRepository.findBySagaId(event.getSagaId());
// Compensate payment
String transactionId = (String) sagaState.getContext().get("paymentTransactionId");
if (transactionId != null) {
eventPublisher.publish(PaymentRefundRequested.builder()
.transactionId(transactionId)
.reason("Shipping arrangement failed")
.build());
}
// Compensate inventory
String reservationId = (String) sagaState.getContext().get("inventoryReservationId");
if (reservationId != null) {
eventPublisher.publish(InventoryReservationCancellationRequested.builder()
.reservationId(reservationId)
.reason("Shipping arrangement failed")
.build());
}
compensateOrderSaga(event.getSagaId(), "Shipping arrangement failed");
}
private void compensateOrderSaga(String sagaId, String reason) {
SagaState sagaState = sagaStateRepository.findBySagaId(sagaId);
if (sagaState != null) {
sagaState.setStatus(SagaStatus.COMPENSATED);
sagaState.setEndTime(Instant.now());
sagaState.getContext().put("compensationReason", reason);
sagaStateRepository.save(sagaState);
eventPublisher.publish(OrderFailedEvent.builder()
.sagaId(sagaId)
.orderId(sagaState.getOrderId())
.reason(reason)
.build());
}
}
}
3. Saga State Machine Implementation
// State machine-based saga implementation for complex workflows
@Component
public class OrderSagaStateMachine {
@Autowired
private StateMachineFactory<SagaState, SagaEvent> stateMachineFactory;
@Autowired
private SagaStateRepository sagaStateRepository;
public void startSaga(OrderCreatedEvent orderEvent) {
StateMachine<SagaState, SagaEvent> stateMachine = stateMachineFactory.getStateMachine();
// Configure saga context
SagaContext context = SagaContext.builder()
.orderId(orderEvent.getOrderId())
.items(orderEvent.getItems())
.paymentInfo(orderEvent.getPaymentInfo())
.shippingAddress(orderEvent.getShippingAddress())
.build();
stateMachine.getExtendedState().getVariables().put("sagaContext", context);
// Start the state machine
stateMachine.start();
stateMachine.sendEvent(SagaEvent.START_SAGA);
}
@Configuration
@EnableStateMachine
public static class SagaStateMachineConfig extends StateMachineConfigurerAdapter<SagaState, SagaEvent> {
@Override
public void configure(StateMachineStateConfigurer<SagaState, SagaEvent> states) throws Exception {
states
.withStates()
.initial(SagaState.INITIAL)
.state(SagaState.RESERVING_INVENTORY)
.state(SagaState.PROCESSING_PAYMENT)
.state(SagaState.ARRANGING_SHIPPING)
.state(SagaState.COMPLETED)
.state(SagaState.COMPENSATING)
.end(SagaState.COMPENSATED);
}
@Override
public void configure(StateMachineTransitionConfigurer<SagaState, SagaEvent> transitions) throws Exception {
transitions
.withExternal()
.source(SagaState.INITIAL).target(SagaState.RESERVING_INVENTORY)
.event(SagaEvent.START_SAGA)
.action(reserveInventoryAction())
.and()
.withExternal()
.source(SagaState.RESERVING_INVENTORY).target(SagaState.PROCESSING_PAYMENT)
.event(SagaEvent.INVENTORY_RESERVED)
.action(processPaymentAction())
.and()
.withExternal()
.source(SagaState.PROCESSING_PAYMENT).target(SagaState.ARRANGING_SHIPPING)
.event(SagaEvent.PAYMENT_PROCESSED)
.action(arrangeShippingAction())
.and()
.withExternal()
.source(SagaState.ARRANGING_SHIPPING).target(SagaState.COMPLETED)
.event(SagaEvent.SHIPPING_ARRANGED)
.action(completeSagaAction())
.and()
// Compensation transitions
.withExternal()
.source(SagaState.RESERVING_INVENTORY).target(SagaState.COMPENSATED)
.event(SagaEvent.INVENTORY_RESERVATION_FAILED)
.and()
.withExternal()
.source(SagaState.PROCESSING_PAYMENT).target(SagaState.COMPENSATING)
.event(SagaEvent.PAYMENT_PROCESSING_FAILED)
.action(compensateInventoryAction())
.and()
.withExternal()
.source(SagaState.ARRANGING_SHIPPING).target(SagaState.COMPENSATING)
.event(SagaEvent.SHIPPING_ARRANGEMENT_FAILED)
.action(compensatePaymentAndInventoryAction());
}
@Bean
public Action<SagaState, SagaEvent> reserveInventoryAction() {
return context -> {
SagaContext sagaContext = context.getExtendedState().get("sagaContext", SagaContext.class);
try {
InventoryReservationResult result = inventoryService.reserveInventory(
sagaContext.getOrderId(),
sagaContext.getItems()
);
if (result.isSuccess()) {
sagaContext.setInventoryReservationId(result.getReservationId());
context.getStateMachine().sendEvent(SagaEvent.INVENTORY_RESERVED);
} else {
context.getStateMachine().sendEvent(SagaEvent.INVENTORY_RESERVATION_FAILED);
}
} catch (Exception e) {
log.error("Inventory reservation failed", e);
context.getStateMachine().sendEvent(SagaEvent.INVENTORY_RESERVATION_FAILED);
}
};
}
@Bean
public Action<SagaState, SagaEvent> processPaymentAction() {
return context -> {
SagaContext sagaContext = context.getExtendedState().get("sagaContext", SagaContext.class);
try {
PaymentResult result = paymentService.processPayment(
sagaContext.getOrderId(),
sagaContext.getPaymentInfo(),
sagaContext.getTotalAmount()
);
if (result.isSuccess()) {
sagaContext.setPaymentTransactionId(result.getTransactionId());
context.getStateMachine().sendEvent(SagaEvent.PAYMENT_PROCESSED);
} else {
context.getStateMachine().sendEvent(SagaEvent.PAYMENT_PROCESSING_FAILED);
}
} catch (Exception e) {
log.error("Payment processing failed", e);
context.getStateMachine().sendEvent(SagaEvent.PAYMENT_PROCESSING_FAILED);
}
};
}
}
}
public enum SagaEvent {
START_SAGA,
INVENTORY_RESERVED,
INVENTORY_RESERVATION_FAILED,
PAYMENT_PROCESSED,
PAYMENT_PROCESSING_FAILED,
SHIPPING_ARRANGED,
SHIPPING_ARRANGEMENT_FAILED,
COMPENSATION_COMPLETED
}
4. Distributed Saga Coordination
// Distributed saga implementation using message queues for coordination
@Service
public class DistributedSagaCoordinator {
@Autowired
private MessageQueueService messageQueueService;
@Autowired
private SagaDefinitionRepository sagaDefinitionRepository;
@Autowired
private SagaInstanceRepository sagaInstanceRepository;
public String startSaga(String sagaDefinitionId, Map<String, Object> initialContext) {
SagaDefinition definition = sagaDefinitionRepository.findById(sagaDefinitionId)
.orElseThrow(() -> new IllegalArgumentException("Saga definition not found: " + sagaDefinitionId));
String sagaInstanceId = UUID.randomUUID().toString();
SagaInstance instance = SagaInstance.builder()
.instanceId(sagaInstanceId)
.definitionId(sagaDefinitionId)
.status(SagaInstanceStatus.ACTIVE)
.currentStepIndex(0)
.context(new HashMap<>(initialContext))
.startTime(Instant.now())
.build();
sagaInstanceRepository.save(instance);
// Start execution with first step
executeSagaStep(instance, definition);
return sagaInstanceId;
}
private void executeSagaStep(SagaInstance instance, SagaDefinition definition) {
if (instance.getCurrentStepIndex() >= definition.getSteps().size()) {
// Saga completed successfully
completeSaga(instance);
return;
}
SagaStep step = definition.getSteps().get(instance.getCurrentStepIndex());
try {
// Send message to execute step
SagaStepExecutionMessage message = SagaStepExecutionMessage.builder()
.sagaInstanceId(instance.getInstanceId())
.stepId(step.getStepId())
.serviceEndpoint(step.getServiceEndpoint())
.operation(step.getOperation())
.inputData(prepareStepInput(step, instance.getContext()))
.compensationData(prepareCompensationData(step, instance.getContext()))
.build();
messageQueueService.sendMessage(step.getServiceQueue(), message);
// Update instance status
instance.setStatus(SagaInstanceStatus.WAITING_FOR_RESPONSE);
sagaInstanceRepository.save(instance);
} catch (Exception e) {
log.error("Failed to execute saga step: {}", step.getStepId(), e);
startCompensation(instance, definition);
}
}
@MessageListener(queues = "saga-step-response")
public void handleSagaStepResponse(SagaStepResponseMessage response) {
SagaInstance instance = sagaInstanceRepository.findByInstanceId(response.getSagaInstanceId());
SagaDefinition definition = sagaDefinitionRepository.findById(instance.getDefinitionId()).orElseThrow();
if (response.isSuccess()) {
// Update context with step results
instance.getContext().putAll(response.getOutputData());
// Move to next step
instance.setCurrentStepIndex(instance.getCurrentStepIndex() + 1);
instance.setStatus(SagaInstanceStatus.ACTIVE);
sagaInstanceRepository.save(instance);
// Continue execution
executeSagaStep(instance, definition);
} else {
// Step failed, start compensation
log.warn("Saga step failed: {} - {}", response.getStepId(), response.getErrorMessage());
startCompensation(instance, definition);
}
}
private void startCompensation(SagaInstance instance, SagaDefinition definition) {
instance.setStatus(SagaInstanceStatus.COMPENSATING);
sagaInstanceRepository.save(instance);
// Compensate completed steps in reverse order
int currentStep = instance.getCurrentStepIndex() - 1; // Last completed step
compensateSagaStep(instance, definition, currentStep);
}
private void compensateSagaStep(SagaInstance instance, SagaDefinition definition, int stepIndex) {
if (stepIndex < 0) {
// All compensations completed
instance.setStatus(SagaInstanceStatus.COMPENSATED);
instance.setEndTime(Instant.now());
sagaInstanceRepository.save(instance);
publishSagaCompensated(instance);
return;
}
SagaStep step = definition.getSteps().get(stepIndex);
if (step.getCompensationOperation() != null) {
try {
SagaStepCompensationMessage message = SagaStepCompensationMessage.builder()
.sagaInstanceId(instance.getInstanceId())
.stepId(step.getStepId())
.compensationOperation(step.getCompensationOperation())
.compensationData(instance.getContext())
.build();
messageQueueService.sendMessage(step.getServiceQueue(), message);
} catch (Exception e) {
log.error("Failed to send compensation message for step: {}", step.getStepId(), e);
// Continue with next compensation
compensateSagaStep(instance, definition, stepIndex - 1);
}
} else {
// No compensation needed, move to previous step
compensateSagaStep(instance, definition, stepIndex - 1);
}
}
@MessageListener(queues = "saga-compensation-response")
public void handleSagaCompensationResponse(SagaCompensationResponseMessage response) {
SagaInstance instance = sagaInstanceRepository.findByInstanceId(response.getSagaInstanceId());
SagaDefinition definition = sagaDefinitionRepository.findById(instance.getDefinitionId()).orElseThrow();
if (!response.isSuccess()) {
log.error("Compensation failed for step: {} - {}",
response.getStepId(), response.getErrorMessage());
// In real implementation, this might trigger manual intervention
}
// Find current step being compensated
int currentStepIndex = definition.getSteps().stream()
.mapToInt(step -> step.getStepId().equals(response.getStepId()) ?
definition.getSteps().indexOf(step) : -1)
.filter(index -> index >= 0)
.findFirst()
.orElse(-1);
// Continue with previous step compensation
compensateSagaStep(instance, definition, currentStepIndex - 1);
}
}
@Entity
public class SagaDefinition {
@Id
private String definitionId;
private String name;
private String description;
@OneToMany(cascade = CascadeType.ALL, fetch = FetchType.EAGER)
@OrderBy("stepOrder")
private List<SagaStep> steps = new ArrayList<>();
// Getters and setters
}
@Entity
public class SagaStep {
@Id
private String stepId;
private String serviceEndpoint;
private String serviceQueue;
private String operation;
private String compensationOperation;
private int stepOrder;
private Map<String, Object> stepConfiguration = new HashMap<>();
// Getters and setters
}
@Entity
public class SagaInstance {
@Id
private String instanceId;
private String definitionId;
@Enumerated(EnumType.STRING)
private SagaInstanceStatus status;
private int currentStepIndex;
private Instant startTime;
private Instant endTime;
@ElementCollection
private Map<String, Object> context = new HashMap<>();
// Getters and setters
}
public enum SagaInstanceStatus {
ACTIVE,
WAITING_FOR_RESPONSE,
COMPLETED,
COMPENSATING,
COMPENSATED,
FAILED
}
Apache Camel Implementation
1. Orchestration-Based Saga Route
@Component
public class SagaOrchestrationRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("direct:startOrderSaga")
.routeId("order-saga-orchestration")
.log("Starting order saga for order: ${body.orderId}")
.process(exchange -> {
OrderSagaRequest request = exchange.getIn().getBody(OrderSagaRequest.class);
String sagaId = UUID.randomUUID().toString();
exchange.setProperty("sagaId", sagaId);
exchange.setProperty("orderId", request.getOrderId());
exchange.setProperty("orderItems", request.getItems());
exchange.setProperty("paymentInfo", request.getPaymentInfo());
exchange.setProperty("shippingAddress", request.getShippingAddress());
})
.to("direct:reserveInventory")
.to("direct:processPayment")
.to("direct:arrangeShipping")
.to("direct:completeSaga")
.onException(Exception.class)
.handled(true)
.to("direct:compensateSaga")
.end();
from("direct:reserveInventory")
.log("Reserving inventory for saga: ${exchangeProperty.sagaId}")
.process(exchange -> {
String orderId = exchange.getProperty("orderId", String.class);
@SuppressWarnings("unchecked")
List<OrderItem> items = exchange.getProperty("orderItems", List.class);
InventoryReservationResult result = inventoryService.reserveInventory(orderId, items);
if (!result.isSuccess()) {
throw new SagaStepException("Inventory reservation failed: " + result.getErrorMessage());
}
exchange.setProperty("inventoryReservationId", result.getReservationId());
})
.log("Inventory reserved successfully: ${exchangeProperty.inventoryReservationId}");
from("direct:processPayment")
.log("Processing payment for saga: ${exchangeProperty.sagaId}")
.process(exchange -> {
String orderId = exchange.getProperty("orderId", String.class);
PaymentInfo paymentInfo = exchange.getProperty("paymentInfo", PaymentInfo.class);
BigDecimal totalAmount = calculateTotalAmount(exchange.getProperty("orderItems"));
PaymentResult result = paymentService.processPayment(orderId, paymentInfo, totalAmount);
if (!result.isSuccess()) {
throw new SagaStepException("Payment processing failed: " + result.getErrorMessage());
}
exchange.setProperty("paymentTransactionId", result.getTransactionId());
})
.log("Payment processed successfully: ${exchangeProperty.paymentTransactionId}");
from("direct:arrangeShipping")
.log("Arranging shipping for saga: ${exchangeProperty.sagaId}")
.process(exchange -> {
String orderId = exchange.getProperty("orderId", String.class);
Address shippingAddress = exchange.getProperty("shippingAddress", Address.class);
@SuppressWarnings("unchecked")
List<OrderItem> items = exchange.getProperty("orderItems", List.class);
ShippingResult result = shippingService.arrangeShipping(orderId, shippingAddress, items);
if (!result.isSuccess()) {
throw new SagaStepException("Shipping arrangement failed: " + result.getErrorMessage());
}
exchange.setProperty("trackingNumber", result.getTrackingNumber());
})
.log("Shipping arranged successfully: ${exchangeProperty.trackingNumber}");
from("direct:completeSaga")
.log("Completing saga: ${exchangeProperty.sagaId}")
.process(exchange -> {
String sagaId = exchange.getProperty("sagaId", String.class);
String orderId = exchange.getProperty("orderId", String.class);
String trackingNumber = exchange.getProperty("trackingNumber", String.class);
// Publish order completed event
OrderCompletedEvent event = OrderCompletedEvent.builder()
.sagaId(sagaId)
.orderId(orderId)
.trackingNumber(trackingNumber)
.completedAt(Instant.now())
.build();
eventPublisher.publish(event);
exchange.getIn().setBody(event);
})
.log("Saga completed successfully: ${exchangeProperty.sagaId}");
// Compensation route
from("direct:compensateSaga")
.log("Starting compensation for saga: ${exchangeProperty.sagaId}")
.doTry()
.to("direct:compensateShipping")
.doCatch(Exception.class)
.log("Shipping compensation failed")
.doFinally()
.to("direct:compensatePayment")
.end()
.doTry()
.to("direct:compensatePayment")
.doCatch(Exception.class)
.log("Payment compensation failed")
.doFinally()
.to("direct:compensateInventory")
.end()
.process(exchange -> {
String sagaId = exchange.getProperty("sagaId", String.class);
String orderId = exchange.getProperty("orderId", String.class);
OrderFailedEvent event = OrderFailedEvent.builder()
.sagaId(sagaId)
.orderId(orderId)
.reason("Saga compensation completed")
.failedAt(Instant.now())
.build();
eventPublisher.publish(event);
})
.log("Saga compensation completed: ${exchangeProperty.sagaId}");
}
}
2. Choreography-Based Saga Route
@Component
public class SagaChoreographyRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
// Order created event handler
from("kafka:order-events?groupId=saga-choreography")
.routeId("saga-choreography-order-created")
.unmarshal().json(JsonLibrary.Jackson, OrderCreatedEvent.class)
.filter(simple("${body.eventType} == 'ORDER_CREATED'"))
.process(exchange -> {
OrderCreatedEvent event = exchange.getIn().getBody(OrderCreatedEvent.class);
String sagaId = UUID.randomUUID().toString();
// Store saga state
SagaState sagaState = SagaState.builder()
.sagaId(sagaId)
.orderId(event.getOrderId())
.status(SagaStatus.IN_PROGRESS)
.startTime(Instant.now())
.build();
sagaStateRepository.save(sagaState);
// Create inventory reservation request
InventoryReservationRequested reservationRequest = InventoryReservationRequested.builder()
.sagaId(sagaId)
.orderId(event.getOrderId())
.items(event.getItems())
.build();
exchange.getIn().setBody(reservationRequest);
})
.marshal().json(JsonLibrary.Jackson)
.to("kafka:inventory-commands")
.log("Inventory reservation requested for saga: ${body.sagaId}");
// Inventory reserved event handler
from("kafka:inventory-events?groupId=saga-choreography")
.routeId("saga-choreography-inventory-reserved")
.unmarshal().json(JsonLibrary.Jackson, InventoryReservedEvent.class)
.filter(simple("${body.eventType} == 'INVENTORY_RESERVED'"))
.process(exchange -> {
InventoryReservedEvent event = exchange.getIn().getBody(InventoryReservedEvent.class);
// Update saga state
SagaState sagaState = sagaStateRepository.findBySagaId(event.getSagaId());
if (sagaState != null && sagaState.getStatus() == SagaStatus.IN_PROGRESS) {
sagaState.getContext().put("inventoryReservationId", event.getReservationId());
sagaStateRepository.save(sagaState);
// Create payment processing request
PaymentProcessingRequested paymentRequest = PaymentProcessingRequested.builder()
.sagaId(event.getSagaId())
.orderId(sagaState.getOrderId())
.amount(event.getTotalAmount())
.paymentInfo(event.getPaymentInfo())
.build();
exchange.getIn().setBody(paymentRequest);
} else {
exchange.setProperty(Exchange.ROUTE_STOP, true);
}
})
.marshal().json(JsonLibrary.Jackson)
.to("kafka:payment-commands")
.log("Payment processing requested for saga: ${body.sagaId}");
// Payment processed event handler
from("kafka:payment-events?groupId=saga-choreography")
.routeId("saga-choreography-payment-processed")
.unmarshal().json(JsonLibrary.Jackson, PaymentProcessedEvent.class)
.filter(simple("${body.eventType} == 'PAYMENT_PROCESSED'"))
.process(exchange -> {
PaymentProcessedEvent event = exchange.getIn().getBody(PaymentProcessedEvent.class);
SagaState sagaState = sagaStateRepository.findBySagaId(event.getSagaId());
if (sagaState != null && sagaState.getStatus() == SagaStatus.IN_PROGRESS) {
sagaState.getContext().put("paymentTransactionId", event.getTransactionId());
sagaStateRepository.save(sagaState);
ShippingArrangementRequested shippingRequest = ShippingArrangementRequested.builder()
.sagaId(event.getSagaId())
.orderId(sagaState.getOrderId())
.shippingAddress(event.getShippingAddress())
.items(event.getItems())
.build();
exchange.getIn().setBody(shippingRequest);
} else {
exchange.setProperty(Exchange.ROUTE_STOP, true);
}
})
.marshal().json(JsonLibrary.Jackson)
.to("kafka:shipping-commands")
.log("Shipping arrangement requested for saga: ${body.sagaId}");
// Shipping arranged event handler
from("kafka:shipping-events?groupId=saga-choreography")
.routeId("saga-choreography-shipping-arranged")
.unmarshal().json(JsonLibrary.Jackson, ShippingArrangedEvent.class)
.filter(simple("${body.eventType} == 'SHIPPING_ARRANGED'"))
.process(exchange -> {
ShippingArrangedEvent event = exchange.getIn().getBody(ShippingArrangedEvent.class);
SagaState sagaState = sagaStateRepository.findBySagaId(event.getSagaId());
if (sagaState != null && sagaState.getStatus() == SagaStatus.IN_PROGRESS) {
// Complete saga
sagaState.setStatus(SagaStatus.COMPLETED);
sagaState.setEndTime(Instant.now());
sagaState.getContext().put("trackingNumber", event.getTrackingNumber());
sagaStateRepository.save(sagaState);
OrderCompletedEvent completedEvent = OrderCompletedEvent.builder()
.sagaId(event.getSagaId())
.orderId(sagaState.getOrderId())
.trackingNumber(event.getTrackingNumber())
.completedAt(Instant.now())
.build();
exchange.getIn().setBody(completedEvent);
} else {
exchange.setProperty(Exchange.ROUTE_STOP, true);
}
})
.marshal().json(JsonLibrary.Jackson)
.to("kafka:order-events")
.log("Order saga completed: ${body.sagaId}");
// Failure handling routes
from("kafka:inventory-events?groupId=saga-choreography-failures")
.routeId("saga-choreography-inventory-failed")
.unmarshal().json(JsonLibrary.Jackson, InventoryReservationFailedEvent.class)
.filter(simple("${body.eventType} == 'INVENTORY_RESERVATION_FAILED'"))
.to("direct:handleSagaFailure")
.log("Saga failed at inventory reservation: ${body.sagaId}");
from("kafka:payment-events?groupId=saga-choreography-failures")
.routeId("saga-choreography-payment-failed")
.unmarshal().json(JsonLibrary.Jackson, PaymentProcessingFailedEvent.class)
.filter(simple("${body.eventType} == 'PAYMENT_PROCESSING_FAILED'"))
.to("direct:compensateInventoryInChoreography")
.to("direct:handleSagaFailure")
.log("Saga failed at payment processing: ${body.sagaId}");
}
}
Best Practices
1. Saga Design and Architecture
- Design sagas to be idempotent to handle duplicate execution safely
- Keep saga steps as small and focused as possible to minimize failure impact
- Use correlation IDs to track messages and events across saga execution
- Implement proper timeout handling for long-running saga steps
- Design compensating actions to be semantically correct, not just technically correct
2. State Management and Persistence
- Persist saga state consistently to handle coordinator failures
- Use event sourcing for saga state to provide complete audit trails
- Implement saga state snapshots for performance in long-running sagas
- Design for saga recovery and resumption after system failures
- Maintain saga execution history for debugging and monitoring
3. Error Handling and Compensation
- Design compensations to handle partial failures and retries
- Implement compensation timeouts and failure handling
- Use semantic locks to prevent conflicting operations during compensation
- Plan for scenarios where compensation actions may also fail
- Implement human intervention workflows for unrecoverable failures
4. Performance and Scalability
- Use asynchronous messaging for saga coordination to improve scalability
- Implement saga step parallelization where business logic allows
- Use distributed saga coordination for high-availability scenarios
- Monitor saga execution times and optimize slow steps
- Implement saga step caching where appropriate and safe
5. Monitoring and Observability
- Implement comprehensive logging for all saga execution steps
- Provide real-time dashboards for saga execution monitoring
- Create alerts for saga failures and timeout scenarios
- Track saga completion rates and performance metrics
- Implement saga execution tracing for complex debugging scenarios
6. Testing and Validation
- Create comprehensive test suites including failure scenario testing
- Implement saga simulation and testing environments
- Test compensation logic thoroughly including cascade compensation scenarios
- Use chaos engineering to validate saga resilience
- Implement automated saga regression testing for critical business processes
The Saga pattern is essential for managing complex, long-running business processes in distributed enterprise integration architectures, providing the foundation for reliable, scalable, and maintainable transaction coordination without the limitations of traditional distributed transaction mechanisms.
← Back to All Patterns