Message Routing Pattern
Overview
The Message Routing pattern provides intelligent message distribution capabilities that determine the appropriate destination for messages based on content, headers, or routing rules. It implements sophisticated routing logic that enables dynamic message flow control, load balancing, and content-based message distribution across multiple endpoints. This pattern serves as the foundation for building flexible, adaptive messaging architectures that can evolve with changing business requirements.
Theoretical Foundation
Message Routing is grounded in graph theory and network routing principles applied to messaging systems. It implements "store-and-forward" semantics with intelligent path determination based on message characteristics and system state. The pattern embodies "content-aware routing" where routing decisions are made based on message content, metadata, and contextual information rather than static configuration.
Core Principles
1. Dynamic Route Determination
Routing decisions are made at runtime based on message content, headers, system state, and configured routing rules, enabling adaptive message flow that responds to changing conditions.
2. Content-Based Message Distribution
Messages are routed based on their content, type, or metadata, enabling fine-grained control over message distribution and processing workflows.
3. Route Optimization and Load Balancing
The routing system optimizes message delivery paths and distributes load across available endpoints to maximize throughput and minimize latency.
4. Fault-Tolerant Routing
Routing logic includes fallback mechanisms, alternative paths, and error handling to ensure message delivery even when primary routes are unavailable.
Why Message Routing is Essential in Integration Architecture
1. Enterprise Integration Requirements
Complex enterprise environments require sophisticated message routing: - Multi-system integration routing messages between diverse enterprise applications - Legacy system modernization enabling gradual transition through intelligent routing - B2B communication routing messages to appropriate trading partners and systems - Compliance and governance ensuring messages follow required processing paths
2. Microservices Architecture Support
Microservices architectures depend on effective message routing: - Service discovery integration routing messages to appropriate service instances - Version management routing requests to correct service versions - Canary deployments gradually routing traffic to new service versions - Circuit breaker integration routing around failed services
3. Scalability and Performance Optimization
Message routing enables system scalability: - Load distribution across multiple processing endpoints - Geographic routing directing messages to closest processing centers - Priority-based routing ensuring critical messages receive appropriate handling - Capacity-aware routing directing messages based on endpoint availability
4. Business Process Flexibility
Dynamic routing supports business agility: - A/B testing routing messages to different processing pipelines for testing - Feature flags enabling/disabling features through routing configuration - Business rule implementation routing messages based on business logic - Workflow orchestration directing messages through complex business processes
Benefits in Integration Contexts
1. Architecture Flexibility
- Loose coupling between message producers and consumers
- Runtime reconfiguration enabling dynamic routing changes without system restarts
- Technology independence routing messages between different platforms and protocols
- Gradual system evolution enabling incremental changes without disrupting existing flows
2. Operational Excellence
- High availability through redundant routing paths and failover mechanisms
- Performance monitoring with detailed routing metrics and analytics
- Troubleshooting support through comprehensive routing logs and tracing
- Capacity management based on routing patterns and endpoint utilization
3. Development Productivity
- Simplified integration through declarative routing configuration
- Testing flexibility enabling isolated testing of individual routing paths
- Debugging capabilities with detailed routing trace information
- Rapid prototyping through quick routing rule modifications
4. Business Value
- Faster time-to-market through flexible integration capabilities
- Cost optimization through efficient resource utilization
- Risk mitigation through redundant routing and fallback mechanisms
- Compliance assurance through controlled message routing paths
Integration Architecture Applications
1. Enterprise Service Bus (ESB) Implementation
Message routing forms the core of ESB architectures: - Service mediation routing messages between different service interfaces - Protocol transformation routing messages through appropriate protocol adapters - Message transformation routing messages through transformation pipelines - Error handling routing failed messages to appropriate error handling services
2. API Gateway Integration
API gateways use message routing for request distribution: - Backend service routing directing API requests to appropriate microservices - Version routing managing API versioning through intelligent request routing - Authentication routing directing requests through appropriate authentication services - Rate limiting routing requests based on quotas and throttling policies
3. Event-Driven Architecture Routing
Event systems require sophisticated routing capabilities: - Event type routing distributing events to appropriate event handlers - Subscriber routing routing events to interested subscribers - Event transformation routing routing events through processing pipelines - Dead letter routing handling failed event processing
4. Data Pipeline Orchestration
Data processing systems use routing for pipeline management: - Data source routing directing data from various sources to processing systems - Processing stage routing orchestrating multi-stage data processing workflows - Quality assurance routing directing data through validation and cleansing processes - Output routing distributing processed data to appropriate destination systems
How Message Routing Pattern Works
Message routing operates through configurable routing engines that evaluate messages against routing rules and direct them to appropriate destinations:
Basic Routing Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Message Router │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────────┐ │
│ │ Message │──▶ │ Routing │──▶ │ Destination │ │
│ │ Classifier │ │ Engine │ │ Selector │ │
│ └─────────────┘ └──────────────┘ └─────────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────────┐ │
│ │ Content │ │ Rule │ │ Load Balancer │ │
│ │ Analyzer │ │ Evaluator │ │ │ │
│ └─────────────┘ └──────────────┘ └─────────────────────┘ │
│ │
├─────────────────────────────────────────────────────────────────┤
│ Routing Outcomes │
│ │
│ Message A ──┬──▶ Service 1 │
│ └──▶ Service 2 (Load Balanced) │
│ │
│ Message B ──────▶ Service 3 (Content-based) │
│ │
│ Message C ──┬──▶ Service 4 (Primary) │
│ └──▶ Dead Letter Queue (Fallback) │
└─────────────────────────────────────────────────────────────────┘
Routing Decision Process
┌────────────────────────────────────────────────────────────┐
│ Routing Decision Flow │
├────────────────────────────────────────────────────────────┤
│ │
│ 1. [Message Received] ──▶ 2. [Extract Routing Metadata] │
│ │ │
│ ▼ │
│ 6. [Route Message] ◀── 5. [Select Route] ◀── 3. [Evaluate Rules] │
│ │ │
│ ▼ │
│ 4. [Apply Filters & Conditions] │
│ │
│ Routing Criteria: │
│ • Message content and headers │
│ • Destination availability │
│ • Load balancing algorithms │
│ • Business rules and policies │
│ • Error handling requirements │
└────────────────────────────────────────────────────────────┘
Routing Strategies
1. Content-Based Routing
Message Content ──▶ [Content Analyzer] ──▶ Route Decision
if (message.type == "ORDER") → order.processing.queue
if (message.priority == "HIGH") → priority.processing.queue
if (message.amount > 1000) → manual.review.queue
2. Header-Based Routing
Message Headers ──▶ [Header Evaluator] ──▶ Route Decision
if (headers.region == "EU") → eu.processing.service
if (headers.version == "v2") → v2.api.endpoint
if (headers.tenant == "premium") → premium.service.tier
3. Load-Balanced Routing
Available Endpoints ──▶ [Load Balancer] ──▶ Endpoint Selection
Round Robin: endpoint1 → endpoint2 → endpoint3 → endpoint1
Weighted: endpoint1 (70%) → endpoint2 (30%)
Least Connections: route to endpoint with fewest active connections
4. Conditional Routing
Business Rules ──▶ [Rule Engine] ──▶ Route Decision
if (customer.tier == "VIP") → vip.service.endpoint
if (order.total > threshold) → approval.workflow.service
if (system.time in business_hours) → live.service else batch.service
Key Components
1. Message Router
Core routing engine that evaluates messages and determines destinations:
@Component
public class MessageRouter {
private final List<RoutingRule> routingRules;
private final RouteEvaluator routeEvaluator;
private final LoadBalancer loadBalancer;
private final RouteMetrics routeMetrics;
private final FallbackHandler fallbackHandler;
public RoutingResult route(Message message, RoutingContext context) {
try {
// Extract routing metadata
RoutingMetadata metadata = extractRoutingMetadata(message, context);
// Evaluate routing rules
List<RoutingRule> applicableRules = findApplicableRules(metadata);
if (applicableRules.isEmpty()) {
return handleNoMatchingRules(message, metadata);
}
// Determine best route
Route selectedRoute = selectBestRoute(applicableRules, metadata);
if (selectedRoute == null) {
return handleNoValidRoute(message, metadata);
}
// Apply load balancing if multiple endpoints
Endpoint targetEndpoint = selectEndpoint(selectedRoute, metadata);
// Validate endpoint availability
if (!isEndpointAvailable(targetEndpoint)) {
return handleUnavailableEndpoint(message, selectedRoute, metadata);
}
// Route message
RoutingResult result = executeRoute(message, targetEndpoint, selectedRoute);
// Record metrics
routeMetrics.recordSuccessfulRoute(selectedRoute, targetEndpoint, result);
return result;
} catch (Exception e) {
log.error("Error routing message: {}", e.getMessage(), e);
routeMetrics.recordRoutingError(message, e);
return fallbackHandler.handleRoutingError(message, context, e);
}
}
private RoutingMetadata extractRoutingMetadata(Message message, RoutingContext context) {
return RoutingMetadata.builder()
.messageType(message.getType())
.headers(message.getHeaders())
.contentLength(message.getPayload().length)
.priority(message.getPriority())
.timestamp(message.getTimestamp())
.correlationId(context.getCorrelationId())
.sourceEndpoint(context.getSourceEndpoint())
.routingAttributes(extractContentAttributes(message))
.build();
}
private List<RoutingRule> findApplicableRules(RoutingMetadata metadata) {
return routingRules.stream()
.filter(rule -> rule.isApplicable(metadata))
.sorted(Comparator.comparing(RoutingRule::getPriority).reversed())
.collect(Collectors.toList());
}
private Route selectBestRoute(List<RoutingRule> rules, RoutingMetadata metadata) {
for (RoutingRule rule : rules) {
List<Route> candidateRoutes = rule.evaluateRoutes(metadata);
if (!candidateRoutes.isEmpty()) {
Route selectedRoute = selectOptimalRoute(candidateRoutes, metadata);
if (selectedRoute != null && isRouteValid(selectedRoute, metadata)) {
return selectedRoute;
}
}
}
return null;
}
private Route selectOptimalRoute(List<Route> candidateRoutes, RoutingMetadata metadata) {
// Apply route selection strategy
RouteSelectionStrategy strategy = determineSelectionStrategy(metadata);
switch (strategy) {
case PRIORITY_BASED:
return candidateRoutes.stream()
.max(Comparator.comparing(Route::getPriority))
.orElse(null);
case PERFORMANCE_BASED:
return candidateRoutes.stream()
.min(Comparator.comparing(this::getRouteLatency))
.orElse(null);
case CAPACITY_BASED:
return candidateRoutes.stream()
.min(Comparator.comparing(this::getRouteUtilization))
.orElse(null);
default:
return candidateRoutes.get(0);
}
}
private Endpoint selectEndpoint(Route route, RoutingMetadata metadata) {
List<Endpoint> availableEndpoints = getAvailableEndpoints(route);
if (availableEndpoints.isEmpty()) {
throw new NoAvailableEndpointException("No available endpoints for route: " + route.getName());
}
if (availableEndpoints.size() == 1) {
return availableEndpoints.get(0);
}
// Apply load balancing
return loadBalancer.selectEndpoint(availableEndpoints, metadata);
}
private RoutingResult executeRoute(Message message, Endpoint endpoint, Route route) {
RouteExecutor executor = getRouteExecutor(route.getType());
return executor.execute(message, endpoint, route);
}
private Map<String, Object> extractContentAttributes(Message message) {
// Extract routing attributes from message content
try {
if (message.getType().equals("JSON")) {
return jsonAttributeExtractor.extract(message.getPayload());
} else if (message.getType().equals("XML")) {
return xmlAttributeExtractor.extract(message.getPayload());
} else {
return Collections.emptyMap();
}
} catch (Exception e) {
log.warn("Failed to extract content attributes: {}", e.getMessage());
return Collections.emptyMap();
}
}
}
2. Routing Rules Engine
Manages and evaluates routing rules:
@Component
public class RoutingRulesEngine {
private final RuleRepository ruleRepository;
private final ExpressionEvaluator expressionEvaluator;
private final RuleCache ruleCache;
public List<RoutingRule> evaluateRules(RoutingMetadata metadata) {
List<RoutingRule> cachedRules = ruleCache.getRules(metadata.getCacheKey());
if (cachedRules != null) {
return filterApplicableRules(cachedRules, metadata);
}
List<RoutingRule> allRules = ruleRepository.getAllActiveRules();
List<RoutingRule> applicableRules = filterApplicableRules(allRules, metadata);
ruleCache.putRules(metadata.getCacheKey(), applicableRules);
return applicableRules;
}
private List<RoutingRule> filterApplicableRules(List<RoutingRule> rules, RoutingMetadata metadata) {
return rules.stream()
.filter(rule -> evaluateRuleCondition(rule, metadata))
.sorted(Comparator.comparing(RoutingRule::getPriority).reversed())
.collect(Collectors.toList());
}
private boolean evaluateRuleCondition(RoutingRule rule, RoutingMetadata metadata) {
try {
RuleCondition condition = rule.getCondition();
return expressionEvaluator.evaluate(condition, createEvaluationContext(metadata));
} catch (Exception e) {
log.error("Error evaluating rule condition for rule {}: {}", rule.getId(), e.getMessage(), e);
return false;
}
}
private EvaluationContext createEvaluationContext(RoutingMetadata metadata) {
EvaluationContext context = new StandardEvaluationContext();
// Add metadata fields to context
context.setVariable("messageType", metadata.getMessageType());
context.setVariable("headers", metadata.getHeaders());
context.setVariable("contentLength", metadata.getContentLength());
context.setVariable("priority", metadata.getPriority());
context.setVariable("timestamp", metadata.getTimestamp());
context.setVariable("attributes", metadata.getRoutingAttributes());
// Add utility functions
context.setVariable("now", Instant.now());
context.setVariable("today", LocalDate.now());
context.setVariable("isBusinessHours", isBusinessHours());
return context;
}
public RoutingRule createRule(RuleDefinition definition) {
validateRuleDefinition(definition);
RoutingRule rule = RoutingRule.builder()
.id(UUID.randomUUID().toString())
.name(definition.getName())
.description(definition.getDescription())
.condition(compileCondition(definition.getConditionExpression()))
.routes(definition.getRoutes())
.priority(definition.getPriority())
.enabled(definition.isEnabled())
.createdAt(Instant.now())
.build();
ruleRepository.save(rule);
ruleCache.invalidate(); // Clear cache after rule change
return rule;
}
public void updateRule(String ruleId, RuleDefinition definition) {
RoutingRule existingRule = ruleRepository.findById(ruleId)
.orElseThrow(() -> new RuleNotFoundException("Rule not found: " + ruleId));
validateRuleDefinition(definition);
RoutingRule updatedRule = existingRule.toBuilder()
.name(definition.getName())
.description(definition.getDescription())
.condition(compileCondition(definition.getConditionExpression()))
.routes(definition.getRoutes())
.priority(definition.getPriority())
.enabled(definition.isEnabled())
.updatedAt(Instant.now())
.build();
ruleRepository.save(updatedRule);
ruleCache.invalidate();
log.info("Updated routing rule: {}", ruleId);
}
private RuleCondition compileCondition(String expression) {
try {
return expressionEvaluator.compile(expression);
} catch (Exception e) {
throw new RuleCompilationException("Failed to compile rule condition: " + expression, e);
}
}
private void validateRuleDefinition(RuleDefinition definition) {
if (definition.getName() == null || definition.getName().trim().isEmpty()) {
throw new IllegalArgumentException("Rule name cannot be null or empty");
}
if (definition.getConditionExpression() == null || definition.getConditionExpression().trim().isEmpty()) {
throw new IllegalArgumentException("Rule condition cannot be null or empty");
}
if (definition.getRoutes() == null || definition.getRoutes().isEmpty()) {
throw new IllegalArgumentException("Rule must have at least one route");
}
// Test compile the condition
try {
compileCondition(definition.getConditionExpression());
} catch (Exception e) {
throw new IllegalArgumentException("Invalid rule condition expression", e);
}
}
}
// Example routing rules
@Component
public class ContactRoutingRules {
public void setupContactRoutingRules(RoutingRulesEngine rulesEngine) {
// Route high-priority contacts to premium processing
rulesEngine.createRule(RuleDefinition.builder()
.name("High Priority Contact Routing")
.description("Route high-priority contacts to premium processing service")
.conditionExpression("headers['priority'] == 'HIGH' || attributes['customerTier'] == 'VIP'")
.routes(Arrays.asList(
Route.builder()
.name("premium-contact-processing")
.endpoint("premium.contact.service")
.priority(100)
.build()
))
.priority(100)
.enabled(true)
.build());
// Route EU contacts to EU processing center
rulesEngine.createRule(RuleDefinition.builder()
.name("EU Contact Geographic Routing")
.description("Route EU contacts to EU data center for GDPR compliance")
.conditionExpression("headers['region'] == 'EU' || attributes['country'] in {'DE', 'FR', 'IT', 'ES'}")
.routes(Arrays.asList(
Route.builder()
.name("eu-contact-processing")
.endpoint("eu.contact.service")
.priority(90)
.build()
))
.priority(90)
.enabled(true)
.build());
// Route large contact batches to batch processing
rulesEngine.createRule(RuleDefinition.builder()
.name("Batch Contact Processing")
.description("Route large contact batches to dedicated batch processing service")
.conditionExpression("messageType == 'BATCH' && contentLength > 1000000")
.routes(Arrays.asList(
Route.builder()
.name("batch-contact-processing")
.endpoint("batch.contact.service")
.priority(80)
.build()
))
.priority(80)
.enabled(true)
.build());
// Default contact routing
rulesEngine.createRule(RuleDefinition.builder()
.name("Default Contact Routing")
.description("Default routing for all other contacts")
.conditionExpression("messageType == 'CONTACT'")
.routes(Arrays.asList(
Route.builder()
.name("standard-contact-processing")
.endpoint("standard.contact.service")
.priority(50)
.build()
))
.priority(10) // Lowest priority - catch-all
.enabled(true)
.build());
}
}
3. Load Balancer
Distributes messages across multiple endpoints:
@Component
public class LoadBalancer {
private final EndpointHealthMonitor healthMonitor;
private final LoadBalancingStrategy strategy;
private final EndpointMetrics endpointMetrics;
public Endpoint selectEndpoint(List<Endpoint> endpoints, RoutingMetadata metadata) {
List<Endpoint> healthyEndpoints = filterHealthyEndpoints(endpoints);
if (healthyEndpoints.isEmpty()) {
throw new NoHealthyEndpointException("No healthy endpoints available");
}
if (healthyEndpoints.size() == 1) {
return healthyEndpoints.get(0);
}
return strategy.selectEndpoint(healthyEndpoints, metadata);
}
private List<Endpoint> filterHealthyEndpoints(List<Endpoint> endpoints) {
return endpoints.stream()
.filter(endpoint -> healthMonitor.isHealthy(endpoint))
.collect(Collectors.toList());
}
// Load balancing strategies
public interface LoadBalancingStrategy {
Endpoint selectEndpoint(List<Endpoint> endpoints, RoutingMetadata metadata);
}
@Component
public static class RoundRobinStrategy implements LoadBalancingStrategy {
private final AtomicLong counter = new AtomicLong(0);
@Override
public Endpoint selectEndpoint(List<Endpoint> endpoints, RoutingMetadata metadata) {
int index = (int) (counter.getAndIncrement() % endpoints.size());
return endpoints.get(index);
}
}
@Component
public static class WeightedRoundRobinStrategy implements LoadBalancingStrategy {
private final Map<String, WeightedEndpointSelector> selectors = new ConcurrentHashMap<>();
@Override
public Endpoint selectEndpoint(List<Endpoint> endpoints, RoutingMetadata metadata) {
String key = createEndpointGroupKey(endpoints);
WeightedEndpointSelector selector = selectors.computeIfAbsent(key,
k -> new WeightedEndpointSelector(endpoints));
return selector.selectEndpoint();
}
private String createEndpointGroupKey(List<Endpoint> endpoints) {
return endpoints.stream()
.map(e -> e.getId() + ":" + e.getWeight())
.collect(Collectors.joining(","));
}
}
@Component
public static class LeastConnectionsStrategy implements LoadBalancingStrategy {
private final EndpointConnectionTracker connectionTracker;
@Override
public Endpoint selectEndpoint(List<Endpoint> endpoints, RoutingMetadata metadata) {
return endpoints.stream()
.min(Comparator.comparing(endpoint ->
connectionTracker.getActiveConnections(endpoint)))
.orElseThrow(() -> new IllegalStateException("No endpoints available"));
}
}
@Component
public static class ResponseTimeStrategy implements LoadBalancingStrategy {
private final EndpointResponseTimeTracker responseTimeTracker;
@Override
public Endpoint selectEndpoint(List<Endpoint> endpoints, RoutingMetadata metadata) {
return endpoints.stream()
.min(Comparator.comparing(endpoint ->
responseTimeTracker.getAverageResponseTime(endpoint)))
.orElseThrow(() -> new IllegalStateException("No endpoints available"));
}
}
@Component
public static class HashBasedStrategy implements LoadBalancingStrategy {
@Override
public Endpoint selectEndpoint(List<Endpoint> endpoints, RoutingMetadata metadata) {
String hashKey = getHashKey(metadata);
int hash = Math.abs(hashKey.hashCode());
int index = hash % endpoints.size();
return endpoints.get(index);
}
private String getHashKey(RoutingMetadata metadata) {
// Use correlation ID or other consistent identifier
String correlationId = metadata.getCorrelationId();
if (correlationId != null) {
return correlationId;
}
// Fallback to message type
return metadata.getMessageType();
}
}
}
4. Route Executor
Executes routing decisions and handles message delivery:
@Component
public class RouteExecutor {
private final Map<RouteType, MessageSender> messageSenders;
private final RetryTemplate retryTemplate;
private final CircuitBreakerRegistry circuitBreakerRegistry;
private final RouteMetrics routeMetrics;
public RoutingResult executeRoute(Message message, Endpoint endpoint, Route route) {
String routeKey = route.getName() + ":" + endpoint.getId();
CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker(routeKey);
return circuitBreaker.executeSupplier(() -> {
Timer.Sample sample = routeMetrics.startRouteTimer(route, endpoint);
try {
RoutingResult result = doExecuteRoute(message, endpoint, route);
sample.stop(Timer.builder("route.execution.duration")
.tag("route", route.getName())
.tag("endpoint", endpoint.getId())
.tag("status", "success")
.register(routeMetrics.getMeterRegistry()));
routeMetrics.recordSuccessfulRoute(route, endpoint);
return result;
} catch (Exception e) {
sample.stop(Timer.builder("route.execution.duration")
.tag("route", route.getName())
.tag("endpoint", endpoint.getId())
.tag("status", "error")
.register(routeMetrics.getMeterRegistry()));
routeMetrics.recordFailedRoute(route, endpoint, e);
throw e;
}
});
}
private RoutingResult doExecuteRoute(Message message, Endpoint endpoint, Route route) {
MessageSender sender = messageSenders.get(route.getType());
if (sender == null) {
throw new UnsupportedRouteTypeException("Unsupported route type: " + route.getType());
}
return retryTemplate.execute(context -> {
log.debug("Executing route {} to endpoint {} (attempt {})",
route.getName(), endpoint.getId(), context.getRetryCount() + 1);
return sender.send(message, endpoint, route);
});
}
// Message senders for different route types
public interface MessageSender {
RoutingResult send(Message message, Endpoint endpoint, Route route);
}
@Component
public static class HttpMessageSender implements MessageSender {
private final RestTemplate restTemplate;
@Override
public RoutingResult send(Message message, Endpoint endpoint, Route route) {
String url = endpoint.getBaseUrl() + route.getPath();
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.putAll(convertHeaders(message.getHeaders()));
HttpEntity<byte[]> request = new HttpEntity<>(message.getPayload(), headers);
ResponseEntity<String> response = restTemplate.exchange(
url, HttpMethod.POST, request, String.class);
return RoutingResult.builder()
.success(response.getStatusCode().is2xxSuccessful())
.endpoint(endpoint)
.route(route)
.response(response.getBody())
.statusCode(response.getStatusCode().value())
.headers(convertResponseHeaders(response.getHeaders()))
.executionTime(Duration.ofMillis(System.currentTimeMillis())) // Simplified
.build();
}
}
@Component
public static class JmsMessageSender implements MessageSender {
private final JmsTemplate jmsTemplate;
@Override
public RoutingResult send(Message message, Endpoint endpoint, Route route) {
String destination = endpoint.getId(); // JMS destination name
jmsTemplate.send(destination, session -> {
javax.jms.Message jmsMessage = session.createBytesMessage();
((BytesMessage) jmsMessage).writeBytes(message.getPayload());
// Add headers
for (Map.Entry<String, String> header : message.getHeaders().entrySet()) {
jmsMessage.setStringProperty(header.getKey(), header.getValue());
}
return jmsMessage;
});
return RoutingResult.builder()
.success(true)
.endpoint(endpoint)
.route(route)
.executionTime(Duration.ofMillis(10)) // Simplified
.build();
}
}
@Component
public static class KafkaMessageSender implements MessageSender {
private final KafkaTemplate<String, byte[]> kafkaTemplate;
@Override
public RoutingResult send(Message message, Endpoint endpoint, Route route) {
String topic = endpoint.getId(); // Kafka topic name
String key = message.getHeaders().get("messageKey");
ProducerRecord<String, byte[]> record = new ProducerRecord<>(
topic, key, message.getPayload());
// Add headers
for (Map.Entry<String, String> header : message.getHeaders().entrySet()) {
record.headers().add(header.getKey(), header.getValue().getBytes());
}
try {
SendResult<String, byte[]> sendResult = kafkaTemplate.send(record).get();
return RoutingResult.builder()
.success(true)
.endpoint(endpoint)
.route(route)
.response("partition=" + sendResult.getRecordMetadata().partition() +
",offset=" + sendResult.getRecordMetadata().offset())
.executionTime(Duration.ofMillis(5)) // Simplified
.build();
} catch (Exception e) {
throw new RoutingExecutionException("Failed to send message to Kafka", e);
}
}
}
}
Configuration Parameters
Essential Settings
| Parameter | Description | Typical Values |
|---|---|---|
| Rule Cache Size | Maximum cached routing rules | 1000-10000 |
| Rule Evaluation Timeout | Max time for rule evaluation | 100ms-1s |
| Endpoint Health Check Interval | Frequency of endpoint health checks | 30s-300s |
| Load Balancing Strategy | Algorithm for endpoint selection | round-robin/weighted/least-connections |
| Circuit Breaker Threshold | Failure threshold for circuit breaking | 50%-90% |
| Retry Attempts | Maximum routing retry attempts | 3-10 |
Example Configuration
# Message Routing Configuration
routing.rules.cache-size=5000
routing.rules.cache-ttl=5m
routing.rules.evaluation-timeout=500ms
routing.rules.reload-interval=1m
# Endpoint Management
routing.endpoints.health-check-interval=30s
routing.endpoints.health-check-timeout=5s
routing.endpoints.failure-threshold=5
routing.endpoints.recovery-check-interval=60s
# Load Balancing
routing.load-balancing.strategy=round-robin
routing.load-balancing.weight-adjustment-interval=5m
routing.load-balancing.sticky-sessions=false
# Circuit Breaker
routing.circuit-breaker.failure-rate-threshold=60%
routing.circuit-breaker.wait-duration-in-open-state=30s
routing.circuit-breaker.sliding-window-size=10
routing.circuit-breaker.minimum-number-of-calls=5
# Retry Configuration
routing.retry.max-attempts=3
routing.retry.backoff-delay=1s
routing.retry.backoff-multiplier=2
routing.retry.max-backoff-delay=30s
# Performance Tuning
routing.thread-pool.core-size=10
routing.thread-pool.max-size=50
routing.thread-pool.queue-capacity=1000
routing.message-buffer-size=1000
Implementation Examples
1. Spring Integration Message Routing
@Configuration
@EnableIntegration
public class SpringIntegrationRoutingConfiguration {
@Bean
public IntegrationFlow contactProcessingFlow() {
return IntegrationFlows
.from("contactInputChannel")
.<ContactMessage>route(message -> {
// Content-based routing
if ("VIP".equals(message.getCustomerTier())) {
return "vipProcessingChannel";
} else if ("BULK".equals(message.getMessageType())) {
return "bulkProcessingChannel";
} else {
return "standardProcessingChannel";
}
})
.get();
}
@Bean
public IntegrationFlow headerBasedRoutingFlow() {
return IntegrationFlows
.from("headerRoutingInputChannel")
.routeToRecipients(r -> r
.recipient("euProcessingChannel", "headers['region'] == 'EU'")
.recipient("usProcessingChannel", "headers['region'] == 'US'")
.recipient("apacProcessingChannel", "headers['region'] == 'APAC'")
.defaultOutputChannel("defaultProcessingChannel"))
.get();
}
@Bean
public IntegrationFlow loadBalancedRoutingFlow() {
return IntegrationFlows
.from("loadBalancedInputChannel")
.route(Message.class, message -> {
// Custom load balancing logic
int hash = Math.abs(message.getHeaders().get("correlationId").hashCode());
return "processingChannel" + (hash % 3 + 1); // Route to one of 3 channels
})
.get();
}
@ServiceActivator(inputChannel = "vipProcessingChannel")
public void handleVipContact(ContactMessage message) {
log.info("Processing VIP contact: {}", message.getContactId());
// VIP processing logic
}
@ServiceActivator(inputChannel = "bulkProcessingChannel")
public void handleBulkContact(ContactMessage message) {
log.info("Processing bulk contact: {}", message.getContactId());
// Bulk processing logic
}
@ServiceActivator(inputChannel = "standardProcessingChannel")
public void handleStandardContact(ContactMessage message) {
log.info("Processing standard contact: {}", message.getContactId());
// Standard processing logic
}
}
2. Apache Camel Routing Implementation
@Configuration
public class CamelRoutingConfiguration {
@Bean
public RouteBuilder contactRoutingRoutes() {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
// Content-based routing with Apache Camel
from("jms:queue:contact.input")
.routeId("contactContentBasedRouting")
.choice()
.when(jsonpath("$.customerTier == 'VIP'"))
.to("jms:queue:contact.vip.processing")
.when(jsonpath("$.messageType == 'BULK'"))
.to("jms:queue:contact.bulk.processing")
.when(header("priority").isEqualTo("HIGH"))
.to("jms:queue:contact.priority.processing")
.otherwise()
.to("jms:queue:contact.standard.processing")
.end();
// Header-based routing
from("jms:queue:contact.regional.input")
.routeId("contactRegionalRouting")
.choice()
.when(header("region").isEqualTo("EU"))
.to("jms:queue:contact.eu.processing")
.when(header("region").isEqualTo("US"))
.to("jms:queue:contact.us.processing")
.when(header("region").isEqualTo("APAC"))
.to("jms:queue:contact.apac.processing")
.otherwise()
.to("jms:queue:contact.default.processing")
.end();
// Load balanced routing with circuit breaker
from("jms:queue:contact.loadbalanced.input")
.routeId("contactLoadBalancedRouting")
.loadBalance()
.roundRobin()
.to("http://contact-service-1/process?bridgeEndpoint=true")
.to("http://contact-service-2/process?bridgeEndpoint=true")
.to("http://contact-service-3/process?bridgeEndpoint=true")
.end()
.onException(Exception.class)
.handled(true)
.to("jms:queue:contact.errors")
.end();
// Dynamic routing based on database lookup
from("jms:queue:contact.dynamic.input")
.routeId("contactDynamicRouting")
.enrichWith("sql:select processing_endpoint from contact_routing where customer_id = :#customerid")
.recipientList(simple("${body[processing_endpoint]}"))
.parallelProcessing()
.stopOnException();
// Routing with content filtering and transformation
from("jms:queue:contact.filtered.input")
.routeId("contactFilteredRouting")
.filter(jsonpath("$.amount > 1000"))
.transform().jsonpath("$.contact")
.choice()
.when(jsonpath("$.contact.type == 'PREMIUM'"))
.to("jms:queue:contact.premium.processing")
.otherwise()
.to("jms:queue:contact.standard.processing")
.end();
}
};
}
@Component
public static class ContactRoutingProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
ContactMessage contact = exchange.getIn().getBody(ContactMessage.class);
// Custom routing logic
String routingKey = determineRoutingKey(contact);
exchange.getIn().setHeader("routingKey", routingKey);
log.info("Routing contact {} to {}", contact.getContactId(), routingKey);
}
private String routingKey(ContactMessage contact) {
if ("VIP".equals(contact.getCustomerTier())) {
return "vip.processing";
} else if (contact.getAmount() != null && contact.getAmount() > 1000) {
return "high.value.processing";
} else {
return "standard.processing";
}
}
}
}
3. Custom Message Router Implementation
@Service
public class CustomMessageRoutingService {
private final MessageRouter messageRouter;
private final RoutingRulesEngine rulesEngine;
private final EndpointRegistry endpointRegistry;
public void processMessage(ContactMessage contactMessage) {
// Convert to internal message format
Message message = convertToMessage(contactMessage);
// Create routing context
RoutingContext context = RoutingContext.builder()
.correlationId(UUID.randomUUID().toString())
.sourceEndpoint("contact.input.service")
.timestamp(Instant.now())
.build();
try {
// Route message
RoutingResult result = messageRouter.route(message, context);
if (result.isSuccess()) {
log.info("Successfully routed message to endpoint: {}",
result.getEndpoint().getId());
} else {
log.error("Failed to route message: {}", result.getErrorMessage());
handleRoutingFailure(message, result);
}
} catch (Exception e) {
log.error("Error routing message: {}", e.getMessage(), e);
handleRoutingException(message, context, e);
}
}
private Message convertToMessage(ContactMessage contactMessage) {
try {
ObjectMapper mapper = new ObjectMapper();
byte[] payload = mapper.writeValueAsBytes(contactMessage);
Map<String, String> headers = new HashMap<>();
headers.put("messageType", "CONTACT");
headers.put("customerTier", contactMessage.getCustomerTier());
headers.put("region", contactMessage.getRegion());
headers.put("priority", contactMessage.getPriority());
return Message.builder()
.id(UUID.randomUUID().toString())
.type("CONTACT")
.payload(payload)
.headers(headers)
.timestamp(Instant.now())
.priority(determinePriority(contactMessage))
.build();
} catch (Exception e) {
throw new MessageConversionException("Failed to convert contact message", e);
}
}
private int determinePriority(ContactMessage contactMessage) {
if ("VIP".equals(contactMessage.getCustomerTier())) {
return 100;
} else if ("HIGH".equals(contactMessage.getPriority())) {
return 80;
} else {
return 50;
}
}
private void handleRoutingFailure(Message message, RoutingResult result) {
// Send to dead letter queue or retry
deadLetterService.handleFailedRouting(message, result);
}
private void handleRoutingException(Message message, RoutingContext context, Exception e) {
// Log and send to error handling service
errorHandlingService.handleRoutingException(message, context, e);
}
}
// Dynamic routing rule management
@RestController
@RequestMapping("/api/routing")
public class RoutingRuleController {
private final RoutingRulesEngine rulesEngine;
@PostMapping("/rules")
public ResponseEntity<RoutingRule> createRule(@RequestBody RuleDefinition definition) {
RoutingRule rule = rulesEngine.createRule(definition);
return ResponseEntity.ok(rule);
}
@PutMapping("/rules/{id}")
public ResponseEntity<Void> updateRule(@PathVariable String id,
@RequestBody RuleDefinition definition) {
rulesEngine.updateRule(id, definition);
return ResponseEntity.ok().build();
}
@DeleteMapping("/rules/{id}")
public ResponseEntity<Void> deleteRule(@PathVariable String id) {
rulesEngine.deleteRule(id);
return ResponseEntity.ok().build();
}
@GetMapping("/rules")
public ResponseEntity<List<RoutingRule>> getAllRules() {
List<RoutingRule> rules = rulesEngine.getAllRules();
return ResponseEntity.ok(rules);
}
@PostMapping("/rules/{id}/test")
public ResponseEntity<RoutingTestResult> testRule(@PathVariable String id,
@RequestBody TestMessage testMessage) {
RoutingTestResult result = rulesEngine.testRule(id, testMessage);
return ResponseEntity.ok(result);
}
}
4. Microservices API Gateway Routing
@Configuration
public class ApiGatewayRouting {
@Bean
public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
return builder.routes()
// Content-based routing for contact API
.route("contact-vip", r -> r
.path("/api/contacts/**")
.and()
.header("Customer-Tier", "VIP")
.uri("http://vip-contact-service:8080"))
// Version-based routing
.route("contact-v2", r -> r
.path("/api/contacts/**")
.and()
.header("API-Version", "v2")
.uri("http://contact-service-v2:8080"))
// Default contact routing
.route("contact-default", r -> r
.path("/api/contacts/**")
.uri("http://contact-service:8080"))
// Geographic routing
.route("contact-eu", r -> r
.path("/api/contacts/**")
.and()
.header("Region", "EU")
.uri("http://eu-contact-service:8080"))
// Load balanced routing
.route("contact-loadbalanced", r -> r
.path("/api/internal/contacts/**")
.uri("lb://contact-service-cluster"))
.build();
}
@Component
public static class CustomRouteFilter implements GlobalFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
// Extract routing metadata
String customerTier = request.getHeaders().getFirst("Customer-Tier");
String region = request.getHeaders().getFirst("Region");
String contentLength = request.getHeaders().getFirst("Content-Length");
// Add routing attributes to request
ServerHttpRequest modifiedRequest = request.mutate()
.header("X-Routing-Tier", customerTier != null ? customerTier : "STANDARD")
.header("X-Routing-Region", region != null ? region : "DEFAULT")
.header("X-Routing-Size", contentLength != null ? contentLength : "0")
.build();
return chain.filter(exchange.mutate().request(modifiedRequest).build());
}
}
}
Best Practices
1. Routing Rule Design
public class RoutingRuleBestPractices {
// Use priority-based rule ordering
public void setupRulePriorities() {
// Highest priority: Security and compliance rules
createRule("GDPR-Routing", "headers['dataClassification'] == 'PERSONAL'", 100);
// High priority: Business-critical routing
createRule("VIP-Customer", "attributes['customerTier'] == 'VIP'", 90);
// Medium priority: Geographic and regional routing
createRule("EU-Routing", "headers['region'] == 'EU'", 70);
// Low priority: Performance optimization routing
createRule("Load-Balancing", "messageType == 'STANDARD'", 50);
// Lowest priority: Default catch-all routing
createRule("Default-Route", "true", 10);
}
// Use efficient rule expressions
public class EfficientRuleExpressions {
// Good: Simple, fast evaluation
String efficientRule = "headers['priority'] == 'HIGH'";
// Avoid: Complex, slow evaluation
String inefficientRule = "jsonPath(payload, '$.customer.orders[?(@.amount > 1000)].length') > 5";
// Better: Pre-computed attributes
String optimizedRule = "attributes['highValueOrderCount'] > 5";
}
// Implement rule testing and validation
public void validateRoutingRules(List<RoutingRule> rules) {
for (RoutingRule rule : rules) {
// Test rule compilation
try {
compileRule(rule);
} catch (Exception e) {
throw new InvalidRuleException("Rule compilation failed: " + rule.getName(), e);
}
// Test rule with sample data
TestMessage[] testCases = createTestCases();
for (TestMessage testCase : testCases) {
try {
evaluateRule(rule, testCase);
} catch (Exception e) {
log.warn("Rule evaluation failed for test case: {} - {}",
rule.getName(), e.getMessage());
}
}
}
}
}
2. Performance Optimization
@Component
public class RoutingPerformanceOptimization {
// Use rule caching for frequently evaluated rules
@Cacheable(value = "routingRules", key = "#metadata.cacheKey")
public List<RoutingRule> getCachedRules(RoutingMetadata metadata) {
return evaluateRules(metadata);
}
// Implement parallel rule evaluation for complex scenarios
public List<RoutingRule> evaluateRulesInParallel(RoutingMetadata metadata) {
List<RoutingRule> allRules = getAllRules();
return allRules.parallelStream()
.filter(rule -> evaluateRuleCondition(rule, metadata))
.sorted(Comparator.comparing(RoutingRule::getPriority).reversed())
.collect(Collectors.toList());
}
// Use connection pooling for external endpoints
@Bean
public RestTemplate routingRestTemplate() {
HttpComponentsClientHttpRequestFactory factory =
new HttpComponentsClientHttpRequestFactory();
// Configure connection pooling
PoolingHttpClientConnectionManager connectionManager =
new PoolingHttpClientConnectionManager();
connectionManager.setMaxTotal(100);
connectionManager.setDefaultMaxPerRoute(20);
CloseableHttpClient httpClient = HttpClients.custom()
.setConnectionManager(connectionManager)
.build();
factory.setHttpClient(httpClient);
RestTemplate restTemplate = new RestTemplate(factory);
restTemplate.setErrorHandler(new RoutingResponseErrorHandler());
return restTemplate;
}
// Optimize message parsing and attribute extraction
public Map<String, Object> extractAttributesEfficiently(Message message) {
// Use streaming JSON parser for large messages
if (message.getPayload().length > 10000) {
return streamingJsonExtractor.extract(message.getPayload());
} else {
return standardJsonExtractor.extract(message.getPayload());
}
}
}
3. Monitoring and Observability
@Component
public class RoutingMonitoring {
private final MeterRegistry meterRegistry;
private final DistributedTracing tracing;
public void recordRoutingMetrics(RoutingResult result) {
String routeName = result.getRoute().getName();
String endpointId = result.getEndpoint().getId();
String status = result.isSuccess() ? "success" : "failure";
// Record routing count
meterRegistry.counter("routing.messages",
"route", routeName,
"endpoint", endpointId,
"status", status)
.increment();
// Record routing latency
meterRegistry.timer("routing.duration",
"route", routeName,
"endpoint", endpointId)
.record(result.getExecutionTime());
// Record endpoint health
meterRegistry.gauge("routing.endpoint.health",
Tags.of("endpoint", endpointId),
result.isSuccess() ? 1.0 : 0.0);
}
public void traceRouting(Message message, RoutingContext context, RoutingResult result) {
Span span = tracing.nextSpan()
.name("message.routing")
.tag("route.name", result.getRoute().getName())
.tag("endpoint.id", result.getEndpoint().getId())
.tag("message.type", message.getType())
.tag("correlation.id", context.getCorrelationId())
.start();
try (Tracer.SpanInScope ws = tracing.tracer().withSpanInScope(span)) {
if (result.isSuccess()) {
span.tag("routing.status", "success");
} else {
span.tag("routing.status", "failure");
span.tag("error.message", result.getErrorMessage());
}
} finally {
span.end();
}
}
@EventListener
public void handleRoutingAlert(RoutingEvent event) {
if (event.getType() == RoutingEventType.ENDPOINT_DOWN) {
alertingService.sendAlert(AlertLevel.CRITICAL,
"Endpoint is down: " + event.getEndpointId());
}
if (event.getType() == RoutingEventType.HIGH_ERROR_RATE) {
alertingService.sendAlert(AlertLevel.WARNING,
"High error rate detected for route: " + event.getRouteName());
}
}
}
Common Pitfalls
1. Rule Conflicts and Ambiguity
Problem: Multiple rules matching the same message causing unpredictable routing
Solution: Use clear rule priorities and comprehensive testing
2. Performance Degradation from Complex Rules
Problem: Slow rule evaluation affecting message throughput
Solution: Optimize rule expressions and implement caching
3. Endpoint Health Management
Problem: Routing to unavailable endpoints causing failures
Solution: Implement comprehensive health checking and circuit breakers
4. Rule Management Complexity
Problem: Difficult to maintain and debug large numbers of routing rules
Solution: Use rule categorization, testing frameworks, and documentation
5. Security Vulnerabilities in Dynamic Routing
Problem: Routing decisions based on untrusted input enabling attacks
Solution: Validate and sanitize all routing inputs and use allowlisting
Integration in Distributed Systems
Event-Driven Microservices
@Service
public class EventRoutingService {
public void routeEvent(DomainEvent event) {
RoutingContext context = createEventContext(event);
messageRouter.route(convertEventToMessage(event), context);
}
}
API Gateway Integration
@Component
public class GatewayRoutingFilter implements GatewayFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return determineRoute(exchange.getRequest())
.flatMap(route -> routeToEndpoint(exchange, route, chain));
}
}
Conclusion
The Message Routing pattern is essential for building flexible, scalable distributed systems. It provides:
- Dynamic Message Distribution: Intelligent routing based on content and context
- System Decoupling: Loose coupling between message producers and consumers
- Operational Flexibility: Runtime configuration changes without system restarts
- Performance Optimization: Load balancing and efficient resource utilization
When properly implemented with comprehensive rule management, performance optimization, and monitoring, message routing enables adaptive architectures that can evolve with changing business requirements while maintaining high performance and reliability.
References
- Enterprise Integration Patterns - Message Router
- Apache Camel Routing Documentation
- Spring Integration Router
- Microservices Patterns - API Gateway