This site is in English. Use your browser's built-in translate feature to read it in your language.

Message Routing Pattern

Overview

The Message Routing pattern provides intelligent message distribution capabilities that determine the appropriate destination for messages based on content, headers, or routing rules. It implements sophisticated routing logic that enables dynamic message flow control, load balancing, and content-based message distribution across multiple endpoints. This pattern serves as the foundation for building flexible, adaptive messaging architectures that can evolve with changing business requirements.

Theoretical Foundation

Message Routing is grounded in graph theory and network routing principles applied to messaging systems. It implements "store-and-forward" semantics with intelligent path determination based on message characteristics and system state. The pattern embodies "content-aware routing" where routing decisions are made based on message content, metadata, and contextual information rather than static configuration.

Core Principles

1. Dynamic Route Determination

Routing decisions are made at runtime based on message content, headers, system state, and configured routing rules, enabling adaptive message flow that responds to changing conditions.

2. Content-Based Message Distribution

Messages are routed based on their content, type, or metadata, enabling fine-grained control over message distribution and processing workflows.

3. Route Optimization and Load Balancing

The routing system optimizes message delivery paths and distributes load across available endpoints to maximize throughput and minimize latency.

4. Fault-Tolerant Routing

Routing logic includes fallback mechanisms, alternative paths, and error handling to ensure message delivery even when primary routes are unavailable.

Why Message Routing is Essential in Integration Architecture

1. Enterprise Integration Requirements

Complex enterprise environments require sophisticated message routing: - Multi-system integration routing messages between diverse enterprise applications - Legacy system modernization enabling gradual transition through intelligent routing - B2B communication routing messages to appropriate trading partners and systems - Compliance and governance ensuring messages follow required processing paths

2. Microservices Architecture Support

Microservices architectures depend on effective message routing: - Service discovery integration routing messages to appropriate service instances - Version management routing requests to correct service versions - Canary deployments gradually routing traffic to new service versions - Circuit breaker integration routing around failed services

3. Scalability and Performance Optimization

Message routing enables system scalability: - Load distribution across multiple processing endpoints - Geographic routing directing messages to closest processing centers - Priority-based routing ensuring critical messages receive appropriate handling - Capacity-aware routing directing messages based on endpoint availability

4. Business Process Flexibility

Dynamic routing supports business agility: - A/B testing routing messages to different processing pipelines for testing - Feature flags enabling/disabling features through routing configuration - Business rule implementation routing messages based on business logic - Workflow orchestration directing messages through complex business processes

Benefits in Integration Contexts

1. Architecture Flexibility

2. Operational Excellence

3. Development Productivity

4. Business Value

Integration Architecture Applications

1. Enterprise Service Bus (ESB) Implementation

Message routing forms the core of ESB architectures: - Service mediation routing messages between different service interfaces - Protocol transformation routing messages through appropriate protocol adapters - Message transformation routing messages through transformation pipelines - Error handling routing failed messages to appropriate error handling services

2. API Gateway Integration

API gateways use message routing for request distribution: - Backend service routing directing API requests to appropriate microservices - Version routing managing API versioning through intelligent request routing - Authentication routing directing requests through appropriate authentication services - Rate limiting routing requests based on quotas and throttling policies

3. Event-Driven Architecture Routing

Event systems require sophisticated routing capabilities: - Event type routing distributing events to appropriate event handlers - Subscriber routing routing events to interested subscribers - Event transformation routing routing events through processing pipelines - Dead letter routing handling failed event processing

4. Data Pipeline Orchestration

Data processing systems use routing for pipeline management: - Data source routing directing data from various sources to processing systems - Processing stage routing orchestrating multi-stage data processing workflows - Quality assurance routing directing data through validation and cleansing processes - Output routing distributing processed data to appropriate destination systems

How Message Routing Pattern Works

Message routing operates through configurable routing engines that evaluate messages against routing rules and direct them to appropriate destinations:

Basic Routing Architecture

┌─────────────────────────────────────────────────────────────────┐
│                     Message Router                             │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────┐    ┌──────────────┐    ┌─────────────────────┐ │
│  │ Message     │──▶ │ Routing      │──▶ │ Destination         │ │
│  │ Classifier  │    │ Engine       │    │ Selector            │ │
│  └─────────────┘    └──────────────┘    └─────────────────────┘ │
│          │                  │                       │           │
│          ▼                  ▼                       ▼           │
│  ┌─────────────┐    ┌──────────────┐    ┌─────────────────────┐ │
│  │ Content     │    │ Rule         │    │ Load Balancer       │ │
│  │ Analyzer    │    │ Evaluator    │    │                     │ │
│  └─────────────┘    └──────────────┘    └─────────────────────┘ │
│                                                                 │
├─────────────────────────────────────────────────────────────────┤
│                    Routing Outcomes                             │
│                                                                 │
│ Message A ──┬──▶ Service 1                                      │
│             └──▶ Service 2 (Load Balanced)                     │
│                                                                 │
│ Message B ──────▶ Service 3 (Content-based)                    │
│                                                                 │
│ Message C ──┬──▶ Service 4 (Primary)                           │
│             └──▶ Dead Letter Queue (Fallback)                  │
└─────────────────────────────────────────────────────────────────┘

Routing Decision Process

┌────────────────────────────────────────────────────────────┐
│                Routing Decision Flow                       │
├────────────────────────────────────────────────────────────┤
│                                                            │
│ 1. [Message Received] ──▶ 2. [Extract Routing Metadata]   │
│                                      │                    │
│                                      ▼                    │
│ 6. [Route Message] ◀── 5. [Select Route] ◀── 3. [Evaluate Rules] │
│                                      │                    │
│                                      ▼                    │
│                           4. [Apply Filters & Conditions] │
│                                                            │
│ Routing Criteria:                                          │
│ • Message content and headers                              │
│ • Destination availability                                 │
│ • Load balancing algorithms                                │
│ • Business rules and policies                              │
│ • Error handling requirements                              │
└────────────────────────────────────────────────────────────┘

Routing Strategies

1. Content-Based Routing

Message Content ──▶ [Content Analyzer] ──▶ Route Decision

if (message.type == "ORDER") → order.processing.queue
if (message.priority == "HIGH") → priority.processing.queue  
if (message.amount > 1000) → manual.review.queue

2. Header-Based Routing

Message Headers ──▶ [Header Evaluator] ──▶ Route Decision

if (headers.region == "EU") → eu.processing.service
if (headers.version == "v2") → v2.api.endpoint
if (headers.tenant == "premium") → premium.service.tier

3. Load-Balanced Routing

Available Endpoints ──▶ [Load Balancer] ──▶ Endpoint Selection

Round Robin: endpoint1 → endpoint2 → endpoint3 → endpoint1
Weighted: endpoint1 (70%) → endpoint2 (30%)
Least Connections: route to endpoint with fewest active connections

4. Conditional Routing

Business Rules ──▶ [Rule Engine] ──▶ Route Decision

if (customer.tier == "VIP") → vip.service.endpoint
if (order.total > threshold) → approval.workflow.service
if (system.time in business_hours) → live.service else batch.service

Key Components

1. Message Router

Core routing engine that evaluates messages and determines destinations:

@Component
public class MessageRouter {
    private final List<RoutingRule> routingRules;
    private final RouteEvaluator routeEvaluator;
    private final LoadBalancer loadBalancer;
    private final RouteMetrics routeMetrics;
    private final FallbackHandler fallbackHandler;

    public RoutingResult route(Message message, RoutingContext context) {
        try {
            // Extract routing metadata
            RoutingMetadata metadata = extractRoutingMetadata(message, context);

            // Evaluate routing rules
            List<RoutingRule> applicableRules = findApplicableRules(metadata);

            if (applicableRules.isEmpty()) {
                return handleNoMatchingRules(message, metadata);
            }

            // Determine best route
            Route selectedRoute = selectBestRoute(applicableRules, metadata);

            if (selectedRoute == null) {
                return handleNoValidRoute(message, metadata);
            }

            // Apply load balancing if multiple endpoints
            Endpoint targetEndpoint = selectEndpoint(selectedRoute, metadata);

            // Validate endpoint availability
            if (!isEndpointAvailable(targetEndpoint)) {
                return handleUnavailableEndpoint(message, selectedRoute, metadata);
            }

            // Route message
            RoutingResult result = executeRoute(message, targetEndpoint, selectedRoute);

            // Record metrics
            routeMetrics.recordSuccessfulRoute(selectedRoute, targetEndpoint, result);

            return result;

        } catch (Exception e) {
            log.error("Error routing message: {}", e.getMessage(), e);
            routeMetrics.recordRoutingError(message, e);
            return fallbackHandler.handleRoutingError(message, context, e);
        }
    }

    private RoutingMetadata extractRoutingMetadata(Message message, RoutingContext context) {
        return RoutingMetadata.builder()
            .messageType(message.getType())
            .headers(message.getHeaders())
            .contentLength(message.getPayload().length)
            .priority(message.getPriority())
            .timestamp(message.getTimestamp())
            .correlationId(context.getCorrelationId())
            .sourceEndpoint(context.getSourceEndpoint())
            .routingAttributes(extractContentAttributes(message))
            .build();
    }

    private List<RoutingRule> findApplicableRules(RoutingMetadata metadata) {
        return routingRules.stream()
            .filter(rule -> rule.isApplicable(metadata))
            .sorted(Comparator.comparing(RoutingRule::getPriority).reversed())
            .collect(Collectors.toList());
    }

    private Route selectBestRoute(List<RoutingRule> rules, RoutingMetadata metadata) {
        for (RoutingRule rule : rules) {
            List<Route> candidateRoutes = rule.evaluateRoutes(metadata);

            if (!candidateRoutes.isEmpty()) {
                Route selectedRoute = selectOptimalRoute(candidateRoutes, metadata);
                if (selectedRoute != null && isRouteValid(selectedRoute, metadata)) {
                    return selectedRoute;
                }
            }
        }

        return null;
    }

    private Route selectOptimalRoute(List<Route> candidateRoutes, RoutingMetadata metadata) {
        // Apply route selection strategy
        RouteSelectionStrategy strategy = determineSelectionStrategy(metadata);

        switch (strategy) {
            case PRIORITY_BASED:
                return candidateRoutes.stream()
                    .max(Comparator.comparing(Route::getPriority))
                    .orElse(null);
            case PERFORMANCE_BASED:
                return candidateRoutes.stream()
                    .min(Comparator.comparing(this::getRouteLatency))
                    .orElse(null);
            case CAPACITY_BASED:
                return candidateRoutes.stream()
                    .min(Comparator.comparing(this::getRouteUtilization))
                    .orElse(null);
            default:
                return candidateRoutes.get(0);
        }
    }

    private Endpoint selectEndpoint(Route route, RoutingMetadata metadata) {
        List<Endpoint> availableEndpoints = getAvailableEndpoints(route);

        if (availableEndpoints.isEmpty()) {
            throw new NoAvailableEndpointException("No available endpoints for route: " + route.getName());
        }

        if (availableEndpoints.size() == 1) {
            return availableEndpoints.get(0);
        }

        // Apply load balancing
        return loadBalancer.selectEndpoint(availableEndpoints, metadata);
    }

    private RoutingResult executeRoute(Message message, Endpoint endpoint, Route route) {
        RouteExecutor executor = getRouteExecutor(route.getType());

        return executor.execute(message, endpoint, route);
    }

    private Map<String, Object> extractContentAttributes(Message message) {
        // Extract routing attributes from message content
        try {
            if (message.getType().equals("JSON")) {
                return jsonAttributeExtractor.extract(message.getPayload());
            } else if (message.getType().equals("XML")) {
                return xmlAttributeExtractor.extract(message.getPayload());
            } else {
                return Collections.emptyMap();
            }
        } catch (Exception e) {
            log.warn("Failed to extract content attributes: {}", e.getMessage());
            return Collections.emptyMap();
        }
    }
}

2. Routing Rules Engine

Manages and evaluates routing rules:

@Component
public class RoutingRulesEngine {
    private final RuleRepository ruleRepository;
    private final ExpressionEvaluator expressionEvaluator;
    private final RuleCache ruleCache;

    public List<RoutingRule> evaluateRules(RoutingMetadata metadata) {
        List<RoutingRule> cachedRules = ruleCache.getRules(metadata.getCacheKey());

        if (cachedRules != null) {
            return filterApplicableRules(cachedRules, metadata);
        }

        List<RoutingRule> allRules = ruleRepository.getAllActiveRules();
        List<RoutingRule> applicableRules = filterApplicableRules(allRules, metadata);

        ruleCache.putRules(metadata.getCacheKey(), applicableRules);

        return applicableRules;
    }

    private List<RoutingRule> filterApplicableRules(List<RoutingRule> rules, RoutingMetadata metadata) {
        return rules.stream()
            .filter(rule -> evaluateRuleCondition(rule, metadata))
            .sorted(Comparator.comparing(RoutingRule::getPriority).reversed())
            .collect(Collectors.toList());
    }

    private boolean evaluateRuleCondition(RoutingRule rule, RoutingMetadata metadata) {
        try {
            RuleCondition condition = rule.getCondition();
            return expressionEvaluator.evaluate(condition, createEvaluationContext(metadata));
        } catch (Exception e) {
            log.error("Error evaluating rule condition for rule {}: {}", rule.getId(), e.getMessage(), e);
            return false;
        }
    }

    private EvaluationContext createEvaluationContext(RoutingMetadata metadata) {
        EvaluationContext context = new StandardEvaluationContext();

        // Add metadata fields to context
        context.setVariable("messageType", metadata.getMessageType());
        context.setVariable("headers", metadata.getHeaders());
        context.setVariable("contentLength", metadata.getContentLength());
        context.setVariable("priority", metadata.getPriority());
        context.setVariable("timestamp", metadata.getTimestamp());
        context.setVariable("attributes", metadata.getRoutingAttributes());

        // Add utility functions
        context.setVariable("now", Instant.now());
        context.setVariable("today", LocalDate.now());
        context.setVariable("isBusinessHours", isBusinessHours());

        return context;
    }

    public RoutingRule createRule(RuleDefinition definition) {
        validateRuleDefinition(definition);

        RoutingRule rule = RoutingRule.builder()
            .id(UUID.randomUUID().toString())
            .name(definition.getName())
            .description(definition.getDescription())
            .condition(compileCondition(definition.getConditionExpression()))
            .routes(definition.getRoutes())
            .priority(definition.getPriority())
            .enabled(definition.isEnabled())
            .createdAt(Instant.now())
            .build();

        ruleRepository.save(rule);
        ruleCache.invalidate(); // Clear cache after rule change

        return rule;
    }

    public void updateRule(String ruleId, RuleDefinition definition) {
        RoutingRule existingRule = ruleRepository.findById(ruleId)
            .orElseThrow(() -> new RuleNotFoundException("Rule not found: " + ruleId));

        validateRuleDefinition(definition);

        RoutingRule updatedRule = existingRule.toBuilder()
            .name(definition.getName())
            .description(definition.getDescription())
            .condition(compileCondition(definition.getConditionExpression()))
            .routes(definition.getRoutes())
            .priority(definition.getPriority())
            .enabled(definition.isEnabled())
            .updatedAt(Instant.now())
            .build();

        ruleRepository.save(updatedRule);
        ruleCache.invalidate();

        log.info("Updated routing rule: {}", ruleId);
    }

    private RuleCondition compileCondition(String expression) {
        try {
            return expressionEvaluator.compile(expression);
        } catch (Exception e) {
            throw new RuleCompilationException("Failed to compile rule condition: " + expression, e);
        }
    }

    private void validateRuleDefinition(RuleDefinition definition) {
        if (definition.getName() == null || definition.getName().trim().isEmpty()) {
            throw new IllegalArgumentException("Rule name cannot be null or empty");
        }

        if (definition.getConditionExpression() == null || definition.getConditionExpression().trim().isEmpty()) {
            throw new IllegalArgumentException("Rule condition cannot be null or empty");
        }

        if (definition.getRoutes() == null || definition.getRoutes().isEmpty()) {
            throw new IllegalArgumentException("Rule must have at least one route");
        }

        // Test compile the condition
        try {
            compileCondition(definition.getConditionExpression());
        } catch (Exception e) {
            throw new IllegalArgumentException("Invalid rule condition expression", e);
        }
    }
}

// Example routing rules
@Component
public class ContactRoutingRules {

    public void setupContactRoutingRules(RoutingRulesEngine rulesEngine) {

        // Route high-priority contacts to premium processing
        rulesEngine.createRule(RuleDefinition.builder()
            .name("High Priority Contact Routing")
            .description("Route high-priority contacts to premium processing service")
            .conditionExpression("headers['priority'] == 'HIGH' || attributes['customerTier'] == 'VIP'")
            .routes(Arrays.asList(
                Route.builder()
                    .name("premium-contact-processing")
                    .endpoint("premium.contact.service")
                    .priority(100)
                    .build()
            ))
            .priority(100)
            .enabled(true)
            .build());

        // Route EU contacts to EU processing center
        rulesEngine.createRule(RuleDefinition.builder()
            .name("EU Contact Geographic Routing")
            .description("Route EU contacts to EU data center for GDPR compliance")
            .conditionExpression("headers['region'] == 'EU' || attributes['country'] in {'DE', 'FR', 'IT', 'ES'}")
            .routes(Arrays.asList(
                Route.builder()
                    .name("eu-contact-processing")
                    .endpoint("eu.contact.service")
                    .priority(90)
                    .build()
            ))
            .priority(90)
            .enabled(true)
            .build());

        // Route large contact batches to batch processing
        rulesEngine.createRule(RuleDefinition.builder()
            .name("Batch Contact Processing")
            .description("Route large contact batches to dedicated batch processing service")
            .conditionExpression("messageType == 'BATCH' && contentLength > 1000000")
            .routes(Arrays.asList(
                Route.builder()
                    .name("batch-contact-processing")
                    .endpoint("batch.contact.service")
                    .priority(80)
                    .build()
            ))
            .priority(80)
            .enabled(true)
            .build());

        // Default contact routing
        rulesEngine.createRule(RuleDefinition.builder()
            .name("Default Contact Routing")
            .description("Default routing for all other contacts")
            .conditionExpression("messageType == 'CONTACT'")
            .routes(Arrays.asList(
                Route.builder()
                    .name("standard-contact-processing")
                    .endpoint("standard.contact.service")
                    .priority(50)
                    .build()
            ))
            .priority(10) // Lowest priority - catch-all
            .enabled(true)
            .build());
    }
}

3. Load Balancer

Distributes messages across multiple endpoints:

@Component
public class LoadBalancer {
    private final EndpointHealthMonitor healthMonitor;
    private final LoadBalancingStrategy strategy;
    private final EndpointMetrics endpointMetrics;

    public Endpoint selectEndpoint(List<Endpoint> endpoints, RoutingMetadata metadata) {
        List<Endpoint> healthyEndpoints = filterHealthyEndpoints(endpoints);

        if (healthyEndpoints.isEmpty()) {
            throw new NoHealthyEndpointException("No healthy endpoints available");
        }

        if (healthyEndpoints.size() == 1) {
            return healthyEndpoints.get(0);
        }

        return strategy.selectEndpoint(healthyEndpoints, metadata);
    }

    private List<Endpoint> filterHealthyEndpoints(List<Endpoint> endpoints) {
        return endpoints.stream()
            .filter(endpoint -> healthMonitor.isHealthy(endpoint))
            .collect(Collectors.toList());
    }

    // Load balancing strategies
    public interface LoadBalancingStrategy {
        Endpoint selectEndpoint(List<Endpoint> endpoints, RoutingMetadata metadata);
    }

    @Component
    public static class RoundRobinStrategy implements LoadBalancingStrategy {
        private final AtomicLong counter = new AtomicLong(0);

        @Override
        public Endpoint selectEndpoint(List<Endpoint> endpoints, RoutingMetadata metadata) {
            int index = (int) (counter.getAndIncrement() % endpoints.size());
            return endpoints.get(index);
        }
    }

    @Component
    public static class WeightedRoundRobinStrategy implements LoadBalancingStrategy {
        private final Map<String, WeightedEndpointSelector> selectors = new ConcurrentHashMap<>();

        @Override
        public Endpoint selectEndpoint(List<Endpoint> endpoints, RoutingMetadata metadata) {
            String key = createEndpointGroupKey(endpoints);
            WeightedEndpointSelector selector = selectors.computeIfAbsent(key, 
                k -> new WeightedEndpointSelector(endpoints));

            return selector.selectEndpoint();
        }

        private String createEndpointGroupKey(List<Endpoint> endpoints) {
            return endpoints.stream()
                .map(e -> e.getId() + ":" + e.getWeight())
                .collect(Collectors.joining(","));
        }
    }

    @Component
    public static class LeastConnectionsStrategy implements LoadBalancingStrategy {
        private final EndpointConnectionTracker connectionTracker;

        @Override
        public Endpoint selectEndpoint(List<Endpoint> endpoints, RoutingMetadata metadata) {
            return endpoints.stream()
                .min(Comparator.comparing(endpoint -> 
                    connectionTracker.getActiveConnections(endpoint)))
                .orElseThrow(() -> new IllegalStateException("No endpoints available"));
        }
    }

    @Component
    public static class ResponseTimeStrategy implements LoadBalancingStrategy {
        private final EndpointResponseTimeTracker responseTimeTracker;

        @Override
        public Endpoint selectEndpoint(List<Endpoint> endpoints, RoutingMetadata metadata) {
            return endpoints.stream()
                .min(Comparator.comparing(endpoint -> 
                    responseTimeTracker.getAverageResponseTime(endpoint)))
                .orElseThrow(() -> new IllegalStateException("No endpoints available"));
        }
    }

    @Component
    public static class HashBasedStrategy implements LoadBalancingStrategy {

        @Override
        public Endpoint selectEndpoint(List<Endpoint> endpoints, RoutingMetadata metadata) {
            String hashKey = getHashKey(metadata);
            int hash = Math.abs(hashKey.hashCode());
            int index = hash % endpoints.size();

            return endpoints.get(index);
        }

        private String getHashKey(RoutingMetadata metadata) {
            // Use correlation ID or other consistent identifier
            String correlationId = metadata.getCorrelationId();
            if (correlationId != null) {
                return correlationId;
            }

            // Fallback to message type
            return metadata.getMessageType();
        }
    }
}

4. Route Executor

Executes routing decisions and handles message delivery:

@Component
public class RouteExecutor {
    private final Map<RouteType, MessageSender> messageSenders;
    private final RetryTemplate retryTemplate;
    private final CircuitBreakerRegistry circuitBreakerRegistry;
    private final RouteMetrics routeMetrics;

    public RoutingResult executeRoute(Message message, Endpoint endpoint, Route route) {
        String routeKey = route.getName() + ":" + endpoint.getId();
        CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker(routeKey);

        return circuitBreaker.executeSupplier(() -> {
            Timer.Sample sample = routeMetrics.startRouteTimer(route, endpoint);

            try {
                RoutingResult result = doExecuteRoute(message, endpoint, route);

                sample.stop(Timer.builder("route.execution.duration")
                    .tag("route", route.getName())
                    .tag("endpoint", endpoint.getId())
                    .tag("status", "success")
                    .register(routeMetrics.getMeterRegistry()));

                routeMetrics.recordSuccessfulRoute(route, endpoint);
                return result;

            } catch (Exception e) {
                sample.stop(Timer.builder("route.execution.duration")
                    .tag("route", route.getName())
                    .tag("endpoint", endpoint.getId())
                    .tag("status", "error")
                    .register(routeMetrics.getMeterRegistry()));

                routeMetrics.recordFailedRoute(route, endpoint, e);
                throw e;
            }
        });
    }

    private RoutingResult doExecuteRoute(Message message, Endpoint endpoint, Route route) {
        MessageSender sender = messageSenders.get(route.getType());
        if (sender == null) {
            throw new UnsupportedRouteTypeException("Unsupported route type: " + route.getType());
        }

        return retryTemplate.execute(context -> {
            log.debug("Executing route {} to endpoint {} (attempt {})", 
                     route.getName(), endpoint.getId(), context.getRetryCount() + 1);

            return sender.send(message, endpoint, route);
        });
    }

    // Message senders for different route types
    public interface MessageSender {
        RoutingResult send(Message message, Endpoint endpoint, Route route);
    }

    @Component
    public static class HttpMessageSender implements MessageSender {
        private final RestTemplate restTemplate;

        @Override
        public RoutingResult send(Message message, Endpoint endpoint, Route route) {
            String url = endpoint.getBaseUrl() + route.getPath();

            HttpHeaders headers = new HttpHeaders();
            headers.setContentType(MediaType.APPLICATION_JSON);
            headers.putAll(convertHeaders(message.getHeaders()));

            HttpEntity<byte[]> request = new HttpEntity<>(message.getPayload(), headers);

            ResponseEntity<String> response = restTemplate.exchange(
                url, HttpMethod.POST, request, String.class);

            return RoutingResult.builder()
                .success(response.getStatusCode().is2xxSuccessful())
                .endpoint(endpoint)
                .route(route)
                .response(response.getBody())
                .statusCode(response.getStatusCode().value())
                .headers(convertResponseHeaders(response.getHeaders()))
                .executionTime(Duration.ofMillis(System.currentTimeMillis())) // Simplified
                .build();
        }
    }

    @Component  
    public static class JmsMessageSender implements MessageSender {
        private final JmsTemplate jmsTemplate;

        @Override
        public RoutingResult send(Message message, Endpoint endpoint, Route route) {
            String destination = endpoint.getId(); // JMS destination name

            jmsTemplate.send(destination, session -> {
                javax.jms.Message jmsMessage = session.createBytesMessage();
                ((BytesMessage) jmsMessage).writeBytes(message.getPayload());

                // Add headers
                for (Map.Entry<String, String> header : message.getHeaders().entrySet()) {
                    jmsMessage.setStringProperty(header.getKey(), header.getValue());
                }

                return jmsMessage;
            });

            return RoutingResult.builder()
                .success(true)
                .endpoint(endpoint)
                .route(route)
                .executionTime(Duration.ofMillis(10)) // Simplified
                .build();
        }
    }

    @Component
    public static class KafkaMessageSender implements MessageSender {
        private final KafkaTemplate<String, byte[]> kafkaTemplate;

        @Override
        public RoutingResult send(Message message, Endpoint endpoint, Route route) {
            String topic = endpoint.getId(); // Kafka topic name
            String key = message.getHeaders().get("messageKey");

            ProducerRecord<String, byte[]> record = new ProducerRecord<>(
                topic, key, message.getPayload());

            // Add headers
            for (Map.Entry<String, String> header : message.getHeaders().entrySet()) {
                record.headers().add(header.getKey(), header.getValue().getBytes());
            }

            try {
                SendResult<String, byte[]> sendResult = kafkaTemplate.send(record).get();

                return RoutingResult.builder()
                    .success(true)
                    .endpoint(endpoint)
                    .route(route)
                    .response("partition=" + sendResult.getRecordMetadata().partition() + 
                             ",offset=" + sendResult.getRecordMetadata().offset())
                    .executionTime(Duration.ofMillis(5)) // Simplified
                    .build();

            } catch (Exception e) {
                throw new RoutingExecutionException("Failed to send message to Kafka", e);
            }
        }
    }
}

Configuration Parameters

Essential Settings

Parameter Description Typical Values
Rule Cache Size Maximum cached routing rules 1000-10000
Rule Evaluation Timeout Max time for rule evaluation 100ms-1s
Endpoint Health Check Interval Frequency of endpoint health checks 30s-300s
Load Balancing Strategy Algorithm for endpoint selection round-robin/weighted/least-connections
Circuit Breaker Threshold Failure threshold for circuit breaking 50%-90%
Retry Attempts Maximum routing retry attempts 3-10

Example Configuration

# Message Routing Configuration
routing.rules.cache-size=5000
routing.rules.cache-ttl=5m
routing.rules.evaluation-timeout=500ms
routing.rules.reload-interval=1m

# Endpoint Management
routing.endpoints.health-check-interval=30s
routing.endpoints.health-check-timeout=5s
routing.endpoints.failure-threshold=5
routing.endpoints.recovery-check-interval=60s

# Load Balancing
routing.load-balancing.strategy=round-robin
routing.load-balancing.weight-adjustment-interval=5m
routing.load-balancing.sticky-sessions=false

# Circuit Breaker
routing.circuit-breaker.failure-rate-threshold=60%
routing.circuit-breaker.wait-duration-in-open-state=30s
routing.circuit-breaker.sliding-window-size=10
routing.circuit-breaker.minimum-number-of-calls=5

# Retry Configuration
routing.retry.max-attempts=3
routing.retry.backoff-delay=1s
routing.retry.backoff-multiplier=2
routing.retry.max-backoff-delay=30s

# Performance Tuning
routing.thread-pool.core-size=10
routing.thread-pool.max-size=50
routing.thread-pool.queue-capacity=1000
routing.message-buffer-size=1000

Implementation Examples

1. Spring Integration Message Routing

@Configuration
@EnableIntegration
public class SpringIntegrationRoutingConfiguration {

    @Bean
    public IntegrationFlow contactProcessingFlow() {
        return IntegrationFlows
            .from("contactInputChannel")
            .<ContactMessage>route(message -> {
                // Content-based routing
                if ("VIP".equals(message.getCustomerTier())) {
                    return "vipProcessingChannel";
                } else if ("BULK".equals(message.getMessageType())) {
                    return "bulkProcessingChannel";
                } else {
                    return "standardProcessingChannel";
                }
            })
            .get();
    }

    @Bean
    public IntegrationFlow headerBasedRoutingFlow() {
        return IntegrationFlows
            .from("headerRoutingInputChannel")
            .routeToRecipients(r -> r
                .recipient("euProcessingChannel", "headers['region'] == 'EU'")
                .recipient("usProcessingChannel", "headers['region'] == 'US'")
                .recipient("apacProcessingChannel", "headers['region'] == 'APAC'")
                .defaultOutputChannel("defaultProcessingChannel"))
            .get();
    }

    @Bean
    public IntegrationFlow loadBalancedRoutingFlow() {
        return IntegrationFlows
            .from("loadBalancedInputChannel")
            .route(Message.class, message -> {
                // Custom load balancing logic
                int hash = Math.abs(message.getHeaders().get("correlationId").hashCode());
                return "processingChannel" + (hash % 3 + 1); // Route to one of 3 channels
            })
            .get();
    }

    @ServiceActivator(inputChannel = "vipProcessingChannel")
    public void handleVipContact(ContactMessage message) {
        log.info("Processing VIP contact: {}", message.getContactId());
        // VIP processing logic
    }

    @ServiceActivator(inputChannel = "bulkProcessingChannel")
    public void handleBulkContact(ContactMessage message) {
        log.info("Processing bulk contact: {}", message.getContactId());
        // Bulk processing logic
    }

    @ServiceActivator(inputChannel = "standardProcessingChannel")
    public void handleStandardContact(ContactMessage message) {
        log.info("Processing standard contact: {}", message.getContactId());
        // Standard processing logic
    }
}

2. Apache Camel Routing Implementation

@Configuration
public class CamelRoutingConfiguration {

    @Bean
    public RouteBuilder contactRoutingRoutes() {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {

                // Content-based routing with Apache Camel
                from("jms:queue:contact.input")
                    .routeId("contactContentBasedRouting")
                    .choice()
                        .when(jsonpath("$.customerTier == 'VIP'"))
                            .to("jms:queue:contact.vip.processing")
                        .when(jsonpath("$.messageType == 'BULK'"))
                            .to("jms:queue:contact.bulk.processing")
                        .when(header("priority").isEqualTo("HIGH"))
                            .to("jms:queue:contact.priority.processing")
                        .otherwise()
                            .to("jms:queue:contact.standard.processing")
                    .end();

                // Header-based routing
                from("jms:queue:contact.regional.input")
                    .routeId("contactRegionalRouting")
                    .choice()
                        .when(header("region").isEqualTo("EU"))
                            .to("jms:queue:contact.eu.processing")
                        .when(header("region").isEqualTo("US"))
                            .to("jms:queue:contact.us.processing")
                        .when(header("region").isEqualTo("APAC"))
                            .to("jms:queue:contact.apac.processing")
                        .otherwise()
                            .to("jms:queue:contact.default.processing")
                    .end();

                // Load balanced routing with circuit breaker
                from("jms:queue:contact.loadbalanced.input")
                    .routeId("contactLoadBalancedRouting")
                    .loadBalance()
                    .roundRobin()
                    .to("http://contact-service-1/process?bridgeEndpoint=true")
                    .to("http://contact-service-2/process?bridgeEndpoint=true")
                    .to("http://contact-service-3/process?bridgeEndpoint=true")
                    .end()
                    .onException(Exception.class)
                        .handled(true)
                        .to("jms:queue:contact.errors")
                    .end();

                // Dynamic routing based on database lookup
                from("jms:queue:contact.dynamic.input")
                    .routeId("contactDynamicRouting")
                    .enrichWith("sql:select processing_endpoint from contact_routing where customer_id = :#customerid")
                    .recipientList(simple("${body[processing_endpoint]}"))
                    .parallelProcessing()
                    .stopOnException();

                // Routing with content filtering and transformation
                from("jms:queue:contact.filtered.input")
                    .routeId("contactFilteredRouting")
                    .filter(jsonpath("$.amount > 1000"))
                    .transform().jsonpath("$.contact")
                    .choice()
                        .when(jsonpath("$.contact.type == 'PREMIUM'"))
                            .to("jms:queue:contact.premium.processing")
                        .otherwise()
                            .to("jms:queue:contact.standard.processing")
                    .end();
            }
        };
    }

    @Component
    public static class ContactRoutingProcessor implements Processor {

        @Override
        public void process(Exchange exchange) throws Exception {
            ContactMessage contact = exchange.getIn().getBody(ContactMessage.class);

            // Custom routing logic
            String routingKey = determineRoutingKey(contact);
            exchange.getIn().setHeader("routingKey", routingKey);

            log.info("Routing contact {} to {}", contact.getContactId(), routingKey);
        }

        private String routingKey(ContactMessage contact) {
            if ("VIP".equals(contact.getCustomerTier())) {
                return "vip.processing";
            } else if (contact.getAmount() != null && contact.getAmount() > 1000) {
                return "high.value.processing";
            } else {
                return "standard.processing";
            }
        }
    }
}

3. Custom Message Router Implementation

@Service
public class CustomMessageRoutingService {
    private final MessageRouter messageRouter;
    private final RoutingRulesEngine rulesEngine;
    private final EndpointRegistry endpointRegistry;

    public void processMessage(ContactMessage contactMessage) {
        // Convert to internal message format
        Message message = convertToMessage(contactMessage);

        // Create routing context
        RoutingContext context = RoutingContext.builder()
            .correlationId(UUID.randomUUID().toString())
            .sourceEndpoint("contact.input.service")
            .timestamp(Instant.now())
            .build();

        try {
            // Route message
            RoutingResult result = messageRouter.route(message, context);

            if (result.isSuccess()) {
                log.info("Successfully routed message to endpoint: {}", 
                         result.getEndpoint().getId());
            } else {
                log.error("Failed to route message: {}", result.getErrorMessage());
                handleRoutingFailure(message, result);
            }

        } catch (Exception e) {
            log.error("Error routing message: {}", e.getMessage(), e);
            handleRoutingException(message, context, e);
        }
    }

    private Message convertToMessage(ContactMessage contactMessage) {
        try {
            ObjectMapper mapper = new ObjectMapper();
            byte[] payload = mapper.writeValueAsBytes(contactMessage);

            Map<String, String> headers = new HashMap<>();
            headers.put("messageType", "CONTACT");
            headers.put("customerTier", contactMessage.getCustomerTier());
            headers.put("region", contactMessage.getRegion());
            headers.put("priority", contactMessage.getPriority());

            return Message.builder()
                .id(UUID.randomUUID().toString())
                .type("CONTACT")
                .payload(payload)
                .headers(headers)
                .timestamp(Instant.now())
                .priority(determinePriority(contactMessage))
                .build();

        } catch (Exception e) {
            throw new MessageConversionException("Failed to convert contact message", e);
        }
    }

    private int determinePriority(ContactMessage contactMessage) {
        if ("VIP".equals(contactMessage.getCustomerTier())) {
            return 100;
        } else if ("HIGH".equals(contactMessage.getPriority())) {
            return 80;
        } else {
            return 50;
        }
    }

    private void handleRoutingFailure(Message message, RoutingResult result) {
        // Send to dead letter queue or retry
        deadLetterService.handleFailedRouting(message, result);
    }

    private void handleRoutingException(Message message, RoutingContext context, Exception e) {
        // Log and send to error handling service
        errorHandlingService.handleRoutingException(message, context, e);
    }
}

// Dynamic routing rule management
@RestController
@RequestMapping("/api/routing")
public class RoutingRuleController {
    private final RoutingRulesEngine rulesEngine;

    @PostMapping("/rules")
    public ResponseEntity<RoutingRule> createRule(@RequestBody RuleDefinition definition) {
        RoutingRule rule = rulesEngine.createRule(definition);
        return ResponseEntity.ok(rule);
    }

    @PutMapping("/rules/{id}")
    public ResponseEntity<Void> updateRule(@PathVariable String id, 
                                         @RequestBody RuleDefinition definition) {
        rulesEngine.updateRule(id, definition);
        return ResponseEntity.ok().build();
    }

    @DeleteMapping("/rules/{id}")
    public ResponseEntity<Void> deleteRule(@PathVariable String id) {
        rulesEngine.deleteRule(id);
        return ResponseEntity.ok().build();
    }

    @GetMapping("/rules")
    public ResponseEntity<List<RoutingRule>> getAllRules() {
        List<RoutingRule> rules = rulesEngine.getAllRules();
        return ResponseEntity.ok(rules);
    }

    @PostMapping("/rules/{id}/test")
    public ResponseEntity<RoutingTestResult> testRule(@PathVariable String id,
                                                    @RequestBody TestMessage testMessage) {
        RoutingTestResult result = rulesEngine.testRule(id, testMessage);
        return ResponseEntity.ok(result);
    }
}

4. Microservices API Gateway Routing

@Configuration
public class ApiGatewayRouting {

    @Bean
    public RouteLocator customRouteLocator(RouteLocatorBuilder builder) {
        return builder.routes()

            // Content-based routing for contact API
            .route("contact-vip", r -> r
                .path("/api/contacts/**")
                .and()
                .header("Customer-Tier", "VIP")
                .uri("http://vip-contact-service:8080"))

            // Version-based routing
            .route("contact-v2", r -> r
                .path("/api/contacts/**")
                .and()
                .header("API-Version", "v2")
                .uri("http://contact-service-v2:8080"))

            // Default contact routing
            .route("contact-default", r -> r
                .path("/api/contacts/**")
                .uri("http://contact-service:8080"))

            // Geographic routing
            .route("contact-eu", r -> r
                .path("/api/contacts/**")
                .and()
                .header("Region", "EU")
                .uri("http://eu-contact-service:8080"))

            // Load balanced routing
            .route("contact-loadbalanced", r -> r
                .path("/api/internal/contacts/**")
                .uri("lb://contact-service-cluster"))

            .build();
    }

    @Component
    public static class CustomRouteFilter implements GlobalFilter {

        @Override
        public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
            ServerHttpRequest request = exchange.getRequest();

            // Extract routing metadata
            String customerTier = request.getHeaders().getFirst("Customer-Tier");
            String region = request.getHeaders().getFirst("Region");
            String contentLength = request.getHeaders().getFirst("Content-Length");

            // Add routing attributes to request
            ServerHttpRequest modifiedRequest = request.mutate()
                .header("X-Routing-Tier", customerTier != null ? customerTier : "STANDARD")
                .header("X-Routing-Region", region != null ? region : "DEFAULT")
                .header("X-Routing-Size", contentLength != null ? contentLength : "0")
                .build();

            return chain.filter(exchange.mutate().request(modifiedRequest).build());
        }
    }
}

Best Practices

1. Routing Rule Design

public class RoutingRuleBestPractices {

    // Use priority-based rule ordering
    public void setupRulePriorities() {
        // Highest priority: Security and compliance rules
        createRule("GDPR-Routing", "headers['dataClassification'] == 'PERSONAL'", 100);

        // High priority: Business-critical routing
        createRule("VIP-Customer", "attributes['customerTier'] == 'VIP'", 90);

        // Medium priority: Geographic and regional routing
        createRule("EU-Routing", "headers['region'] == 'EU'", 70);

        // Low priority: Performance optimization routing
        createRule("Load-Balancing", "messageType == 'STANDARD'", 50);

        // Lowest priority: Default catch-all routing
        createRule("Default-Route", "true", 10);
    }

    // Use efficient rule expressions
    public class EfficientRuleExpressions {

        // Good: Simple, fast evaluation
        String efficientRule = "headers['priority'] == 'HIGH'";

        // Avoid: Complex, slow evaluation
        String inefficientRule = "jsonPath(payload, '$.customer.orders[?(@.amount > 1000)].length') > 5";

        // Better: Pre-computed attributes
        String optimizedRule = "attributes['highValueOrderCount'] > 5";
    }

    // Implement rule testing and validation
    public void validateRoutingRules(List<RoutingRule> rules) {
        for (RoutingRule rule : rules) {
            // Test rule compilation
            try {
                compileRule(rule);
            } catch (Exception e) {
                throw new InvalidRuleException("Rule compilation failed: " + rule.getName(), e);
            }

            // Test rule with sample data
            TestMessage[] testCases = createTestCases();
            for (TestMessage testCase : testCases) {
                try {
                    evaluateRule(rule, testCase);
                } catch (Exception e) {
                    log.warn("Rule evaluation failed for test case: {} - {}", 
                            rule.getName(), e.getMessage());
                }
            }
        }
    }
}

2. Performance Optimization

@Component
public class RoutingPerformanceOptimization {

    // Use rule caching for frequently evaluated rules
    @Cacheable(value = "routingRules", key = "#metadata.cacheKey")
    public List<RoutingRule> getCachedRules(RoutingMetadata metadata) {
        return evaluateRules(metadata);
    }

    // Implement parallel rule evaluation for complex scenarios
    public List<RoutingRule> evaluateRulesInParallel(RoutingMetadata metadata) {
        List<RoutingRule> allRules = getAllRules();

        return allRules.parallelStream()
            .filter(rule -> evaluateRuleCondition(rule, metadata))
            .sorted(Comparator.comparing(RoutingRule::getPriority).reversed())
            .collect(Collectors.toList());
    }

    // Use connection pooling for external endpoints
    @Bean
    public RestTemplate routingRestTemplate() {
        HttpComponentsClientHttpRequestFactory factory = 
            new HttpComponentsClientHttpRequestFactory();

        // Configure connection pooling
        PoolingHttpClientConnectionManager connectionManager = 
            new PoolingHttpClientConnectionManager();
        connectionManager.setMaxTotal(100);
        connectionManager.setDefaultMaxPerRoute(20);

        CloseableHttpClient httpClient = HttpClients.custom()
            .setConnectionManager(connectionManager)
            .build();

        factory.setHttpClient(httpClient);

        RestTemplate restTemplate = new RestTemplate(factory);
        restTemplate.setErrorHandler(new RoutingResponseErrorHandler());

        return restTemplate;
    }

    // Optimize message parsing and attribute extraction
    public Map<String, Object> extractAttributesEfficiently(Message message) {
        // Use streaming JSON parser for large messages
        if (message.getPayload().length > 10000) {
            return streamingJsonExtractor.extract(message.getPayload());
        } else {
            return standardJsonExtractor.extract(message.getPayload());
        }
    }
}

3. Monitoring and Observability

@Component
public class RoutingMonitoring {
    private final MeterRegistry meterRegistry;
    private final DistributedTracing tracing;

    public void recordRoutingMetrics(RoutingResult result) {
        String routeName = result.getRoute().getName();
        String endpointId = result.getEndpoint().getId();
        String status = result.isSuccess() ? "success" : "failure";

        // Record routing count
        meterRegistry.counter("routing.messages", 
                            "route", routeName,
                            "endpoint", endpointId,
                            "status", status)
                    .increment();

        // Record routing latency
        meterRegistry.timer("routing.duration",
                          "route", routeName,
                          "endpoint", endpointId)
                    .record(result.getExecutionTime());

        // Record endpoint health
        meterRegistry.gauge("routing.endpoint.health",
                          Tags.of("endpoint", endpointId),
                          result.isSuccess() ? 1.0 : 0.0);
    }

    public void traceRouting(Message message, RoutingContext context, RoutingResult result) {
        Span span = tracing.nextSpan()
            .name("message.routing")
            .tag("route.name", result.getRoute().getName())
            .tag("endpoint.id", result.getEndpoint().getId())
            .tag("message.type", message.getType())
            .tag("correlation.id", context.getCorrelationId())
            .start();

        try (Tracer.SpanInScope ws = tracing.tracer().withSpanInScope(span)) {
            if (result.isSuccess()) {
                span.tag("routing.status", "success");
            } else {
                span.tag("routing.status", "failure");
                span.tag("error.message", result.getErrorMessage());
            }
        } finally {
            span.end();
        }
    }

    @EventListener
    public void handleRoutingAlert(RoutingEvent event) {
        if (event.getType() == RoutingEventType.ENDPOINT_DOWN) {
            alertingService.sendAlert(AlertLevel.CRITICAL,
                "Endpoint is down: " + event.getEndpointId());
        }

        if (event.getType() == RoutingEventType.HIGH_ERROR_RATE) {
            alertingService.sendAlert(AlertLevel.WARNING,
                "High error rate detected for route: " + event.getRouteName());
        }
    }
}

Common Pitfalls

1. Rule Conflicts and Ambiguity

Problem: Multiple rules matching the same message causing unpredictable routing
Solution: Use clear rule priorities and comprehensive testing

2. Performance Degradation from Complex Rules

Problem: Slow rule evaluation affecting message throughput
Solution: Optimize rule expressions and implement caching

3. Endpoint Health Management

Problem: Routing to unavailable endpoints causing failures
Solution: Implement comprehensive health checking and circuit breakers

4. Rule Management Complexity

Problem: Difficult to maintain and debug large numbers of routing rules
Solution: Use rule categorization, testing frameworks, and documentation

5. Security Vulnerabilities in Dynamic Routing

Problem: Routing decisions based on untrusted input enabling attacks
Solution: Validate and sanitize all routing inputs and use allowlisting

Integration in Distributed Systems

Event-Driven Microservices

@Service
public class EventRoutingService {

    public void routeEvent(DomainEvent event) {
        RoutingContext context = createEventContext(event);
        messageRouter.route(convertEventToMessage(event), context);
    }
}

API Gateway Integration

@Component
public class GatewayRoutingFilter implements GatewayFilter {

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        return determineRoute(exchange.getRequest())
            .flatMap(route -> routeToEndpoint(exchange, route, chain));
    }
}

Conclusion

The Message Routing pattern is essential for building flexible, scalable distributed systems. It provides:

When properly implemented with comprehensive rule management, performance optimization, and monitoring, message routing enables adaptive architectures that can evolve with changing business requirements while maintaining high performance and reliability.

References

← Back to All Patterns