This site is in English. Use your browser's built-in translate feature to read it in your language.

Aggregator Pattern

Overview

The Aggregator pattern is a fundamental composition pattern in enterprise integration architectures that collects and combines related messages or data elements into a single, cohesive message or result. Like a conductor orchestrating multiple musicians into a harmonious symphony, the Aggregator pattern coordinates multiple incoming messages, waits for completion criteria to be met, and then produces a unified response that combines the individual contributions into a meaningful whole.

Theoretical Foundation

The Aggregator pattern is grounded in information integration theory and distributed computation principles, specifically addressing the challenge of scatter-gather operations and parallel processing coordination in distributed systems. The pattern embodies the principle of "temporal data correlation" - collecting related data that arrives at different times and combining it based on correlation identifiers and completion criteria.

Core Principles

1. Message Correlation and Collection

The Aggregator identifies related messages using correlation identifiers and collects them until completion criteria are met: - Correlation ID matching - grouping messages that belong to the same logical operation - Temporal correlation - collecting messages within time windows or sequences - Content-based correlation - grouping based on shared data elements or business keys - State management - maintaining correlation state across multiple message arrivals

2. Completion Detection

The pattern implements various strategies to determine when aggregation is complete: - Count-based completion - waiting for a specific number of messages - Timeout-based completion - completing aggregation after a time threshold - Content-based completion - detecting completion through message content analysis - Conditional completion - complex completion criteria based on business rules

3. Data Combination and Synthesis

Once completion criteria are met, the aggregator combines the collected messages: - Data merging - combining data elements from multiple messages - Result computation - performing calculations across collected data - Structure transformation - reformatting aggregated data into target structure - Conflict resolution - handling conflicting or duplicate data elements

4. Resource and Memory Management

The pattern manages resources efficiently during aggregation: - Memory optimization - managing storage for pending aggregations - Timeout handling - cleaning up incomplete aggregations - Persistence strategies - handling aggregation state across system restarts - Scalability considerations - managing large numbers of concurrent aggregations

Why Aggregators are Essential in Integration Architecture

1. Distributed Query and Response Coordination

In distributed systems, aggregators enable: - Scatter-gather queries - sending requests to multiple services and combining responses - Parallel processing coordination - orchestrating concurrent operations - Multi-source data integration - combining data from multiple systems or databases - Composite API responses - building unified responses from multiple service calls

2. Event-Driven Architecture Support

In event streaming and messaging systems: - Event correlation - grouping related events for complex event processing - Workflow coordination - collecting workflow step completions - Batch processing - accumulating events for batch operations - State reconstruction - rebuilding entity state from event sequences

3. Real-Time Analytics and Monitoring

For analytics and monitoring use cases: - Metrics aggregation - combining measurements from multiple sources - Alert correlation - grouping related alerts to prevent alert storms - Performance monitoring - collecting performance data across service boundaries - Business intelligence - aggregating transaction data for reporting

4. Microservices Orchestration

In microservices architectures: - Saga pattern implementation - coordinating distributed transactions - Composite service operations - building higher-level services from microservice calls - Data consistency management - ensuring consistent views across services - Cross-cutting concern coordination - aggregating logging, security, or audit data

Benefits in Integration Contexts

1. System Performance Optimization

2. Data Consistency and Completeness

3. Business Process Support

4. Fault Tolerance and Reliability

Integration Architecture Applications

1. API Gateway and Backend Aggregation

Aggregators in API gateways provide: - Backend for Frontend (BFF) - combining multiple microservice calls into unified responses - Mobile optimization - reducing round trips by aggregating data for mobile clients - Legacy system integration - combining data from multiple legacy systems - Cross-cutting data aggregation - combining business data with metadata, permissions, etc.

2. Data Pipeline and ETL Processing

In data processing workflows: - Batch data collection - aggregating streaming data into batches - Data warehouse loading - combining data from multiple sources for analytics - Real-time analytics - aggregating streaming events for real-time dashboards - Data quality assessment - collecting data quality metrics across sources

3. Event Sourcing and CQRS

In event-driven architectures: - Event stream aggregation - building read models from event streams - Saga coordination - collecting saga step completions - Snapshot generation - aggregating events to create entity snapshots - Cross-aggregate reporting - combining data across aggregate boundaries

4. Monitoring and Observability

For system monitoring: - Distributed tracing aggregation - collecting trace spans across services - Metrics correlation - combining metrics from multiple monitoring sources - Log aggregation - collecting and correlating log entries across systems - Health check aggregation - combining health status from multiple components

Implementation Patterns

1. Simple Count-Based Aggregator

// Aggregate based on expected message count
@Component
public class CountBasedAggregator {

    private final Map<String, AggregationState> aggregations = new ConcurrentHashMap<>();

    public Optional<AggregatedResult> aggregate(CorrelatedMessage message) {
        String correlationId = message.getCorrelationId();

        AggregationState state = aggregations.computeIfAbsent(correlationId, 
            k -> new AggregationState(message.getExpectedCount()));

        state.addMessage(message);

        if (state.isComplete()) {
            AggregatedResult result = buildResult(state.getMessages());
            aggregations.remove(correlationId);
            return Optional.of(result);
        }

        return Optional.empty();
    }

    private AggregatedResult buildResult(List<CorrelatedMessage> messages) {
        return AggregatedResult.builder()
            .correlationId(messages.get(0).getCorrelationId())
            .timestamp(Instant.now())
            .messageCount(messages.size())
            .aggregatedData(combineMessages(messages))
            .build();
    }
}

public class AggregationState {
    private final int expectedCount;
    private final List<CorrelatedMessage> messages;
    private final Instant startTime;

    public boolean isComplete() {
        return messages.size() >= expectedCount;
    }
}

2. Time-Window Aggregator

// Aggregate messages within time windows
@Component
public class TimeWindowAggregator {

    private final Map<String, TimeWindowState> windows = new ConcurrentHashMap<>();
    private final Duration windowDuration;

    @Scheduled(fixedDelay = 1000) // Check every second
    public void processExpiredWindows() {
        Instant now = Instant.now();

        windows.entrySet().removeIf(entry -> {
            TimeWindowState state = entry.getValue();
            if (state.isExpired(now)) {
                processAggregatedMessages(entry.getKey(), state);
                return true;
            }
            return false;
        });
    }

    public void addMessage(TimeWindowedMessage message) {
        String windowKey = calculateWindowKey(message);

        TimeWindowState state = windows.computeIfAbsent(windowKey,
            k -> new TimeWindowState(calculateWindowStart(message.getTimestamp())));

        state.addMessage(message);

        // Check if window should be completed early (e.g., batch size reached)
        if (state.shouldCompleteEarly()) {
            processAggregatedMessages(windowKey, state);
            windows.remove(windowKey);
        }
    }

    private void processAggregatedMessages(String windowKey, TimeWindowState state) {
        AggregatedTimeWindow result = AggregatedTimeWindow.builder()
            .windowKey(windowKey)
            .windowStart(state.getWindowStart())
            .windowEnd(state.getWindowStart().plus(windowDuration))
            .messageCount(state.getMessageCount())
            .aggregatedMetrics(calculateMetrics(state.getMessages()))
            .build();

        // Send aggregated result for processing
        publishAggregatedResult(result);
    }
}

3. Content-Based Aggregator

// Aggregate based on message content and business rules
@Component
public class ContentBasedAggregator {

    @Autowired
    private CompletionRuleEngine completionRules;

    private final Map<String, ContentAggregationState> aggregations = new ConcurrentHashMap<>();

    public void processMessage(BusinessMessage message) {
        String correlationKey = extractCorrelationKey(message);

        ContentAggregationState state = aggregations.computeIfAbsent(correlationKey,
            k -> new ContentAggregationState());

        state.addMessage(message);

        // Check completion rules
        if (completionRules.isComplete(state)) {
            BusinessTransaction transaction = buildTransaction(state);
            publishTransaction(transaction);
            aggregations.remove(correlationKey);
        }
    }

    private BusinessTransaction buildTransaction(ContentAggregationState state) {
        List<BusinessMessage> messages = state.getMessages();

        return BusinessTransaction.builder()
            .transactionId(generateTransactionId())
            .totalAmount(calculateTotalAmount(messages))
            .participants(extractParticipants(messages))
            .transactionType(determineTransactionType(messages))
            .completedAt(Instant.now())
            .build();
    }
}

@Service
public class CompletionRuleEngine {

    public boolean isComplete(ContentAggregationState state) {
        List<BusinessMessage> messages = state.getMessages();

        // Rule 1: Must have at least one initiator message
        boolean hasInitiator = messages.stream()
            .anyMatch(m -> m.getType() == MessageType.TRANSACTION_INITIATE);

        // Rule 2: All required participants must have responded
        Set<String> requiredParticipants = extractRequiredParticipants(messages);
        Set<String> respondedParticipants = extractRespondedParticipants(messages);

        boolean allParticipantsResponded = respondedParticipants.containsAll(requiredParticipants);

        // Rule 3: No pending approval messages
        boolean noPendingApprovals = messages.stream()
            .noneMatch(m -> m.getStatus() == MessageStatus.PENDING_APPROVAL);

        return hasInitiator && allParticipantsResponded && noPendingApprovals;
    }
}

4. Streaming Aggregator with State

// Aggregate streaming data with persistent state
@Component
public class StreamingAggregator {

    @Autowired
    private AggregationStateRepository stateRepository;

    @EventListener
    public void handleStreamEvent(StreamEvent event) {
        String aggregationKey = event.getAggregationKey();

        AggregationState state = stateRepository.findByKey(aggregationKey)
            .orElse(new AggregationState(aggregationKey));

        updateAggregationState(state, event);
        stateRepository.save(state);

        if (shouldEmitResult(state)) {
            StreamAggregationResult result = buildAggregationResult(state);
            publishResult(result);

            // Reset or remove state based on aggregation type
            if (state.isResetAfterEmit()) {
                state.reset();
                stateRepository.save(state);
            } else {
                stateRepository.delete(state);
            }
        }
    }

    private void updateAggregationState(AggregationState state, StreamEvent event) {
        state.incrementCount();
        state.updateSum(event.getValue());
        state.updateMinMax(event.getValue());
        state.addToWindow(event);
        state.updateTimestamp(event.getTimestamp());
    }

    private boolean shouldEmitResult(AggregationState state) {
        return state.getCount() >= state.getBatchSize() || 
               state.hasExceededTimeWindow() ||
               state.hasSpecialTriggerCondition();
    }
}

Apache Camel Implementation

1. Simple Message Aggregator

@Component
public class SimpleAggregatorRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("direct:aggregateMessages")
            .routeId("simple-aggregator")
            .log("Received message for aggregation: ${body}")
            .aggregate(header("correlationId"))
                .completionSize(3) // Complete when 3 messages received
                .completionTimeout(5000) // Or complete after 5 seconds
                .strategy(new MessageAggregationStrategy())
                .to("direct:processAggregatedResult");

        from("direct:processAggregatedResult")
            .log("Processing aggregated result: ${body}")
            .process(exchange -> {
                AggregatedMessages result = exchange.getIn().getBody(AggregatedMessages.class);
                // Process the aggregated result
                log.info("Processed {} messages in aggregation {}", 
                    result.getMessages().size(), result.getCorrelationId());
            });
    }
}

public class MessageAggregationStrategy implements AggregationStrategy {

    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (oldExchange == null) {
            // First message in aggregation
            AggregatedMessages aggregated = new AggregatedMessages();
            aggregated.setCorrelationId(newExchange.getIn().getHeader("correlationId", String.class));
            aggregated.addMessage(newExchange.getIn().getBody());

            newExchange.getIn().setBody(aggregated);
            return newExchange;
        }

        // Add new message to existing aggregation
        AggregatedMessages aggregated = oldExchange.getIn().getBody(AggregatedMessages.class);
        aggregated.addMessage(newExchange.getIn().getBody());

        oldExchange.getIn().setBody(aggregated);
        return oldExchange;
    }
}

2. Time-Window Aggregator

@Component
public class TimeWindowAggregatorRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("direct:timeWindowAggregate")
            .routeId("time-window-aggregator")
            .aggregate(header("windowKey"))
                .completionInterval(60000) // Complete every 60 seconds
                .strategy(new TimeWindowAggregationStrategy())
                .to("direct:processTimeWindow");

        from("direct:processTimeWindow")
            .log("Processing time window: ${body}")
            .process(exchange -> {
                TimeWindowResult window = exchange.getIn().getBody(TimeWindowResult.class);

                // Calculate window statistics
                WindowStatistics stats = WindowStatistics.builder()
                    .windowStart(window.getWindowStart())
                    .windowEnd(window.getWindowEnd())
                    .messageCount(window.getMessages().size())
                    .averageValue(calculateAverage(window.getMessages()))
                    .totalValue(calculateTotal(window.getMessages()))
                    .build();

                exchange.getIn().setBody(stats);
            })
            .to("direct:publishStatistics");
    }
}

public class TimeWindowAggregationStrategy implements AggregationStrategy {

    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        Instant now = Instant.now();

        if (oldExchange == null) {
            TimeWindowResult window = new TimeWindowResult();
            window.setWindowStart(now);
            window.addMessage(newExchange.getIn().getBody());

            newExchange.getIn().setBody(window);
            return newExchange;
        }

        TimeWindowResult window = oldExchange.getIn().getBody(TimeWindowResult.class);
        window.addMessage(newExchange.getIn().getBody());
        window.setLastUpdate(now);

        oldExchange.getIn().setBody(window);
        return oldExchange;
    }
}

3. Conditional Completion Aggregator

@Component
public class ConditionalAggregatorRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("direct:conditionalAggregate")
            .routeId("conditional-aggregator")
            .aggregate(header("transactionId"))
                .strategy(new BusinessTransactionAggregationStrategy())
                .completionPredicate(new TransactionCompletionPredicate())
                .completionTimeout(30000) // Fallback timeout
                .to("direct:processCompletedTransaction");

        from("direct:processCompletedTransaction")
            .log("Processing completed transaction: ${body}")
            .choice()
                .when(method(TransactionValidator.class, "isValid"))
                    .to("direct:commitTransaction")
                .otherwise()
                    .to("direct:rejectTransaction")
            .end();
    }
}

public class TransactionCompletionPredicate implements Predicate {

    @Override
    public boolean matches(Exchange exchange) {
        BusinessTransaction transaction = exchange.getIn().getBody(BusinessTransaction.class);

        // Check if all required steps are completed
        boolean allStepsCompleted = transaction.getSteps().stream()
            .allMatch(step -> step.getStatus() == StepStatus.COMPLETED);

        // Check if all approvals are received
        boolean allApprovalsReceived = transaction.getApprovals().size() >= 
            transaction.getRequiredApprovals();

        // Check if no errors occurred
        boolean noErrors = transaction.getErrors().isEmpty();

        return allStepsCompleted && allApprovalsReceived && noErrors;
    }
}

4. Scatter-Gather Aggregator

@Component
public class ScatterGatherAggregatorRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("direct:scatterGatherRequest")
            .routeId("scatter-gather-aggregator")
            .process(exchange -> {
                // Set correlation ID for tracking
                String correlationId = UUID.randomUUID().toString();
                exchange.getIn().setHeader("correlationId", correlationId);
                exchange.getIn().setHeader("expectedResponses", 3);
            })
            .multicast()
                .parallelProcessing()
                .to("direct:serviceA", "direct:serviceB", "direct:serviceC")
            .end()
            .aggregate(header("correlationId"))
                .strategy(new ScatterGatherAggregationStrategy())
                .completionSize(header("expectedResponses"))
                .completionTimeout(10000) // 10 second timeout
                .to("direct:processGatheredResponses");

        // Individual service routes
        from("direct:serviceA")
            .log("Calling service A")
            .process(new ServiceAProcessor())
            .setHeader("serviceResponse", constant("serviceA"));

        from("direct:serviceB")
            .log("Calling service B")
            .process(new ServiceBProcessor())
            .setHeader("serviceResponse", constant("serviceB"));

        from("direct:serviceC")
            .log("Calling service C")
            .process(new ServiceCProcessor())
            .setHeader("serviceResponse", constant("serviceC"));

        from("direct:processGatheredResponses")
            .log("Processing gathered responses: ${body}")
            .process(exchange -> {
                GatheredResponses responses = exchange.getIn().getBody(GatheredResponses.class);

                // Combine responses into final result
                CombinedResult result = CombinedResult.builder()
                    .correlationId(responses.getCorrelationId())
                    .responseCount(responses.getResponses().size())
                    .combinedData(combineServiceData(responses.getResponses()))
                    .processingTime(calculateProcessingTime(responses))
                    .build();

                exchange.getIn().setBody(result);
            })
            .to("direct:sendFinalResponse");
    }
}

5. Persistent State Aggregator

@Component
public class PersistentAggregatorRoute extends RouteBuilder {

    @Autowired
    private AggregationRepository aggregationRepository;

    @Override
    public void configure() throws Exception {
        from("direct:persistentAggregate")
            .routeId("persistent-aggregator")
            .aggregate(header("businessKey"))
                .strategy(new PersistentAggregationStrategy())
                .completionPredicate(new BusinessCompletionPredicate())
                .aggregationRepository(aggregationRepository)
                .completionTimeout(300000) // 5 minute timeout
                .to("direct:processPersistentResult");

        from("direct:processPersistentResult")
            .log("Processing persistent aggregation result: ${body}")
            .process(exchange -> {
                PersistentAggregationResult result = 
                    exchange.getIn().getBody(PersistentAggregationResult.class);

                // Process the long-running aggregation result
                log.info("Completed aggregation for key: {} with {} messages", 
                    result.getBusinessKey(), result.getMessageCount());
            })
            .to("direct:persistResult");
    }
}

@Configuration
public class AggregationConfiguration {

    @Bean
    public AggregationRepository aggregationRepository() {
        // Use database-backed aggregation repository for persistence
        return new JdbcAggregationRepository()
            .setDataSource(dataSource())
            .setRepositoryName("aggregation")
            .setReturnOldExchange(false);
    }
}

Best Practices

1. Memory and Resource Management

2. Correlation and Completion Strategy

3. Performance Optimization

4. Error Handling and Recovery

5. Monitoring and Observability

The Aggregator pattern is essential for building sophisticated integration solutions that coordinate distributed operations and provide unified views of data from multiple sources while managing the complexity of temporal coordination and state management.

← Back to All Patterns