This site is in English. Use your browser's built-in translate feature to read it in your language.

Saga Pattern

Overview

The Saga pattern is a sophisticated distributed transaction management mechanism in enterprise integration architectures that coordinates long-running business processes across multiple services by breaking them into a sequence of local transactions. Like a carefully choreographed performance where each act must complete successfully for the story to progress, or where compensating actions must undo previous acts if something goes wrong, the Saga pattern ensures data consistency in distributed systems without requiring traditional ACID transactions. This pattern is essential for managing complex business workflows that span multiple services, databases, and external systems while maintaining system reliability and business process integrity.

Theoretical Foundation

The Saga pattern is grounded in distributed systems theory and transaction processing principles, specifically addressing the challenges of maintaining consistency and managing failures in loosely coupled, distributed environments. The pattern incorporates concepts from compensating transactions, eventual consistency, workflow management, and fault tolerance to provide a robust framework for handling long-running business processes that cannot be managed by traditional two-phase commit protocols due to their distributed nature and extended duration.

Core Principles

1. Transaction Decomposition

Breaking complex transactions into manageable, atomic units: - Local transactions - each service manages its own data using local ACID transactions - Sequential execution - transactions execute in a predetermined order with clear dependencies - Compensating actions - each transaction has a corresponding compensation operation for rollback - Idempotency - transactions and compensations can be safely retried without side effects

2. Coordination Strategies

Two primary approaches for managing saga execution: - Choreography-based sagas - services coordinate directly with each other through events - Orchestration-based sagas - a central coordinator manages the saga execution flow - Hybrid approaches - combining choreography and orchestration for optimal flexibility - State management - tracking saga progress and handling intermediate states

3. Failure Handling

Comprehensive strategies for dealing with failures and inconsistencies: - Forward recovery - attempting to complete the saga despite failures - Backward recovery - compensating completed transactions to restore consistency - Semantic rollback - using business logic to undo the effects of completed transactions - Partial compensation - handling scenarios where some compensations may also fail

4. Consistency Models

Understanding the consistency guarantees provided by sagas: - Eventual consistency - the system will reach a consistent state eventually - Compensating consistency - using compensation to maintain business invariants - Semantic atomicity - maintaining business-level atomicity without technical atomicity - Isolation relaxation - accepting intermediate states visible to other transactions

Why Saga Pattern is Essential in Integration Architecture

1. Distributed Transaction Management

In microservices and distributed architectures, the Saga pattern provides: - Cross-service transactions - coordinating transactions across multiple independent services - Long-running processes - managing business workflows that may take hours, days, or weeks - System decoupling - avoiding tight coupling between services through asynchronous coordination - Scalability support - enabling horizontal scaling without distributed locking mechanisms

2. Business Process Automation

For complex business workflows: - Order processing - managing multi-step order fulfillment across inventory, payment, and shipping - Account opening - coordinating identity verification, credit checks, and account creation - Insurance claims - handling complex approval workflows with multiple stakeholders - Supply chain management - orchestrating procurement, manufacturing, and distribution processes

3. System Resilience

Enhancing system reliability and fault tolerance: - Failure isolation - limiting the impact of failures to specific transaction steps - Recovery mechanisms - providing clear paths for both forward and backward recovery - Partial success handling - managing scenarios where some operations succeed while others fail - Graceful degradation - maintaining system functionality despite component failures

4. Integration Flexibility

Supporting various integration scenarios: - Legacy system integration - coordinating with systems that don't support modern transaction protocols - External service coordination - managing interactions with third-party services and APIs - Multi-cloud orchestration - coordinating processes across multiple cloud providers - Hybrid environment support - managing workflows that span on-premises and cloud systems

Benefits in Integration Contexts

1. Technical Advantages

2. Business Value

3. Operational Excellence

4. Integration Enablement

Integration Architecture Applications

1. E-commerce and Retail

Saga applications in retail scenarios: - Order processing sagas - coordinating inventory reservation, payment processing, and fulfillment - Return and refund sagas - managing complex return workflows with inventory, payment, and customer service - Promotion and pricing sagas - updating prices and promotions across multiple channels and systems - Customer onboarding sagas - coordinating account creation, verification, and initial setup

2. Financial Services

In banking and financial applications: - Loan origination sagas - managing application processing, credit checks, approval, and account setup - Payment processing sagas - coordinating complex payment workflows with multiple parties - Account transfer sagas - handling money transfers with fraud detection and compliance checks - Trading settlement sagas - managing multi-step securities trading and settlement processes

3. Healthcare and Insurance

For healthcare workflow management: - Patient admission sagas - coordinating registration, insurance verification, and care planning - Claims processing sagas - managing insurance claim workflows with multiple approval stages - Prescription fulfillment sagas - coordinating doctor orders, pharmacy fulfillment, and insurance processing - Treatment plan sagas - orchestrating complex, multi-provider treatment workflows

4. Supply Chain and Manufacturing

In operational workflows: - Procurement sagas - managing purchase orders, supplier coordination, and delivery tracking - Manufacturing sagas - coordinating production planning, resource allocation, and quality control - Shipping and logistics sagas - managing complex shipping workflows with multiple carriers and tracking - Vendor onboarding sagas - coordinating supplier qualification, contract negotiation, and system setup

Implementation Patterns

1. Orchestration-Based Saga Implementation

// Central saga coordinator that manages the entire workflow
@Service
public class OrderProcessingSagaOrchestrator {

    @Autowired
    private InventoryService inventoryService;

    @Autowired
    private PaymentService paymentService;

    @Autowired
    private ShippingService shippingService;

    @Autowired
    private SagaStateRepository sagaStateRepository;

    public void processOrder(OrderSagaRequest request) {
        String sagaId = UUID.randomUUID().toString();

        SagaState sagaState = SagaState.builder()
            .sagaId(sagaId)
            .orderId(request.getOrderId())
            .currentStep(SagaStep.INVENTORY_RESERVATION)
            .status(SagaStatus.IN_PROGRESS)
            .startTime(Instant.now())
            .context(request.toMap())
            .build();

        sagaStateRepository.save(sagaState);

        try {
            executeOrderSaga(sagaState);
        } catch (Exception e) {
            log.error("Saga execution failed: {}", sagaId, e);
            compensateOrderSaga(sagaState);
        }
    }

    private void executeOrderSaga(SagaState sagaState) {
        Map<String, Object> context = sagaState.getContext();

        switch (sagaState.getCurrentStep()) {
            case INVENTORY_RESERVATION:
                executeInventoryReservation(sagaState, context);
                break;
            case PAYMENT_PROCESSING:
                executePaymentProcessing(sagaState, context);
                break;
            case SHIPPING_ARRANGEMENT:
                executeShippingArrangement(sagaState, context);
                break;
            case ORDER_COMPLETION:
                completeOrderSaga(sagaState);
                break;
        }
    }

    private void executeInventoryReservation(SagaState sagaState, Map<String, Object> context) {
        try {
            String orderId = (String) context.get("orderId");
            List<OrderItem> items = (List<OrderItem>) context.get("items");

            InventoryReservationResult result = inventoryService.reserveInventory(orderId, items);

            if (result.isSuccess()) {
                // Store reservation details for potential compensation
                context.put("inventoryReservationId", result.getReservationId());
                context.put("reservedItems", result.getReservedItems());

                // Move to next step
                sagaState.setCurrentStep(SagaStep.PAYMENT_PROCESSING);
                sagaStateRepository.save(sagaState);

                // Continue with payment processing
                executePaymentProcessing(sagaState, context);
            } else {
                throw new SagaExecutionException("Inventory reservation failed: " + result.getErrorMessage());
            }

        } catch (Exception e) {
            sagaState.setStatus(SagaStatus.COMPENSATION_REQUIRED);
            sagaStateRepository.save(sagaState);
            throw new SagaExecutionException("Inventory reservation step failed", e);
        }
    }

    private void executePaymentProcessing(SagaState sagaState, Map<String, Object> context) {
        try {
            String orderId = (String) context.get("orderId");
            PaymentInfo paymentInfo = (PaymentInfo) context.get("paymentInfo");
            BigDecimal amount = (BigDecimal) context.get("totalAmount");

            PaymentResult result = paymentService.processPayment(orderId, paymentInfo, amount);

            if (result.isSuccess()) {
                // Store payment details for potential compensation
                context.put("paymentTransactionId", result.getTransactionId());
                context.put("paymentAmount", amount);

                // Move to next step
                sagaState.setCurrentStep(SagaStep.SHIPPING_ARRANGEMENT);
                sagaStateRepository.save(sagaState);

                // Continue with shipping
                executeShippingArrangement(sagaState, context);
            } else {
                throw new SagaExecutionException("Payment processing failed: " + result.getErrorMessage());
            }

        } catch (Exception e) {
            sagaState.setStatus(SagaStatus.COMPENSATION_REQUIRED);
            sagaStateRepository.save(sagaState);
            throw new SagaExecutionException("Payment processing step failed", e);
        }
    }

    private void executeShippingArrangement(SagaState sagaState, Map<String, Object> context) {
        try {
            String orderId = (String) context.get("orderId");
            Address shippingAddress = (Address) context.get("shippingAddress");
            List<OrderItem> items = (List<OrderItem>) context.get("reservedItems");

            ShippingResult result = shippingService.arrangeShipping(orderId, shippingAddress, items);

            if (result.isSuccess()) {
                // Store shipping details
                context.put("shippingTrackingNumber", result.getTrackingNumber());
                context.put("estimatedDelivery", result.getEstimatedDelivery());

                // Move to completion
                sagaState.setCurrentStep(SagaStep.ORDER_COMPLETION);
                sagaStateRepository.save(sagaState);

                completeOrderSaga(sagaState);
            } else {
                throw new SagaExecutionException("Shipping arrangement failed: " + result.getErrorMessage());
            }

        } catch (Exception e) {
            sagaState.setStatus(SagaStatus.COMPENSATION_REQUIRED);
            sagaStateRepository.save(sagaState);
            throw new SagaExecutionException("Shipping arrangement step failed", e);
        }
    }

    private void completeOrderSaga(SagaState sagaState) {
        sagaState.setStatus(SagaStatus.COMPLETED);
        sagaState.setEndTime(Instant.now());
        sagaStateRepository.save(sagaState);

        // Publish order completion event
        orderEventPublisher.publishOrderCompleted(sagaState.getOrderId());

        log.info("Order saga completed successfully: {}", sagaState.getSagaId());
    }

    private void compensateOrderSaga(SagaState sagaState) {
        log.warn("Starting compensation for saga: {}", sagaState.getSagaId());

        Map<String, Object> context = sagaState.getContext();

        // Compensate in reverse order
        switch (sagaState.getCurrentStep()) {
            case SHIPPING_ARRANGEMENT:
                compensateShipping(context);
                // Fall through to compensate payment
            case PAYMENT_PROCESSING:
                compensatePayment(context);
                // Fall through to compensate inventory
            case INVENTORY_RESERVATION:
                compensateInventory(context);
                break;
        }

        sagaState.setStatus(SagaStatus.COMPENSATED);
        sagaState.setEndTime(Instant.now());
        sagaStateRepository.save(sagaState);

        // Publish order failed event
        orderEventPublisher.publishOrderFailed(sagaState.getOrderId());

        log.info("Order saga compensated: {}", sagaState.getSagaId());
    }

    private void compensateInventory(Map<String, Object> context) {
        try {
            String reservationId = (String) context.get("inventoryReservationId");
            if (reservationId != null) {
                inventoryService.cancelReservation(reservationId);
                log.info("Inventory reservation compensated: {}", reservationId);
            }
        } catch (Exception e) {
            log.error("Failed to compensate inventory reservation", e);
            // In real implementation, this would trigger manual intervention or retry
        }
    }

    private void compensatePayment(Map<String, Object> context) {
        try {
            String transactionId = (String) context.get("paymentTransactionId");
            BigDecimal amount = (BigDecimal) context.get("paymentAmount");

            if (transactionId != null) {
                paymentService.refundPayment(transactionId, amount);
                log.info("Payment compensated: {}", transactionId);
            }
        } catch (Exception e) {
            log.error("Failed to compensate payment", e);
            // This might require manual intervention or alternative compensation
        }
    }

    private void compensateShipping(Map<String, Object> context) {
        try {
            String trackingNumber = (String) context.get("shippingTrackingNumber");
            if (trackingNumber != null) {
                shippingService.cancelShipment(trackingNumber);
                log.info("Shipping compensated: {}", trackingNumber);
            }
        } catch (Exception e) {
            log.error("Failed to compensate shipping", e);
        }
    }
}

@Entity
public class SagaState {
    @Id
    private String sagaId;
    private String orderId;

    @Enumerated(EnumType.STRING)
    private SagaStep currentStep;

    @Enumerated(EnumType.STRING)
    private SagaStatus status;

    private Instant startTime;
    private Instant endTime;

    @ElementCollection
    @MapKeyColumn(name = "context_key")
    @Column(name = "context_value", columnDefinition = "TEXT")
    private Map<String, Object> context = new HashMap<>();

    // Getters, setters, and builder
}

public enum SagaStep {
    INVENTORY_RESERVATION,
    PAYMENT_PROCESSING,
    SHIPPING_ARRANGEMENT,
    ORDER_COMPLETION
}

public enum SagaStatus {
    IN_PROGRESS,
    COMPLETED,
    COMPENSATION_REQUIRED,
    COMPENSATED,
    FAILED
}

2. Choreography-Based Saga Implementation

// Event-driven saga where services coordinate through events
@Service
public class OrderProcessingChoreographySaga {

    @Autowired
    private DomainEventPublisher eventPublisher;

    @Autowired
    private SagaStateRepository sagaStateRepository;

    @EventHandler
    public void handleOrderCreated(OrderCreatedEvent event) {
        String sagaId = UUID.randomUUID().toString();

        SagaState sagaState = SagaState.builder()
            .sagaId(sagaId)
            .orderId(event.getOrderId())
            .status(SagaStatus.IN_PROGRESS)
            .startTime(Instant.now())
            .build();

        sagaStateRepository.save(sagaState);

        // Publish event to start inventory reservation
        eventPublisher.publish(InventoryReservationRequested.builder()
            .sagaId(sagaId)
            .orderId(event.getOrderId())
            .items(event.getItems())
            .build());
    }

    @EventHandler
    public void handleInventoryReserved(InventoryReservedEvent event) {
        SagaState sagaState = sagaStateRepository.findBySagaId(event.getSagaId());

        if (sagaState != null && sagaState.getStatus() == SagaStatus.IN_PROGRESS) {
            // Store reservation details
            sagaState.getContext().put("inventoryReservationId", event.getReservationId());
            sagaStateRepository.save(sagaState);

            // Publish payment processing request
            eventPublisher.publish(PaymentProcessingRequested.builder()
                .sagaId(event.getSagaId())
                .orderId(sagaState.getOrderId())
                .amount(event.getTotalAmount())
                .paymentInfo(event.getPaymentInfo())
                .build());
        }
    }

    @EventHandler
    public void handlePaymentProcessed(PaymentProcessedEvent event) {
        SagaState sagaState = sagaStateRepository.findBySagaId(event.getSagaId());

        if (sagaState != null && sagaState.getStatus() == SagaStatus.IN_PROGRESS) {
            // Store payment details
            sagaState.getContext().put("paymentTransactionId", event.getTransactionId());
            sagaStateRepository.save(sagaState);

            // Publish shipping arrangement request
            eventPublisher.publish(ShippingArrangementRequested.builder()
                .sagaId(event.getSagaId())
                .orderId(sagaState.getOrderId())
                .shippingAddress(event.getShippingAddress())
                .items(event.getItems())
                .build());
        }
    }

    @EventHandler
    public void handleShippingArranged(ShippingArrangedEvent event) {
        SagaState sagaState = sagaStateRepository.findBySagaId(event.getSagaId());

        if (sagaState != null && sagaState.getStatus() == SagaStatus.IN_PROGRESS) {
            // Complete the saga
            sagaState.setStatus(SagaStatus.COMPLETED);
            sagaState.setEndTime(Instant.now());
            sagaState.getContext().put("trackingNumber", event.getTrackingNumber());
            sagaStateRepository.save(sagaState);

            // Publish order completion event
            eventPublisher.publish(OrderCompletedEvent.builder()
                .sagaId(event.getSagaId())
                .orderId(sagaState.getOrderId())
                .trackingNumber(event.getTrackingNumber())
                .build());
        }
    }

    // Compensation event handlers
    @EventHandler
    public void handleInventoryReservationFailed(InventoryReservationFailedEvent event) {
        compensateOrderSaga(event.getSagaId(), "Inventory reservation failed");
    }

    @EventHandler
    public void handlePaymentProcessingFailed(PaymentProcessingFailedEvent event) {
        SagaState sagaState = sagaStateRepository.findBySagaId(event.getSagaId());

        // Compensate inventory reservation
        String reservationId = (String) sagaState.getContext().get("inventoryReservationId");
        if (reservationId != null) {
            eventPublisher.publish(InventoryReservationCancellationRequested.builder()
                .reservationId(reservationId)
                .reason("Payment processing failed")
                .build());
        }

        compensateOrderSaga(event.getSagaId(), "Payment processing failed");
    }

    @EventHandler
    public void handleShippingArrangementFailed(ShippingArrangementFailedEvent event) {
        SagaState sagaState = sagaStateRepository.findBySagaId(event.getSagaId());

        // Compensate payment
        String transactionId = (String) sagaState.getContext().get("paymentTransactionId");
        if (transactionId != null) {
            eventPublisher.publish(PaymentRefundRequested.builder()
                .transactionId(transactionId)
                .reason("Shipping arrangement failed")
                .build());
        }

        // Compensate inventory
        String reservationId = (String) sagaState.getContext().get("inventoryReservationId");
        if (reservationId != null) {
            eventPublisher.publish(InventoryReservationCancellationRequested.builder()
                .reservationId(reservationId)
                .reason("Shipping arrangement failed")
                .build());
        }

        compensateOrderSaga(event.getSagaId(), "Shipping arrangement failed");
    }

    private void compensateOrderSaga(String sagaId, String reason) {
        SagaState sagaState = sagaStateRepository.findBySagaId(sagaId);

        if (sagaState != null) {
            sagaState.setStatus(SagaStatus.COMPENSATED);
            sagaState.setEndTime(Instant.now());
            sagaState.getContext().put("compensationReason", reason);
            sagaStateRepository.save(sagaState);

            eventPublisher.publish(OrderFailedEvent.builder()
                .sagaId(sagaId)
                .orderId(sagaState.getOrderId())
                .reason(reason)
                .build());
        }
    }
}

3. Saga State Machine Implementation

// State machine-based saga implementation for complex workflows
@Component
public class OrderSagaStateMachine {

    @Autowired
    private StateMachineFactory<SagaState, SagaEvent> stateMachineFactory;

    @Autowired
    private SagaStateRepository sagaStateRepository;

    public void startSaga(OrderCreatedEvent orderEvent) {
        StateMachine<SagaState, SagaEvent> stateMachine = stateMachineFactory.getStateMachine();

        // Configure saga context
        SagaContext context = SagaContext.builder()
            .orderId(orderEvent.getOrderId())
            .items(orderEvent.getItems())
            .paymentInfo(orderEvent.getPaymentInfo())
            .shippingAddress(orderEvent.getShippingAddress())
            .build();

        stateMachine.getExtendedState().getVariables().put("sagaContext", context);

        // Start the state machine
        stateMachine.start();
        stateMachine.sendEvent(SagaEvent.START_SAGA);
    }

    @Configuration
    @EnableStateMachine
    public static class SagaStateMachineConfig extends StateMachineConfigurerAdapter<SagaState, SagaEvent> {

        @Override
        public void configure(StateMachineStateConfigurer<SagaState, SagaEvent> states) throws Exception {
            states
                .withStates()
                    .initial(SagaState.INITIAL)
                    .state(SagaState.RESERVING_INVENTORY)
                    .state(SagaState.PROCESSING_PAYMENT)
                    .state(SagaState.ARRANGING_SHIPPING)
                    .state(SagaState.COMPLETED)
                    .state(SagaState.COMPENSATING)
                    .end(SagaState.COMPENSATED);
        }

        @Override
        public void configure(StateMachineTransitionConfigurer<SagaState, SagaEvent> transitions) throws Exception {
            transitions
                .withExternal()
                    .source(SagaState.INITIAL).target(SagaState.RESERVING_INVENTORY)
                    .event(SagaEvent.START_SAGA)
                    .action(reserveInventoryAction())
                .and()
                .withExternal()
                    .source(SagaState.RESERVING_INVENTORY).target(SagaState.PROCESSING_PAYMENT)
                    .event(SagaEvent.INVENTORY_RESERVED)
                    .action(processPaymentAction())
                .and()
                .withExternal()
                    .source(SagaState.PROCESSING_PAYMENT).target(SagaState.ARRANGING_SHIPPING)
                    .event(SagaEvent.PAYMENT_PROCESSED)
                    .action(arrangeShippingAction())
                .and()
                .withExternal()
                    .source(SagaState.ARRANGING_SHIPPING).target(SagaState.COMPLETED)
                    .event(SagaEvent.SHIPPING_ARRANGED)
                    .action(completeSagaAction())
                .and()
                // Compensation transitions
                .withExternal()
                    .source(SagaState.RESERVING_INVENTORY).target(SagaState.COMPENSATED)
                    .event(SagaEvent.INVENTORY_RESERVATION_FAILED)
                .and()
                .withExternal()
                    .source(SagaState.PROCESSING_PAYMENT).target(SagaState.COMPENSATING)
                    .event(SagaEvent.PAYMENT_PROCESSING_FAILED)
                    .action(compensateInventoryAction())
                .and()
                .withExternal()
                    .source(SagaState.ARRANGING_SHIPPING).target(SagaState.COMPENSATING)
                    .event(SagaEvent.SHIPPING_ARRANGEMENT_FAILED)
                    .action(compensatePaymentAndInventoryAction());
        }

        @Bean
        public Action<SagaState, SagaEvent> reserveInventoryAction() {
            return context -> {
                SagaContext sagaContext = context.getExtendedState().get("sagaContext", SagaContext.class);

                try {
                    InventoryReservationResult result = inventoryService.reserveInventory(
                        sagaContext.getOrderId(), 
                        sagaContext.getItems()
                    );

                    if (result.isSuccess()) {
                        sagaContext.setInventoryReservationId(result.getReservationId());
                        context.getStateMachine().sendEvent(SagaEvent.INVENTORY_RESERVED);
                    } else {
                        context.getStateMachine().sendEvent(SagaEvent.INVENTORY_RESERVATION_FAILED);
                    }

                } catch (Exception e) {
                    log.error("Inventory reservation failed", e);
                    context.getStateMachine().sendEvent(SagaEvent.INVENTORY_RESERVATION_FAILED);
                }
            };
        }

        @Bean
        public Action<SagaState, SagaEvent> processPaymentAction() {
            return context -> {
                SagaContext sagaContext = context.getExtendedState().get("sagaContext", SagaContext.class);

                try {
                    PaymentResult result = paymentService.processPayment(
                        sagaContext.getOrderId(),
                        sagaContext.getPaymentInfo(),
                        sagaContext.getTotalAmount()
                    );

                    if (result.isSuccess()) {
                        sagaContext.setPaymentTransactionId(result.getTransactionId());
                        context.getStateMachine().sendEvent(SagaEvent.PAYMENT_PROCESSED);
                    } else {
                        context.getStateMachine().sendEvent(SagaEvent.PAYMENT_PROCESSING_FAILED);
                    }

                } catch (Exception e) {
                    log.error("Payment processing failed", e);
                    context.getStateMachine().sendEvent(SagaEvent.PAYMENT_PROCESSING_FAILED);
                }
            };
        }
    }
}

public enum SagaEvent {
    START_SAGA,
    INVENTORY_RESERVED,
    INVENTORY_RESERVATION_FAILED,
    PAYMENT_PROCESSED,
    PAYMENT_PROCESSING_FAILED,
    SHIPPING_ARRANGED,
    SHIPPING_ARRANGEMENT_FAILED,
    COMPENSATION_COMPLETED
}

4. Distributed Saga Coordination

// Distributed saga implementation using message queues for coordination
@Service
public class DistributedSagaCoordinator {

    @Autowired
    private MessageQueueService messageQueueService;

    @Autowired
    private SagaDefinitionRepository sagaDefinitionRepository;

    @Autowired
    private SagaInstanceRepository sagaInstanceRepository;

    public String startSaga(String sagaDefinitionId, Map<String, Object> initialContext) {
        SagaDefinition definition = sagaDefinitionRepository.findById(sagaDefinitionId)
            .orElseThrow(() -> new IllegalArgumentException("Saga definition not found: " + sagaDefinitionId));

        String sagaInstanceId = UUID.randomUUID().toString();

        SagaInstance instance = SagaInstance.builder()
            .instanceId(sagaInstanceId)
            .definitionId(sagaDefinitionId)
            .status(SagaInstanceStatus.ACTIVE)
            .currentStepIndex(0)
            .context(new HashMap<>(initialContext))
            .startTime(Instant.now())
            .build();

        sagaInstanceRepository.save(instance);

        // Start execution with first step
        executeSagaStep(instance, definition);

        return sagaInstanceId;
    }

    private void executeSagaStep(SagaInstance instance, SagaDefinition definition) {
        if (instance.getCurrentStepIndex() >= definition.getSteps().size()) {
            // Saga completed successfully
            completeSaga(instance);
            return;
        }

        SagaStep step = definition.getSteps().get(instance.getCurrentStepIndex());

        try {
            // Send message to execute step
            SagaStepExecutionMessage message = SagaStepExecutionMessage.builder()
                .sagaInstanceId(instance.getInstanceId())
                .stepId(step.getStepId())
                .serviceEndpoint(step.getServiceEndpoint())
                .operation(step.getOperation())
                .inputData(prepareStepInput(step, instance.getContext()))
                .compensationData(prepareCompensationData(step, instance.getContext()))
                .build();

            messageQueueService.sendMessage(step.getServiceQueue(), message);

            // Update instance status
            instance.setStatus(SagaInstanceStatus.WAITING_FOR_RESPONSE);
            sagaInstanceRepository.save(instance);

        } catch (Exception e) {
            log.error("Failed to execute saga step: {}", step.getStepId(), e);
            startCompensation(instance, definition);
        }
    }

    @MessageListener(queues = "saga-step-response")
    public void handleSagaStepResponse(SagaStepResponseMessage response) {
        SagaInstance instance = sagaInstanceRepository.findByInstanceId(response.getSagaInstanceId());
        SagaDefinition definition = sagaDefinitionRepository.findById(instance.getDefinitionId()).orElseThrow();

        if (response.isSuccess()) {
            // Update context with step results
            instance.getContext().putAll(response.getOutputData());

            // Move to next step
            instance.setCurrentStepIndex(instance.getCurrentStepIndex() + 1);
            instance.setStatus(SagaInstanceStatus.ACTIVE);
            sagaInstanceRepository.save(instance);

            // Continue execution
            executeSagaStep(instance, definition);

        } else {
            // Step failed, start compensation
            log.warn("Saga step failed: {} - {}", response.getStepId(), response.getErrorMessage());
            startCompensation(instance, definition);
        }
    }

    private void startCompensation(SagaInstance instance, SagaDefinition definition) {
        instance.setStatus(SagaInstanceStatus.COMPENSATING);
        sagaInstanceRepository.save(instance);

        // Compensate completed steps in reverse order
        int currentStep = instance.getCurrentStepIndex() - 1; // Last completed step

        compensateSagaStep(instance, definition, currentStep);
    }

    private void compensateSagaStep(SagaInstance instance, SagaDefinition definition, int stepIndex) {
        if (stepIndex < 0) {
            // All compensations completed
            instance.setStatus(SagaInstanceStatus.COMPENSATED);
            instance.setEndTime(Instant.now());
            sagaInstanceRepository.save(instance);

            publishSagaCompensated(instance);
            return;
        }

        SagaStep step = definition.getSteps().get(stepIndex);

        if (step.getCompensationOperation() != null) {
            try {
                SagaStepCompensationMessage message = SagaStepCompensationMessage.builder()
                    .sagaInstanceId(instance.getInstanceId())
                    .stepId(step.getStepId())
                    .compensationOperation(step.getCompensationOperation())
                    .compensationData(instance.getContext())
                    .build();

                messageQueueService.sendMessage(step.getServiceQueue(), message);

            } catch (Exception e) {
                log.error("Failed to send compensation message for step: {}", step.getStepId(), e);
                // Continue with next compensation
                compensateSagaStep(instance, definition, stepIndex - 1);
            }
        } else {
            // No compensation needed, move to previous step
            compensateSagaStep(instance, definition, stepIndex - 1);
        }
    }

    @MessageListener(queues = "saga-compensation-response")
    public void handleSagaCompensationResponse(SagaCompensationResponseMessage response) {
        SagaInstance instance = sagaInstanceRepository.findByInstanceId(response.getSagaInstanceId());
        SagaDefinition definition = sagaDefinitionRepository.findById(instance.getDefinitionId()).orElseThrow();

        if (!response.isSuccess()) {
            log.error("Compensation failed for step: {} - {}", 
                response.getStepId(), response.getErrorMessage());
            // In real implementation, this might trigger manual intervention
        }

        // Find current step being compensated
        int currentStepIndex = definition.getSteps().stream()
            .mapToInt(step -> step.getStepId().equals(response.getStepId()) ? 
                definition.getSteps().indexOf(step) : -1)
            .filter(index -> index >= 0)
            .findFirst()
            .orElse(-1);

        // Continue with previous step compensation
        compensateSagaStep(instance, definition, currentStepIndex - 1);
    }
}

@Entity
public class SagaDefinition {
    @Id
    private String definitionId;
    private String name;
    private String description;

    @OneToMany(cascade = CascadeType.ALL, fetch = FetchType.EAGER)
    @OrderBy("stepOrder")
    private List<SagaStep> steps = new ArrayList<>();

    // Getters and setters
}

@Entity
public class SagaStep {
    @Id
    private String stepId;
    private String serviceEndpoint;
    private String serviceQueue;
    private String operation;
    private String compensationOperation;
    private int stepOrder;
    private Map<String, Object> stepConfiguration = new HashMap<>();

    // Getters and setters
}

@Entity
public class SagaInstance {
    @Id
    private String instanceId;
    private String definitionId;

    @Enumerated(EnumType.STRING)
    private SagaInstanceStatus status;

    private int currentStepIndex;
    private Instant startTime;
    private Instant endTime;

    @ElementCollection
    private Map<String, Object> context = new HashMap<>();

    // Getters and setters
}

public enum SagaInstanceStatus {
    ACTIVE,
    WAITING_FOR_RESPONSE,
    COMPLETED,
    COMPENSATING,
    COMPENSATED,
    FAILED
}

Apache Camel Implementation

1. Orchestration-Based Saga Route

@Component
public class SagaOrchestrationRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("direct:startOrderSaga")
            .routeId("order-saga-orchestration")
            .log("Starting order saga for order: ${body.orderId}")
            .process(exchange -> {
                OrderSagaRequest request = exchange.getIn().getBody(OrderSagaRequest.class);
                String sagaId = UUID.randomUUID().toString();

                exchange.setProperty("sagaId", sagaId);
                exchange.setProperty("orderId", request.getOrderId());
                exchange.setProperty("orderItems", request.getItems());
                exchange.setProperty("paymentInfo", request.getPaymentInfo());
                exchange.setProperty("shippingAddress", request.getShippingAddress());
            })
            .to("direct:reserveInventory")
            .to("direct:processPayment")
            .to("direct:arrangeShipping")
            .to("direct:completeSaga")
            .onException(Exception.class)
                .handled(true)
                .to("direct:compensateSaga")
            .end();

        from("direct:reserveInventory")
            .log("Reserving inventory for saga: ${exchangeProperty.sagaId}")
            .process(exchange -> {
                String orderId = exchange.getProperty("orderId", String.class);
                @SuppressWarnings("unchecked")
                List<OrderItem> items = exchange.getProperty("orderItems", List.class);

                InventoryReservationResult result = inventoryService.reserveInventory(orderId, items);

                if (!result.isSuccess()) {
                    throw new SagaStepException("Inventory reservation failed: " + result.getErrorMessage());
                }

                exchange.setProperty("inventoryReservationId", result.getReservationId());
            })
            .log("Inventory reserved successfully: ${exchangeProperty.inventoryReservationId}");

        from("direct:processPayment")
            .log("Processing payment for saga: ${exchangeProperty.sagaId}")
            .process(exchange -> {
                String orderId = exchange.getProperty("orderId", String.class);
                PaymentInfo paymentInfo = exchange.getProperty("paymentInfo", PaymentInfo.class);
                BigDecimal totalAmount = calculateTotalAmount(exchange.getProperty("orderItems"));

                PaymentResult result = paymentService.processPayment(orderId, paymentInfo, totalAmount);

                if (!result.isSuccess()) {
                    throw new SagaStepException("Payment processing failed: " + result.getErrorMessage());
                }

                exchange.setProperty("paymentTransactionId", result.getTransactionId());
            })
            .log("Payment processed successfully: ${exchangeProperty.paymentTransactionId}");

        from("direct:arrangeShipping")
            .log("Arranging shipping for saga: ${exchangeProperty.sagaId}")
            .process(exchange -> {
                String orderId = exchange.getProperty("orderId", String.class);
                Address shippingAddress = exchange.getProperty("shippingAddress", Address.class);
                @SuppressWarnings("unchecked")
                List<OrderItem> items = exchange.getProperty("orderItems", List.class);

                ShippingResult result = shippingService.arrangeShipping(orderId, shippingAddress, items);

                if (!result.isSuccess()) {
                    throw new SagaStepException("Shipping arrangement failed: " + result.getErrorMessage());
                }

                exchange.setProperty("trackingNumber", result.getTrackingNumber());
            })
            .log("Shipping arranged successfully: ${exchangeProperty.trackingNumber}");

        from("direct:completeSaga")
            .log("Completing saga: ${exchangeProperty.sagaId}")
            .process(exchange -> {
                String sagaId = exchange.getProperty("sagaId", String.class);
                String orderId = exchange.getProperty("orderId", String.class);
                String trackingNumber = exchange.getProperty("trackingNumber", String.class);

                // Publish order completed event
                OrderCompletedEvent event = OrderCompletedEvent.builder()
                    .sagaId(sagaId)
                    .orderId(orderId)
                    .trackingNumber(trackingNumber)
                    .completedAt(Instant.now())
                    .build();

                eventPublisher.publish(event);

                exchange.getIn().setBody(event);
            })
            .log("Saga completed successfully: ${exchangeProperty.sagaId}");

        // Compensation route
        from("direct:compensateSaga")
            .log("Starting compensation for saga: ${exchangeProperty.sagaId}")
            .doTry()
                .to("direct:compensateShipping")
            .doCatch(Exception.class)
                .log("Shipping compensation failed")
            .doFinally()
                .to("direct:compensatePayment")
            .end()
            .doTry()
                .to("direct:compensatePayment")
            .doCatch(Exception.class)
                .log("Payment compensation failed")
            .doFinally()
                .to("direct:compensateInventory")
            .end()
            .process(exchange -> {
                String sagaId = exchange.getProperty("sagaId", String.class);
                String orderId = exchange.getProperty("orderId", String.class);

                OrderFailedEvent event = OrderFailedEvent.builder()
                    .sagaId(sagaId)
                    .orderId(orderId)
                    .reason("Saga compensation completed")
                    .failedAt(Instant.now())
                    .build();

                eventPublisher.publish(event);
            })
            .log("Saga compensation completed: ${exchangeProperty.sagaId}");
    }
}

2. Choreography-Based Saga Route

@Component
public class SagaChoreographyRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        // Order created event handler
        from("kafka:order-events?groupId=saga-choreography")
            .routeId("saga-choreography-order-created")
            .unmarshal().json(JsonLibrary.Jackson, OrderCreatedEvent.class)
            .filter(simple("${body.eventType} == 'ORDER_CREATED'"))
            .process(exchange -> {
                OrderCreatedEvent event = exchange.getIn().getBody(OrderCreatedEvent.class);
                String sagaId = UUID.randomUUID().toString();

                // Store saga state
                SagaState sagaState = SagaState.builder()
                    .sagaId(sagaId)
                    .orderId(event.getOrderId())
                    .status(SagaStatus.IN_PROGRESS)
                    .startTime(Instant.now())
                    .build();

                sagaStateRepository.save(sagaState);

                // Create inventory reservation request
                InventoryReservationRequested reservationRequest = InventoryReservationRequested.builder()
                    .sagaId(sagaId)
                    .orderId(event.getOrderId())
                    .items(event.getItems())
                    .build();

                exchange.getIn().setBody(reservationRequest);
            })
            .marshal().json(JsonLibrary.Jackson)
            .to("kafka:inventory-commands")
            .log("Inventory reservation requested for saga: ${body.sagaId}");

        // Inventory reserved event handler
        from("kafka:inventory-events?groupId=saga-choreography")
            .routeId("saga-choreography-inventory-reserved")
            .unmarshal().json(JsonLibrary.Jackson, InventoryReservedEvent.class)
            .filter(simple("${body.eventType} == 'INVENTORY_RESERVED'"))
            .process(exchange -> {
                InventoryReservedEvent event = exchange.getIn().getBody(InventoryReservedEvent.class);

                // Update saga state
                SagaState sagaState = sagaStateRepository.findBySagaId(event.getSagaId());
                if (sagaState != null && sagaState.getStatus() == SagaStatus.IN_PROGRESS) {
                    sagaState.getContext().put("inventoryReservationId", event.getReservationId());
                    sagaStateRepository.save(sagaState);

                    // Create payment processing request
                    PaymentProcessingRequested paymentRequest = PaymentProcessingRequested.builder()
                        .sagaId(event.getSagaId())
                        .orderId(sagaState.getOrderId())
                        .amount(event.getTotalAmount())
                        .paymentInfo(event.getPaymentInfo())
                        .build();

                    exchange.getIn().setBody(paymentRequest);
                } else {
                    exchange.setProperty(Exchange.ROUTE_STOP, true);
                }
            })
            .marshal().json(JsonLibrary.Jackson)
            .to("kafka:payment-commands")
            .log("Payment processing requested for saga: ${body.sagaId}");

        // Payment processed event handler
        from("kafka:payment-events?groupId=saga-choreography")
            .routeId("saga-choreography-payment-processed")
            .unmarshal().json(JsonLibrary.Jackson, PaymentProcessedEvent.class)
            .filter(simple("${body.eventType} == 'PAYMENT_PROCESSED'"))
            .process(exchange -> {
                PaymentProcessedEvent event = exchange.getIn().getBody(PaymentProcessedEvent.class);

                SagaState sagaState = sagaStateRepository.findBySagaId(event.getSagaId());
                if (sagaState != null && sagaState.getStatus() == SagaStatus.IN_PROGRESS) {
                    sagaState.getContext().put("paymentTransactionId", event.getTransactionId());
                    sagaStateRepository.save(sagaState);

                    ShippingArrangementRequested shippingRequest = ShippingArrangementRequested.builder()
                        .sagaId(event.getSagaId())
                        .orderId(sagaState.getOrderId())
                        .shippingAddress(event.getShippingAddress())
                        .items(event.getItems())
                        .build();

                    exchange.getIn().setBody(shippingRequest);
                } else {
                    exchange.setProperty(Exchange.ROUTE_STOP, true);
                }
            })
            .marshal().json(JsonLibrary.Jackson)
            .to("kafka:shipping-commands")
            .log("Shipping arrangement requested for saga: ${body.sagaId}");

        // Shipping arranged event handler
        from("kafka:shipping-events?groupId=saga-choreography")
            .routeId("saga-choreography-shipping-arranged")
            .unmarshal().json(JsonLibrary.Jackson, ShippingArrangedEvent.class)
            .filter(simple("${body.eventType} == 'SHIPPING_ARRANGED'"))
            .process(exchange -> {
                ShippingArrangedEvent event = exchange.getIn().getBody(ShippingArrangedEvent.class);

                SagaState sagaState = sagaStateRepository.findBySagaId(event.getSagaId());
                if (sagaState != null && sagaState.getStatus() == SagaStatus.IN_PROGRESS) {
                    // Complete saga
                    sagaState.setStatus(SagaStatus.COMPLETED);
                    sagaState.setEndTime(Instant.now());
                    sagaState.getContext().put("trackingNumber", event.getTrackingNumber());
                    sagaStateRepository.save(sagaState);

                    OrderCompletedEvent completedEvent = OrderCompletedEvent.builder()
                        .sagaId(event.getSagaId())
                        .orderId(sagaState.getOrderId())
                        .trackingNumber(event.getTrackingNumber())
                        .completedAt(Instant.now())
                        .build();

                    exchange.getIn().setBody(completedEvent);
                } else {
                    exchange.setProperty(Exchange.ROUTE_STOP, true);
                }
            })
            .marshal().json(JsonLibrary.Jackson)
            .to("kafka:order-events")
            .log("Order saga completed: ${body.sagaId}");

        // Failure handling routes
        from("kafka:inventory-events?groupId=saga-choreography-failures")
            .routeId("saga-choreography-inventory-failed")
            .unmarshal().json(JsonLibrary.Jackson, InventoryReservationFailedEvent.class)
            .filter(simple("${body.eventType} == 'INVENTORY_RESERVATION_FAILED'"))
            .to("direct:handleSagaFailure")
            .log("Saga failed at inventory reservation: ${body.sagaId}");

        from("kafka:payment-events?groupId=saga-choreography-failures")
            .routeId("saga-choreography-payment-failed")
            .unmarshal().json(JsonLibrary.Jackson, PaymentProcessingFailedEvent.class)
            .filter(simple("${body.eventType} == 'PAYMENT_PROCESSING_FAILED'"))
            .to("direct:compensateInventoryInChoreography")
            .to("direct:handleSagaFailure")
            .log("Saga failed at payment processing: ${body.sagaId}");
    }
}

Best Practices

1. Saga Design and Architecture

2. State Management and Persistence

3. Error Handling and Compensation

4. Performance and Scalability

5. Monitoring and Observability

6. Testing and Validation

The Saga pattern is essential for managing complex, long-running business processes in distributed enterprise integration architectures, providing the foundation for reliable, scalable, and maintainable transaction coordination without the limitations of traditional distributed transaction mechanisms.

← Back to All Patterns