This site is in English. Use your browser's built-in translate feature to read it in your language.

Compensation Transactions

Overview

The Compensation Transactions pattern is a sophisticated error handling and recovery mechanism in enterprise integration architectures that enables the undoing of completed business operations through semantically inverse operations when distributed transactions fail or need to be reversed. Like a careful financial accountant who maintains detailed records and can reverse any transaction through proper correcting entries, the Compensation Transactions pattern provides a systematic approach to maintaining data consistency and business integrity in distributed systems by defining and executing compensating actions that semantically undo the effects of previously completed operations. This pattern is essential for managing long-running business processes, handling distributed transaction failures, and maintaining system consistency in environments where traditional ACID transactions cannot be used.

Theoretical Foundation

The Compensation Transactions pattern is grounded in distributed systems theory, transaction processing principles, and semantic consistency models. It incorporates concepts from compensating transaction theory, eventual consistency, distributed consensus, and business process management to provide a robust framework for handling failures and maintaining consistency in distributed environments. The pattern addresses the fundamental challenges of distributed transaction management where traditional two-phase commit protocols are impractical due to long-running operations, network partitions, or the involvement of external systems that don't support distributed transaction protocols.

Core Principles

1. Semantic Compensation

Implementing business-logic-based reversal of operations: - Semantic equivalence - compensating actions achieve business-equivalent reversal rather than technical rollback - Business logic integration - compensation logic integrated with business domain knowledge - State-aware compensation - compensations consider current system state and business context - Idempotent operations - compensation actions can be safely retried without side effects

2. Compensation Orchestration

Managing the coordination and execution of compensating actions: - Compensation ordering - executing compensations in reverse order of original operations - Parallel compensation - executing independent compensations concurrently for efficiency - Compensation chaining - handling cascading compensations across multiple services - Compensation state management - tracking compensation progress and handling partial failures

3. Forward and Backward Recovery

Supporting different recovery strategies based on failure scenarios: - Forward recovery - attempting to complete the original transaction despite failures - Backward recovery - systematically undoing completed operations through compensation - Mixed recovery - combining forward and backward recovery strategies - Selective compensation - compensating only specific operations while preserving others

4. Compensation Design Patterns

Implementing proven approaches to compensation logic: - Reversible operations - designing operations that can be semantically undone - Compensating resources - maintaining resources needed for future compensation - Compensation workflows - orchestrating complex multi-step compensations - Compensation persistence - maintaining compensation state for reliability

Why Compensation Transactions are Essential in Integration Architecture

1. Distributed Transaction Management

In distributed system architectures, compensation transactions provide: - Long-running transaction support - managing transactions that span extended time periods - Cross-system consistency - maintaining consistency across systems that don't support 2PC - Failure recovery - systematic recovery from partial failures in distributed operations - Business process integrity - ensuring business-level consistency despite technical failures

2. Microservices and Service Orchestration

For microservices architectures requiring coordination: - Service autonomy preservation - maintaining service independence while providing consistency - Distributed workflow management - coordinating complex workflows across multiple services - Failure isolation - limiting the impact of failures to specific service boundaries - Business process reliability - ensuring business processes complete successfully or fail gracefully

3. Legacy and External System Integration

Enabling integration with systems that have limited transaction capabilities: - Legacy system integration - working with systems that don't support modern transaction protocols - Third-party service integration - managing consistency with external service providers - Hybrid architecture support - maintaining consistency across on-premises and cloud systems - API-based integration - ensuring consistency in REST and other stateless API interactions

4. Business Process Management

Supporting sophisticated business process requirements: - Regulatory compliance - maintaining audit trails and supporting regulatory rollback requirements - Business rule enforcement - ensuring business rules are maintained during error recovery - Customer experience - providing clear and consistent customer experiences during failures - Process optimization - optimizing process performance through intelligent compensation strategies

Benefits in Integration Contexts

1. Technical Advantages

2. Business Value

3. Integration Enablement

4. Compliance and Governance

Integration Architecture Applications

1. Travel Booking System

Comprehensive compensation handling for travel reservations:

// Travel Booking Service with Compensation
@Component
public class TravelBookingService {

    @Autowired
    private ApplicationEventPublisher eventPublisher;

    @Autowired
    private BookingRepository bookingRepository;

    @Autowired
    private CompensationManager compensationManager;

    public BookingResult processBooking(TravelBookingRequest request) {
        String bookingId = UUID.randomUUID().toString();

        try {
            // Create booking record
            TravelBooking booking = new TravelBooking();
            booking.setBookingId(bookingId);
            booking.setCustomerId(request.getCustomerId());
            booking.setFlightDetails(request.getFlightDetails());
            booking.setHotelDetails(request.getHotelDetails());
            booking.setCarRentalDetails(request.getCarRentalDetails());
            booking.setStatus(BookingStatus.IN_PROGRESS);

            bookingRepository.save(booking);

            // Start compensation context
            CompensationContext context = compensationManager.createContext(bookingId);

            // Execute booking steps with compensation registration
            boolean success = true;

            // Step 1: Reserve flight
            if (request.getFlightDetails() != null) {
                FlightReservationResult flightResult = reserveFlight(request.getFlightDetails(), context);
                if (!flightResult.isSuccess()) {
                    success = false;
                } else {
                    booking.setFlightReservationId(flightResult.getReservationId());
                }
            }

            // Step 2: Reserve hotel
            if (success && request.getHotelDetails() != null) {
                HotelReservationResult hotelResult = reserveHotel(request.getHotelDetails(), context);
                if (!hotelResult.isSuccess()) {
                    success = false;
                } else {
                    booking.setHotelReservationId(hotelResult.getReservationId());
                }
            }

            // Step 3: Reserve car rental
            if (success && request.getCarRentalDetails() != null) {
                CarRentalResult carResult = reserveCarRental(request.getCarRentalDetails(), context);
                if (!carResult.isSuccess()) {
                    success = false;
                } else {
                    booking.setCarRentalReservationId(carResult.getReservationId());
                }
            }

            // Step 4: Process payment
            if (success) {
                PaymentResult paymentResult = processPayment(request.getPaymentDetails(), context);
                if (!paymentResult.isSuccess()) {
                    success = false;
                } else {
                    booking.setPaymentId(paymentResult.getPaymentId());
                }
            }

            if (success) {
                // Complete booking
                booking.setStatus(BookingStatus.CONFIRMED);
                bookingRepository.save(booking);

                // Confirm all reservations
                compensationManager.confirmAll(context);

                log.info("Travel booking completed successfully: {}", bookingId);
                return BookingResult.success(booking);

            } else {
                // Execute compensations
                booking.setStatus(BookingStatus.FAILED);
                bookingRepository.save(booking);

                compensationManager.executeCompensations(context);

                log.warn("Travel booking failed, compensations executed: {}", bookingId);
                return BookingResult.failure("Booking failed during processing");
            }

        } catch (Exception e) {
            log.error("Error during travel booking: {}", bookingId, e);

            // Execute compensations on exception
            try {
                CompensationContext context = compensationManager.getContext(bookingId);
                if (context != null) {
                    compensationManager.executeCompensations(context);
                }
            } catch (Exception compensationError) {
                log.error("Error during compensation execution for booking: {}", bookingId, compensationError);
            }

            return BookingResult.failure("Booking failed due to system error");
        }
    }

    private FlightReservationResult reserveFlight(FlightDetails flightDetails, CompensationContext context) {
        try {
            FlightReservationRequest request = new FlightReservationRequest();
            request.setFlightNumber(flightDetails.getFlightNumber());
            request.setPassengerDetails(flightDetails.getPassengers());
            request.setSeatClass(flightDetails.getSeatClass());

            FlightReservationResponse response = flightReservationService.reserveFlight(request);

            if (response.isSuccess()) {
                // Register compensation action
                compensationManager.addCompensation(context, 
                    new FlightCancellationCompensation(response.getReservationId()));

                log.info("Flight reserved successfully: {}", response.getReservationId());
                return FlightReservationResult.success(response.getReservationId());

            } else {
                log.warn("Flight reservation failed: {}", response.getErrorMessage());
                return FlightReservationResult.failure(response.getErrorMessage());
            }

        } catch (Exception e) {
            log.error("Error during flight reservation", e);
            return FlightReservationResult.failure("Flight reservation system error");
        }
    }

    private HotelReservationResult reserveHotel(HotelDetails hotelDetails, CompensationContext context) {
        try {
            HotelReservationRequest request = new HotelReservationRequest();
            request.setHotelId(hotelDetails.getHotelId());
            request.setCheckInDate(hotelDetails.getCheckInDate());
            request.setCheckOutDate(hotelDetails.getCheckOutDate());
            request.setRoomType(hotelDetails.getRoomType());
            request.setGuestDetails(hotelDetails.getGuests());

            HotelReservationResponse response = hotelReservationService.reserveHotel(request);

            if (response.isSuccess()) {
                // Register compensation action
                compensationManager.addCompensation(context, 
                    new HotelCancellationCompensation(response.getReservationId()));

                log.info("Hotel reserved successfully: {}", response.getReservationId());
                return HotelReservationResult.success(response.getReservationId());

            } else {
                log.warn("Hotel reservation failed: {}", response.getErrorMessage());
                return HotelReservationResult.failure(response.getErrorMessage());
            }

        } catch (Exception e) {
            log.error("Error during hotel reservation", e);
            return HotelReservationResult.failure("Hotel reservation system error");
        }
    }

    private CarRentalResult reserveCarRental(CarRentalDetails carDetails, CompensationContext context) {
        try {
            CarRentalRequest request = new CarRentalRequest();
            request.setCarType(carDetails.getCarType());
            request.setPickupLocation(carDetails.getPickupLocation());
            request.setDropoffLocation(carDetails.getDropoffLocation());
            request.setPickupDate(carDetails.getPickupDate());
            request.setDropoffDate(carDetails.getDropoffDate());
            request.setDriverDetails(carDetails.getDriverDetails());

            CarRentalResponse response = carRentalService.reserveCar(request);

            if (response.isSuccess()) {
                // Register compensation action
                compensationManager.addCompensation(context, 
                    new CarRentalCancellationCompensation(response.getReservationId()));

                log.info("Car rental reserved successfully: {}", response.getReservationId());
                return CarRentalResult.success(response.getReservationId());

            } else {
                log.warn("Car rental reservation failed: {}", response.getErrorMessage());
                return CarRentalResult.failure(response.getErrorMessage());
            }

        } catch (Exception e) {
            log.error("Error during car rental reservation", e);
            return CarRentalResult.failure("Car rental system error");
        }
    }

    private PaymentResult processPayment(PaymentDetails paymentDetails, CompensationContext context) {
        try {
            PaymentRequest request = new PaymentRequest();
            request.setAmount(paymentDetails.getTotalAmount());
            request.setPaymentMethod(paymentDetails.getPaymentMethod());
            request.setCurrency(paymentDetails.getCurrency());
            request.setBillingAddress(paymentDetails.getBillingAddress());

            PaymentResponse response = paymentService.processPayment(request);

            if (response.isSuccess()) {
                // Register compensation action
                compensationManager.addCompensation(context, 
                    new PaymentRefundCompensation(response.getPaymentId(), paymentDetails.getTotalAmount()));

                log.info("Payment processed successfully: {}", response.getPaymentId());
                return PaymentResult.success(response.getPaymentId());

            } else {
                log.warn("Payment processing failed: {}", response.getErrorMessage());
                return PaymentResult.failure(response.getErrorMessage());
            }

        } catch (Exception e) {
            log.error("Error during payment processing", e);
            return PaymentResult.failure("Payment system error");
        }
    }
}

// Compensation Manager
@Component
public class CompensationManager {

    private final Map<String, CompensationContext> activeContexts = new ConcurrentHashMap<>();

    @Autowired
    private CompensationRepository compensationRepository;

    public CompensationContext createContext(String contextId) {
        CompensationContext context = new CompensationContext(contextId);
        activeContexts.put(contextId, context);

        // Persist compensation context
        CompensationRecord record = new CompensationRecord();
        record.setContextId(contextId);
        record.setStatus(CompensationStatus.ACTIVE);
        record.setCreatedAt(Instant.now());

        compensationRepository.save(record);

        log.info("Compensation context created: {}", contextId);
        return context;
    }

    public void addCompensation(CompensationContext context, CompensationAction action) {
        context.addCompensation(action);

        // Persist compensation action
        CompensationActionRecord actionRecord = new CompensationActionRecord();
        actionRecord.setContextId(context.getContextId());
        actionRecord.setActionType(action.getClass().getSimpleName());
        actionRecord.setActionData(action.serialize());
        actionRecord.setSequenceNumber(context.getCompensationCount());
        actionRecord.setStatus(CompensationActionStatus.REGISTERED);

        compensationActionRepository.save(actionRecord);

        log.debug("Compensation action registered: {} for context: {}", 
                action.getClass().getSimpleName(), context.getContextId());
    }

    public void executeCompensations(CompensationContext context) {
        log.info("Executing compensations for context: {}", context.getContextId());

        List<CompensationAction> actions = context.getCompensations();

        // Execute compensations in reverse order
        Collections.reverse(actions);

        for (CompensationAction action : actions) {
            try {
                log.info("Executing compensation: {} for context: {}", 
                        action.getClass().getSimpleName(), context.getContextId());

                action.execute();

                // Update action status
                updateCompensationActionStatus(context.getContextId(), action, 
                                             CompensationActionStatus.COMPLETED);

                log.info("Compensation completed: {} for context: {}", 
                        action.getClass().getSimpleName(), context.getContextId());

            } catch (Exception e) {
                log.error("Compensation failed: {} for context: {}", 
                         action.getClass().getSimpleName(), context.getContextId(), e);

                // Update action status
                updateCompensationActionStatus(context.getContextId(), action, 
                                             CompensationActionStatus.FAILED);

                // Continue with other compensations despite individual failures
            }
        }

        // Update context status
        CompensationRecord record = compensationRepository.findByContextId(context.getContextId());
        record.setStatus(CompensationStatus.COMPLETED);
        record.setCompletedAt(Instant.now());
        compensationRepository.save(record);

        // Remove from active contexts
        activeContexts.remove(context.getContextId());

        log.info("All compensations executed for context: {}", context.getContextId());
    }

    public void confirmAll(CompensationContext context) {
        log.info("Confirming all operations for context: {}", context.getContextId());

        // Mark all compensation actions as confirmed (no longer needed)
        List<CompensationActionRecord> actions = compensationActionRepository
            .findByContextIdOrderBySequenceNumber(context.getContextId());

        for (CompensationActionRecord action : actions) {
            action.setStatus(CompensationActionStatus.CONFIRMED);
        }

        compensationActionRepository.saveAll(actions);

        // Update context status
        CompensationRecord record = compensationRepository.findByContextId(context.getContextId());
        record.setStatus(CompensationStatus.CONFIRMED);
        record.setConfirmedAt(Instant.now());
        compensationRepository.save(record);

        // Remove from active contexts
        activeContexts.remove(context.getContextId());

        log.info("All operations confirmed for context: {}", context.getContextId());
    }

    public CompensationContext getContext(String contextId) {
        return activeContexts.get(contextId);
    }
}

// Compensation Actions
public abstract class CompensationAction {
    protected String actionId;
    protected Instant createdAt;

    public CompensationAction() {
        this.actionId = UUID.randomUUID().toString();
        this.createdAt = Instant.now();
    }

    public abstract void execute() throws CompensationException;

    public abstract String serialize();

    public String getActionId() {
        return actionId;
    }

    public Instant getCreatedAt() {
        return createdAt;
    }
}

public class FlightCancellationCompensation extends CompensationAction {

    private final String flightReservationId;

    @Autowired
    private FlightReservationService flightReservationService;

    public FlightCancellationCompensation(String flightReservationId) {
        super();
        this.flightReservationId = flightReservationId;
    }

    @Override
    public void execute() throws CompensationException {
        try {
            FlightCancellationRequest request = new FlightCancellationRequest();
            request.setReservationId(flightReservationId);
            request.setReason("Booking compensation");

            FlightCancellationResponse response = flightReservationService.cancelReservation(request);

            if (!response.isSuccess()) {
                throw new CompensationException("Failed to cancel flight reservation: " + 
                                              response.getErrorMessage());
            }

            log.info("Flight reservation cancelled successfully: {}", flightReservationId);

        } catch (Exception e) {
            log.error("Error cancelling flight reservation: {}", flightReservationId, e);
            throw new CompensationException("Flight cancellation failed", e);
        }
    }

    @Override
    public String serialize() {
        return String.format("FlightCancellation:{\"reservationId\":\"%s\"}", flightReservationId);
    }
}

public class HotelCancellationCompensation extends CompensationAction {

    private final String hotelReservationId;

    @Autowired
    private HotelReservationService hotelReservationService;

    public HotelCancellationCompensation(String hotelReservationId) {
        super();
        this.hotelReservationId = hotelReservationId;
    }

    @Override
    public void execute() throws CompensationException {
        try {
            HotelCancellationRequest request = new HotelCancellationRequest();
            request.setReservationId(hotelReservationId);
            request.setReason("Booking compensation");

            HotelCancellationResponse response = hotelReservationService.cancelReservation(request);

            if (!response.isSuccess()) {
                throw new CompensationException("Failed to cancel hotel reservation: " + 
                                              response.getErrorMessage());
            }

            log.info("Hotel reservation cancelled successfully: {}", hotelReservationId);

        } catch (Exception e) {
            log.error("Error cancelling hotel reservation: {}", hotelReservationId, e);
            throw new CompensationException("Hotel cancellation failed", e);
        }
    }

    @Override
    public String serialize() {
        return String.format("HotelCancellation:{\"reservationId\":\"%s\"}", hotelReservationId);
    }
}

public class PaymentRefundCompensation extends CompensationAction {

    private final String paymentId;
    private final BigDecimal refundAmount;

    @Autowired
    private PaymentService paymentService;

    public PaymentRefundCompensation(String paymentId, BigDecimal refundAmount) {
        super();
        this.paymentId = paymentId;
        this.refundAmount = refundAmount;
    }

    @Override
    public void execute() throws CompensationException {
        try {
            RefundRequest request = new RefundRequest();
            request.setPaymentId(paymentId);
            request.setRefundAmount(refundAmount);
            request.setReason("Booking compensation");

            RefundResponse response = paymentService.processRefund(request);

            if (!response.isSuccess()) {
                throw new CompensationException("Failed to process refund: " + 
                                              response.getErrorMessage());
            }

            log.info("Payment refunded successfully: {} for amount: {}", paymentId, refundAmount);

        } catch (Exception e) {
            log.error("Error processing refund for payment: {}", paymentId, e);
            throw new CompensationException("Payment refund failed", e);
        }
    }

    @Override
    public String serialize() {
        return String.format("PaymentRefund:{\"paymentId\":\"%s\",\"amount\":\"%s\"}", 
                           paymentId, refundAmount.toString());
    }
}

2. Banking Transfer Compensation

Comprehensive compensation for multi-step banking operations:

// Banking Transfer Service with Compensation
@Component
public class BankingTransferService {

    @Autowired
    private CompensationManager compensationManager;

    @Autowired
    private AccountService accountService;

    @Autowired
    private AuditService auditService;

    public TransferResult processTransfer(TransferRequest request) {
        String transferId = UUID.randomUUID().toString();
        CompensationContext context = compensationManager.createContext(transferId);

        try {
            // Step 1: Validate accounts
            AccountValidationResult validation = validateAccounts(request, context);
            if (!validation.isValid()) {
                return TransferResult.failure("Account validation failed: " + validation.getErrorMessage());
            }

            // Step 2: Place hold on source account
            HoldResult holdResult = placeAccountHold(request.getFromAccount(), 
                                                   request.getAmount(), context);
            if (!holdResult.isSuccess()) {
                compensationManager.executeCompensations(context);
                return TransferResult.failure("Failed to place hold on source account");
            }

            // Step 3: Check transfer limits and compliance
            ComplianceResult complianceResult = checkTransferCompliance(request, context);
            if (!complianceResult.isApproved()) {
                compensationManager.executeCompensations(context);
                return TransferResult.failure("Transfer failed compliance check: " + 
                                            complianceResult.getReason());
            }

            // Step 4: Execute the actual transfer
            TransferExecutionResult executionResult = executeTransfer(request, context);
            if (!executionResult.isSuccess()) {
                compensationManager.executeCompensations(context);
                return TransferResult.failure("Transfer execution failed: " + 
                                            executionResult.getErrorMessage());
            }

            // Step 5: Send notifications
            NotificationResult notificationResult = sendTransferNotifications(request, context);
            if (!notificationResult.isSuccess()) {
                // Non-critical failure, log but don't compensate the transfer
                log.warn("Failed to send transfer notifications for transfer: {}", transferId);
            }

            // All steps completed successfully
            compensationManager.confirmAll(context);

            log.info("Transfer completed successfully: {}", transferId);
            return TransferResult.success(transferId, executionResult.getTransactionReference());

        } catch (Exception e) {
            log.error("Error during transfer processing: {}", transferId, e);
            compensationManager.executeCompensations(context);
            return TransferResult.failure("Transfer failed due to system error");
        }
    }

    private HoldResult placeAccountHold(String accountNumber, BigDecimal amount, 
                                       CompensationContext context) {
        try {
            AccountHoldRequest request = new AccountHoldRequest();
            request.setAccountNumber(accountNumber);
            request.setHoldAmount(amount);
            request.setHoldReason("Transfer processing");
            request.setExpiryTime(Instant.now().plus(Duration.ofMinutes(30)));

            AccountHoldResponse response = accountService.placeHold(request);

            if (response.isSuccess()) {
                // Register compensation to release the hold
                compensationManager.addCompensation(context, 
                    new AccountHoldReleaseCompensation(response.getHoldId()));

                log.info("Account hold placed successfully: {} for amount: {}", 
                        response.getHoldId(), amount);
                return HoldResult.success(response.getHoldId());

            } else {
                log.warn("Failed to place account hold: {}", response.getErrorMessage());
                return HoldResult.failure(response.getErrorMessage());
            }

        } catch (Exception e) {
            log.error("Error placing account hold for account: {}", accountNumber, e);
            return HoldResult.failure("Account hold system error");
        }
    }

    private TransferExecutionResult executeTransfer(TransferRequest request, 
                                                  CompensationContext context) {
        try {
            BankingTransferRequest transferRequest = new BankingTransferRequest();
            transferRequest.setFromAccount(request.getFromAccount());
            transferRequest.setToAccount(request.getToAccount());
            transferRequest.setAmount(request.getAmount());
            transferRequest.setReference(request.getReference());
            transferRequest.setCurrency(request.getCurrency());

            BankingTransferResponse response = accountService.executeTransfer(transferRequest);

            if (response.isSuccess()) {
                // Register compensation to reverse the transfer if needed
                compensationManager.addCompensation(context, 
                    new TransferReversalCompensation(response.getTransactionId(), 
                                                   request.getFromAccount(), 
                                                   request.getToAccount(), 
                                                   request.getAmount()));

                log.info("Transfer executed successfully: {} with transaction ID: {}", 
                        request.getReference(), response.getTransactionId());
                return TransferExecutionResult.success(response.getTransactionId(), 
                                                     response.getTransactionReference());

            } else {
                log.warn("Transfer execution failed: {}", response.getErrorMessage());
                return TransferExecutionResult.failure(response.getErrorMessage());
            }

        } catch (Exception e) {
            log.error("Error executing transfer: {}", request.getReference(), e);
            return TransferExecutionResult.failure("Transfer execution system error");
        }
    }
}

// Account Hold Release Compensation
public class AccountHoldReleaseCompensation extends CompensationAction {

    private final String holdId;

    @Autowired
    private AccountService accountService;

    public AccountHoldReleaseCompensation(String holdId) {
        super();
        this.holdId = holdId;
    }

    @Override
    public void execute() throws CompensationException {
        try {
            AccountHoldReleaseRequest request = new AccountHoldReleaseRequest();
            request.setHoldId(holdId);
            request.setReleaseReason("Transfer compensation");

            AccountHoldReleaseResponse response = accountService.releaseHold(request);

            if (!response.isSuccess()) {
                throw new CompensationException("Failed to release account hold: " + 
                                              response.getErrorMessage());
            }

            log.info("Account hold released successfully: {}", holdId);

        } catch (Exception e) {
            log.error("Error releasing account hold: {}", holdId, e);
            throw new CompensationException("Account hold release failed", e);
        }
    }

    @Override
    public String serialize() {
        return String.format("AccountHoldRelease:{\"holdId\":\"%s\"}", holdId);
    }
}

// Transfer Reversal Compensation
public class TransferReversalCompensation extends CompensationAction {

    private final String originalTransactionId;
    private final String originalFromAccount;
    private final String originalToAccount;
    private final BigDecimal originalAmount;

    @Autowired
    private AccountService accountService;

    @Autowired
    private AuditService auditService;

    public TransferReversalCompensation(String originalTransactionId,
                                      String originalFromAccount,
                                      String originalToAccount,
                                      BigDecimal originalAmount) {
        super();
        this.originalTransactionId = originalTransactionId;
        this.originalFromAccount = originalFromAccount;
        this.originalToAccount = originalToAccount;
        this.originalAmount = originalAmount;
    }

    @Override
    public void execute() throws CompensationException {
        try {
            // Create reversal transfer (from destination back to source)
            BankingTransferRequest reversalRequest = new BankingTransferRequest();
            reversalRequest.setFromAccount(originalToAccount);
            reversalRequest.setToAccount(originalFromAccount);
            reversalRequest.setAmount(originalAmount);
            reversalRequest.setReference("REVERSAL:" + originalTransactionId);
            reversalRequest.setOriginalTransactionId(originalTransactionId);

            BankingTransferResponse response = accountService.executeReversal(reversalRequest);

            if (!response.isSuccess()) {
                throw new CompensationException("Failed to execute transfer reversal: " + 
                                              response.getErrorMessage());
            }

            // Log reversal for audit purposes
            auditService.logTransferReversal(originalTransactionId, 
                                           response.getTransactionId(),
                                           "Compensation reversal");

            log.info("Transfer reversed successfully. Original: {}, Reversal: {}", 
                    originalTransactionId, response.getTransactionId());

        } catch (Exception e) {
            log.error("Error executing transfer reversal for transaction: {}", 
                     originalTransactionId, e);
            throw new CompensationException("Transfer reversal failed", e);
        }
    }

    @Override
    public String serialize() {
        return String.format("TransferReversal:{\"originalTxId\":\"%s\",\"fromAccount\":\"%s\",\"toAccount\":\"%s\",\"amount\":\"%s\"}", 
                           originalTransactionId, originalFromAccount, originalToAccount, originalAmount.toString());
    }
}

3. Apache Camel Compensation Integration

Event-driven compensation using Apache Camel:

@Component
public class CamelCompensationRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {

        // Compensation event processing
        from("kafka:compensation-events?groupId=compensation-processor")
            .routeId("compensation-event-processor")
            .unmarshal().json(JsonLibrary.Jackson, CompensationEvent.class)
            .choice()
                .when(simple("${body.eventType} == 'COMPENSATION_REQUIRED'"))
                    .to("direct:initiateCompensation")
                .when(simple("${body.eventType} == 'COMPENSATION_COMPLETED'"))
                    .to("direct:handleCompensationCompleted")
                .when(simple("${body.eventType} == 'COMPENSATION_FAILED'"))
                    .to("direct:handleCompensationFailed")
                .otherwise()
                    .log("Unknown compensation event: ${body.eventType}")
            .end();

        from("direct:initiateCompensation")
            .routeId("initiate-compensation")
            .process(exchange -> {
                CompensationEvent event = exchange.getIn().getBody(CompensationEvent.class);

                // Load compensation context
                CompensationContext context = compensationManager.getContext(event.getContextId());
                if (context == null) {
                    // Try to restore context from persistence
                    context = compensationManager.restoreContext(event.getContextId());
                }

                if (context != null) {
                    exchange.getIn().setBody(context);
                    exchange.getIn().setHeader("compensationRequired", true);
                } else {
                    exchange.getIn().setHeader("compensationRequired", false);
                    log.warn("Compensation context not found: {}", event.getContextId());
                }
            })
            .choice()
                .when(header("compensationRequired").isEqualTo(true))
                    .to("direct:executeCompensationSequence")
                .otherwise()
                    .log("No compensation context found for: ${body.contextId}")
            .end();

        from("direct:executeCompensationSequence")
            .routeId("execute-compensation-sequence")
            .process(exchange -> {
                CompensationContext context = exchange.getIn().getBody(CompensationContext.class);
                List<CompensationAction> actions = context.getCompensations();

                // Reverse the order for compensation
                Collections.reverse(actions);

                exchange.getIn().setBody(actions);
                exchange.getIn().setHeader("contextId", context.getContextId());
            })
            .split(body())
            .parallelProcessing(false) // Sequential execution for compensations
            .process(exchange -> {
                CompensationAction action = exchange.getIn().getBody(CompensationAction.class);
                String contextId = (String) exchange.getIn().getHeader("contextId");

                try {
                    log.info("Executing compensation action: {} for context: {}", 
                            action.getClass().getSimpleName(), contextId);

                    action.execute();

                    // Create success event
                    CompensationActionEvent successEvent = new CompensationActionEvent();
                    successEvent.setContextId(contextId);
                    successEvent.setActionId(action.getActionId());
                    successEvent.setActionType(action.getClass().getSimpleName());
                    successEvent.setStatus(CompensationActionStatus.COMPLETED);
                    successEvent.setTimestamp(Instant.now());

                    exchange.getIn().setBody(successEvent);
                    exchange.getIn().setHeader("actionStatus", "SUCCESS");

                    log.info("Compensation action completed: {} for context: {}", 
                            action.getClass().getSimpleName(), contextId);

                } catch (Exception e) {
                    log.error("Compensation action failed: {} for context: {}", 
                             action.getClass().getSimpleName(), contextId, e);

                    // Create failure event
                    CompensationActionEvent failureEvent = new CompensationActionEvent();
                    failureEvent.setContextId(contextId);
                    failureEvent.setActionId(action.getActionId());
                    failureEvent.setActionType(action.getClass().getSimpleName());
                    failureEvent.setStatus(CompensationActionStatus.FAILED);
                    failureEvent.setErrorMessage(e.getMessage());
                    failureEvent.setTimestamp(Instant.now());

                    exchange.getIn().setBody(failureEvent);
                    exchange.getIn().setHeader("actionStatus", "FAILED");
                }
            })
            .to("kafka:compensation-action-events")
            .log("Compensation action event published: ${body.actionType} - ${header.actionStatus}");

        // Compensation monitoring and retry
        from("timer:compensationMonitor?period=60000")
            .routeId("compensation-monitor")
            .process(exchange -> {
                // Find failed compensations that need retry
                List<CompensationContext> failedCompensations = 
                    compensationManager.getFailedCompensations();

                exchange.getIn().setBody(failedCompensations);
            })
            .split(body())
            .filter(simple("${body.retryCount} < 3")) // Max 3 retries
            .process(exchange -> {
                CompensationContext context = exchange.getIn().getBody(CompensationContext.class);

                // Increment retry count
                context.incrementRetryCount();

                CompensationEvent retryEvent = new CompensationEvent();
                retryEvent.setEventType("COMPENSATION_RETRY");
                retryEvent.setContextId(context.getContextId());
                retryEvent.setRetryAttempt(context.getRetryCount());
                retryEvent.setTimestamp(Instant.now());

                exchange.getIn().setBody(retryEvent);
            })
            .to("kafka:compensation-events")
            .log("Compensation retry initiated for context: ${body.contextId}");

        // Compensation reporting
        from("kafka:compensation-action-events?groupId=compensation-reporting")
            .routeId("compensation-reporting")
            .unmarshal().json(JsonLibrary.Jackson, CompensationActionEvent.class)
            .process(exchange -> {
                CompensationActionEvent event = exchange.getIn().getBody(CompensationActionEvent.class);

                // Create compensation report entry
                CompensationReport report = new CompensationReport();
                report.setContextId(event.getContextId());
                report.setActionId(event.getActionId());
                report.setActionType(event.getActionType());
                report.setStatus(event.getStatus());
                report.setExecutionTime(event.getTimestamp());

                if (event.getStatus() == CompensationActionStatus.FAILED) {
                    report.setErrorMessage(event.getErrorMessage());
                }

                compensationReportRepository.save(report);

                exchange.getIn().setBody(report);
            })
            .choice()
                .when(simple("${body.status} == 'FAILED'"))
                    .to("direct:sendCompensationFailureAlert")
                .otherwise()
                    .log("Compensation action reported: ${body.actionType}")
            .end();

        from("direct:sendCompensationFailureAlert")
            .routeId("send-compensation-failure-alert")
            .process(exchange -> {
                CompensationReport report = exchange.getIn().getBody(CompensationReport.class);

                AlertMessage alert = new AlertMessage();
                alert.setAlertType("COMPENSATION_FAILURE");
                alert.setSeverity(AlertSeverity.HIGH);
                alert.setMessage(String.format("Compensation action failed: %s for context: %s", 
                                              report.getActionType(), report.getContextId()));
                alert.setTimestamp(Instant.now());

                exchange.getIn().setBody(alert);
            })
            .to("kafka:system-alerts")
            .log("Compensation failure alert sent for context: ${body.contextId}");

        // Compensation analytics
        from("timer:compensationAnalytics?period=3600000") // Hourly
            .routeId("compensation-analytics")
            .process(exchange -> {
                // Generate compensation analytics
                CompensationAnalytics analytics = new CompensationAnalytics();
                analytics.setAnalysisTime(Instant.now());

                // Calculate metrics
                analytics.setTotalCompensations(compensationReportRepository.countTodaysCompensations());
                analytics.setSuccessfulCompensations(compensationReportRepository.countSuccessfulCompensations());
                analytics.setFailedCompensations(compensationReportRepository.countFailedCompensations());
                analytics.setAverageCompensationTime(compensationReportRepository.getAverageCompensationTime());

                // Top failure reasons
                analytics.setTopFailureReasons(compensationReportRepository.getTopFailureReasons());

                exchange.getIn().setBody(analytics);
            })
            .to("kafka:compensation-analytics")
            .log("Compensation analytics generated: ${body.totalCompensations} total compensations");
    }
}

Best Practices

1. Compensation Design and Implementation

2. Compensation Ordering and Dependencies

3. State Management and Persistence

4. Error Handling and Recovery

5. Performance and Scalability

6. Testing and Validation

The Compensation Transactions pattern is essential for maintaining data consistency and business integrity in distributed enterprise integration architectures, providing systematic approaches to error recovery and business process reliability without the limitations of traditional distributed transaction mechanisms.

← Back to All Patterns