This site is in English. Use your browser's built-in translate feature to read it in your language.

Event Streaming Pattern

Overview

The Event Streaming pattern enables continuous, real-time processing of unbounded data streams through persistent, append-only logs of events. It provides a foundational architecture for building reactive, event-driven systems that can process massive volumes of data with low latency while maintaining strong consistency and durability guarantees. Event streaming transforms traditional batch processing paradigms into continuous, real-time data flows.

Theoretical Foundation

Event Streaming is rooted in distributed systems theory, stream processing principles, and event sourcing concepts. It implements the "log-structured data" paradigm where events are stored in immutable, ordered sequences that can be replayed and processed by multiple consumers. The pattern embodies "stream processing semantics" where data flows continuously through processing pipelines rather than being processed in discrete batches.

Core Principles

1. Immutable Event Logs

Events are stored in append-only logs that preserve the complete history of state changes, enabling temporal queries, debugging, and audit capabilities while ensuring data integrity.

2. Stream-Table Duality

Every stream can be viewed as a changelog of a table, and every table can be viewed as a snapshot of a stream, enabling flexible data modeling and processing patterns.

3. Temporal Processing

Events carry temporal information enabling time-based operations like windowing, late data handling, and temporal joins across multiple streams.

4. Scalable Partitioning

Event streams are horizontally partitioned to enable parallel processing while maintaining ordering guarantees within partitions.

Why Event Streaming is Essential in Integration Architecture

1. Real-Time Business Intelligence

Modern businesses require immediate insights and reactions: - Real-time analytics for operational decision making - Live dashboards showing current business state - Immediate alerting for critical business events - Fraud detection requiring millisecond response times

2. Microservices Data Integration

Event streaming enables effective microservices architectures: - Service state synchronization across distributed systems - Event-driven communication between loosely coupled services - Data consistency through event sourcing patterns - Service independence while maintaining data coherence

3. IoT and Edge Computing

Supporting massive-scale sensor and device integration: - High-throughput ingestion from millions of IoT devices - Real-time stream processing at the edge - Time series data management for sensor readings - Anomaly detection in continuous data streams

4. Modern Data Architecture

Event streaming forms the backbone of modern data platforms: - Data lake integration through streaming ingestion - Real-time ETL replacing batch processing workflows - Change data capture (CDC) from operational databases - Multi-cloud data replication with low latency

Benefits in Integration Contexts

1. Temporal Flexibility

2. Scalability and Performance

3. Integration Simplification

4. Operational Excellence

Integration Architecture Applications

1. Real-Time Data Pipelines

Event streaming powers modern data engineering: - Change data capture from operational databases to analytical systems - Real-time ETL transforming data as it flows through the system - Data mesh architectures enabling domain-owned data products - Stream-to-batch integration for hybrid processing scenarios

2. Event-Driven Microservices

Enables sophisticated microservices choreography: - Saga pattern implementation for distributed transaction management - CQRS (Command Query Responsibility Segregation) with event sourcing - Service mesh integration with event-driven communication - Polyglot persistence synchronized through event streams

3. Real-Time Analytics and ML

Supports advanced analytics and machine learning: - Streaming analytics for real-time business metrics - Online machine learning with continuous model updates - Feature engineering on live data streams - Real-time recommendations based on user behavior streams

4. Enterprise Application Integration

Modernizes enterprise integration patterns: - Legacy system integration through event streaming adapters - Multi-cloud data synchronization with low latency - B2B integration with partner systems through standardized event formats - Supply chain visibility through end-to-end event tracking

How Event Streaming Pattern Works

Event streaming operates through distributed, fault-tolerant logs that enable continuous data processing:

Stream Architecture Overview

┌─────────────────────────────────────────────────────────────────────┐
│                     Event Streaming Platform                       │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  Producers                 Event Logs                Consumers      │
│  ┌──────────┐              ┌─────────────┐          ┌─────────────┐ │
│  │ App A    │─────────────▶│ Topic A     │─────────▶│ Analytics   │ │
│  │          │              │ [P0][P1][P2]│          │ Service     │ │
│  └──────────┘              └─────────────┘          └─────────────┘ │
│                                   │                        │        │
│  ┌──────────┐              ┌─────────────┐          ┌─────────────┐ │
│  │ App B    │─────────────▶│ Topic B     │─────────▶│ ML Pipeline │ │
│  │          │              │ [P0][P1]    │          │             │ │
│  └──────────┘              └─────────────┘          └─────────────┘ │
│                                   │                        │        │
│  ┌──────────┐              ┌─────────────┐          ┌─────────────┐ │
│  │ IoT Hub  │─────────────▶│ Topic C     │─────────▶│ Alerting    │ │
│  │          │              │ [P0][P1][P2]│          │ System      │ │
│  └──────────┘              └─────────────┘          └─────────────┘ │
└─────────────────────────────────────────────────────────────────────┘

Event Stream Lifecycle

┌───────────────────────────────────────────────────────────────┐
│                    Event Stream Lifecycle                    │
├───────────────────────────────────────────────────────────────┤
│                                                               │
│ 1. [Event Production] ──▶ 2. [Stream Partitioning] ──▶       │
│                                      │                       │
│                                      ▼                       │
│ 6. [Consumer Commit] ◀── 5. [Event Processing] ◀──           │
│                                      │                       │
│                                      ▼                       │
│                           3. [Persistent Storage] ──▶        │
│                                      │                       │
│                                      ▼                       │
│                           4. [Consumer Subscription]         │
│                                                               │
│ Stream Features:                                              │
│ • Append-only logs                                            │
│ • Configurable retention (time/size)                         │
│ • Multiple consumer groups                                    │
│ • Replay capability                                           │
│ • Fault tolerance through replication                        │
└───────────────────────────────────────────────────────────────┘

Stream Processing Topologies

1. Linear Processing Pipeline

Event Source ──▶ [Transform] ──▶ [Filter] ──▶ [Enrich] ──▶ Sink

2. Fan-out Processing

                    ┌──▶ [Analytics Pipeline]
Event Source ──▶ [Split] ──▶ [ML Pipeline]
                    └──▶ [Alerting Pipeline]

3. Stream Joins

Stream A ──┐
           ├──▶ [Join] ──▶ Enriched Stream
Stream B ──┘

4. Aggregation Windows

Event Stream ──▶ [Windowing] ──▶ [Aggregate] ──▶ Result Stream
              (5-min windows)   (sum, count, avg)

Key Components

1. Stream Producer

Publishes events to streaming platforms:

@Component
public class EventStreamProducer {
    private final StreamingPlatform streamingPlatform;
    private final EventSerializer eventSerializer;
    private final StreamMetrics streamMetrics;
    private final SchemaRegistry schemaRegistry;

    public StreamResult publishEvent(String streamName, Object event) {
        return publishEvent(streamName, event, StreamOptions.defaultOptions());
    }

    public StreamResult publishEvent(String streamName, Object event, StreamOptions options) {
        try {
            // Validate event schema
            validateEventSchema(event, streamName);

            // Serialize event
            byte[] serializedEvent = eventSerializer.serialize(event);

            // Create stream record
            StreamRecord record = StreamRecord.builder()
                .streamName(streamName)
                .eventId(generateEventId())
                .timestamp(Instant.now())
                .partitionKey(options.getPartitionKey())
                .payload(serializedEvent)
                .headers(buildHeaders(event, options))
                .build();

            // Publish to stream
            StreamResult result = streamingPlatform.publish(record);

            // Record metrics
            streamMetrics.recordEventPublished(streamName, serializedEvent.length);

            log.debug("Published event to stream {}: partition={}, offset={}", 
                     streamName, result.getPartition(), result.getOffset());

            return result;

        } catch (Exception e) {
            streamMetrics.recordPublishError(streamName);
            log.error("Failed to publish event to stream {}: {}", streamName, e.getMessage(), e);
            throw new StreamPublishException("Failed to publish event", e);
        }
    }

    public CompletableFuture<StreamResult> publishEventAsync(String streamName, 
                                                           Object event, 
                                                           StreamOptions options) {
        return CompletableFuture.supplyAsync(() -> publishEvent(streamName, event, options));
    }

    public StreamResult publishEventBatch(String streamName, 
                                        List<Object> events, 
                                        StreamOptions options) {
        try {
            List<StreamRecord> records = events.stream()
                .map(event -> {
                    try {
                        validateEventSchema(event, streamName);
                        byte[] serializedEvent = eventSerializer.serialize(event);

                        return StreamRecord.builder()
                            .streamName(streamName)
                            .eventId(generateEventId())
                            .timestamp(Instant.now())
                            .partitionKey(options.getPartitionKey())
                            .payload(serializedEvent)
                            .headers(buildHeaders(event, options))
                            .build();
                    } catch (Exception e) {
                        throw new StreamPublishException("Failed to prepare event for batch", e);
                    }
                })
                .collect(Collectors.toList());

            StreamResult result = streamingPlatform.publishBatch(records);
            streamMetrics.recordBatchPublished(streamName, events.size());

            return result;

        } catch (Exception e) {
            streamMetrics.recordBatchError(streamName);
            throw new StreamPublishException("Failed to publish event batch", e);
        }
    }

    private void validateEventSchema(Object event, String streamName) {
        try {
            Schema schema = schemaRegistry.getLatestSchema(streamName);
            if (schema != null && !schema.isCompatible(event)) {
                throw new SchemaValidationException(
                    "Event does not conform to schema for stream: " + streamName);
            }
        } catch (SchemaNotFoundException e) {
            log.warn("No schema found for stream {}, skipping validation", streamName);
        }
    }

    private Map<String, String> buildHeaders(Object event, StreamOptions options) {
        Map<String, String> headers = new HashMap<>();
        headers.put("eventType", event.getClass().getSimpleName());
        headers.put("producerId", options.getProducerId());
        headers.put("schemaVersion", getEventSchemaVersion(event));

        if (options.getCorrelationId() != null) {
            headers.put("correlationId", options.getCorrelationId());
        }

        if (options.getCausationId() != null) {
            headers.put("causationId", options.getCausationId());
        }

        return headers;
    }

    private String generateEventId() {
        return UUID.randomUUID().toString();
    }

    private String getEventSchemaVersion(Object event) {
        SchemaVersion annotation = event.getClass().getAnnotation(SchemaVersion.class);
        return annotation != null ? annotation.value() : "1.0";
    }
}

2. Stream Consumer

Consumes and processes events from streams:

@Component
public abstract class EventStreamConsumer {
    private final StreamingPlatform streamingPlatform;
    private final EventDeserializer eventDeserializer;
    private final StreamMetrics streamMetrics;

    private volatile boolean running = false;
    private StreamSubscription subscription;
    private final ExecutorService processingExecutor;

    public void start() {
        this.running = true;

        StreamSubscription.Builder subscriptionBuilder = StreamSubscription.builder()
            .consumerId(getConsumerId())
            .consumerGroup(getConsumerGroup())
            .streamNames(getStreamNames())
            .processingGuarantee(getProcessingGuarantee())
            .offsetResetStrategy(getOffsetResetStrategy())
            .maxPollRecords(getMaxPollRecords())
            .sessionTimeout(getSessionTimeout());

        this.subscription = streamingPlatform.subscribe(subscriptionBuilder.build());

        // Start processing threads
        int concurrency = getProcessingConcurrency();
        for (int i = 0; i < concurrency; i++) {
            processingExecutor.submit(this::processEvents);
        }

        log.info("Started stream consumer {} for streams: {}", 
                 getConsumerId(), getStreamNames());
    }

    public void stop() {
        running = false;

        if (subscription != null) {
            subscription.close();
        }

        processingExecutor.shutdown();
        try {
            if (!processingExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
                processingExecutor.shutdownNow();
            }
        } catch (InterruptedException e) {
            processingExecutor.shutdownNow();
            Thread.currentThread().interrupt();
        }

        log.info("Stopped stream consumer {}", getConsumerId());
    }

    private void processEvents() {
        while (running) {
            try {
                List<StreamRecord> records = subscription.poll(getPollTimeout());

                if (records.isEmpty()) {
                    continue;
                }

                // Process records in batch or individually based on configuration
                if (supportsBatchProcessing()) {
                    processBatch(records);
                } else {
                    for (StreamRecord record : records) {
                        processRecord(record);
                    }
                }

                // Commit offsets based on processing guarantee
                if (getProcessingGuarantee() == ProcessingGuarantee.AT_LEAST_ONCE) {
                    subscription.commitSync();
                }

            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            } catch (Exception e) {
                log.error("Error in stream processing loop: {}", e.getMessage(), e);
                handleProcessingError(e);
            }
        }
    }

    private void processRecord(StreamRecord record) {
        String streamName = record.getStreamName();
        long startTime = System.nanoTime();

        try {
            // Deserialize event
            Object event = eventDeserializer.deserialize(
                record.getPayload(), getEventType(record));

            // Create processing context
            StreamProcessingContext context = StreamProcessingContext.builder()
                .streamName(streamName)
                .partition(record.getPartition())
                .offset(record.getOffset())
                .timestamp(record.getTimestamp())
                .eventId(record.getEventId())
                .headers(record.getHeaders())
                .event(event)
                .build();

            // Process event
            ProcessingResult result = handleEvent(context);

            if (result.isSuccess()) {
                streamMetrics.recordEventProcessed(streamName);
            } else {
                handleProcessingFailure(record, result.getError());
            }

        } catch (Exception e) {
            log.error("Error processing event from stream {}: {}", streamName, e.getMessage(), e);
            handleProcessingFailure(record, e);
        } finally {
            long duration = System.nanoTime() - startTime;
            streamMetrics.recordProcessingLatency(streamName, Duration.ofNanos(duration));
        }
    }

    private void processBatch(List<StreamRecord> records) {
        try {
            List<StreamProcessingContext> contexts = records.stream()
                .map(record -> {
                    try {
                        Object event = eventDeserializer.deserialize(
                            record.getPayload(), getEventType(record));

                        return StreamProcessingContext.builder()
                            .streamName(record.getStreamName())
                            .partition(record.getPartition())
                            .offset(record.getOffset())
                            .timestamp(record.getTimestamp())
                            .eventId(record.getEventId())
                            .headers(record.getHeaders())
                            .event(event)
                            .build();
                    } catch (Exception e) {
                        throw new StreamProcessingException("Failed to prepare context for batch", e);
                    }
                })
                .collect(Collectors.toList());

            ProcessingResult result = handleEventBatch(contexts);

            if (result.isSuccess()) {
                streamMetrics.recordBatchProcessed(records.get(0).getStreamName(), records.size());
            } else {
                // Handle batch failure - may need individual record processing
                handleBatchProcessingFailure(records, result.getError());
            }

        } catch (Exception e) {
            log.error("Error processing event batch: {}", e.getMessage(), e);
            handleBatchProcessingFailure(records, e);
        }
    }

    protected abstract ProcessingResult handleEvent(StreamProcessingContext context);
    protected abstract String getConsumerId();
    protected abstract String getConsumerGroup();
    protected abstract List<String> getStreamNames();
    protected abstract ProcessingGuarantee getProcessingGuarantee();
    protected abstract OffsetResetStrategy getOffsetResetStrategy();
    protected abstract int getMaxPollRecords();
    protected abstract Duration getPollTimeout();
    protected abstract Duration getSessionTimeout();
    protected abstract int getProcessingConcurrency();
    protected abstract Class<?> getEventType(StreamRecord record);

    // Optional overrides
    protected boolean supportsBatchProcessing() { return false; }
    protected ProcessingResult handleEventBatch(List<StreamProcessingContext> contexts) {
        throw new UnsupportedOperationException("Batch processing not supported");
    }

    protected void handleProcessingError(Exception error) {
        // Default: sleep before retry
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    protected void handleProcessingFailure(StreamRecord record, Exception error) {
        streamMetrics.recordProcessingError(record.getStreamName());
        // Default: log error (can be overridden for DLQ, etc.)
        log.error("Failed to process event {} from stream {}: {}", 
                 record.getEventId(), record.getStreamName(), error.getMessage());
    }

    protected void handleBatchProcessingFailure(List<StreamRecord> records, Exception error) {
        // Default: process records individually
        for (StreamRecord record : records) {
            try {
                processRecord(record);
            } catch (Exception e) {
                handleProcessingFailure(record, e);
            }
        }
    }
}

// Example concrete consumer implementation
@Component
public class ContactEventStreamConsumer extends EventStreamConsumer {
    private final ContactEventHandler contactEventHandler;

    @Override
    protected ProcessingResult handleEvent(StreamProcessingContext context) {
        ContactEvent contactEvent = (ContactEvent) context.getEvent();

        try {
            contactEventHandler.handleContactEvent(contactEvent, context);
            return ProcessingResult.success();
        } catch (ContactValidationException e) {
            return ProcessingResult.failure("Validation error: " + e.getMessage());
        } catch (ContactBusinessException e) {
            return ProcessingResult.failure("Business logic error: " + e.getMessage());
        }
    }

    @Override
    protected String getConsumerId() {
        return "contact-event-consumer";
    }

    @Override
    protected String getConsumerGroup() {
        return "contact-processing-group";
    }

    @Override
    protected List<String> getStreamNames() {
        return Arrays.asList("contact.events", "contact.lifecycle");
    }

    @Override
    protected ProcessingGuarantee getProcessingGuarantee() {
        return ProcessingGuarantee.AT_LEAST_ONCE;
    }

    @Override
    protected OffsetResetStrategy getOffsetResetStrategy() {
        return OffsetResetStrategy.EARLIEST;
    }

    @Override
    protected int getMaxPollRecords() {
        return 100;
    }

    @Override
    protected Duration getPollTimeout() {
        return Duration.ofSeconds(5);
    }

    @Override
    protected Duration getSessionTimeout() {
        return Duration.ofSeconds(30);
    }

    @Override
    protected int getProcessingConcurrency() {
        return 3;
    }

    @Override
    protected Class<?> getEventType(StreamRecord record) {
        String eventType = record.getHeaders().get("eventType");
        return eventType != null ? getEventClassByType(eventType) : ContactEvent.class;
    }

    private Class<?> getEventClassByType(String eventType) {
        switch (eventType) {
            case "ContactCreated": return ContactCreatedEvent.class;
            case "ContactUpdated": return ContactUpdatedEvent.class;
            case "ContactDeleted": return ContactDeletedEvent.class;
            default: return ContactEvent.class;
        }
    }
}

3. Stream Processor

Handles stream transformations and analytics:

@Component
public class StreamProcessor {
    private final StreamingPlatform streamingPlatform;
    private final StreamTopologyBuilder topologyBuilder;

    public StreamTopology buildContactProcessingTopology() {
        StreamTopologyBuilder builder = new StreamTopologyBuilder();

        // Source stream
        StreamNode<ContactEvent> contactEvents = builder
            .source("contact-events-source", "contact.events", ContactEvent.class);

        // Filter valid events
        StreamNode<ContactEvent> validEvents = contactEvents
            .filter("filter-valid", event -> isValidContactEvent(event));

        // Transform and enrich
        StreamNode<EnrichedContactEvent> enrichedEvents = validEvents
            .map("enrich-contact", event -> enrichContactEvent(event));

        // Branch processing
        Map<String, StreamNode<EnrichedContactEvent>> branches = enrichedEvents
            .branch("branch-by-type",
                    Predicate.named("created", e -> e.getEventType() == EventType.CREATED),
                    Predicate.named("updated", e -> e.getEventType() == EventType.UPDATED),
                    Predicate.named("deleted", e -> e.getEventType() == EventType.DELETED));

        // Process created contacts
        branches.get("created")
            .process("process-created", new ContactCreatedProcessor())
            .to("contact.processed.created");

        // Process updated contacts with windowing
        branches.get("updated")
            .groupBy("group-by-contact", contact -> contact.getContactId())
            .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
            .aggregate("aggregate-updates", 
                      () -> new ContactUpdateAggregate(),
                      (key, value, aggregate) -> aggregate.add(value),
                      Materialized.as("contact-updates-store"))
            .toStream()
            .map("flatten-aggregates", (windowedKey, aggregate) -> 
                 aggregate.toContactUpdateSummary(windowedKey.key()))
            .to("contact.update.summaries");

        // Process deleted contacts
        branches.get("deleted")
            .process("process-deleted", new ContactDeletedProcessor())
            .to("contact.processed.deleted");

        return builder.build();
    }

    public void startProcessing(StreamTopology topology) {
        StreamProcessor processor = streamingPlatform.createProcessor(topology);
        processor.start();

        // Add shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            log.info("Shutting down stream processor...");
            processor.stop();
        }));
    }

    private boolean isValidContactEvent(ContactEvent event) {
        return event != null && 
               event.getContactId() != null && 
               event.getEventType() != null &&
               event.getTimestamp() != null;
    }

    private EnrichedContactEvent enrichContactEvent(ContactEvent event) {
        // Enrich with additional data (lookup services, etc.)
        ContactProfile profile = lookupContactProfile(event.getContactId());
        ContactMetadata metadata = generateContactMetadata(event);

        return EnrichedContactEvent.builder()
            .baseEvent(event)
            .contactProfile(profile)
            .metadata(metadata)
            .enrichedAt(Instant.now())
            .build();
    }

    // Custom processors
    public static class ContactCreatedProcessor implements StreamNodeProcessor<EnrichedContactEvent, ContactCreatedResult> {

        @Override
        public ContactCreatedResult process(ProcessorContext context, EnrichedContactEvent event) {
            try {
                // Business logic for contact creation
                ContactCreatedResult result = processContactCreation(event);

                // Update state store
                context.getStateStore("contact-state").put(event.getContactId(), event);

                // Schedule future processing if needed
                context.schedule(Duration.ofHours(24), 
                               () -> scheduleContactFollowUp(event.getContactId()));

                return result;

            } catch (Exception e) {
                log.error("Error processing contact creation: {}", e.getMessage(), e);
                throw new StreamProcessingException("Contact creation failed", e);
            }
        }

        private ContactCreatedResult processContactCreation(EnrichedContactEvent event) {
            // Implementation details...
            return ContactCreatedResult.success(event.getContactId());
        }

        private void scheduleContactFollowUp(String contactId) {
            // Implementation details...
        }
    }
}

4. Schema Registry

Manages event schemas and evolution:

@Component
public class SchemaRegistry {
    private final SchemaStorage schemaStorage;
    private final Map<String, Schema> schemaCache = new ConcurrentHashMap<>();
    private final SchemaCompatibilityChecker compatibilityChecker;

    public Schema registerSchema(String subject, SchemaDefinition definition) {
        validateSchemaDefinition(definition);

        // Check compatibility with existing schemas
        List<Schema> existingSchemas = getSchemaHistory(subject);
        if (!existingSchemas.isEmpty()) {
            Schema latestSchema = existingSchemas.get(existingSchemas.size() - 1);
            if (!compatibilityChecker.isCompatible(latestSchema, definition)) {
                throw new SchemaCompatibilityException(
                    "Schema is not compatible with existing version");
            }
        }

        // Generate schema version
        int version = existingSchemas.size() + 1;

        Schema schema = Schema.builder()
            .subject(subject)
            .version(version)
            .definition(definition)
            .registeredAt(Instant.now())
            .build();

        // Store schema
        schemaStorage.store(schema);

        // Update cache
        schemaCache.put(getCacheKey(subject, version), schema);
        schemaCache.put(getCacheKey(subject, "latest"), schema);

        log.info("Registered schema for subject {} version {}", subject, version);
        return schema;
    }

    public Optional<Schema> getSchema(String subject, int version) {
        String cacheKey = getCacheKey(subject, version);
        Schema cached = schemaCache.get(cacheKey);

        if (cached != null) {
            return Optional.of(cached);
        }

        Optional<Schema> schema = schemaStorage.getSchema(subject, version);
        schema.ifPresent(s -> schemaCache.put(cacheKey, s));

        return schema;
    }

    public Optional<Schema> getLatestSchema(String subject) {
        String cacheKey = getCacheKey(subject, "latest");
        Schema cached = schemaCache.get(cacheKey);

        if (cached != null) {
            return Optional.of(cached);
        }

        Optional<Schema> schema = schemaStorage.getLatestSchema(subject);
        schema.ifPresent(s -> schemaCache.put(cacheKey, s));

        return schema;
    }

    public List<Schema> getSchemaHistory(String subject) {
        return schemaStorage.getSchemaHistory(subject);
    }

    public boolean isCompatible(String subject, SchemaDefinition newDefinition) {
        Optional<Schema> latestSchema = getLatestSchema(subject);
        if (!latestSchema.isPresent()) {
            return true; // No existing schema, always compatible
        }

        return compatibilityChecker.isCompatible(latestSchema.get(), newDefinition);
    }

    public SchemaEvolutionPlan planEvolution(String subject, SchemaDefinition targetDefinition) {
        List<Schema> history = getSchemaHistory(subject);

        return SchemaEvolutionPlan.builder()
            .subject(subject)
            .currentVersion(history.isEmpty() ? 0 : history.get(history.size() - 1).getVersion())
            .targetDefinition(targetDefinition)
            .evolutionSteps(calculateEvolutionSteps(history, targetDefinition))
            .breakingChanges(identifyBreakingChanges(history, targetDefinition))
            .migrationRequired(requiresMigration(history, targetDefinition))
            .build();
    }

    private void validateSchemaDefinition(SchemaDefinition definition) {
        if (definition == null || definition.getSchema() == null) {
            throw new IllegalArgumentException("Schema definition cannot be null");
        }

        try {
            // Validate schema syntax (JSON Schema, Avro, etc.)
            definition.validate();
        } catch (Exception e) {
            throw new SchemaValidationException("Invalid schema definition", e);
        }
    }

    private String getCacheKey(String subject, Object version) {
        return subject + ":" + version.toString();
    }

    private List<EvolutionStep> calculateEvolutionSteps(List<Schema> history, 
                                                       SchemaDefinition target) {
        // Implementation for calculating evolution steps
        return Collections.emptyList();
    }

    private List<BreakingChange> identifyBreakingChanges(List<Schema> history, 
                                                        SchemaDefinition target) {
        // Implementation for identifying breaking changes
        return Collections.emptyList();
    }

    private boolean requiresMigration(List<Schema> history, SchemaDefinition target) {
        // Implementation for determining if migration is required
        return false;
    }
}

Configuration Parameters

Essential Settings

Parameter Description Typical Values
Partition Count Number of partitions per stream 3-100
Retention Period How long to keep events 7d-365d
Batch Size Events per producer batch 100-10000
Consumer Lag Threshold Maximum acceptable consumer lag 1000-100000
Processing Guarantee Delivery semantics at-least-once/exactly-once
Compression Type Event compression algorithm gzip/lz4/snappy

Example Configuration

# Stream Platform Configuration
stream.platform=kafka
stream.bootstrap-servers=localhost:9092
stream.schema-registry-url=http://localhost:8081

# Producer Configuration
stream.producer.batch-size=16384
stream.producer.linger-ms=10
stream.producer.compression-type=lz4
stream.producer.acks=all
stream.producer.retries=3
stream.producer.max-in-flight-requests=1

# Consumer Configuration
stream.consumer.fetch-min-bytes=1024
stream.consumer.fetch-max-wait-ms=500
stream.consumer.max-poll-records=500
stream.consumer.session-timeout-ms=30000
stream.consumer.heartbeat-interval-ms=3000
stream.consumer.auto-offset-reset=earliest

# Stream Processing Configuration
stream.processing.guarantee=exactly-once
stream.processing.commit-interval=10s
stream.processing.cache-max-bytes=10MB
stream.processing.num-stream-threads=4

# Schema Registry Configuration
stream.schema.compatibility-level=BACKWARD
stream.schema.cache-size=1000
stream.schema.cache-expiry=1h

Implementation Examples

1. Apache Kafka Streams Implementation

@Configuration
@EnableKafka
public class KafkaStreamsConfiguration {

    @Bean
    public KafkaStreamsConfiguration kafkaStreamsConfig() {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "contact-streams-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000);
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024);
        return new KafkaStreamsConfiguration(props);
    }

    @Bean
    public KafkaStreams contactEventStream(KafkaStreamsConfiguration config) {
        StreamsBuilder builder = new StreamsBuilder();

        // Contact events stream
        KStream<String, ContactEvent> contactEvents = builder
            .stream("contact.events", 
                   Consumed.with(Serdes.String(), new ContactEventSerde()));

        // Filter and transform
        KStream<String, EnrichedContactEvent> enrichedEvents = contactEvents
            .filter((key, event) -> event != null && event.getContactId() != null)
            .mapValues(this::enrichContactEvent);

        // Branch by event type
        Map<String, KStream<String, EnrichedContactEvent>> branches = enrichedEvents
            .split(Named.as("contact-"))
            .branch((key, event) -> event.getEventType() == EventType.CREATED,
                   Branched.as("created"))
            .branch((key, event) -> event.getEventType() == EventType.UPDATED,
                   Branched.as("updated"))
            .defaultBranch(Branched.as("other"));

        // Process created contacts
        branches.get("contact-created")
            .process(() -> new ContactCreatedProcessor(),
                    Named.as("process-created"));

        // Aggregate updated contacts
        branches.get("contact-updated")
            .groupBy((key, event) -> event.getContactId(),
                    Grouped.with(Serdes.String(), new EnrichedContactEventSerde()))
            .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
            .aggregate(
                ContactUpdateSummary::new,
                (key, event, aggregate) -> aggregate.addUpdate(event),
                Named.as("aggregate-updates"),
                Materialized.<String, ContactUpdateSummary, WindowStore<Bytes, byte[]>>as("updates-store")
                    .withKeySerde(Serdes.String())
                    .withValueSerde(new ContactUpdateSummarySerde()))
            .toStream()
            .map((windowedKey, summary) -> 
                 KeyValue.pair(windowedKey.key(), summary.complete()))
            .to("contact.update.summaries",
               Produced.with(Serdes.String(), new ContactUpdateSummarySerde()));

        Topology topology = builder.build();
        KafkaStreams streams = new KafkaStreams(topology, config.asProperties());

        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

        return streams;
    }

    private EnrichedContactEvent enrichContactEvent(ContactEvent event) {
        // Enrichment logic
        return EnrichedContactEvent.builder()
            .baseEvent(event)
            .enrichedAt(Instant.now())
            .build();
    }
}

@Component
public class KafkaContactEventProducer {
    private final KafkaTemplate<String, ContactEvent> kafkaTemplate;

    @EventListener
    @Async
    public void handleContactEvent(ContactEvent event) {
        String partitionKey = event.getContactId();

        kafkaTemplate.send("contact.events", partitionKey, event)
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    log.error("Failed to publish contact event: {}", ex.getMessage(), ex);
                } else {
                    log.debug("Published contact event: partition={}, offset={}", 
                             result.getRecordMetadata().partition(),
                             result.getRecordMetadata().offset());
                }
            });
    }
}

2. Amazon Kinesis Implementation

@Configuration
public class KinesisConfiguration {

    @Bean
    public AmazonKinesis kinesisClient() {
        return AmazonKinesisClientBuilder.standard()
            .withRegion(Regions.EU_WEST_1)
            .build();
    }

    @Bean
    public KinesisProducerConfiguration kinesisProducerConfig() {
        KinesisProducerConfiguration config = new KinesisProducerConfiguration();
        config.setRegion("eu-west-1");
        config.setMaxConnections(24);
        config.setRequestTimeout(60000);
        config.setRecordMaxBufferedTime(15000);
        config.setAggregationEnabled(true);
        config.setCompressionFormat("gzip");
        return config;
    }

    @Bean
    public KinesisProducer kinesisProducer(KinesisProducerConfiguration config) {
        return new KinesisProducer(config);
    }
}

@Service
public class KinesisContactEventProducer {
    private final KinesisProducer kinesisProducer;
    private final ObjectMapper objectMapper;

    @Value("${aws.kinesis.contact-stream}")
    private String streamName;

    public CompletableFuture<UserRecordResult> publishContactEvent(ContactEvent event) {
        try {
            String partitionKey = event.getContactId();
            String eventJson = objectMapper.writeValueAsString(event);
            ByteBuffer data = ByteBuffer.wrap(eventJson.getBytes(StandardCharsets.UTF_8));

            UserRecord record = UserRecord.builder()
                .streamName(streamName)
                .partitionKey(partitionKey)
                .data(data)
                .explicitHashKey(null) // Let Kinesis handle hash key
                .build();

            CompletableFuture<UserRecordResult> future = kinesisProducer.addUserRecord(record);

            future.whenComplete((result, ex) -> {
                if (ex != null) {
                    log.error("Failed to publish to Kinesis: {}", ex.getMessage(), ex);
                } else {
                    log.debug("Published to Kinesis: shard={}, sequence={}", 
                             result.getShardId(), result.getSequenceNumber());
                }
            });

            return future;

        } catch (Exception e) {
            log.error("Error preparing Kinesis record: {}", e.getMessage(), e);
            CompletableFuture<UserRecordResult> failedFuture = new CompletableFuture<>();
            failedFuture.completeExceptionally(e);
            return failedFuture;
        }
    }
}

@Component
public class KinesisContactEventConsumer {
    private final AmazonKinesis kinesisClient;
    private final ObjectMapper objectMapper;
    private final ContactEventHandler eventHandler;

    @Value("${aws.kinesis.contact-stream}")
    private String streamName;

    @PostConstruct
    public void startConsumer() {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        // Start multiple workers for different shards
        for (int i = 0; i < 2; i++) {
            executor.submit(this::consumeEvents);
        }
    }

    private void consumeEvents() {
        String shardIterator = getShardIterator();

        while (true) {
            try {
                GetRecordsRequest request = GetRecordsRequest.builder()
                    .shardIterator(shardIterator)
                    .limit(100)
                    .build();

                GetRecordsResult result = kinesisClient.getRecords(request);

                for (Record record : result.getRecords()) {
                    processRecord(record);
                }

                shardIterator = result.getNextShardIterator();

                if (result.getRecords().isEmpty()) {
                    Thread.sleep(1000); // Wait before polling again
                }

            } catch (Exception e) {
                log.error("Error consuming from Kinesis: {}", e.getMessage(), e);
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }

    private void processRecord(Record record) {
        try {
            String eventJson = StandardCharsets.UTF_8.decode(record.getData()).toString();
            ContactEvent event = objectMapper.readValue(eventJson, ContactEvent.class);

            eventHandler.handleContactEvent(event);

            log.debug("Processed Kinesis record: sequence={}", record.getSequenceNumber());

        } catch (Exception e) {
            log.error("Error processing Kinesis record: {}", e.getMessage(), e);
        }
    }

    private String getShardIterator() {
        // Implementation to get shard iterator
        return null;
    }
}

3. Apache Pulsar Implementation

@Configuration
public class PulsarConfiguration {

    @Bean
    public PulsarClient pulsarClient() {
        try {
            return PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();
        } catch (PulsarClientException e) {
            throw new RuntimeException("Failed to create Pulsar client", e);
        }
    }

    @Bean
    public Producer<ContactEvent> contactEventProducer(PulsarClient client) {
        try {
            return client.newProducer(Schema.JSON(ContactEvent.class))
                .topic("contact-events")
                .batchingMaxMessages(100)
                .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
                .compressionType(CompressionType.LZ4)
                .create();
        } catch (PulsarClientException e) {
            throw new RuntimeException("Failed to create Pulsar producer", e);
        }
    }

    @Bean
    public Consumer<ContactEvent> contactEventConsumer(PulsarClient client) {
        try {
            return client.newConsumer(Schema.JSON(ContactEvent.class))
                .topic("contact-events")
                .subscriptionName("contact-processing")
                .subscriptionType(SubscriptionType.Shared)
                .receiverQueueSize(1000)
                .subscribe();
        } catch (PulsarClientException e) {
            throw new RuntimeException("Failed to create Pulsar consumer", e);
        }
    }
}

@Service
public class PulsarContactEventProducer {
    private final Producer<ContactEvent> producer;

    public CompletableFuture<MessageId> publishContactEvent(ContactEvent event) {
        return producer.newMessage()
            .key(event.getContactId())
            .property("eventType", event.getEventType().name())
            .eventTime(event.getTimestamp().toEpochMilli())
            .value(event)
            .sendAsync()
            .whenComplete((messageId, ex) -> {
                if (ex != null) {
                    log.error("Failed to publish to Pulsar: {}", ex.getMessage(), ex);
                } else {
                    log.debug("Published to Pulsar: messageId={}", messageId);
                }
            });
    }
}

@Component
public class PulsarContactEventConsumer {
    private final Consumer<ContactEvent> consumer;
    private final ContactEventHandler eventHandler;

    @PostConstruct
    public void startConsumer() {
        CompletableFuture.runAsync(this::consumeEvents);
    }

    private void consumeEvents() {
        while (true) {
            try {
                Messages<ContactEvent> messages = consumer.batchReceive();

                for (Message<ContactEvent> message : messages) {
                    processMessage(message);
                }

                consumer.acknowledge(messages);

            } catch (Exception e) {
                log.error("Error consuming from Pulsar: {}", e.getMessage(), e);
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }

    private void processMessage(Message<ContactEvent> message) {
        try {
            ContactEvent event = message.getValue();
            eventHandler.handleContactEvent(event);

            log.debug("Processed Pulsar message: messageId={}", message.getMessageId());

        } catch (Exception e) {
            log.error("Error processing Pulsar message: {}", e.getMessage(), e);
        }
    }
}

Best Practices

1. Event Design and Schema Management

public class EventDesignBestPractices {

    // Use CloudEvents specification for standard event structure
    public class CloudEventWrapper<T> {
        private String specversion = "1.0";
        private String type;
        private String source;
        private String id;
        private Instant time;
        private String datacontenttype = "application/json";
        private T data;
        private String subject;
        private Map<String, String> extensionAttributes;

        public static <T> CloudEventWrapper<T> create(T data, String source, String type) {
            return CloudEventWrapper.<T>builder()
                .id(UUID.randomUUID().toString())
                .source(source)
                .type(type)
                .time(Instant.now())
                .data(data)
                .build();
        }
    }

    // Design events for schema evolution
    @SchemaEvolution(compatibility = BACKWARD)
    public class ContactEventV2 {
        @Required
        private String eventId;

        @Required 
        private String contactId;

        @Required
        private EventType eventType;

        @Required
        private Instant timestamp;

        // New fields with defaults for backward compatibility
        @Since("2.0")
        @Default("unknown")
        private String source = "unknown";

        @Since("2.0") 
        private Map<String, String> metadata = new HashMap<>();

        // Deprecated fields marked for future removal
        @Deprecated(since = "2.0", forRemoval = "3.0")
        private String legacyField;
    }

    // Use semantic versioning for schemas
    public class SchemaVersioning {

        public Schema evolveSchema(Schema currentSchema, SchemaChange change) {
            SchemaVersion currentVersion = currentSchema.getVersion();
            SchemaVersion newVersion;

            if (change.isBreaking()) {
                newVersion = currentVersion.nextMajor(); // 1.0 -> 2.0
            } else if (change.isFeatureAddition()) {
                newVersion = currentVersion.nextMinor(); // 1.0 -> 1.1
            } else {
                newVersion = currentVersion.nextPatch(); // 1.0.0 -> 1.0.1
            }

            return currentSchema.evolve(newVersion, change);
        }
    }
}

2. Stream Partitioning and Ordering

@Component
public class StreamPartitioningStrategy {

    // Partition by entity ID for ordered processing
    public String getPartitionKey(ContactEvent event) {
        return event.getContactId(); // All events for same contact go to same partition
    }

    // Custom partitioner for specific business logic
    public class ContactEventPartitioner implements StreamPartitioner<String, ContactEvent> {

        @Override
        public Integer partition(String topic, String key, ContactEvent value, int numPartitions) {
            // Partition by contact region for geographic distribution
            String region = extractRegion(value.getContactId());
            return Math.abs(region.hashCode()) % numPartitions;
        }

        private String extractRegion(String contactId) {
            // Extract region from contact ID format: REGION-XXXXXXXXXX
            return contactId.split("-")[0];
        }
    }

    // Handle out-of-order events with event timestamps
    @Component
    public class OutOfOrderEventHandler {
        private final Map<String, Long> lastProcessedTimestamp = new ConcurrentHashMap<>();

        public boolean isOutOfOrder(ContactEvent event) {
            String contactId = event.getContactId();
            long eventTimestamp = event.getTimestamp().toEpochMilli();

            Long lastTimestamp = lastProcessedTimestamp.get(contactId);
            if (lastTimestamp == null || eventTimestamp > lastTimestamp) {
                lastProcessedTimestamp.put(contactId, eventTimestamp);
                return false;
            }

            return true;
        }

        public void handleOutOfOrderEvent(ContactEvent event) {
            // Buffer out-of-order events for later processing
            outOfOrderBuffer.add(event);

            // Trigger reprocessing if buffer size threshold reached
            if (outOfOrderBuffer.size() > 100) {
                reprocessBufferedEvents();
            }
        }
    }
}

3. Stream Processing Patterns

@Component
public class StreamProcessingPatterns {

    // Windowed aggregations for time-based analytics
    public void configureWindowedAggregation(StreamsBuilder builder) {
        builder.stream("contact.events", 
                      Consumed.with(Serdes.String(), new ContactEventSerde()))
            .groupBy((key, event) -> event.getEventType(),
                    Grouped.with(Serdes.String(), new ContactEventSerde()))
            .windowedBy(TimeWindows.of(Duration.ofMinutes(5)).grace(Duration.ofSeconds(30)))
            .count(Named.as("event-counts"))
            .toStream()
            .map((windowedKey, count) -> KeyValue.pair(
                windowedKey.key() + "@" + windowedKey.window().start(),
                new EventCountSummary(windowedKey.key(), count, windowedKey.window())
            ))
            .to("contact.event.counts");
    }

    // Stream joins for event correlation
    public void configureStreamJoins(StreamsBuilder builder) {
        KStream<String, ContactEvent> contactEvents = 
            builder.stream("contact.events");
        KStream<String, UserEvent> userEvents = 
            builder.stream("user.events");

        // Join streams within a time window
        contactEvents.join(
            userEvents,
            (contact, user) -> new CorrelatedEvent(contact, user),
            JoinWindows.of(Duration.ofMinutes(5)),
            StreamJoined.with(
                Serdes.String(),
                new ContactEventSerde(),
                new UserEventSerde()
            )
        ).to("correlated.events");
    }

    // Error handling and dead letter topics
    public void configureErrorHandling(StreamsBuilder builder) {
        builder.stream("contact.events")
            .process(() -> new ContactEventProcessor(),
                    Named.as("process-contacts"))
            .to((recordContext, key, value, topicNameExtractor) -> {
                if (value instanceof ProcessingError) {
                    return "contact.events.errors";
                } else {
                    return "contact.events.processed";
                }
            });
    }
}

Common Pitfalls

1. Hot Partitions

Problem: Uneven distribution of events leading to processing bottlenecks
Solution: Use effective partitioning strategies and monitor partition metrics

2. Schema Compatibility Issues

Problem: Breaking schema changes causing consumer failures
Solution: Implement schema registry with compatibility checking

3. Consumer Lag Accumulation

Problem: Consumers falling behind producers causing delays
Solution: Monitor lag metrics and implement auto-scaling

4. Memory Leaks in State Stores

Problem: Unbounded state growth in stream processing applications
Solution: Configure retention policies and compact state stores

5. Exactly-Once Processing Overhead

Problem: Performance degradation with exactly-once guarantees
Solution: Use at-least-once when appropriate and implement idempotent processing

Integration in Distributed Systems

Event Sourcing Implementation

@Service
public class EventSourcedContactService {

    public void createContact(CreateContactCommand command) {
        ContactCreatedEvent event = ContactCreatedEvent.builder()
            .contactId(command.getContactId())
            .personalData(command.getPersonalData())
            .occurredAt(Instant.now())
            .build();

        eventStreamProducer.publish("contact.events", event);
    }
}

CQRS with Event Streaming

@Service
public class ContactQueryService {

    @StreamListener("contact.events")
    public void updateContactProjection(ContactEvent event) {
        ContactProjection projection = contactProjectionRepository
            .findByContactId(event.getContactId())
            .orElse(new ContactProjection(event.getContactId()));

        projection.apply(event);
        contactProjectionRepository.save(projection);
    }
}

Conclusion

Event Streaming is essential for building modern, responsive distributed systems. It provides:

When properly implemented with appropriate partitioning, schema management, and monitoring, event streaming enables robust, scalable architectures that can process massive data volumes while maintaining consistency and enabling real-time business intelligence.

References

← Back to All Patterns