This site is in English. Use your browser's built-in translate feature to read it in your language.

Splitter Pattern

Overview

The Splitter pattern is a fundamental decomposition pattern in enterprise integration architectures that divides a single composite message into multiple individual messages for separate processing. Like a prism that separates white light into its constituent colors, the Splitter pattern takes complex, compound messages and breaks them into smaller, manageable pieces that can be processed independently, in parallel, or through different processing paths based on their individual characteristics.

Theoretical Foundation

The Splitter pattern is grounded in divide-and-conquer algorithms and parallel processing theory, specifically addressing the challenge of message decomposition and workload distribution in distributed systems. The pattern embodies the principle of "granular processing" - breaking down large or complex operations into smaller, more manageable units that can be processed efficiently and reliably.

Core Principles

1. Message Decomposition

The Splitter analyzes composite messages and intelligently separates them into constituent parts: - Structural splitting - dividing messages based on data structure (arrays, collections, nested objects) - Logical splitting - separating messages based on business logic or processing requirements - Size-based splitting - breaking large messages into smaller chunks for efficient processing - Content-based splitting - dividing messages based on content characteristics or data types

2. Context Preservation

During splitting, the pattern maintains important contextual information: - Correlation tracking - preserving correlation IDs to track related split messages - Sequence ordering - maintaining order information when sequence matters - Parent message metadata - preserving headers, properties, and context from the original message - Processing instructions - carrying forward processing directives and routing information

3. Independent Processing Enablement

Split messages are designed for independent processing: - Atomic operations - each split message can be processed as a complete unit - Parallel processing - split messages can be processed concurrently - Isolated failures - failure of one split message doesn't affect others - Individual routing - each split message can follow different processing paths

4. Scalability and Performance Optimization

The pattern enhances system scalability through: - Load distribution - spreading processing load across multiple workers or services - Resource utilization - better utilization of available processing capacity - Throughput improvement - processing multiple smaller messages faster than one large message - Memory management - reducing memory pressure by processing smaller message chunks

Why Splitters are Essential in Integration Architecture

1. Batch Processing and Data Integration

In data processing scenarios, splitters enable: - File processing - breaking large files into processable records or chunks - Bulk operations - dividing bulk updates into individual operations - Data migration - splitting large datasets for incremental migration - ETL optimization - processing data in parallel streams for better performance

2. Microservices Architecture Support

In microservices environments, splitters facilitate: - Service granularity - matching message size to service processing capabilities - Independent scaling - allowing services to scale based on individual message processing rates - Fault isolation - isolating failures to individual message processing - Service specialization - routing different message types to specialized services

3. Event-Driven and Streaming Architecture

For event processing and streaming systems: - Event stream decomposition - breaking compound events into individual event streams - Parallel event processing - processing multiple events concurrently - Event routing - sending different event types to appropriate processors - Stream partitioning - distributing events across processing partitions

4. Legacy System Integration

When integrating with legacy systems: - Message size adaptation - splitting large messages to match legacy system limits - Protocol limitations - accommodating size restrictions in older protocols - Processing capacity - matching message size to legacy system processing capabilities - Resource constraints - working within memory and processing limitations

Benefits in Integration Contexts

1. Performance Enhancement

2. Scalability and Elasticity

3. Fault Tolerance and Reliability

4. Processing Flexibility

Integration Architecture Applications

1. API Gateway and Request Processing

Splitters in API gateways enable: - Bulk request decomposition - breaking bulk API requests into individual operations - Batch operation processing - processing array parameters as separate operations - Multi-resource operations - splitting requests that affect multiple resources - Parallel backend calls - splitting client requests into multiple backend service calls

2. Message Queue and Event Processing

In messaging systems: - Queue optimization - splitting large messages for better queue performance - Consumer distribution - distributing split messages across multiple consumers - Event decomposition - breaking complex business events into atomic events - Dead letter handling - isolating problematic parts of composite messages

3. Data Pipeline and ETL

In data processing workflows: - File processing - splitting large files into manageable chunks - Record-level processing - converting batch operations to record-level operations - Parallel data transformation - processing data elements in parallel streams - Data validation - validating individual data elements independently

4. Workflow and Process Orchestration

For business process management: - Workflow step decomposition - breaking complex workflows into individual steps - Task distribution - distributing work across multiple workers or services - Parallel approval processes - splitting approval requests for concurrent processing - Activity monitoring - tracking progress of individual workflow components

Implementation Patterns

1. Array/Collection Splitter

// Split arrays and collections into individual elements
@Component
public class CollectionSplitter {

    public List<IndividualMessage> splitCollection(CollectionMessage collectionMessage) {
        List<IndividualMessage> splitMessages = new ArrayList<>();
        String correlationId = collectionMessage.getCorrelationId();

        List<Object> items = collectionMessage.getItems();
        for (int i = 0; i < items.size(); i++) {
            IndividualMessage splitMessage = IndividualMessage.builder()
                .correlationId(correlationId)
                .sequenceNumber(i + 1)
                .totalCount(items.size())
                .payload(items.get(i))
                .originalMessageId(collectionMessage.getMessageId())
                .timestamp(Instant.now())
                .build();

            splitMessages.add(splitMessage);
        }

        return splitMessages;
    }
}

// Usage in service
@Service
public class OrderProcessingService {

    @Autowired
    private CollectionSplitter collectionSplitter;

    public void processOrderBatch(OrderBatch orderBatch) {
        List<IndividualMessage> orderMessages = collectionSplitter.splitCollection(
            CollectionMessage.builder()
                .correlationId(orderBatch.getBatchId())
                .items(orderBatch.getOrders())
                .messageId(orderBatch.getMessageId())
                .build()
        );

        // Process each order individually
        orderMessages.forEach(this::processIndividualOrder);
    }
}

2. Size-Based Content Splitter

// Split large messages into size-based chunks
@Component
public class SizeBasedSplitter {

    private static final int MAX_CHUNK_SIZE = 64 * 1024; // 64KB chunks

    public List<MessageChunk> splitBySize(LargeMessage largeMessage) {
        List<MessageChunk> chunks = new ArrayList<>();
        byte[] content = largeMessage.getContent();
        String correlationId = largeMessage.getCorrelationId();

        int totalChunks = (int) Math.ceil((double) content.length / MAX_CHUNK_SIZE);

        for (int i = 0; i < totalChunks; i++) {
            int startIndex = i * MAX_CHUNK_SIZE;
            int endIndex = Math.min(startIndex + MAX_CHUNK_SIZE, content.length);

            byte[] chunkContent = Arrays.copyOfRange(content, startIndex, endIndex);

            MessageChunk chunk = MessageChunk.builder()
                .correlationId(correlationId)
                .chunkNumber(i + 1)
                .totalChunks(totalChunks)
                .chunkSize(chunkContent.length)
                .content(chunkContent)
                .originalMessageId(largeMessage.getMessageId())
                .checksum(calculateChecksum(chunkContent))
                .build();

            chunks.add(chunk);
        }

        return chunks;
    }

    private String calculateChecksum(byte[] content) {
        try {
            MessageDigest md = MessageDigest.getInstance("MD5");
            byte[] digest = md.digest(content);
            return Base64.getEncoder().encodeToString(digest);
        } catch (NoSuchAlgorithmException e) {
            throw new RuntimeException("MD5 algorithm not available", e);
        }
    }
}

3. Content-Based Logical Splitter

// Split messages based on content type and business logic
@Component
public class ContentBasedSplitter {

    public List<TypedMessage> splitByContent(CompositeBusinessMessage compositeMessage) {
        List<TypedMessage> splitMessages = new ArrayList<>();
        String correlationId = compositeMessage.getCorrelationId();

        // Split customer-related data
        if (compositeMessage.getCustomerData() != null) {
            TypedMessage customerMessage = TypedMessage.builder()
                .correlationId(correlationId)
                .messageType(MessageType.CUSTOMER_UPDATE)
                .payload(compositeMessage.getCustomerData())
                .processingPriority(Priority.HIGH)
                .targetSystem("CRM")
                .build();
            splitMessages.add(customerMessage);
        }

        // Split order-related data
        if (compositeMessage.getOrderData() != null) {
            TypedMessage orderMessage = TypedMessage.builder()
                .correlationId(correlationId)
                .messageType(MessageType.ORDER_PROCESSING)
                .payload(compositeMessage.getOrderData())
                .processingPriority(Priority.MEDIUM)
                .targetSystem("ORDER_MANAGEMENT")
                .build();
            splitMessages.add(orderMessage);
        }

        // Split financial data
        if (compositeMessage.getFinancialData() != null) {
            TypedMessage financialMessage = TypedMessage.builder()
                .correlationId(correlationId)
                .messageType(MessageType.FINANCIAL_TRANSACTION)
                .payload(compositeMessage.getFinancialData())
                .processingPriority(Priority.HIGH)
                .targetSystem("FINANCIAL_SYSTEM")
                .requiresApproval(true)
                .build();
            splitMessages.add(financialMessage);
        }

        return splitMessages;
    }
}

4. Streaming File Splitter

// Split large files into streaming chunks for memory-efficient processing
@Component
public class StreamingFileSplitter {

    private static final int BUFFER_SIZE = 8192;
    private static final long MAX_CHUNK_SIZE = 1024 * 1024; // 1MB chunks

    @Async
    public CompletableFuture<Void> splitFileAsync(FileMessage fileMessage, 
                                                  Consumer<FileChunk> chunkProcessor) {
        return CompletableFuture.runAsync(() -> {
            try (InputStream inputStream = fileMessage.getInputStream()) {
                splitFileStream(inputStream, fileMessage.getCorrelationId(), chunkProcessor);
            } catch (IOException e) {
                throw new RuntimeException("Error processing file stream", e);
            }
        });
    }

    private void splitFileStream(InputStream inputStream, String correlationId, 
                                Consumer<FileChunk> chunkProcessor) throws IOException {

        byte[] buffer = new byte[BUFFER_SIZE];
        ByteArrayOutputStream chunkBuffer = new ByteArrayOutputStream();
        int chunkNumber = 1;
        int bytesRead;

        while ((bytesRead = inputStream.read(buffer)) != -1) {
            chunkBuffer.write(buffer, 0, bytesRead);

            if (chunkBuffer.size() >= MAX_CHUNK_SIZE) {
                processChunk(chunkBuffer, correlationId, chunkNumber++, chunkProcessor);
                chunkBuffer.reset();
            }
        }

        // Process final chunk if any data remains
        if (chunkBuffer.size() > 0) {
            processChunk(chunkBuffer, correlationId, chunkNumber, chunkProcessor);
        }
    }

    private void processChunk(ByteArrayOutputStream chunkBuffer, String correlationId, 
                             int chunkNumber, Consumer<FileChunk> chunkProcessor) {

        FileChunk chunk = FileChunk.builder()
            .correlationId(correlationId)
            .chunkNumber(chunkNumber)
            .chunkSize(chunkBuffer.size())
            .content(chunkBuffer.toByteArray())
            .timestamp(Instant.now())
            .build();

        chunkProcessor.accept(chunk);
    }
}

Apache Camel Implementation

1. Simple Array Splitter

@Component
public class ArraySplitterRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("direct:splitArray")
            .routeId("array-splitter")
            .log("Splitting array message: ${body}")
            .split(jsonpath("$.items[*]")) // Split JSON array
                .streaming() // Use streaming for large arrays
                .parallelProcessing() // Process split messages in parallel
                .to("direct:processIndividualItem")
            .end()
            .log("Array splitting completed");

        from("direct:processIndividualItem")
            .log("Processing individual item: ${body}")
            .process(exchange -> {
                // Add correlation information
                String correlationId = exchange.getIn().getHeader("correlationId", String.class);
                Integer splitIndex = exchange.getProperty(Exchange.SPLIT_INDEX, Integer.class);
                Integer splitSize = exchange.getProperty(Exchange.SPLIT_SIZE, Integer.class);

                exchange.getIn().setHeader("originalCorrelationId", correlationId);
                exchange.getIn().setHeader("splitIndex", splitIndex);
                exchange.getIn().setHeader("splitTotal", splitSize);
            })
            .to("direct:businessProcessing");
    }
}

2. File Content Splitter

@Component
public class FileSplitterRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("file:input?include=*.csv")
            .routeId("file-content-splitter")
            .log("Processing file: ${header.CamelFileName}")
            .split(body().tokenize("\n"))
                .streaming()
                .skipFirst(1) // Skip header row
                .process(exchange -> {
                    // Add file context to each split line
                    String fileName = exchange.getIn().getHeader("CamelFileName", String.class);
                    Integer lineNumber = exchange.getProperty(Exchange.SPLIT_INDEX, Integer.class);

                    exchange.getIn().setHeader("sourceFileName", fileName);
                    exchange.getIn().setHeader("lineNumber", lineNumber + 1); // +1 because we skipped header
                })
                .to("direct:processCSVLine")
            .end()
            .log("File processing completed: ${header.CamelFileName}");

        from("direct:processCSVLine")
            .log("Processing CSV line ${header.lineNumber} from ${header.sourceFileName}")
            .process(exchange -> {
                String csvLine = exchange.getIn().getBody(String.class);
                String[] fields = csvLine.split(",");

                // Convert CSV line to structured data
                CSVRecord record = CSVRecord.builder()
                    .sourceFile(exchange.getIn().getHeader("sourceFileName", String.class))
                    .lineNumber(exchange.getIn().getHeader("lineNumber", Integer.class))
                    .fields(Arrays.asList(fields))
                    .build();

                exchange.getIn().setBody(record);
            })
            .to("direct:validateAndProcess");
    }
}

3. Size-Based Chunk Splitter

@Component
public class ChunkSplitterRoute extends RouteBuilder {

    private static final int CHUNK_SIZE = 1000; // Process in chunks of 1000 items

    @Override
    public void configure() throws Exception {
        from("direct:splitLargeDataset")
            .routeId("chunk-splitter")
            .log("Splitting large dataset into chunks")
            .process(exchange -> {
                List<?> largeDataset = exchange.getIn().getBody(List.class);
                String correlationId = UUID.randomUUID().toString();
                exchange.getIn().setHeader("correlationId", correlationId);
                exchange.getIn().setHeader("totalItems", largeDataset.size());
            })
            .split(method(ChunkSplitterBean.class, "createChunks"))
                .process(exchange -> {
                    DataChunk chunk = exchange.getIn().getBody(DataChunk.class);
                    exchange.getIn().setHeader("chunkNumber", chunk.getChunkNumber());
                    exchange.getIn().setHeader("chunkSize", chunk.getItems().size());
                })
                .to("direct:processChunk")
            .end()
            .log("Large dataset processing completed");

        from("direct:processChunk")
            .log("Processing chunk ${header.chunkNumber} with ${header.chunkSize} items")
            .process(exchange -> {
                DataChunk chunk = exchange.getIn().getBody(DataChunk.class);

                // Process each item in the chunk
                List<ProcessedItem> processedItems = chunk.getItems().stream()
                    .map(this::processItem)
                    .collect(Collectors.toList());

                ChunkResult result = ChunkResult.builder()
                    .correlationId(chunk.getCorrelationId())
                    .chunkNumber(chunk.getChunkNumber())
                    .processedItems(processedItems)
                    .processingTime(Duration.between(chunk.getStartTime(), Instant.now()))
                    .build();

                exchange.getIn().setBody(result);
            })
            .to("direct:aggregateChunkResults");
    }
}

@Component
public class ChunkSplitterBean {

    public List<DataChunk> createChunks(List<?> dataset, @Header("correlationId") String correlationId) {
        List<DataChunk> chunks = new ArrayList<>();

        for (int i = 0; i < dataset.size(); i += CHUNK_SIZE) {
            int endIndex = Math.min(i + CHUNK_SIZE, dataset.size());
            List<?> chunkItems = dataset.subList(i, endIndex);

            DataChunk chunk = DataChunk.builder()
                .correlationId(correlationId)
                .chunkNumber((i / CHUNK_SIZE) + 1)
                .totalChunks((int) Math.ceil((double) dataset.size() / CHUNK_SIZE))
                .items(chunkItems)
                .startTime(Instant.now())
                .build();

            chunks.add(chunk);
        }

        return chunks;
    }
}

4. Conditional Content Splitter

@Component
public class ConditionalSplitterRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("direct:conditionalSplit")
            .routeId("conditional-splitter")
            .log("Analyzing message for conditional splitting")
            .choice()
                .when(jsonpath("$.messageType == 'COMPOSITE'"))
                    .to("direct:splitCompositeMessage")
                .when(jsonpath("$.messageType == 'BATCH'"))
                    .to("direct:splitBatchMessage")
                .when(jsonpath("$.messageType == 'HIERARCHICAL'"))
                    .to("direct:splitHierarchicalMessage")
                .otherwise()
                    .log("Message doesn't require splitting")
                    .to("direct:processAtomic")
            .end();

        from("direct:splitCompositeMessage")
            .log("Splitting composite message")
            .process(exchange -> {
                CompositeMessage composite = exchange.getIn().getBody(CompositeMessage.class);
                List<AtomicMessage> atomicMessages = new ArrayList<>();

                // Split customer data
                if (composite.getCustomerData() != null) {
                    AtomicMessage customerMsg = createAtomicMessage(
                        composite, "CUSTOMER", composite.getCustomerData());
                    atomicMessages.add(customerMsg);
                }

                // Split order data
                if (composite.getOrderData() != null) {
                    AtomicMessage orderMsg = createAtomicMessage(
                        composite, "ORDER", composite.getOrderData());
                    atomicMessages.add(orderMsg);
                }

                exchange.getIn().setBody(atomicMessages);
            })
            .split(body())
                .to("direct:processAtomicMessage")
            .end();

        from("direct:processAtomicMessage")
            .log("Processing atomic message: ${header.messageSubType}")
            .choice()
                .when(header("messageSubType").isEqualTo("CUSTOMER"))
                    .to("direct:processCustomer")
                .when(header("messageSubType").isEqualTo("ORDER"))
                    .to("direct:processOrder")
                .otherwise()
                    .to("direct:processGeneric")
            .end();
    }

    private AtomicMessage createAtomicMessage(CompositeMessage composite, 
                                            String subType, Object data) {
        return AtomicMessage.builder()
            .correlationId(composite.getCorrelationId())
            .messageSubType(subType)
            .payload(data)
            .originalMessageId(composite.getMessageId())
            .timestamp(Instant.now())
            .build();
    }
}

5. Streaming XML Splitter

@Component
public class XMLStreamingSplitterRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        from("direct:splitXMLStream")
            .routeId("xml-streaming-splitter")
            .log("Starting XML streaming split")
            .split(stax(Contact.class))
                .streaming()
                .process(exchange -> {
                    Contact contact = exchange.getIn().getBody(Contact.class);

                    // Add processing context
                    Integer splitIndex = exchange.getProperty(Exchange.SPLIT_INDEX, Integer.class);
                    exchange.getIn().setHeader("contactSequence", splitIndex + 1);
                    exchange.getIn().setHeader("contactId", contact.getId());
                })
                .to("direct:processContact")
            .end()
            .log("XML streaming split completed");

        from("direct:processContact")
            .log("Processing contact ${header.contactId} (sequence ${header.contactSequence})")
            .marshal().jacksonXml() // Convert to JSON for further processing
            .to("direct:validateContact");
    }
}

// XML Contact class for StAX processing
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class Contact {
    @XmlElement
    private String id;
    @XmlElement
    private String firstName;
    @XmlElement
    private String lastName;
    @XmlElement
    private String email;

    // getters and setters
}

Best Practices

1. Memory Management and Performance

2. Correlation and Tracking

3. Error Handling and Recovery

4. Processing Strategy

5. Monitoring and Observability

The Splitter pattern is crucial for building scalable, high-performance integration solutions that can handle large, complex messages efficiently while enabling parallel processing and improving overall system throughput and resilience.

← Back to All Patterns