This site is in English. Use your browser's built-in translate feature to read it in your language.

Process Manager

Overview

The Process Manager pattern is a sophisticated orchestration mechanism in enterprise integration architectures that coordinates complex, stateful business workflows by maintaining process state and managing the flow of messages between participating services. Like a conductor directing an orchestra where each musician must play their part at the right time and in the right order, the Process Manager ensures that multi-step business processes execute correctly by tracking progress, managing state transitions, and handling exceptional conditions. This pattern is essential for managing long-running, stateful business processes that require coordination between multiple services while maintaining visibility, control, and reliability throughout the entire workflow lifecycle.

Theoretical Foundation

The Process Manager pattern is grounded in workflow management theory, finite state machine principles, and distributed coordination patterns. It incorporates concepts from business process management (BPM), state machine design, event-driven architecture, and distributed systems coordination to provide a robust framework for managing complex business processes that span multiple services, systems, and organizational boundaries. The pattern addresses the challenges of maintaining state consistency, handling process variations, and ensuring reliable completion of long-running workflows in distributed environments.

Core Principles

1. Stateful Orchestration

Managing complex business process state and coordination: - Process state management - maintaining complete workflow state throughout execution - State transitions - managing transitions between process steps based on business rules - Process correlation - tracking related messages and events across the workflow - Context preservation - maintaining business context and data throughout the process

2. Message-Driven Coordination

Coordinating services through intelligent message management: - Message routing - directing messages to appropriate services based on process state - Response correlation - matching responses to outstanding requests using correlation identifiers - Timeout management - handling delayed or missing responses from participating services - Message ordering - ensuring messages are processed in the correct sequence

3. Process Flow Control

Managing the execution flow of complex business processes: - Conditional branching - executing different paths based on business conditions - Parallel execution - coordinating concurrent process steps for improved performance - Process synchronization - coordinating parallel branches and collecting results - Loop handling - managing iterative process steps and batch processing scenarios

4. Exception and Error Handling

Comprehensive strategies for managing process failures and exceptions: - Error detection - identifying failures and exceptional conditions during process execution - Process compensation - undoing completed steps when process failures occur - Alternative flows - executing alternative process paths when primary flows fail - Human intervention - escalating complex exceptions to human operators for resolution

Why Process Manager is Essential in Integration Architecture

1. Complex Workflow Coordination

In enterprise integration scenarios, the Process Manager pattern provides: - Multi-service coordination - orchestrating interactions between numerous independent services - Long-running process management - handling business processes that span days, weeks, or months - State consistency - maintaining consistent process state across distributed services - Process visibility - providing complete visibility into workflow progress and status

2. Business Process Automation

For sophisticated business workflow scenarios: - Approval workflows - managing multi-level approval processes with conditional routing - Order fulfillment - coordinating complex order processing across multiple systems - Customer onboarding - orchestrating multi-step customer registration and verification processes - Supply chain management - coordinating procurement, manufacturing, and distribution workflows

3. System Integration

Enabling seamless integration across heterogeneous systems: - Legacy system integration - coordinating with existing systems through standardized interfaces - Third-party service integration - managing interactions with external service providers - Cross-platform coordination - orchestrating processes across different technology platforms - Multi-tenant process management - managing workflows for multiple organizational units

4. Operational Excellence

Enhancing operational efficiency and reliability: - Process monitoring - providing real-time visibility into workflow execution status - Error handling - systematic approaches to handling and recovering from failures - Performance optimization - optimizing workflow execution through parallel processing and caching - Compliance support - maintaining audit trails and supporting regulatory compliance requirements

Benefits in Integration Contexts

1. Technical Advantages

2. Business Value

3. Integration Enablement

4. Maintainability and Evolution

Integration Architecture Applications

1. Financial Services

Process Manager applications in financial workflows:

@Component
public class LoanApplicationProcessManager {

    @Autowired
    private ProcessStateRepository processStateRepository;

    @Autowired
    private CreditCheckService creditCheckService;

    @Autowired
    private DocumentVerificationService documentService;

    @Autowired
    private UnderwritingService underwritingService;

    @EventListener
    public void handleLoanApplicationSubmitted(LoanApplicationSubmittedEvent event) {
        ProcessState state = new ProcessState();
        state.setProcessId(event.getProcessId());
        state.setApplicationId(event.getApplicationId());
        state.setCurrentStep(ProcessStep.CREDIT_CHECK_INITIATED);
        state.setProcessData(event.getApplicationData());

        processStateRepository.save(state);

        // Initiate credit check
        CreditCheckRequest request = new CreditCheckRequest();
        request.setProcessId(event.getProcessId());
        request.setCustomerSSN(event.getApplicationData().getCustomerSSN());
        request.setRequestedAmount(event.getApplicationData().getLoanAmount());

        creditCheckService.initiateCheck(request);

        // Set timeout for credit check response
        scheduleTimeout(event.getProcessId(), CREDIT_CHECK_TIMEOUT, 
                       ProcessTimeoutType.CREDIT_CHECK);
    }

    @EventListener
    public void handleCreditCheckCompleted(CreditCheckCompletedEvent event) {
        ProcessState state = processStateRepository.findByProcessId(event.getProcessId());

        if (state.getCurrentStep() != ProcessStep.CREDIT_CHECK_INITIATED) {
            log.warn("Unexpected credit check response for process {}", event.getProcessId());
            return;
        }

        state.setCurrentStep(ProcessStep.CREDIT_CHECK_COMPLETED);
        state.addProcessData("creditScore", event.getCreditScore());
        state.addProcessData("creditCheckResult", event.getResult());

        if (event.getResult() == CreditCheckResult.APPROVED) {
            // Proceed to document verification
            state.setCurrentStep(ProcessStep.DOCUMENT_VERIFICATION_INITIATED);
            processStateRepository.save(state);

            DocumentVerificationRequest docRequest = new DocumentVerificationRequest();
            docRequest.setProcessId(event.getProcessId());
            docRequest.setDocuments(state.getProcessData().getDocuments());

            documentService.initiateVerification(docRequest);

            scheduleTimeout(event.getProcessId(), DOCUMENT_VERIFICATION_TIMEOUT,
                           ProcessTimeoutType.DOCUMENT_VERIFICATION);
        } else {
            // Handle credit check failure
            state.setCurrentStep(ProcessStep.PROCESS_REJECTED);
            state.setRejectionReason("Credit check failed: " + event.getReason());
            processStateRepository.save(state);

            publishApplicationRejectedEvent(event.getProcessId(), event.getReason());
        }
    }

    @EventListener
    public void handleDocumentVerificationCompleted(DocumentVerificationCompletedEvent event) {
        ProcessState state = processStateRepository.findByProcessId(event.getProcessId());

        if (state.getCurrentStep() != ProcessStep.DOCUMENT_VERIFICATION_INITIATED) {
            log.warn("Unexpected document verification response for process {}", 
                    event.getProcessId());
            return;
        }

        state.setCurrentStep(ProcessStep.DOCUMENT_VERIFICATION_COMPLETED);
        state.addProcessData("documentVerificationResult", event.getResult());

        if (event.getResult() == VerificationResult.VERIFIED) {
            // Proceed to underwriting
            state.setCurrentStep(ProcessStep.UNDERWRITING_INITIATED);
            processStateRepository.save(state);

            UnderwritingRequest underwritingRequest = new UnderwritingRequest();
            underwritingRequest.setProcessId(event.getProcessId());
            underwritingRequest.setCreditScore(state.getProcessData().getCreditScore());
            underwritingRequest.setApplicationData(state.getProcessData());
            underwritingRequest.setDocumentationStatus(event.getResult());

            underwritingService.initiateUnderwriting(underwritingRequest);

            scheduleTimeout(event.getProcessId(), UNDERWRITING_TIMEOUT,
                           ProcessTimeoutType.UNDERWRITING);
        } else {
            // Handle document verification failure
            state.setCurrentStep(ProcessStep.PROCESS_REJECTED);
            state.setRejectionReason("Document verification failed");
            processStateRepository.save(state);

            publishApplicationRejectedEvent(event.getProcessId(), 
                                          "Document verification failed");
        }
    }

    @EventListener
    public void handleUnderwritingCompleted(UnderwritingCompletedEvent event) {
        ProcessState state = processStateRepository.findByProcessId(event.getProcessId());

        if (state.getCurrentStep() != ProcessStep.UNDERWRITING_INITIATED) {
            log.warn("Unexpected underwriting response for process {}", event.getProcessId());
            return;
        }

        state.setCurrentStep(ProcessStep.UNDERWRITING_COMPLETED);
        state.addProcessData("underwritingResult", event.getResult());
        state.addProcessData("approvedAmount", event.getApprovedAmount());
        state.addProcessData("interestRate", event.getInterestRate());

        if (event.getResult() == UnderwritingResult.APPROVED) {
            state.setCurrentStep(ProcessStep.PROCESS_COMPLETED);
            processStateRepository.save(state);

            publishApplicationApprovedEvent(event.getProcessId(), 
                                          event.getApprovedAmount(), 
                                          event.getInterestRate());
        } else {
            state.setCurrentStep(ProcessStep.PROCESS_REJECTED);
            state.setRejectionReason("Underwriting declined: " + event.getReason());
            processStateRepository.save(state);

            publishApplicationRejectedEvent(event.getProcessId(), event.getReason());
        }
    }

    @EventListener
    public void handleProcessTimeout(ProcessTimeoutEvent event) {
        ProcessState state = processStateRepository.findByProcessId(event.getProcessId());

        switch (event.getTimeoutType()) {
            case CREDIT_CHECK:
                if (state.getCurrentStep() == ProcessStep.CREDIT_CHECK_INITIATED) {
                    handleCreditCheckTimeout(state);
                }
                break;
            case DOCUMENT_VERIFICATION:
                if (state.getCurrentStep() == ProcessStep.DOCUMENT_VERIFICATION_INITIATED) {
                    handleDocumentVerificationTimeout(state);
                }
                break;
            case UNDERWRITING:
                if (state.getCurrentStep() == ProcessStep.UNDERWRITING_INITIATED) {
                    handleUnderwritingTimeout(state);
                }
                break;
        }
    }
}

2. E-commerce Order Processing

Process Manager for complex order fulfillment:

@Component
public class OrderFulfillmentProcessManager {

    @Autowired
    private ProcessStateRepository processStateRepository;

    @Autowired
    private InventoryService inventoryService;

    @Autowired
    private PaymentService paymentService;

    @Autowired
    private ShippingService shippingService;

    @Autowired
    private NotificationService notificationService;

    @EventListener
    public void handleOrderPlaced(OrderPlacedEvent event) {
        ProcessState state = new ProcessState();
        state.setProcessId(event.getOrderId());
        state.setCurrentStep(ProcessStep.INVENTORY_RESERVATION_INITIATED);
        state.setProcessData(event.getOrderData());

        processStateRepository.save(state);

        // Start parallel inventory reservations for all items
        for (OrderItem item : event.getOrderData().getItems()) {
            InventoryReservationRequest request = new InventoryReservationRequest();
            request.setProcessId(event.getOrderId());
            request.setItemId(item.getItemId());
            request.setQuantity(item.getQuantity());
            request.setReservationId(generateReservationId());

            inventoryService.reserveInventory(request);
        }

        scheduleTimeout(event.getOrderId(), INVENTORY_RESERVATION_TIMEOUT,
                       ProcessTimeoutType.INVENTORY_RESERVATION);
    }

    @EventListener
    public void handleInventoryReserved(InventoryReservedEvent event) {
        ProcessState state = processStateRepository.findByProcessId(event.getProcessId());

        // Track inventory reservations
        state.addProcessData("reservedItems." + event.getItemId(), event.getReservationId());

        // Check if all items are reserved
        OrderData orderData = state.getProcessData();
        boolean allItemsReserved = orderData.getItems().stream()
            .allMatch(item -> state.getProcessData()
                .containsKey("reservedItems." + item.getItemId()));

        if (allItemsReserved) {
            // Proceed to payment processing
            state.setCurrentStep(ProcessStep.PAYMENT_PROCESSING_INITIATED);
            processStateRepository.save(state);

            PaymentRequest paymentRequest = new PaymentRequest();
            paymentRequest.setProcessId(event.getProcessId());
            paymentRequest.setAmount(orderData.getTotalAmount());
            paymentRequest.setPaymentMethod(orderData.getPaymentMethod());
            paymentRequest.setCustomerId(orderData.getCustomerId());

            paymentService.processPayment(paymentRequest);

            scheduleTimeout(event.getProcessId(), PAYMENT_PROCESSING_TIMEOUT,
                           ProcessTimeoutType.PAYMENT_PROCESSING);
        }

        processStateRepository.save(state);
    }

    @EventListener
    public void handleInventoryReservationFailed(InventoryReservationFailedEvent event) {
        ProcessState state = processStateRepository.findByProcessId(event.getProcessId());

        // Handle inventory reservation failure
        state.setCurrentStep(ProcessStep.INVENTORY_COMPENSATION_INITIATED);
        state.addProcessData("failureReason", event.getReason());

        // Release any successful reservations
        compensateInventoryReservations(state);

        // Notify customer of order failure
        state.setCurrentStep(ProcessStep.PROCESS_FAILED);
        processStateRepository.save(state);

        publishOrderFailedEvent(event.getProcessId(), "Inventory not available: " + event.getReason());
    }

    @EventListener
    public void handlePaymentProcessed(PaymentProcessedEvent event) {
        ProcessState state = processStateRepository.findByProcessId(event.getProcessId());

        if (event.getResult() == PaymentResult.SUCCESS) {
            state.setCurrentStep(ProcessStep.SHIPPING_INITIATED);
            state.addProcessData("paymentId", event.getPaymentId());

            // Create shipping request
            ShippingRequest shippingRequest = new ShippingRequest();
            shippingRequest.setProcessId(event.getProcessId());
            shippingRequest.setOrderData(state.getProcessData());
            shippingRequest.setShippingAddress(state.getProcessData().getShippingAddress());

            shippingService.initiateShipping(shippingRequest);

            scheduleTimeout(event.getProcessId(), SHIPPING_INITIATION_TIMEOUT,
                           ProcessTimeoutType.SHIPPING_INITIATION);
        } else {
            // Payment failed - compensate inventory reservations
            state.setCurrentStep(ProcessStep.PAYMENT_COMPENSATION_INITIATED);
            state.addProcessData("paymentFailureReason", event.getFailureReason());

            compensateInventoryReservations(state);

            state.setCurrentStep(ProcessStep.PROCESS_FAILED);
            processStateRepository.save(state);

            publishOrderFailedEvent(event.getProcessId(), "Payment failed: " + event.getFailureReason());
        }

        processStateRepository.save(state);
    }

    @EventListener
    public void handleShippingInitiated(ShippingInitiatedEvent event) {
        ProcessState state = processStateRepository.findByProcessId(event.getProcessId());

        state.setCurrentStep(ProcessStep.ORDER_FULFILLED);
        state.addProcessData("trackingNumber", event.getTrackingNumber());
        state.addProcessData("estimatedDelivery", event.getEstimatedDelivery());

        processStateRepository.save(state);

        // Send order confirmation to customer
        publishOrderFulfilledEvent(event.getProcessId(), 
                                 event.getTrackingNumber(), 
                                 event.getEstimatedDelivery());
    }

    private void compensateInventoryReservations(ProcessState state) {
        OrderData orderData = state.getProcessData();

        for (OrderItem item : orderData.getItems()) {
            String reservationKey = "reservedItems." + item.getItemId();
            if (state.getProcessData().containsKey(reservationKey)) {
                String reservationId = state.getProcessData().get(reservationKey);

                InventoryReleaseRequest releaseRequest = new InventoryReleaseRequest();
                releaseRequest.setProcessId(state.getProcessId());
                releaseRequest.setItemId(item.getItemId());
                releaseRequest.setReservationId(reservationId);

                inventoryService.releaseReservation(releaseRequest);
            }
        }
    }
}

3. Apache Camel Integration

Camel-based Process Manager implementation:

@Component
public class CamelProcessManagerRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {

        // Order processing workflow
        from("kafka:order-events?groupId=process-manager")
            .routeId("order-process-manager")
            .unmarshal().json(JsonLibrary.Jackson, OrderEvent.class)
            .choice()
                .when(simple("${body.eventType} == 'ORDER_PLACED'"))
                    .to("direct:initiateOrderProcess")
                .when(simple("${body.eventType} == 'INVENTORY_RESERVED'"))
                    .to("direct:handleInventoryReserved")
                .when(simple("${body.eventType} == 'PAYMENT_PROCESSED'"))
                    .to("direct:handlePaymentProcessed")
                .when(simple("${body.eventType} == 'SHIPPING_INITIATED'"))
                    .to("direct:handleShippingInitiated")
                .otherwise()
                    .log("Unknown order event type: ${body.eventType}")
            .end();

        from("direct:initiateOrderProcess")
            .routeId("initiate-order-process")
            .process(exchange -> {
                OrderEvent orderEvent = exchange.getIn().getBody(OrderEvent.class);
                ProcessState state = new ProcessState();
                state.setProcessId(orderEvent.getOrderId());
                state.setCurrentStep("INVENTORY_CHECK_INITIATED");
                state.setProcessData(orderEvent.getOrderData());

                // Save process state
                processStateRepository.save(state);

                exchange.getIn().setBody(state);
            })
            .multicast().parallelProcessing()
                .to("direct:checkInventoryAvailability")
                .to("direct:validatePaymentMethod")
            .end()
            .log("Order process initiated for order: ${body.processId}");

        from("direct:checkInventoryAvailability")
            .routeId("check-inventory-availability")
            .process(exchange -> {
                ProcessState state = exchange.getIn().getBody(ProcessState.class);
                OrderData orderData = state.getProcessData();

                for (OrderItem item : orderData.getItems()) {
                    InventoryCheckRequest request = new InventoryCheckRequest();
                    request.setProcessId(state.getProcessId());
                    request.setItemId(item.getItemId());
                    request.setQuantity(item.getQuantity());

                    exchange.getIn().setBody(request);
                }
            })
            .to("kafka:inventory-check-requests")
            .log("Inventory check initiated for process: ${body.processId}");

        from("direct:handleInventoryReserved")
            .routeId("handle-inventory-reserved")
            .process(exchange -> {
                OrderEvent event = exchange.getIn().getBody(OrderEvent.class);
                ProcessState state = processStateRepository.findByProcessId(event.getProcessId());

                // Update process state
                state.addInventoryReservation(event.getItemId(), event.getReservationId());

                // Check if all items are reserved
                if (state.allItemsReserved()) {
                    state.setCurrentStep("PAYMENT_PROCESSING_INITIATED");

                    PaymentRequest paymentRequest = new PaymentRequest();
                    paymentRequest.setProcessId(event.getProcessId());
                    paymentRequest.setAmount(state.getProcessData().getTotalAmount());

                    exchange.getIn().setBody(paymentRequest);

                    processStateRepository.save(state);
                } else {
                    exchange.getIn().setBody(null);
                }
            })
            .choice()
                .when(body().isNotNull())
                    .to("kafka:payment-processing-requests")
                    .log("Payment processing initiated for process: ${body.processId}")
                .otherwise()
                    .log("Waiting for more inventory reservations")
            .end();

        from("direct:handlePaymentProcessed")
            .routeId("handle-payment-processed")
            .process(exchange -> {
                OrderEvent event = exchange.getIn().getBody(OrderEvent.class);
                ProcessState state = processStateRepository.findByProcessId(event.getProcessId());

                if (event.getPaymentResult().equals("SUCCESS")) {
                    state.setCurrentStep("SHIPPING_INITIATED");
                    state.addProcessData("paymentId", event.getPaymentId());

                    ShippingRequest shippingRequest = new ShippingRequest();
                    shippingRequest.setProcessId(event.getProcessId());
                    shippingRequest.setOrderData(state.getProcessData());

                    exchange.getIn().setBody(shippingRequest);

                    processStateRepository.save(state);
                } else {
                    // Handle payment failure
                    exchange.getIn().setHeader("compensationRequired", true);
                    exchange.getIn().setBody(state);
                }
            })
            .choice()
                .when(header("compensationRequired").isEqualTo(true))
                    .to("direct:compensateOrderProcess")
                .otherwise()
                    .to("kafka:shipping-requests")
                    .log("Shipping initiated for process: ${body.processId}")
            .end();

        from("direct:compensateOrderProcess")
            .routeId("compensate-order-process")
            .process(exchange -> {
                ProcessState state = exchange.getIn().getBody(ProcessState.class);

                // Release inventory reservations
                for (String itemId : state.getReservedItems().keySet()) {
                    String reservationId = state.getReservedItems().get(itemId);

                    InventoryReleaseRequest releaseRequest = new InventoryReleaseRequest();
                    releaseRequest.setProcessId(state.getProcessId());
                    releaseRequest.setItemId(itemId);
                    releaseRequest.setReservationId(reservationId);

                    // Send release request
                }

                state.setCurrentStep("PROCESS_COMPENSATED");
                processStateRepository.save(state);
            })
            .to("kafka:inventory-release-requests")
            .log("Order process compensated for process: ${body.processId}");

        // Process timeout handling
        from("timer:processTimeoutChecker?period=60000")
            .routeId("process-timeout-checker")
            .process(exchange -> {
                List<ProcessState> timedOutProcesses = processStateRepository.findTimedOutProcesses();
                exchange.getIn().setBody(timedOutProcesses);
            })
            .split(body())
            .process(exchange -> {
                ProcessState state = exchange.getIn().getBody(ProcessState.class);

                ProcessTimeoutEvent timeoutEvent = new ProcessTimeoutEvent();
                timeoutEvent.setProcessId(state.getProcessId());
                timeoutEvent.setCurrentStep(state.getCurrentStep());
                timeoutEvent.setTimeoutDuration(state.getTimeoutDuration());

                exchange.getIn().setBody(timeoutEvent);
            })
            .to("kafka:process-timeout-events")
            .log("Process timeout detected for process: ${body.processId}");
    }
}

Best Practices

1. Process Design and Architecture

2. State Management and Persistence

3. Message Coordination and Correlation

4. Error Handling and Compensation

5. Performance and Scalability

6. Monitoring and Observability

The Process Manager pattern is essential for orchestrating complex, stateful business workflows in distributed enterprise integration architectures, providing centralized coordination, state management, and reliable process execution across multiple services and systems.

← Back to All Patterns