Splitter Pattern
Overview
The Splitter pattern is a fundamental decomposition pattern in enterprise integration architectures that divides a single composite message into multiple individual messages for separate processing. Like a prism that separates white light into its constituent colors, the Splitter pattern takes complex, compound messages and breaks them into smaller, manageable pieces that can be processed independently, in parallel, or through different processing paths based on their individual characteristics.
Theoretical Foundation
The Splitter pattern is grounded in divide-and-conquer algorithms and parallel processing theory, specifically addressing the challenge of message decomposition and workload distribution in distributed systems. The pattern embodies the principle of "granular processing" - breaking down large or complex operations into smaller, more manageable units that can be processed efficiently and reliably.
Core Principles
1. Message Decomposition
The Splitter analyzes composite messages and intelligently separates them into constituent parts: - Structural splitting - dividing messages based on data structure (arrays, collections, nested objects) - Logical splitting - separating messages based on business logic or processing requirements - Size-based splitting - breaking large messages into smaller chunks for efficient processing - Content-based splitting - dividing messages based on content characteristics or data types
2. Context Preservation
During splitting, the pattern maintains important contextual information: - Correlation tracking - preserving correlation IDs to track related split messages - Sequence ordering - maintaining order information when sequence matters - Parent message metadata - preserving headers, properties, and context from the original message - Processing instructions - carrying forward processing directives and routing information
3. Independent Processing Enablement
Split messages are designed for independent processing: - Atomic operations - each split message can be processed as a complete unit - Parallel processing - split messages can be processed concurrently - Isolated failures - failure of one split message doesn't affect others - Individual routing - each split message can follow different processing paths
4. Scalability and Performance Optimization
The pattern enhances system scalability through: - Load distribution - spreading processing load across multiple workers or services - Resource utilization - better utilization of available processing capacity - Throughput improvement - processing multiple smaller messages faster than one large message - Memory management - reducing memory pressure by processing smaller message chunks
Why Splitters are Essential in Integration Architecture
1. Batch Processing and Data Integration
In data processing scenarios, splitters enable: - File processing - breaking large files into processable records or chunks - Bulk operations - dividing bulk updates into individual operations - Data migration - splitting large datasets for incremental migration - ETL optimization - processing data in parallel streams for better performance
2. Microservices Architecture Support
In microservices environments, splitters facilitate: - Service granularity - matching message size to service processing capabilities - Independent scaling - allowing services to scale based on individual message processing rates - Fault isolation - isolating failures to individual message processing - Service specialization - routing different message types to specialized services
3. Event-Driven and Streaming Architecture
For event processing and streaming systems: - Event stream decomposition - breaking compound events into individual event streams - Parallel event processing - processing multiple events concurrently - Event routing - sending different event types to appropriate processors - Stream partitioning - distributing events across processing partitions
4. Legacy System Integration
When integrating with legacy systems: - Message size adaptation - splitting large messages to match legacy system limits - Protocol limitations - accommodating size restrictions in older protocols - Processing capacity - matching message size to legacy system processing capabilities - Resource constraints - working within memory and processing limitations
Benefits in Integration Contexts
1. Performance Enhancement
- Parallel processing - simultaneous processing of split messages improves throughput
- Resource optimization - better utilization of multi-core systems and distributed resources
- Memory efficiency - processing smaller messages reduces memory requirements
- Response time improvement - faster processing of individual components
2. Scalability and Elasticity
- Horizontal scaling - split messages can be distributed across multiple processing instances
- Load balancing - even distribution of processing load
- Auto-scaling support - better matching of resource allocation to processing demand
- Capacity planning - easier to plan capacity for smaller, uniform message sizes
3. Fault Tolerance and Reliability
- Failure isolation - failure in processing one split message doesn't affect others
- Partial success handling - some split messages can succeed even if others fail
- Retry granularity - individual failed messages can be retried without reprocessing successful ones
- Error analysis - easier to identify and analyze specific failure causes
4. Processing Flexibility
- Different processing paths - split messages can follow different business logic
- Conditional processing - some split messages may require different validation or transformation
- Priority handling - high-priority split messages can be processed first
- Routing optimization - split messages can be routed to most appropriate processors
Integration Architecture Applications
1. API Gateway and Request Processing
Splitters in API gateways enable: - Bulk request decomposition - breaking bulk API requests into individual operations - Batch operation processing - processing array parameters as separate operations - Multi-resource operations - splitting requests that affect multiple resources - Parallel backend calls - splitting client requests into multiple backend service calls
2. Message Queue and Event Processing
In messaging systems: - Queue optimization - splitting large messages for better queue performance - Consumer distribution - distributing split messages across multiple consumers - Event decomposition - breaking complex business events into atomic events - Dead letter handling - isolating problematic parts of composite messages
3. Data Pipeline and ETL
In data processing workflows: - File processing - splitting large files into manageable chunks - Record-level processing - converting batch operations to record-level operations - Parallel data transformation - processing data elements in parallel streams - Data validation - validating individual data elements independently
4. Workflow and Process Orchestration
For business process management: - Workflow step decomposition - breaking complex workflows into individual steps - Task distribution - distributing work across multiple workers or services - Parallel approval processes - splitting approval requests for concurrent processing - Activity monitoring - tracking progress of individual workflow components
Implementation Patterns
1. Array/Collection Splitter
// Split arrays and collections into individual elements
@Component
public class CollectionSplitter {
public List<IndividualMessage> splitCollection(CollectionMessage collectionMessage) {
List<IndividualMessage> splitMessages = new ArrayList<>();
String correlationId = collectionMessage.getCorrelationId();
List<Object> items = collectionMessage.getItems();
for (int i = 0; i < items.size(); i++) {
IndividualMessage splitMessage = IndividualMessage.builder()
.correlationId(correlationId)
.sequenceNumber(i + 1)
.totalCount(items.size())
.payload(items.get(i))
.originalMessageId(collectionMessage.getMessageId())
.timestamp(Instant.now())
.build();
splitMessages.add(splitMessage);
}
return splitMessages;
}
}
// Usage in service
@Service
public class OrderProcessingService {
@Autowired
private CollectionSplitter collectionSplitter;
public void processOrderBatch(OrderBatch orderBatch) {
List<IndividualMessage> orderMessages = collectionSplitter.splitCollection(
CollectionMessage.builder()
.correlationId(orderBatch.getBatchId())
.items(orderBatch.getOrders())
.messageId(orderBatch.getMessageId())
.build()
);
// Process each order individually
orderMessages.forEach(this::processIndividualOrder);
}
}
2. Size-Based Content Splitter
// Split large messages into size-based chunks
@Component
public class SizeBasedSplitter {
private static final int MAX_CHUNK_SIZE = 64 * 1024; // 64KB chunks
public List<MessageChunk> splitBySize(LargeMessage largeMessage) {
List<MessageChunk> chunks = new ArrayList<>();
byte[] content = largeMessage.getContent();
String correlationId = largeMessage.getCorrelationId();
int totalChunks = (int) Math.ceil((double) content.length / MAX_CHUNK_SIZE);
for (int i = 0; i < totalChunks; i++) {
int startIndex = i * MAX_CHUNK_SIZE;
int endIndex = Math.min(startIndex + MAX_CHUNK_SIZE, content.length);
byte[] chunkContent = Arrays.copyOfRange(content, startIndex, endIndex);
MessageChunk chunk = MessageChunk.builder()
.correlationId(correlationId)
.chunkNumber(i + 1)
.totalChunks(totalChunks)
.chunkSize(chunkContent.length)
.content(chunkContent)
.originalMessageId(largeMessage.getMessageId())
.checksum(calculateChecksum(chunkContent))
.build();
chunks.add(chunk);
}
return chunks;
}
private String calculateChecksum(byte[] content) {
try {
MessageDigest md = MessageDigest.getInstance("MD5");
byte[] digest = md.digest(content);
return Base64.getEncoder().encodeToString(digest);
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("MD5 algorithm not available", e);
}
}
}
3. Content-Based Logical Splitter
// Split messages based on content type and business logic
@Component
public class ContentBasedSplitter {
public List<TypedMessage> splitByContent(CompositeBusinessMessage compositeMessage) {
List<TypedMessage> splitMessages = new ArrayList<>();
String correlationId = compositeMessage.getCorrelationId();
// Split customer-related data
if (compositeMessage.getCustomerData() != null) {
TypedMessage customerMessage = TypedMessage.builder()
.correlationId(correlationId)
.messageType(MessageType.CUSTOMER_UPDATE)
.payload(compositeMessage.getCustomerData())
.processingPriority(Priority.HIGH)
.targetSystem("CRM")
.build();
splitMessages.add(customerMessage);
}
// Split order-related data
if (compositeMessage.getOrderData() != null) {
TypedMessage orderMessage = TypedMessage.builder()
.correlationId(correlationId)
.messageType(MessageType.ORDER_PROCESSING)
.payload(compositeMessage.getOrderData())
.processingPriority(Priority.MEDIUM)
.targetSystem("ORDER_MANAGEMENT")
.build();
splitMessages.add(orderMessage);
}
// Split financial data
if (compositeMessage.getFinancialData() != null) {
TypedMessage financialMessage = TypedMessage.builder()
.correlationId(correlationId)
.messageType(MessageType.FINANCIAL_TRANSACTION)
.payload(compositeMessage.getFinancialData())
.processingPriority(Priority.HIGH)
.targetSystem("FINANCIAL_SYSTEM")
.requiresApproval(true)
.build();
splitMessages.add(financialMessage);
}
return splitMessages;
}
}
4. Streaming File Splitter
// Split large files into streaming chunks for memory-efficient processing
@Component
public class StreamingFileSplitter {
private static final int BUFFER_SIZE = 8192;
private static final long MAX_CHUNK_SIZE = 1024 * 1024; // 1MB chunks
@Async
public CompletableFuture<Void> splitFileAsync(FileMessage fileMessage,
Consumer<FileChunk> chunkProcessor) {
return CompletableFuture.runAsync(() -> {
try (InputStream inputStream = fileMessage.getInputStream()) {
splitFileStream(inputStream, fileMessage.getCorrelationId(), chunkProcessor);
} catch (IOException e) {
throw new RuntimeException("Error processing file stream", e);
}
});
}
private void splitFileStream(InputStream inputStream, String correlationId,
Consumer<FileChunk> chunkProcessor) throws IOException {
byte[] buffer = new byte[BUFFER_SIZE];
ByteArrayOutputStream chunkBuffer = new ByteArrayOutputStream();
int chunkNumber = 1;
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
chunkBuffer.write(buffer, 0, bytesRead);
if (chunkBuffer.size() >= MAX_CHUNK_SIZE) {
processChunk(chunkBuffer, correlationId, chunkNumber++, chunkProcessor);
chunkBuffer.reset();
}
}
// Process final chunk if any data remains
if (chunkBuffer.size() > 0) {
processChunk(chunkBuffer, correlationId, chunkNumber, chunkProcessor);
}
}
private void processChunk(ByteArrayOutputStream chunkBuffer, String correlationId,
int chunkNumber, Consumer<FileChunk> chunkProcessor) {
FileChunk chunk = FileChunk.builder()
.correlationId(correlationId)
.chunkNumber(chunkNumber)
.chunkSize(chunkBuffer.size())
.content(chunkBuffer.toByteArray())
.timestamp(Instant.now())
.build();
chunkProcessor.accept(chunk);
}
}
Apache Camel Implementation
1. Simple Array Splitter
@Component
public class ArraySplitterRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("direct:splitArray")
.routeId("array-splitter")
.log("Splitting array message: ${body}")
.split(jsonpath("$.items[*]")) // Split JSON array
.streaming() // Use streaming for large arrays
.parallelProcessing() // Process split messages in parallel
.to("direct:processIndividualItem")
.end()
.log("Array splitting completed");
from("direct:processIndividualItem")
.log("Processing individual item: ${body}")
.process(exchange -> {
// Add correlation information
String correlationId = exchange.getIn().getHeader("correlationId", String.class);
Integer splitIndex = exchange.getProperty(Exchange.SPLIT_INDEX, Integer.class);
Integer splitSize = exchange.getProperty(Exchange.SPLIT_SIZE, Integer.class);
exchange.getIn().setHeader("originalCorrelationId", correlationId);
exchange.getIn().setHeader("splitIndex", splitIndex);
exchange.getIn().setHeader("splitTotal", splitSize);
})
.to("direct:businessProcessing");
}
}
2. File Content Splitter
@Component
public class FileSplitterRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("file:input?include=*.csv")
.routeId("file-content-splitter")
.log("Processing file: ${header.CamelFileName}")
.split(body().tokenize("\n"))
.streaming()
.skipFirst(1) // Skip header row
.process(exchange -> {
// Add file context to each split line
String fileName = exchange.getIn().getHeader("CamelFileName", String.class);
Integer lineNumber = exchange.getProperty(Exchange.SPLIT_INDEX, Integer.class);
exchange.getIn().setHeader("sourceFileName", fileName);
exchange.getIn().setHeader("lineNumber", lineNumber + 1); // +1 because we skipped header
})
.to("direct:processCSVLine")
.end()
.log("File processing completed: ${header.CamelFileName}");
from("direct:processCSVLine")
.log("Processing CSV line ${header.lineNumber} from ${header.sourceFileName}")
.process(exchange -> {
String csvLine = exchange.getIn().getBody(String.class);
String[] fields = csvLine.split(",");
// Convert CSV line to structured data
CSVRecord record = CSVRecord.builder()
.sourceFile(exchange.getIn().getHeader("sourceFileName", String.class))
.lineNumber(exchange.getIn().getHeader("lineNumber", Integer.class))
.fields(Arrays.asList(fields))
.build();
exchange.getIn().setBody(record);
})
.to("direct:validateAndProcess");
}
}
3. Size-Based Chunk Splitter
@Component
public class ChunkSplitterRoute extends RouteBuilder {
private static final int CHUNK_SIZE = 1000; // Process in chunks of 1000 items
@Override
public void configure() throws Exception {
from("direct:splitLargeDataset")
.routeId("chunk-splitter")
.log("Splitting large dataset into chunks")
.process(exchange -> {
List<?> largeDataset = exchange.getIn().getBody(List.class);
String correlationId = UUID.randomUUID().toString();
exchange.getIn().setHeader("correlationId", correlationId);
exchange.getIn().setHeader("totalItems", largeDataset.size());
})
.split(method(ChunkSplitterBean.class, "createChunks"))
.process(exchange -> {
DataChunk chunk = exchange.getIn().getBody(DataChunk.class);
exchange.getIn().setHeader("chunkNumber", chunk.getChunkNumber());
exchange.getIn().setHeader("chunkSize", chunk.getItems().size());
})
.to("direct:processChunk")
.end()
.log("Large dataset processing completed");
from("direct:processChunk")
.log("Processing chunk ${header.chunkNumber} with ${header.chunkSize} items")
.process(exchange -> {
DataChunk chunk = exchange.getIn().getBody(DataChunk.class);
// Process each item in the chunk
List<ProcessedItem> processedItems = chunk.getItems().stream()
.map(this::processItem)
.collect(Collectors.toList());
ChunkResult result = ChunkResult.builder()
.correlationId(chunk.getCorrelationId())
.chunkNumber(chunk.getChunkNumber())
.processedItems(processedItems)
.processingTime(Duration.between(chunk.getStartTime(), Instant.now()))
.build();
exchange.getIn().setBody(result);
})
.to("direct:aggregateChunkResults");
}
}
@Component
public class ChunkSplitterBean {
public List<DataChunk> createChunks(List<?> dataset, @Header("correlationId") String correlationId) {
List<DataChunk> chunks = new ArrayList<>();
for (int i = 0; i < dataset.size(); i += CHUNK_SIZE) {
int endIndex = Math.min(i + CHUNK_SIZE, dataset.size());
List<?> chunkItems = dataset.subList(i, endIndex);
DataChunk chunk = DataChunk.builder()
.correlationId(correlationId)
.chunkNumber((i / CHUNK_SIZE) + 1)
.totalChunks((int) Math.ceil((double) dataset.size() / CHUNK_SIZE))
.items(chunkItems)
.startTime(Instant.now())
.build();
chunks.add(chunk);
}
return chunks;
}
}
4. Conditional Content Splitter
@Component
public class ConditionalSplitterRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("direct:conditionalSplit")
.routeId("conditional-splitter")
.log("Analyzing message for conditional splitting")
.choice()
.when(jsonpath("$.messageType == 'COMPOSITE'"))
.to("direct:splitCompositeMessage")
.when(jsonpath("$.messageType == 'BATCH'"))
.to("direct:splitBatchMessage")
.when(jsonpath("$.messageType == 'HIERARCHICAL'"))
.to("direct:splitHierarchicalMessage")
.otherwise()
.log("Message doesn't require splitting")
.to("direct:processAtomic")
.end();
from("direct:splitCompositeMessage")
.log("Splitting composite message")
.process(exchange -> {
CompositeMessage composite = exchange.getIn().getBody(CompositeMessage.class);
List<AtomicMessage> atomicMessages = new ArrayList<>();
// Split customer data
if (composite.getCustomerData() != null) {
AtomicMessage customerMsg = createAtomicMessage(
composite, "CUSTOMER", composite.getCustomerData());
atomicMessages.add(customerMsg);
}
// Split order data
if (composite.getOrderData() != null) {
AtomicMessage orderMsg = createAtomicMessage(
composite, "ORDER", composite.getOrderData());
atomicMessages.add(orderMsg);
}
exchange.getIn().setBody(atomicMessages);
})
.split(body())
.to("direct:processAtomicMessage")
.end();
from("direct:processAtomicMessage")
.log("Processing atomic message: ${header.messageSubType}")
.choice()
.when(header("messageSubType").isEqualTo("CUSTOMER"))
.to("direct:processCustomer")
.when(header("messageSubType").isEqualTo("ORDER"))
.to("direct:processOrder")
.otherwise()
.to("direct:processGeneric")
.end();
}
private AtomicMessage createAtomicMessage(CompositeMessage composite,
String subType, Object data) {
return AtomicMessage.builder()
.correlationId(composite.getCorrelationId())
.messageSubType(subType)
.payload(data)
.originalMessageId(composite.getMessageId())
.timestamp(Instant.now())
.build();
}
}
5. Streaming XML Splitter
@Component
public class XMLStreamingSplitterRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("direct:splitXMLStream")
.routeId("xml-streaming-splitter")
.log("Starting XML streaming split")
.split(stax(Contact.class))
.streaming()
.process(exchange -> {
Contact contact = exchange.getIn().getBody(Contact.class);
// Add processing context
Integer splitIndex = exchange.getProperty(Exchange.SPLIT_INDEX, Integer.class);
exchange.getIn().setHeader("contactSequence", splitIndex + 1);
exchange.getIn().setHeader("contactId", contact.getId());
})
.to("direct:processContact")
.end()
.log("XML streaming split completed");
from("direct:processContact")
.log("Processing contact ${header.contactId} (sequence ${header.contactSequence})")
.marshal().jacksonXml() // Convert to JSON for further processing
.to("direct:validateContact");
}
}
// XML Contact class for StAX processing
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class Contact {
@XmlElement
private String id;
@XmlElement
private String firstName;
@XmlElement
private String lastName;
@XmlElement
private String email;
// getters and setters
}
Best Practices
1. Memory Management and Performance
- Use streaming splits for large datasets to avoid memory exhaustion
- Implement appropriate chunk sizes based on available memory and processing capacity
- Consider parallel processing for independent split messages
- Monitor memory usage during split operations
- Use backpressure mechanisms when split message production exceeds consumption
2. Correlation and Tracking
- Preserve correlation IDs from original messages in all split messages
- Include sequence information (split index, total count) for proper ordering
- Maintain parent message metadata for traceability and debugging
- Implement proper correlation strategies for downstream aggregation
- Use unique identifiers for each split message
3. Error Handling and Recovery
- Implement individual error handling for each split message
- Provide dead letter queues for split messages that can't be processed
- Consider partial success scenarios where some splits succeed and others fail
- Implement retry mechanisms at the split message level
- Maintain error correlation back to original composite messages
4. Processing Strategy
- Choose appropriate splitting strategies based on message characteristics
- Consider downstream processing capabilities when determining split size
- Implement conditional splitting based on message content and business rules
- Use appropriate splitting patterns (sequential vs. parallel) based on requirements
- Balance splitting granularity with processing overhead
5. Monitoring and Observability
- Track splitting metrics (split count, processing time, success rate)
- Monitor split message processing progress and completion
- Implement distributed tracing through split message boundaries
- Alert on unusual splitting patterns or failures
- Provide visibility into split message lifecycle and status
The Splitter pattern is crucial for building scalable, high-performance integration solutions that can handle large, complex messages efficiently while enabling parallel processing and improving overall system throughput and resilience.
← Back to All Patterns