Process Manager
Overview
The Process Manager pattern is a sophisticated orchestration mechanism in enterprise integration architectures that coordinates complex, stateful business workflows by maintaining process state and managing the flow of messages between participating services. Like a conductor directing an orchestra where each musician must play their part at the right time and in the right order, the Process Manager ensures that multi-step business processes execute correctly by tracking progress, managing state transitions, and handling exceptional conditions. This pattern is essential for managing long-running, stateful business processes that require coordination between multiple services while maintaining visibility, control, and reliability throughout the entire workflow lifecycle.
Theoretical Foundation
The Process Manager pattern is grounded in workflow management theory, finite state machine principles, and distributed coordination patterns. It incorporates concepts from business process management (BPM), state machine design, event-driven architecture, and distributed systems coordination to provide a robust framework for managing complex business processes that span multiple services, systems, and organizational boundaries. The pattern addresses the challenges of maintaining state consistency, handling process variations, and ensuring reliable completion of long-running workflows in distributed environments.
Core Principles
1. Stateful Orchestration
Managing complex business process state and coordination: - Process state management - maintaining complete workflow state throughout execution - State transitions - managing transitions between process steps based on business rules - Process correlation - tracking related messages and events across the workflow - Context preservation - maintaining business context and data throughout the process
2. Message-Driven Coordination
Coordinating services through intelligent message management: - Message routing - directing messages to appropriate services based on process state - Response correlation - matching responses to outstanding requests using correlation identifiers - Timeout management - handling delayed or missing responses from participating services - Message ordering - ensuring messages are processed in the correct sequence
3. Process Flow Control
Managing the execution flow of complex business processes: - Conditional branching - executing different paths based on business conditions - Parallel execution - coordinating concurrent process steps for improved performance - Process synchronization - coordinating parallel branches and collecting results - Loop handling - managing iterative process steps and batch processing scenarios
4. Exception and Error Handling
Comprehensive strategies for managing process failures and exceptions: - Error detection - identifying failures and exceptional conditions during process execution - Process compensation - undoing completed steps when process failures occur - Alternative flows - executing alternative process paths when primary flows fail - Human intervention - escalating complex exceptions to human operators for resolution
Why Process Manager is Essential in Integration Architecture
1. Complex Workflow Coordination
In enterprise integration scenarios, the Process Manager pattern provides: - Multi-service coordination - orchestrating interactions between numerous independent services - Long-running process management - handling business processes that span days, weeks, or months - State consistency - maintaining consistent process state across distributed services - Process visibility - providing complete visibility into workflow progress and status
2. Business Process Automation
For sophisticated business workflow scenarios: - Approval workflows - managing multi-level approval processes with conditional routing - Order fulfillment - coordinating complex order processing across multiple systems - Customer onboarding - orchestrating multi-step customer registration and verification processes - Supply chain management - coordinating procurement, manufacturing, and distribution workflows
3. System Integration
Enabling seamless integration across heterogeneous systems: - Legacy system integration - coordinating with existing systems through standardized interfaces - Third-party service integration - managing interactions with external service providers - Cross-platform coordination - orchestrating processes across different technology platforms - Multi-tenant process management - managing workflows for multiple organizational units
4. Operational Excellence
Enhancing operational efficiency and reliability: - Process monitoring - providing real-time visibility into workflow execution status - Error handling - systematic approaches to handling and recovering from failures - Performance optimization - optimizing workflow execution through parallel processing and caching - Compliance support - maintaining audit trails and supporting regulatory compliance requirements
Benefits in Integration Contexts
1. Technical Advantages
- Centralized coordination - single point of control for complex multi-service workflows
- State management - reliable persistence and recovery of process state information
- Message correlation - accurate tracking and correlation of related messages and events
- Exception handling - sophisticated error handling and recovery mechanisms
2. Business Value
- Process visibility - complete transparency into business process execution and status
- Business agility - ability to rapidly modify and adapt business processes
- Compliance assurance - maintaining comprehensive audit trails for regulatory compliance
- Operational efficiency - optimized resource utilization through intelligent workflow coordination
3. Integration Enablement
- Service orchestration - coordinating complex interactions between multiple services
- Protocol mediation - handling communication between services using different protocols
- Data transformation - managing data format conversions throughout the workflow
- Transaction coordination - ensuring transactional consistency across distributed operations
4. Maintainability and Evolution
- Process modularity - clear separation between process logic and service implementations
- Change management - ability to modify process flows without changing individual services
- Testing support - comprehensive testing capabilities for complex workflow scenarios
- Version management - managing multiple versions of business processes simultaneously
Integration Architecture Applications
1. Financial Services
Process Manager applications in financial workflows:
@Component
public class LoanApplicationProcessManager {
@Autowired
private ProcessStateRepository processStateRepository;
@Autowired
private CreditCheckService creditCheckService;
@Autowired
private DocumentVerificationService documentService;
@Autowired
private UnderwritingService underwritingService;
@EventListener
public void handleLoanApplicationSubmitted(LoanApplicationSubmittedEvent event) {
ProcessState state = new ProcessState();
state.setProcessId(event.getProcessId());
state.setApplicationId(event.getApplicationId());
state.setCurrentStep(ProcessStep.CREDIT_CHECK_INITIATED);
state.setProcessData(event.getApplicationData());
processStateRepository.save(state);
// Initiate credit check
CreditCheckRequest request = new CreditCheckRequest();
request.setProcessId(event.getProcessId());
request.setCustomerSSN(event.getApplicationData().getCustomerSSN());
request.setRequestedAmount(event.getApplicationData().getLoanAmount());
creditCheckService.initiateCheck(request);
// Set timeout for credit check response
scheduleTimeout(event.getProcessId(), CREDIT_CHECK_TIMEOUT,
ProcessTimeoutType.CREDIT_CHECK);
}
@EventListener
public void handleCreditCheckCompleted(CreditCheckCompletedEvent event) {
ProcessState state = processStateRepository.findByProcessId(event.getProcessId());
if (state.getCurrentStep() != ProcessStep.CREDIT_CHECK_INITIATED) {
log.warn("Unexpected credit check response for process {}", event.getProcessId());
return;
}
state.setCurrentStep(ProcessStep.CREDIT_CHECK_COMPLETED);
state.addProcessData("creditScore", event.getCreditScore());
state.addProcessData("creditCheckResult", event.getResult());
if (event.getResult() == CreditCheckResult.APPROVED) {
// Proceed to document verification
state.setCurrentStep(ProcessStep.DOCUMENT_VERIFICATION_INITIATED);
processStateRepository.save(state);
DocumentVerificationRequest docRequest = new DocumentVerificationRequest();
docRequest.setProcessId(event.getProcessId());
docRequest.setDocuments(state.getProcessData().getDocuments());
documentService.initiateVerification(docRequest);
scheduleTimeout(event.getProcessId(), DOCUMENT_VERIFICATION_TIMEOUT,
ProcessTimeoutType.DOCUMENT_VERIFICATION);
} else {
// Handle credit check failure
state.setCurrentStep(ProcessStep.PROCESS_REJECTED);
state.setRejectionReason("Credit check failed: " + event.getReason());
processStateRepository.save(state);
publishApplicationRejectedEvent(event.getProcessId(), event.getReason());
}
}
@EventListener
public void handleDocumentVerificationCompleted(DocumentVerificationCompletedEvent event) {
ProcessState state = processStateRepository.findByProcessId(event.getProcessId());
if (state.getCurrentStep() != ProcessStep.DOCUMENT_VERIFICATION_INITIATED) {
log.warn("Unexpected document verification response for process {}",
event.getProcessId());
return;
}
state.setCurrentStep(ProcessStep.DOCUMENT_VERIFICATION_COMPLETED);
state.addProcessData("documentVerificationResult", event.getResult());
if (event.getResult() == VerificationResult.VERIFIED) {
// Proceed to underwriting
state.setCurrentStep(ProcessStep.UNDERWRITING_INITIATED);
processStateRepository.save(state);
UnderwritingRequest underwritingRequest = new UnderwritingRequest();
underwritingRequest.setProcessId(event.getProcessId());
underwritingRequest.setCreditScore(state.getProcessData().getCreditScore());
underwritingRequest.setApplicationData(state.getProcessData());
underwritingRequest.setDocumentationStatus(event.getResult());
underwritingService.initiateUnderwriting(underwritingRequest);
scheduleTimeout(event.getProcessId(), UNDERWRITING_TIMEOUT,
ProcessTimeoutType.UNDERWRITING);
} else {
// Handle document verification failure
state.setCurrentStep(ProcessStep.PROCESS_REJECTED);
state.setRejectionReason("Document verification failed");
processStateRepository.save(state);
publishApplicationRejectedEvent(event.getProcessId(),
"Document verification failed");
}
}
@EventListener
public void handleUnderwritingCompleted(UnderwritingCompletedEvent event) {
ProcessState state = processStateRepository.findByProcessId(event.getProcessId());
if (state.getCurrentStep() != ProcessStep.UNDERWRITING_INITIATED) {
log.warn("Unexpected underwriting response for process {}", event.getProcessId());
return;
}
state.setCurrentStep(ProcessStep.UNDERWRITING_COMPLETED);
state.addProcessData("underwritingResult", event.getResult());
state.addProcessData("approvedAmount", event.getApprovedAmount());
state.addProcessData("interestRate", event.getInterestRate());
if (event.getResult() == UnderwritingResult.APPROVED) {
state.setCurrentStep(ProcessStep.PROCESS_COMPLETED);
processStateRepository.save(state);
publishApplicationApprovedEvent(event.getProcessId(),
event.getApprovedAmount(),
event.getInterestRate());
} else {
state.setCurrentStep(ProcessStep.PROCESS_REJECTED);
state.setRejectionReason("Underwriting declined: " + event.getReason());
processStateRepository.save(state);
publishApplicationRejectedEvent(event.getProcessId(), event.getReason());
}
}
@EventListener
public void handleProcessTimeout(ProcessTimeoutEvent event) {
ProcessState state = processStateRepository.findByProcessId(event.getProcessId());
switch (event.getTimeoutType()) {
case CREDIT_CHECK:
if (state.getCurrentStep() == ProcessStep.CREDIT_CHECK_INITIATED) {
handleCreditCheckTimeout(state);
}
break;
case DOCUMENT_VERIFICATION:
if (state.getCurrentStep() == ProcessStep.DOCUMENT_VERIFICATION_INITIATED) {
handleDocumentVerificationTimeout(state);
}
break;
case UNDERWRITING:
if (state.getCurrentStep() == ProcessStep.UNDERWRITING_INITIATED) {
handleUnderwritingTimeout(state);
}
break;
}
}
}
2. E-commerce Order Processing
Process Manager for complex order fulfillment:
@Component
public class OrderFulfillmentProcessManager {
@Autowired
private ProcessStateRepository processStateRepository;
@Autowired
private InventoryService inventoryService;
@Autowired
private PaymentService paymentService;
@Autowired
private ShippingService shippingService;
@Autowired
private NotificationService notificationService;
@EventListener
public void handleOrderPlaced(OrderPlacedEvent event) {
ProcessState state = new ProcessState();
state.setProcessId(event.getOrderId());
state.setCurrentStep(ProcessStep.INVENTORY_RESERVATION_INITIATED);
state.setProcessData(event.getOrderData());
processStateRepository.save(state);
// Start parallel inventory reservations for all items
for (OrderItem item : event.getOrderData().getItems()) {
InventoryReservationRequest request = new InventoryReservationRequest();
request.setProcessId(event.getOrderId());
request.setItemId(item.getItemId());
request.setQuantity(item.getQuantity());
request.setReservationId(generateReservationId());
inventoryService.reserveInventory(request);
}
scheduleTimeout(event.getOrderId(), INVENTORY_RESERVATION_TIMEOUT,
ProcessTimeoutType.INVENTORY_RESERVATION);
}
@EventListener
public void handleInventoryReserved(InventoryReservedEvent event) {
ProcessState state = processStateRepository.findByProcessId(event.getProcessId());
// Track inventory reservations
state.addProcessData("reservedItems." + event.getItemId(), event.getReservationId());
// Check if all items are reserved
OrderData orderData = state.getProcessData();
boolean allItemsReserved = orderData.getItems().stream()
.allMatch(item -> state.getProcessData()
.containsKey("reservedItems." + item.getItemId()));
if (allItemsReserved) {
// Proceed to payment processing
state.setCurrentStep(ProcessStep.PAYMENT_PROCESSING_INITIATED);
processStateRepository.save(state);
PaymentRequest paymentRequest = new PaymentRequest();
paymentRequest.setProcessId(event.getProcessId());
paymentRequest.setAmount(orderData.getTotalAmount());
paymentRequest.setPaymentMethod(orderData.getPaymentMethod());
paymentRequest.setCustomerId(orderData.getCustomerId());
paymentService.processPayment(paymentRequest);
scheduleTimeout(event.getProcessId(), PAYMENT_PROCESSING_TIMEOUT,
ProcessTimeoutType.PAYMENT_PROCESSING);
}
processStateRepository.save(state);
}
@EventListener
public void handleInventoryReservationFailed(InventoryReservationFailedEvent event) {
ProcessState state = processStateRepository.findByProcessId(event.getProcessId());
// Handle inventory reservation failure
state.setCurrentStep(ProcessStep.INVENTORY_COMPENSATION_INITIATED);
state.addProcessData("failureReason", event.getReason());
// Release any successful reservations
compensateInventoryReservations(state);
// Notify customer of order failure
state.setCurrentStep(ProcessStep.PROCESS_FAILED);
processStateRepository.save(state);
publishOrderFailedEvent(event.getProcessId(), "Inventory not available: " + event.getReason());
}
@EventListener
public void handlePaymentProcessed(PaymentProcessedEvent event) {
ProcessState state = processStateRepository.findByProcessId(event.getProcessId());
if (event.getResult() == PaymentResult.SUCCESS) {
state.setCurrentStep(ProcessStep.SHIPPING_INITIATED);
state.addProcessData("paymentId", event.getPaymentId());
// Create shipping request
ShippingRequest shippingRequest = new ShippingRequest();
shippingRequest.setProcessId(event.getProcessId());
shippingRequest.setOrderData(state.getProcessData());
shippingRequest.setShippingAddress(state.getProcessData().getShippingAddress());
shippingService.initiateShipping(shippingRequest);
scheduleTimeout(event.getProcessId(), SHIPPING_INITIATION_TIMEOUT,
ProcessTimeoutType.SHIPPING_INITIATION);
} else {
// Payment failed - compensate inventory reservations
state.setCurrentStep(ProcessStep.PAYMENT_COMPENSATION_INITIATED);
state.addProcessData("paymentFailureReason", event.getFailureReason());
compensateInventoryReservations(state);
state.setCurrentStep(ProcessStep.PROCESS_FAILED);
processStateRepository.save(state);
publishOrderFailedEvent(event.getProcessId(), "Payment failed: " + event.getFailureReason());
}
processStateRepository.save(state);
}
@EventListener
public void handleShippingInitiated(ShippingInitiatedEvent event) {
ProcessState state = processStateRepository.findByProcessId(event.getProcessId());
state.setCurrentStep(ProcessStep.ORDER_FULFILLED);
state.addProcessData("trackingNumber", event.getTrackingNumber());
state.addProcessData("estimatedDelivery", event.getEstimatedDelivery());
processStateRepository.save(state);
// Send order confirmation to customer
publishOrderFulfilledEvent(event.getProcessId(),
event.getTrackingNumber(),
event.getEstimatedDelivery());
}
private void compensateInventoryReservations(ProcessState state) {
OrderData orderData = state.getProcessData();
for (OrderItem item : orderData.getItems()) {
String reservationKey = "reservedItems." + item.getItemId();
if (state.getProcessData().containsKey(reservationKey)) {
String reservationId = state.getProcessData().get(reservationKey);
InventoryReleaseRequest releaseRequest = new InventoryReleaseRequest();
releaseRequest.setProcessId(state.getProcessId());
releaseRequest.setItemId(item.getItemId());
releaseRequest.setReservationId(reservationId);
inventoryService.releaseReservation(releaseRequest);
}
}
}
}
3. Apache Camel Integration
Camel-based Process Manager implementation:
@Component
public class CamelProcessManagerRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
// Order processing workflow
from("kafka:order-events?groupId=process-manager")
.routeId("order-process-manager")
.unmarshal().json(JsonLibrary.Jackson, OrderEvent.class)
.choice()
.when(simple("${body.eventType} == 'ORDER_PLACED'"))
.to("direct:initiateOrderProcess")
.when(simple("${body.eventType} == 'INVENTORY_RESERVED'"))
.to("direct:handleInventoryReserved")
.when(simple("${body.eventType} == 'PAYMENT_PROCESSED'"))
.to("direct:handlePaymentProcessed")
.when(simple("${body.eventType} == 'SHIPPING_INITIATED'"))
.to("direct:handleShippingInitiated")
.otherwise()
.log("Unknown order event type: ${body.eventType}")
.end();
from("direct:initiateOrderProcess")
.routeId("initiate-order-process")
.process(exchange -> {
OrderEvent orderEvent = exchange.getIn().getBody(OrderEvent.class);
ProcessState state = new ProcessState();
state.setProcessId(orderEvent.getOrderId());
state.setCurrentStep("INVENTORY_CHECK_INITIATED");
state.setProcessData(orderEvent.getOrderData());
// Save process state
processStateRepository.save(state);
exchange.getIn().setBody(state);
})
.multicast().parallelProcessing()
.to("direct:checkInventoryAvailability")
.to("direct:validatePaymentMethod")
.end()
.log("Order process initiated for order: ${body.processId}");
from("direct:checkInventoryAvailability")
.routeId("check-inventory-availability")
.process(exchange -> {
ProcessState state = exchange.getIn().getBody(ProcessState.class);
OrderData orderData = state.getProcessData();
for (OrderItem item : orderData.getItems()) {
InventoryCheckRequest request = new InventoryCheckRequest();
request.setProcessId(state.getProcessId());
request.setItemId(item.getItemId());
request.setQuantity(item.getQuantity());
exchange.getIn().setBody(request);
}
})
.to("kafka:inventory-check-requests")
.log("Inventory check initiated for process: ${body.processId}");
from("direct:handleInventoryReserved")
.routeId("handle-inventory-reserved")
.process(exchange -> {
OrderEvent event = exchange.getIn().getBody(OrderEvent.class);
ProcessState state = processStateRepository.findByProcessId(event.getProcessId());
// Update process state
state.addInventoryReservation(event.getItemId(), event.getReservationId());
// Check if all items are reserved
if (state.allItemsReserved()) {
state.setCurrentStep("PAYMENT_PROCESSING_INITIATED");
PaymentRequest paymentRequest = new PaymentRequest();
paymentRequest.setProcessId(event.getProcessId());
paymentRequest.setAmount(state.getProcessData().getTotalAmount());
exchange.getIn().setBody(paymentRequest);
processStateRepository.save(state);
} else {
exchange.getIn().setBody(null);
}
})
.choice()
.when(body().isNotNull())
.to("kafka:payment-processing-requests")
.log("Payment processing initiated for process: ${body.processId}")
.otherwise()
.log("Waiting for more inventory reservations")
.end();
from("direct:handlePaymentProcessed")
.routeId("handle-payment-processed")
.process(exchange -> {
OrderEvent event = exchange.getIn().getBody(OrderEvent.class);
ProcessState state = processStateRepository.findByProcessId(event.getProcessId());
if (event.getPaymentResult().equals("SUCCESS")) {
state.setCurrentStep("SHIPPING_INITIATED");
state.addProcessData("paymentId", event.getPaymentId());
ShippingRequest shippingRequest = new ShippingRequest();
shippingRequest.setProcessId(event.getProcessId());
shippingRequest.setOrderData(state.getProcessData());
exchange.getIn().setBody(shippingRequest);
processStateRepository.save(state);
} else {
// Handle payment failure
exchange.getIn().setHeader("compensationRequired", true);
exchange.getIn().setBody(state);
}
})
.choice()
.when(header("compensationRequired").isEqualTo(true))
.to("direct:compensateOrderProcess")
.otherwise()
.to("kafka:shipping-requests")
.log("Shipping initiated for process: ${body.processId}")
.end();
from("direct:compensateOrderProcess")
.routeId("compensate-order-process")
.process(exchange -> {
ProcessState state = exchange.getIn().getBody(ProcessState.class);
// Release inventory reservations
for (String itemId : state.getReservedItems().keySet()) {
String reservationId = state.getReservedItems().get(itemId);
InventoryReleaseRequest releaseRequest = new InventoryReleaseRequest();
releaseRequest.setProcessId(state.getProcessId());
releaseRequest.setItemId(itemId);
releaseRequest.setReservationId(reservationId);
// Send release request
}
state.setCurrentStep("PROCESS_COMPENSATED");
processStateRepository.save(state);
})
.to("kafka:inventory-release-requests")
.log("Order process compensated for process: ${body.processId}");
// Process timeout handling
from("timer:processTimeoutChecker?period=60000")
.routeId("process-timeout-checker")
.process(exchange -> {
List<ProcessState> timedOutProcesses = processStateRepository.findTimedOutProcesses();
exchange.getIn().setBody(timedOutProcesses);
})
.split(body())
.process(exchange -> {
ProcessState state = exchange.getIn().getBody(ProcessState.class);
ProcessTimeoutEvent timeoutEvent = new ProcessTimeoutEvent();
timeoutEvent.setProcessId(state.getProcessId());
timeoutEvent.setCurrentStep(state.getCurrentStep());
timeoutEvent.setTimeoutDuration(state.getTimeoutDuration());
exchange.getIn().setBody(timeoutEvent);
})
.to("kafka:process-timeout-events")
.log("Process timeout detected for process: ${body.processId}");
}
}
Best Practices
1. Process Design and Architecture
- Design processes with clear state transitions and well-defined steps
- Implement comprehensive process state persistence to handle system failures
- Use correlation identifiers to track related messages and events
- Design for process versioning to handle evolving business requirements
- Implement process instance isolation to prevent cross-contamination
2. State Management and Persistence
- Persist process state consistently using transactional updates
- Implement process state snapshots for long-running workflows
- Use event sourcing for complete process audit trails
- Design process state recovery mechanisms for system restart scenarios
- Implement process state cleanup for completed workflows
3. Message Coordination and Correlation
- Use unique correlation identifiers for all related messages
- Implement message deduplication to handle duplicate deliveries
- Design timeout handling for all external service interactions
- Use message versioning to handle evolving service interfaces
- Implement comprehensive message logging for debugging and monitoring
4. Error Handling and Compensation
- Design comprehensive error handling for all process steps
- Implement compensation logic for reversible operations
- Use dead letter queues for unprocessable messages
- Plan for manual intervention in exceptional scenarios
- Implement process recovery mechanisms for partial failures
5. Performance and Scalability
- Use parallel processing where business logic allows
- Implement process step caching where appropriate and safe
- Design for horizontal scaling of process manager instances
- Use asynchronous messaging for improved throughput
- Monitor and optimize process execution performance
6. Monitoring and Observability
- Implement comprehensive process execution logging
- Provide real-time dashboards for process monitoring
- Create alerts for process failures and timeout scenarios
- Track process completion rates and performance metrics
- Implement distributed tracing for complex process debugging
The Process Manager pattern is essential for orchestrating complex, stateful business workflows in distributed enterprise integration architectures, providing centralized coordination, state management, and reliable process execution across multiple services and systems.
← Back to All Patterns