Aggregator Pattern
Overview
The Aggregator pattern is a fundamental composition pattern in enterprise integration architectures that collects and combines related messages or data elements into a single, cohesive message or result. Like a conductor orchestrating multiple musicians into a harmonious symphony, the Aggregator pattern coordinates multiple incoming messages, waits for completion criteria to be met, and then produces a unified response that combines the individual contributions into a meaningful whole.
Theoretical Foundation
The Aggregator pattern is grounded in information integration theory and distributed computation principles, specifically addressing the challenge of scatter-gather operations and parallel processing coordination in distributed systems. The pattern embodies the principle of "temporal data correlation" - collecting related data that arrives at different times and combining it based on correlation identifiers and completion criteria.
Core Principles
1. Message Correlation and Collection
The Aggregator identifies related messages using correlation identifiers and collects them until completion criteria are met: - Correlation ID matching - grouping messages that belong to the same logical operation - Temporal correlation - collecting messages within time windows or sequences - Content-based correlation - grouping based on shared data elements or business keys - State management - maintaining correlation state across multiple message arrivals
2. Completion Detection
The pattern implements various strategies to determine when aggregation is complete: - Count-based completion - waiting for a specific number of messages - Timeout-based completion - completing aggregation after a time threshold - Content-based completion - detecting completion through message content analysis - Conditional completion - complex completion criteria based on business rules
3. Data Combination and Synthesis
Once completion criteria are met, the aggregator combines the collected messages: - Data merging - combining data elements from multiple messages - Result computation - performing calculations across collected data - Structure transformation - reformatting aggregated data into target structure - Conflict resolution - handling conflicting or duplicate data elements
4. Resource and Memory Management
The pattern manages resources efficiently during aggregation: - Memory optimization - managing storage for pending aggregations - Timeout handling - cleaning up incomplete aggregations - Persistence strategies - handling aggregation state across system restarts - Scalability considerations - managing large numbers of concurrent aggregations
Why Aggregators are Essential in Integration Architecture
1. Distributed Query and Response Coordination
In distributed systems, aggregators enable: - Scatter-gather queries - sending requests to multiple services and combining responses - Parallel processing coordination - orchestrating concurrent operations - Multi-source data integration - combining data from multiple systems or databases - Composite API responses - building unified responses from multiple service calls
2. Event-Driven Architecture Support
In event streaming and messaging systems: - Event correlation - grouping related events for complex event processing - Workflow coordination - collecting workflow step completions - Batch processing - accumulating events for batch operations - State reconstruction - rebuilding entity state from event sequences
3. Real-Time Analytics and Monitoring
For analytics and monitoring use cases: - Metrics aggregation - combining measurements from multiple sources - Alert correlation - grouping related alerts to prevent alert storms - Performance monitoring - collecting performance data across service boundaries - Business intelligence - aggregating transaction data for reporting
4. Microservices Orchestration
In microservices architectures: - Saga pattern implementation - coordinating distributed transactions - Composite service operations - building higher-level services from microservice calls - Data consistency management - ensuring consistent views across services - Cross-cutting concern coordination - aggregating logging, security, or audit data
Benefits in Integration Contexts
1. System Performance Optimization
- Parallel processing - multiple operations can execute concurrently
- Reduced latency - overlapping processing time instead of sequential execution
- Batch efficiency - combining multiple operations into efficient batches
- Resource utilization - better utilization of network and processing resources
2. Data Consistency and Completeness
- Complete data views - ensuring all relevant data is collected before processing
- Cross-system consistency - maintaining consistency across multiple data sources
- Duplicate detection - identifying and handling duplicate messages
- Data validation - validating completeness and correctness of aggregated data
3. Business Process Support
- Complex workflow coordination - supporting multi-step business processes
- Decision making - collecting all necessary information for business decisions
- Compliance reporting - aggregating data for regulatory reporting requirements
- Customer experience - providing complete customer views from multiple systems
4. Fault Tolerance and Reliability
- Partial result handling - processing available data when some sources are unavailable
- Retry coordination - coordinating retry strategies across multiple operations
- Graceful degradation - providing best-effort results when complete data isn't available
- State recovery - recovering aggregation state after system failures
Integration Architecture Applications
1. API Gateway and Backend Aggregation
Aggregators in API gateways provide: - Backend for Frontend (BFF) - combining multiple microservice calls into unified responses - Mobile optimization - reducing round trips by aggregating data for mobile clients - Legacy system integration - combining data from multiple legacy systems - Cross-cutting data aggregation - combining business data with metadata, permissions, etc.
2. Data Pipeline and ETL Processing
In data processing workflows: - Batch data collection - aggregating streaming data into batches - Data warehouse loading - combining data from multiple sources for analytics - Real-time analytics - aggregating streaming events for real-time dashboards - Data quality assessment - collecting data quality metrics across sources
3. Event Sourcing and CQRS
In event-driven architectures: - Event stream aggregation - building read models from event streams - Saga coordination - collecting saga step completions - Snapshot generation - aggregating events to create entity snapshots - Cross-aggregate reporting - combining data across aggregate boundaries
4. Monitoring and Observability
For system monitoring: - Distributed tracing aggregation - collecting trace spans across services - Metrics correlation - combining metrics from multiple monitoring sources - Log aggregation - collecting and correlating log entries across systems - Health check aggregation - combining health status from multiple components
Implementation Patterns
1. Simple Count-Based Aggregator
// Aggregate based on expected message count
@Component
public class CountBasedAggregator {
private final Map<String, AggregationState> aggregations = new ConcurrentHashMap<>();
public Optional<AggregatedResult> aggregate(CorrelatedMessage message) {
String correlationId = message.getCorrelationId();
AggregationState state = aggregations.computeIfAbsent(correlationId,
k -> new AggregationState(message.getExpectedCount()));
state.addMessage(message);
if (state.isComplete()) {
AggregatedResult result = buildResult(state.getMessages());
aggregations.remove(correlationId);
return Optional.of(result);
}
return Optional.empty();
}
private AggregatedResult buildResult(List<CorrelatedMessage> messages) {
return AggregatedResult.builder()
.correlationId(messages.get(0).getCorrelationId())
.timestamp(Instant.now())
.messageCount(messages.size())
.aggregatedData(combineMessages(messages))
.build();
}
}
public class AggregationState {
private final int expectedCount;
private final List<CorrelatedMessage> messages;
private final Instant startTime;
public boolean isComplete() {
return messages.size() >= expectedCount;
}
}
2. Time-Window Aggregator
// Aggregate messages within time windows
@Component
public class TimeWindowAggregator {
private final Map<String, TimeWindowState> windows = new ConcurrentHashMap<>();
private final Duration windowDuration;
@Scheduled(fixedDelay = 1000) // Check every second
public void processExpiredWindows() {
Instant now = Instant.now();
windows.entrySet().removeIf(entry -> {
TimeWindowState state = entry.getValue();
if (state.isExpired(now)) {
processAggregatedMessages(entry.getKey(), state);
return true;
}
return false;
});
}
public void addMessage(TimeWindowedMessage message) {
String windowKey = calculateWindowKey(message);
TimeWindowState state = windows.computeIfAbsent(windowKey,
k -> new TimeWindowState(calculateWindowStart(message.getTimestamp())));
state.addMessage(message);
// Check if window should be completed early (e.g., batch size reached)
if (state.shouldCompleteEarly()) {
processAggregatedMessages(windowKey, state);
windows.remove(windowKey);
}
}
private void processAggregatedMessages(String windowKey, TimeWindowState state) {
AggregatedTimeWindow result = AggregatedTimeWindow.builder()
.windowKey(windowKey)
.windowStart(state.getWindowStart())
.windowEnd(state.getWindowStart().plus(windowDuration))
.messageCount(state.getMessageCount())
.aggregatedMetrics(calculateMetrics(state.getMessages()))
.build();
// Send aggregated result for processing
publishAggregatedResult(result);
}
}
3. Content-Based Aggregator
// Aggregate based on message content and business rules
@Component
public class ContentBasedAggregator {
@Autowired
private CompletionRuleEngine completionRules;
private final Map<String, ContentAggregationState> aggregations = new ConcurrentHashMap<>();
public void processMessage(BusinessMessage message) {
String correlationKey = extractCorrelationKey(message);
ContentAggregationState state = aggregations.computeIfAbsent(correlationKey,
k -> new ContentAggregationState());
state.addMessage(message);
// Check completion rules
if (completionRules.isComplete(state)) {
BusinessTransaction transaction = buildTransaction(state);
publishTransaction(transaction);
aggregations.remove(correlationKey);
}
}
private BusinessTransaction buildTransaction(ContentAggregationState state) {
List<BusinessMessage> messages = state.getMessages();
return BusinessTransaction.builder()
.transactionId(generateTransactionId())
.totalAmount(calculateTotalAmount(messages))
.participants(extractParticipants(messages))
.transactionType(determineTransactionType(messages))
.completedAt(Instant.now())
.build();
}
}
@Service
public class CompletionRuleEngine {
public boolean isComplete(ContentAggregationState state) {
List<BusinessMessage> messages = state.getMessages();
// Rule 1: Must have at least one initiator message
boolean hasInitiator = messages.stream()
.anyMatch(m -> m.getType() == MessageType.TRANSACTION_INITIATE);
// Rule 2: All required participants must have responded
Set<String> requiredParticipants = extractRequiredParticipants(messages);
Set<String> respondedParticipants = extractRespondedParticipants(messages);
boolean allParticipantsResponded = respondedParticipants.containsAll(requiredParticipants);
// Rule 3: No pending approval messages
boolean noPendingApprovals = messages.stream()
.noneMatch(m -> m.getStatus() == MessageStatus.PENDING_APPROVAL);
return hasInitiator && allParticipantsResponded && noPendingApprovals;
}
}
4. Streaming Aggregator with State
// Aggregate streaming data with persistent state
@Component
public class StreamingAggregator {
@Autowired
private AggregationStateRepository stateRepository;
@EventListener
public void handleStreamEvent(StreamEvent event) {
String aggregationKey = event.getAggregationKey();
AggregationState state = stateRepository.findByKey(aggregationKey)
.orElse(new AggregationState(aggregationKey));
updateAggregationState(state, event);
stateRepository.save(state);
if (shouldEmitResult(state)) {
StreamAggregationResult result = buildAggregationResult(state);
publishResult(result);
// Reset or remove state based on aggregation type
if (state.isResetAfterEmit()) {
state.reset();
stateRepository.save(state);
} else {
stateRepository.delete(state);
}
}
}
private void updateAggregationState(AggregationState state, StreamEvent event) {
state.incrementCount();
state.updateSum(event.getValue());
state.updateMinMax(event.getValue());
state.addToWindow(event);
state.updateTimestamp(event.getTimestamp());
}
private boolean shouldEmitResult(AggregationState state) {
return state.getCount() >= state.getBatchSize() ||
state.hasExceededTimeWindow() ||
state.hasSpecialTriggerCondition();
}
}
Apache Camel Implementation
1. Simple Message Aggregator
@Component
public class SimpleAggregatorRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("direct:aggregateMessages")
.routeId("simple-aggregator")
.log("Received message for aggregation: ${body}")
.aggregate(header("correlationId"))
.completionSize(3) // Complete when 3 messages received
.completionTimeout(5000) // Or complete after 5 seconds
.strategy(new MessageAggregationStrategy())
.to("direct:processAggregatedResult");
from("direct:processAggregatedResult")
.log("Processing aggregated result: ${body}")
.process(exchange -> {
AggregatedMessages result = exchange.getIn().getBody(AggregatedMessages.class);
// Process the aggregated result
log.info("Processed {} messages in aggregation {}",
result.getMessages().size(), result.getCorrelationId());
});
}
}
public class MessageAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
// First message in aggregation
AggregatedMessages aggregated = new AggregatedMessages();
aggregated.setCorrelationId(newExchange.getIn().getHeader("correlationId", String.class));
aggregated.addMessage(newExchange.getIn().getBody());
newExchange.getIn().setBody(aggregated);
return newExchange;
}
// Add new message to existing aggregation
AggregatedMessages aggregated = oldExchange.getIn().getBody(AggregatedMessages.class);
aggregated.addMessage(newExchange.getIn().getBody());
oldExchange.getIn().setBody(aggregated);
return oldExchange;
}
}
2. Time-Window Aggregator
@Component
public class TimeWindowAggregatorRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("direct:timeWindowAggregate")
.routeId("time-window-aggregator")
.aggregate(header("windowKey"))
.completionInterval(60000) // Complete every 60 seconds
.strategy(new TimeWindowAggregationStrategy())
.to("direct:processTimeWindow");
from("direct:processTimeWindow")
.log("Processing time window: ${body}")
.process(exchange -> {
TimeWindowResult window = exchange.getIn().getBody(TimeWindowResult.class);
// Calculate window statistics
WindowStatistics stats = WindowStatistics.builder()
.windowStart(window.getWindowStart())
.windowEnd(window.getWindowEnd())
.messageCount(window.getMessages().size())
.averageValue(calculateAverage(window.getMessages()))
.totalValue(calculateTotal(window.getMessages()))
.build();
exchange.getIn().setBody(stats);
})
.to("direct:publishStatistics");
}
}
public class TimeWindowAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Instant now = Instant.now();
if (oldExchange == null) {
TimeWindowResult window = new TimeWindowResult();
window.setWindowStart(now);
window.addMessage(newExchange.getIn().getBody());
newExchange.getIn().setBody(window);
return newExchange;
}
TimeWindowResult window = oldExchange.getIn().getBody(TimeWindowResult.class);
window.addMessage(newExchange.getIn().getBody());
window.setLastUpdate(now);
oldExchange.getIn().setBody(window);
return oldExchange;
}
}
3. Conditional Completion Aggregator
@Component
public class ConditionalAggregatorRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("direct:conditionalAggregate")
.routeId("conditional-aggregator")
.aggregate(header("transactionId"))
.strategy(new BusinessTransactionAggregationStrategy())
.completionPredicate(new TransactionCompletionPredicate())
.completionTimeout(30000) // Fallback timeout
.to("direct:processCompletedTransaction");
from("direct:processCompletedTransaction")
.log("Processing completed transaction: ${body}")
.choice()
.when(method(TransactionValidator.class, "isValid"))
.to("direct:commitTransaction")
.otherwise()
.to("direct:rejectTransaction")
.end();
}
}
public class TransactionCompletionPredicate implements Predicate {
@Override
public boolean matches(Exchange exchange) {
BusinessTransaction transaction = exchange.getIn().getBody(BusinessTransaction.class);
// Check if all required steps are completed
boolean allStepsCompleted = transaction.getSteps().stream()
.allMatch(step -> step.getStatus() == StepStatus.COMPLETED);
// Check if all approvals are received
boolean allApprovalsReceived = transaction.getApprovals().size() >=
transaction.getRequiredApprovals();
// Check if no errors occurred
boolean noErrors = transaction.getErrors().isEmpty();
return allStepsCompleted && allApprovalsReceived && noErrors;
}
}
4. Scatter-Gather Aggregator
@Component
public class ScatterGatherAggregatorRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("direct:scatterGatherRequest")
.routeId("scatter-gather-aggregator")
.process(exchange -> {
// Set correlation ID for tracking
String correlationId = UUID.randomUUID().toString();
exchange.getIn().setHeader("correlationId", correlationId);
exchange.getIn().setHeader("expectedResponses", 3);
})
.multicast()
.parallelProcessing()
.to("direct:serviceA", "direct:serviceB", "direct:serviceC")
.end()
.aggregate(header("correlationId"))
.strategy(new ScatterGatherAggregationStrategy())
.completionSize(header("expectedResponses"))
.completionTimeout(10000) // 10 second timeout
.to("direct:processGatheredResponses");
// Individual service routes
from("direct:serviceA")
.log("Calling service A")
.process(new ServiceAProcessor())
.setHeader("serviceResponse", constant("serviceA"));
from("direct:serviceB")
.log("Calling service B")
.process(new ServiceBProcessor())
.setHeader("serviceResponse", constant("serviceB"));
from("direct:serviceC")
.log("Calling service C")
.process(new ServiceCProcessor())
.setHeader("serviceResponse", constant("serviceC"));
from("direct:processGatheredResponses")
.log("Processing gathered responses: ${body}")
.process(exchange -> {
GatheredResponses responses = exchange.getIn().getBody(GatheredResponses.class);
// Combine responses into final result
CombinedResult result = CombinedResult.builder()
.correlationId(responses.getCorrelationId())
.responseCount(responses.getResponses().size())
.combinedData(combineServiceData(responses.getResponses()))
.processingTime(calculateProcessingTime(responses))
.build();
exchange.getIn().setBody(result);
})
.to("direct:sendFinalResponse");
}
}
5. Persistent State Aggregator
@Component
public class PersistentAggregatorRoute extends RouteBuilder {
@Autowired
private AggregationRepository aggregationRepository;
@Override
public void configure() throws Exception {
from("direct:persistentAggregate")
.routeId("persistent-aggregator")
.aggregate(header("businessKey"))
.strategy(new PersistentAggregationStrategy())
.completionPredicate(new BusinessCompletionPredicate())
.aggregationRepository(aggregationRepository)
.completionTimeout(300000) // 5 minute timeout
.to("direct:processPersistentResult");
from("direct:processPersistentResult")
.log("Processing persistent aggregation result: ${body}")
.process(exchange -> {
PersistentAggregationResult result =
exchange.getIn().getBody(PersistentAggregationResult.class);
// Process the long-running aggregation result
log.info("Completed aggregation for key: {} with {} messages",
result.getBusinessKey(), result.getMessageCount());
})
.to("direct:persistResult");
}
}
@Configuration
public class AggregationConfiguration {
@Bean
public AggregationRepository aggregationRepository() {
// Use database-backed aggregation repository for persistence
return new JdbcAggregationRepository()
.setDataSource(dataSource())
.setRepositoryName("aggregation")
.setReturnOldExchange(false);
}
}
Best Practices
1. Memory and Resource Management
- Implement proper timeout mechanisms to prevent memory leaks from incomplete aggregations
- Use persistent storage for long-running aggregations that span system restarts
- Monitor aggregation state size and implement size limits
- Clean up expired aggregations regularly
- Consider using streaming aggregation for very large data sets
2. Correlation and Completion Strategy
- Design robust correlation keys that uniquely identify related messages
- Implement flexible completion criteria (count, timeout, content-based)
- Handle partial results gracefully when some expected messages don't arrive
- Use idempotent aggregation to handle duplicate messages
- Implement compensation logic for failed aggregations
3. Performance Optimization
- Use efficient data structures for aggregation state storage
- Consider parallel processing where aggregations are independent
- Implement batch processing for high-volume scenarios
- Use appropriate caching strategies for frequently accessed aggregation state
- Monitor and optimize aggregation completion criteria
4. Error Handling and Recovery
- Implement dead letter queues for messages that can't be aggregated
- Provide detailed logging for aggregation lifecycle events
- Handle aggregation timeouts gracefully with partial results
- Implement retry mechanisms for transient failures
- Ensure aggregation state consistency across failures
5. Monitoring and Observability
- Track aggregation metrics (completion rate, average aggregation time, etc.)
- Monitor memory usage and resource consumption
- Implement distributed tracing through aggregation boundaries
- Alert on unusual aggregation patterns or failures
- Provide visibility into aggregation state and progress
The Aggregator pattern is essential for building sophisticated integration solutions that coordinate distributed operations and provide unified views of data from multiple sources while managing the complexity of temporal coordination and state management.
← Back to All Patterns