Event Streaming Pattern
Overview
The Event Streaming pattern enables continuous, real-time processing of unbounded data streams through persistent, append-only logs of events. It provides a foundational architecture for building reactive, event-driven systems that can process massive volumes of data with low latency while maintaining strong consistency and durability guarantees. Event streaming transforms traditional batch processing paradigms into continuous, real-time data flows.
Theoretical Foundation
Event Streaming is rooted in distributed systems theory, stream processing principles, and event sourcing concepts. It implements the "log-structured data" paradigm where events are stored in immutable, ordered sequences that can be replayed and processed by multiple consumers. The pattern embodies "stream processing semantics" where data flows continuously through processing pipelines rather than being processed in discrete batches.
Core Principles
1. Immutable Event Logs
Events are stored in append-only logs that preserve the complete history of state changes, enabling temporal queries, debugging, and audit capabilities while ensuring data integrity.
2. Stream-Table Duality
Every stream can be viewed as a changelog of a table, and every table can be viewed as a snapshot of a stream, enabling flexible data modeling and processing patterns.
3. Temporal Processing
Events carry temporal information enabling time-based operations like windowing, late data handling, and temporal joins across multiple streams.
4. Scalable Partitioning
Event streams are horizontally partitioned to enable parallel processing while maintaining ordering guarantees within partitions.
Why Event Streaming is Essential in Integration Architecture
1. Real-Time Business Intelligence
Modern businesses require immediate insights and reactions: - Real-time analytics for operational decision making - Live dashboards showing current business state - Immediate alerting for critical business events - Fraud detection requiring millisecond response times
2. Microservices Data Integration
Event streaming enables effective microservices architectures: - Service state synchronization across distributed systems - Event-driven communication between loosely coupled services - Data consistency through event sourcing patterns - Service independence while maintaining data coherence
3. IoT and Edge Computing
Supporting massive-scale sensor and device integration: - High-throughput ingestion from millions of IoT devices - Real-time stream processing at the edge - Time series data management for sensor readings - Anomaly detection in continuous data streams
4. Modern Data Architecture
Event streaming forms the backbone of modern data platforms: - Data lake integration through streaming ingestion - Real-time ETL replacing batch processing workflows - Change data capture (CDC) from operational databases - Multi-cloud data replication with low latency
Benefits in Integration Contexts
1. Temporal Flexibility
- Historical replay enabling reprocessing of past events for new use cases
- Point-in-time recovery for system state reconstruction
- A/B testing through parallel stream processing with different algorithms
- System migration by replaying events through new processing logic
2. Scalability and Performance
- Linear scalability through stream partitioning and parallel processing
- Low latency processing with sub-millisecond event delivery
- High throughput supporting millions of events per second
- Elastic scaling adapting to varying data volumes automatically
3. Integration Simplification
- Unified data pipeline replacing multiple point-to-point integrations
- Schema evolution supporting gradual system updates
- Multi-consumer patterns enabling diverse downstream processing
- Replay capabilities for integration testing and debugging
4. Operational Excellence
- Monitoring and observability through stream metrics and processing topology visibility
- Fault tolerance with automatic recovery and exactly-once processing guarantees
- Data governance through schema registries and lineage tracking
- Capacity planning based on stream throughput and processing lag metrics
Integration Architecture Applications
1. Real-Time Data Pipelines
Event streaming powers modern data engineering: - Change data capture from operational databases to analytical systems - Real-time ETL transforming data as it flows through the system - Data mesh architectures enabling domain-owned data products - Stream-to-batch integration for hybrid processing scenarios
2. Event-Driven Microservices
Enables sophisticated microservices choreography: - Saga pattern implementation for distributed transaction management - CQRS (Command Query Responsibility Segregation) with event sourcing - Service mesh integration with event-driven communication - Polyglot persistence synchronized through event streams
3. Real-Time Analytics and ML
Supports advanced analytics and machine learning: - Streaming analytics for real-time business metrics - Online machine learning with continuous model updates - Feature engineering on live data streams - Real-time recommendations based on user behavior streams
4. Enterprise Application Integration
Modernizes enterprise integration patterns: - Legacy system integration through event streaming adapters - Multi-cloud data synchronization with low latency - B2B integration with partner systems through standardized event formats - Supply chain visibility through end-to-end event tracking
How Event Streaming Pattern Works
Event streaming operates through distributed, fault-tolerant logs that enable continuous data processing:
Stream Architecture Overview
┌─────────────────────────────────────────────────────────────────────┐
│ Event Streaming Platform │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Producers Event Logs Consumers │
│ ┌──────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ App A │─────────────▶│ Topic A │─────────▶│ Analytics │ │
│ │ │ │ [P0][P1][P2]│ │ Service │ │
│ └──────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │
│ ┌──────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ App B │─────────────▶│ Topic B │─────────▶│ ML Pipeline │ │
│ │ │ │ [P0][P1] │ │ │ │
│ └──────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │
│ ┌──────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ IoT Hub │─────────────▶│ Topic C │─────────▶│ Alerting │ │
│ │ │ │ [P0][P1][P2]│ │ System │ │
│ └──────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
Event Stream Lifecycle
┌───────────────────────────────────────────────────────────────┐
│ Event Stream Lifecycle │
├───────────────────────────────────────────────────────────────┤
│ │
│ 1. [Event Production] ──▶ 2. [Stream Partitioning] ──▶ │
│ │ │
│ ▼ │
│ 6. [Consumer Commit] ◀── 5. [Event Processing] ◀── │
│ │ │
│ ▼ │
│ 3. [Persistent Storage] ──▶ │
│ │ │
│ ▼ │
│ 4. [Consumer Subscription] │
│ │
│ Stream Features: │
│ • Append-only logs │
│ • Configurable retention (time/size) │
│ • Multiple consumer groups │
│ • Replay capability │
│ • Fault tolerance through replication │
└───────────────────────────────────────────────────────────────┘
Stream Processing Topologies
1. Linear Processing Pipeline
Event Source ──▶ [Transform] ──▶ [Filter] ──▶ [Enrich] ──▶ Sink
2. Fan-out Processing
┌──▶ [Analytics Pipeline]
Event Source ──▶ [Split] ──▶ [ML Pipeline]
└──▶ [Alerting Pipeline]
3. Stream Joins
Stream A ──┐
├──▶ [Join] ──▶ Enriched Stream
Stream B ──┘
4. Aggregation Windows
Event Stream ──▶ [Windowing] ──▶ [Aggregate] ──▶ Result Stream
(5-min windows) (sum, count, avg)
Key Components
1. Stream Producer
Publishes events to streaming platforms:
@Component
public class EventStreamProducer {
private final StreamingPlatform streamingPlatform;
private final EventSerializer eventSerializer;
private final StreamMetrics streamMetrics;
private final SchemaRegistry schemaRegistry;
public StreamResult publishEvent(String streamName, Object event) {
return publishEvent(streamName, event, StreamOptions.defaultOptions());
}
public StreamResult publishEvent(String streamName, Object event, StreamOptions options) {
try {
// Validate event schema
validateEventSchema(event, streamName);
// Serialize event
byte[] serializedEvent = eventSerializer.serialize(event);
// Create stream record
StreamRecord record = StreamRecord.builder()
.streamName(streamName)
.eventId(generateEventId())
.timestamp(Instant.now())
.partitionKey(options.getPartitionKey())
.payload(serializedEvent)
.headers(buildHeaders(event, options))
.build();
// Publish to stream
StreamResult result = streamingPlatform.publish(record);
// Record metrics
streamMetrics.recordEventPublished(streamName, serializedEvent.length);
log.debug("Published event to stream {}: partition={}, offset={}",
streamName, result.getPartition(), result.getOffset());
return result;
} catch (Exception e) {
streamMetrics.recordPublishError(streamName);
log.error("Failed to publish event to stream {}: {}", streamName, e.getMessage(), e);
throw new StreamPublishException("Failed to publish event", e);
}
}
public CompletableFuture<StreamResult> publishEventAsync(String streamName,
Object event,
StreamOptions options) {
return CompletableFuture.supplyAsync(() -> publishEvent(streamName, event, options));
}
public StreamResult publishEventBatch(String streamName,
List<Object> events,
StreamOptions options) {
try {
List<StreamRecord> records = events.stream()
.map(event -> {
try {
validateEventSchema(event, streamName);
byte[] serializedEvent = eventSerializer.serialize(event);
return StreamRecord.builder()
.streamName(streamName)
.eventId(generateEventId())
.timestamp(Instant.now())
.partitionKey(options.getPartitionKey())
.payload(serializedEvent)
.headers(buildHeaders(event, options))
.build();
} catch (Exception e) {
throw new StreamPublishException("Failed to prepare event for batch", e);
}
})
.collect(Collectors.toList());
StreamResult result = streamingPlatform.publishBatch(records);
streamMetrics.recordBatchPublished(streamName, events.size());
return result;
} catch (Exception e) {
streamMetrics.recordBatchError(streamName);
throw new StreamPublishException("Failed to publish event batch", e);
}
}
private void validateEventSchema(Object event, String streamName) {
try {
Schema schema = schemaRegistry.getLatestSchema(streamName);
if (schema != null && !schema.isCompatible(event)) {
throw new SchemaValidationException(
"Event does not conform to schema for stream: " + streamName);
}
} catch (SchemaNotFoundException e) {
log.warn("No schema found for stream {}, skipping validation", streamName);
}
}
private Map<String, String> buildHeaders(Object event, StreamOptions options) {
Map<String, String> headers = new HashMap<>();
headers.put("eventType", event.getClass().getSimpleName());
headers.put("producerId", options.getProducerId());
headers.put("schemaVersion", getEventSchemaVersion(event));
if (options.getCorrelationId() != null) {
headers.put("correlationId", options.getCorrelationId());
}
if (options.getCausationId() != null) {
headers.put("causationId", options.getCausationId());
}
return headers;
}
private String generateEventId() {
return UUID.randomUUID().toString();
}
private String getEventSchemaVersion(Object event) {
SchemaVersion annotation = event.getClass().getAnnotation(SchemaVersion.class);
return annotation != null ? annotation.value() : "1.0";
}
}
2. Stream Consumer
Consumes and processes events from streams:
@Component
public abstract class EventStreamConsumer {
private final StreamingPlatform streamingPlatform;
private final EventDeserializer eventDeserializer;
private final StreamMetrics streamMetrics;
private volatile boolean running = false;
private StreamSubscription subscription;
private final ExecutorService processingExecutor;
public void start() {
this.running = true;
StreamSubscription.Builder subscriptionBuilder = StreamSubscription.builder()
.consumerId(getConsumerId())
.consumerGroup(getConsumerGroup())
.streamNames(getStreamNames())
.processingGuarantee(getProcessingGuarantee())
.offsetResetStrategy(getOffsetResetStrategy())
.maxPollRecords(getMaxPollRecords())
.sessionTimeout(getSessionTimeout());
this.subscription = streamingPlatform.subscribe(subscriptionBuilder.build());
// Start processing threads
int concurrency = getProcessingConcurrency();
for (int i = 0; i < concurrency; i++) {
processingExecutor.submit(this::processEvents);
}
log.info("Started stream consumer {} for streams: {}",
getConsumerId(), getStreamNames());
}
public void stop() {
running = false;
if (subscription != null) {
subscription.close();
}
processingExecutor.shutdown();
try {
if (!processingExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
processingExecutor.shutdownNow();
}
} catch (InterruptedException e) {
processingExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
log.info("Stopped stream consumer {}", getConsumerId());
}
private void processEvents() {
while (running) {
try {
List<StreamRecord> records = subscription.poll(getPollTimeout());
if (records.isEmpty()) {
continue;
}
// Process records in batch or individually based on configuration
if (supportsBatchProcessing()) {
processBatch(records);
} else {
for (StreamRecord record : records) {
processRecord(record);
}
}
// Commit offsets based on processing guarantee
if (getProcessingGuarantee() == ProcessingGuarantee.AT_LEAST_ONCE) {
subscription.commitSync();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("Error in stream processing loop: {}", e.getMessage(), e);
handleProcessingError(e);
}
}
}
private void processRecord(StreamRecord record) {
String streamName = record.getStreamName();
long startTime = System.nanoTime();
try {
// Deserialize event
Object event = eventDeserializer.deserialize(
record.getPayload(), getEventType(record));
// Create processing context
StreamProcessingContext context = StreamProcessingContext.builder()
.streamName(streamName)
.partition(record.getPartition())
.offset(record.getOffset())
.timestamp(record.getTimestamp())
.eventId(record.getEventId())
.headers(record.getHeaders())
.event(event)
.build();
// Process event
ProcessingResult result = handleEvent(context);
if (result.isSuccess()) {
streamMetrics.recordEventProcessed(streamName);
} else {
handleProcessingFailure(record, result.getError());
}
} catch (Exception e) {
log.error("Error processing event from stream {}: {}", streamName, e.getMessage(), e);
handleProcessingFailure(record, e);
} finally {
long duration = System.nanoTime() - startTime;
streamMetrics.recordProcessingLatency(streamName, Duration.ofNanos(duration));
}
}
private void processBatch(List<StreamRecord> records) {
try {
List<StreamProcessingContext> contexts = records.stream()
.map(record -> {
try {
Object event = eventDeserializer.deserialize(
record.getPayload(), getEventType(record));
return StreamProcessingContext.builder()
.streamName(record.getStreamName())
.partition(record.getPartition())
.offset(record.getOffset())
.timestamp(record.getTimestamp())
.eventId(record.getEventId())
.headers(record.getHeaders())
.event(event)
.build();
} catch (Exception e) {
throw new StreamProcessingException("Failed to prepare context for batch", e);
}
})
.collect(Collectors.toList());
ProcessingResult result = handleEventBatch(contexts);
if (result.isSuccess()) {
streamMetrics.recordBatchProcessed(records.get(0).getStreamName(), records.size());
} else {
// Handle batch failure - may need individual record processing
handleBatchProcessingFailure(records, result.getError());
}
} catch (Exception e) {
log.error("Error processing event batch: {}", e.getMessage(), e);
handleBatchProcessingFailure(records, e);
}
}
protected abstract ProcessingResult handleEvent(StreamProcessingContext context);
protected abstract String getConsumerId();
protected abstract String getConsumerGroup();
protected abstract List<String> getStreamNames();
protected abstract ProcessingGuarantee getProcessingGuarantee();
protected abstract OffsetResetStrategy getOffsetResetStrategy();
protected abstract int getMaxPollRecords();
protected abstract Duration getPollTimeout();
protected abstract Duration getSessionTimeout();
protected abstract int getProcessingConcurrency();
protected abstract Class<?> getEventType(StreamRecord record);
// Optional overrides
protected boolean supportsBatchProcessing() { return false; }
protected ProcessingResult handleEventBatch(List<StreamProcessingContext> contexts) {
throw new UnsupportedOperationException("Batch processing not supported");
}
protected void handleProcessingError(Exception error) {
// Default: sleep before retry
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
protected void handleProcessingFailure(StreamRecord record, Exception error) {
streamMetrics.recordProcessingError(record.getStreamName());
// Default: log error (can be overridden for DLQ, etc.)
log.error("Failed to process event {} from stream {}: {}",
record.getEventId(), record.getStreamName(), error.getMessage());
}
protected void handleBatchProcessingFailure(List<StreamRecord> records, Exception error) {
// Default: process records individually
for (StreamRecord record : records) {
try {
processRecord(record);
} catch (Exception e) {
handleProcessingFailure(record, e);
}
}
}
}
// Example concrete consumer implementation
@Component
public class ContactEventStreamConsumer extends EventStreamConsumer {
private final ContactEventHandler contactEventHandler;
@Override
protected ProcessingResult handleEvent(StreamProcessingContext context) {
ContactEvent contactEvent = (ContactEvent) context.getEvent();
try {
contactEventHandler.handleContactEvent(contactEvent, context);
return ProcessingResult.success();
} catch (ContactValidationException e) {
return ProcessingResult.failure("Validation error: " + e.getMessage());
} catch (ContactBusinessException e) {
return ProcessingResult.failure("Business logic error: " + e.getMessage());
}
}
@Override
protected String getConsumerId() {
return "contact-event-consumer";
}
@Override
protected String getConsumerGroup() {
return "contact-processing-group";
}
@Override
protected List<String> getStreamNames() {
return Arrays.asList("contact.events", "contact.lifecycle");
}
@Override
protected ProcessingGuarantee getProcessingGuarantee() {
return ProcessingGuarantee.AT_LEAST_ONCE;
}
@Override
protected OffsetResetStrategy getOffsetResetStrategy() {
return OffsetResetStrategy.EARLIEST;
}
@Override
protected int getMaxPollRecords() {
return 100;
}
@Override
protected Duration getPollTimeout() {
return Duration.ofSeconds(5);
}
@Override
protected Duration getSessionTimeout() {
return Duration.ofSeconds(30);
}
@Override
protected int getProcessingConcurrency() {
return 3;
}
@Override
protected Class<?> getEventType(StreamRecord record) {
String eventType = record.getHeaders().get("eventType");
return eventType != null ? getEventClassByType(eventType) : ContactEvent.class;
}
private Class<?> getEventClassByType(String eventType) {
switch (eventType) {
case "ContactCreated": return ContactCreatedEvent.class;
case "ContactUpdated": return ContactUpdatedEvent.class;
case "ContactDeleted": return ContactDeletedEvent.class;
default: return ContactEvent.class;
}
}
}
3. Stream Processor
Handles stream transformations and analytics:
@Component
public class StreamProcessor {
private final StreamingPlatform streamingPlatform;
private final StreamTopologyBuilder topologyBuilder;
public StreamTopology buildContactProcessingTopology() {
StreamTopologyBuilder builder = new StreamTopologyBuilder();
// Source stream
StreamNode<ContactEvent> contactEvents = builder
.source("contact-events-source", "contact.events", ContactEvent.class);
// Filter valid events
StreamNode<ContactEvent> validEvents = contactEvents
.filter("filter-valid", event -> isValidContactEvent(event));
// Transform and enrich
StreamNode<EnrichedContactEvent> enrichedEvents = validEvents
.map("enrich-contact", event -> enrichContactEvent(event));
// Branch processing
Map<String, StreamNode<EnrichedContactEvent>> branches = enrichedEvents
.branch("branch-by-type",
Predicate.named("created", e -> e.getEventType() == EventType.CREATED),
Predicate.named("updated", e -> e.getEventType() == EventType.UPDATED),
Predicate.named("deleted", e -> e.getEventType() == EventType.DELETED));
// Process created contacts
branches.get("created")
.process("process-created", new ContactCreatedProcessor())
.to("contact.processed.created");
// Process updated contacts with windowing
branches.get("updated")
.groupBy("group-by-contact", contact -> contact.getContactId())
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.aggregate("aggregate-updates",
() -> new ContactUpdateAggregate(),
(key, value, aggregate) -> aggregate.add(value),
Materialized.as("contact-updates-store"))
.toStream()
.map("flatten-aggregates", (windowedKey, aggregate) ->
aggregate.toContactUpdateSummary(windowedKey.key()))
.to("contact.update.summaries");
// Process deleted contacts
branches.get("deleted")
.process("process-deleted", new ContactDeletedProcessor())
.to("contact.processed.deleted");
return builder.build();
}
public void startProcessing(StreamTopology topology) {
StreamProcessor processor = streamingPlatform.createProcessor(topology);
processor.start();
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("Shutting down stream processor...");
processor.stop();
}));
}
private boolean isValidContactEvent(ContactEvent event) {
return event != null &&
event.getContactId() != null &&
event.getEventType() != null &&
event.getTimestamp() != null;
}
private EnrichedContactEvent enrichContactEvent(ContactEvent event) {
// Enrich with additional data (lookup services, etc.)
ContactProfile profile = lookupContactProfile(event.getContactId());
ContactMetadata metadata = generateContactMetadata(event);
return EnrichedContactEvent.builder()
.baseEvent(event)
.contactProfile(profile)
.metadata(metadata)
.enrichedAt(Instant.now())
.build();
}
// Custom processors
public static class ContactCreatedProcessor implements StreamNodeProcessor<EnrichedContactEvent, ContactCreatedResult> {
@Override
public ContactCreatedResult process(ProcessorContext context, EnrichedContactEvent event) {
try {
// Business logic for contact creation
ContactCreatedResult result = processContactCreation(event);
// Update state store
context.getStateStore("contact-state").put(event.getContactId(), event);
// Schedule future processing if needed
context.schedule(Duration.ofHours(24),
() -> scheduleContactFollowUp(event.getContactId()));
return result;
} catch (Exception e) {
log.error("Error processing contact creation: {}", e.getMessage(), e);
throw new StreamProcessingException("Contact creation failed", e);
}
}
private ContactCreatedResult processContactCreation(EnrichedContactEvent event) {
// Implementation details...
return ContactCreatedResult.success(event.getContactId());
}
private void scheduleContactFollowUp(String contactId) {
// Implementation details...
}
}
}
4. Schema Registry
Manages event schemas and evolution:
@Component
public class SchemaRegistry {
private final SchemaStorage schemaStorage;
private final Map<String, Schema> schemaCache = new ConcurrentHashMap<>();
private final SchemaCompatibilityChecker compatibilityChecker;
public Schema registerSchema(String subject, SchemaDefinition definition) {
validateSchemaDefinition(definition);
// Check compatibility with existing schemas
List<Schema> existingSchemas = getSchemaHistory(subject);
if (!existingSchemas.isEmpty()) {
Schema latestSchema = existingSchemas.get(existingSchemas.size() - 1);
if (!compatibilityChecker.isCompatible(latestSchema, definition)) {
throw new SchemaCompatibilityException(
"Schema is not compatible with existing version");
}
}
// Generate schema version
int version = existingSchemas.size() + 1;
Schema schema = Schema.builder()
.subject(subject)
.version(version)
.definition(definition)
.registeredAt(Instant.now())
.build();
// Store schema
schemaStorage.store(schema);
// Update cache
schemaCache.put(getCacheKey(subject, version), schema);
schemaCache.put(getCacheKey(subject, "latest"), schema);
log.info("Registered schema for subject {} version {}", subject, version);
return schema;
}
public Optional<Schema> getSchema(String subject, int version) {
String cacheKey = getCacheKey(subject, version);
Schema cached = schemaCache.get(cacheKey);
if (cached != null) {
return Optional.of(cached);
}
Optional<Schema> schema = schemaStorage.getSchema(subject, version);
schema.ifPresent(s -> schemaCache.put(cacheKey, s));
return schema;
}
public Optional<Schema> getLatestSchema(String subject) {
String cacheKey = getCacheKey(subject, "latest");
Schema cached = schemaCache.get(cacheKey);
if (cached != null) {
return Optional.of(cached);
}
Optional<Schema> schema = schemaStorage.getLatestSchema(subject);
schema.ifPresent(s -> schemaCache.put(cacheKey, s));
return schema;
}
public List<Schema> getSchemaHistory(String subject) {
return schemaStorage.getSchemaHistory(subject);
}
public boolean isCompatible(String subject, SchemaDefinition newDefinition) {
Optional<Schema> latestSchema = getLatestSchema(subject);
if (!latestSchema.isPresent()) {
return true; // No existing schema, always compatible
}
return compatibilityChecker.isCompatible(latestSchema.get(), newDefinition);
}
public SchemaEvolutionPlan planEvolution(String subject, SchemaDefinition targetDefinition) {
List<Schema> history = getSchemaHistory(subject);
return SchemaEvolutionPlan.builder()
.subject(subject)
.currentVersion(history.isEmpty() ? 0 : history.get(history.size() - 1).getVersion())
.targetDefinition(targetDefinition)
.evolutionSteps(calculateEvolutionSteps(history, targetDefinition))
.breakingChanges(identifyBreakingChanges(history, targetDefinition))
.migrationRequired(requiresMigration(history, targetDefinition))
.build();
}
private void validateSchemaDefinition(SchemaDefinition definition) {
if (definition == null || definition.getSchema() == null) {
throw new IllegalArgumentException("Schema definition cannot be null");
}
try {
// Validate schema syntax (JSON Schema, Avro, etc.)
definition.validate();
} catch (Exception e) {
throw new SchemaValidationException("Invalid schema definition", e);
}
}
private String getCacheKey(String subject, Object version) {
return subject + ":" + version.toString();
}
private List<EvolutionStep> calculateEvolutionSteps(List<Schema> history,
SchemaDefinition target) {
// Implementation for calculating evolution steps
return Collections.emptyList();
}
private List<BreakingChange> identifyBreakingChanges(List<Schema> history,
SchemaDefinition target) {
// Implementation for identifying breaking changes
return Collections.emptyList();
}
private boolean requiresMigration(List<Schema> history, SchemaDefinition target) {
// Implementation for determining if migration is required
return false;
}
}
Configuration Parameters
Essential Settings
| Parameter | Description | Typical Values |
|---|---|---|
| Partition Count | Number of partitions per stream | 3-100 |
| Retention Period | How long to keep events | 7d-365d |
| Batch Size | Events per producer batch | 100-10000 |
| Consumer Lag Threshold | Maximum acceptable consumer lag | 1000-100000 |
| Processing Guarantee | Delivery semantics | at-least-once/exactly-once |
| Compression Type | Event compression algorithm | gzip/lz4/snappy |
Example Configuration
# Stream Platform Configuration
stream.platform=kafka
stream.bootstrap-servers=localhost:9092
stream.schema-registry-url=http://localhost:8081
# Producer Configuration
stream.producer.batch-size=16384
stream.producer.linger-ms=10
stream.producer.compression-type=lz4
stream.producer.acks=all
stream.producer.retries=3
stream.producer.max-in-flight-requests=1
# Consumer Configuration
stream.consumer.fetch-min-bytes=1024
stream.consumer.fetch-max-wait-ms=500
stream.consumer.max-poll-records=500
stream.consumer.session-timeout-ms=30000
stream.consumer.heartbeat-interval-ms=3000
stream.consumer.auto-offset-reset=earliest
# Stream Processing Configuration
stream.processing.guarantee=exactly-once
stream.processing.commit-interval=10s
stream.processing.cache-max-bytes=10MB
stream.processing.num-stream-threads=4
# Schema Registry Configuration
stream.schema.compatibility-level=BACKWARD
stream.schema.cache-size=1000
stream.schema.cache-expiry=1h
Implementation Examples
1. Apache Kafka Streams Implementation
@Configuration
@EnableKafka
public class KafkaStreamsConfiguration {
@Bean
public KafkaStreamsConfiguration kafkaStreamsConfig() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "contact-streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024);
return new KafkaStreamsConfiguration(props);
}
@Bean
public KafkaStreams contactEventStream(KafkaStreamsConfiguration config) {
StreamsBuilder builder = new StreamsBuilder();
// Contact events stream
KStream<String, ContactEvent> contactEvents = builder
.stream("contact.events",
Consumed.with(Serdes.String(), new ContactEventSerde()));
// Filter and transform
KStream<String, EnrichedContactEvent> enrichedEvents = contactEvents
.filter((key, event) -> event != null && event.getContactId() != null)
.mapValues(this::enrichContactEvent);
// Branch by event type
Map<String, KStream<String, EnrichedContactEvent>> branches = enrichedEvents
.split(Named.as("contact-"))
.branch((key, event) -> event.getEventType() == EventType.CREATED,
Branched.as("created"))
.branch((key, event) -> event.getEventType() == EventType.UPDATED,
Branched.as("updated"))
.defaultBranch(Branched.as("other"));
// Process created contacts
branches.get("contact-created")
.process(() -> new ContactCreatedProcessor(),
Named.as("process-created"));
// Aggregate updated contacts
branches.get("contact-updated")
.groupBy((key, event) -> event.getContactId(),
Grouped.with(Serdes.String(), new EnrichedContactEventSerde()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.aggregate(
ContactUpdateSummary::new,
(key, event, aggregate) -> aggregate.addUpdate(event),
Named.as("aggregate-updates"),
Materialized.<String, ContactUpdateSummary, WindowStore<Bytes, byte[]>>as("updates-store")
.withKeySerde(Serdes.String())
.withValueSerde(new ContactUpdateSummarySerde()))
.toStream()
.map((windowedKey, summary) ->
KeyValue.pair(windowedKey.key(), summary.complete()))
.to("contact.update.summaries",
Produced.with(Serdes.String(), new ContactUpdateSummarySerde()));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, config.asProperties());
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
return streams;
}
private EnrichedContactEvent enrichContactEvent(ContactEvent event) {
// Enrichment logic
return EnrichedContactEvent.builder()
.baseEvent(event)
.enrichedAt(Instant.now())
.build();
}
}
@Component
public class KafkaContactEventProducer {
private final KafkaTemplate<String, ContactEvent> kafkaTemplate;
@EventListener
@Async
public void handleContactEvent(ContactEvent event) {
String partitionKey = event.getContactId();
kafkaTemplate.send("contact.events", partitionKey, event)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Failed to publish contact event: {}", ex.getMessage(), ex);
} else {
log.debug("Published contact event: partition={}, offset={}",
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
});
}
}
2. Amazon Kinesis Implementation
@Configuration
public class KinesisConfiguration {
@Bean
public AmazonKinesis kinesisClient() {
return AmazonKinesisClientBuilder.standard()
.withRegion(Regions.EU_WEST_1)
.build();
}
@Bean
public KinesisProducerConfiguration kinesisProducerConfig() {
KinesisProducerConfiguration config = new KinesisProducerConfiguration();
config.setRegion("eu-west-1");
config.setMaxConnections(24);
config.setRequestTimeout(60000);
config.setRecordMaxBufferedTime(15000);
config.setAggregationEnabled(true);
config.setCompressionFormat("gzip");
return config;
}
@Bean
public KinesisProducer kinesisProducer(KinesisProducerConfiguration config) {
return new KinesisProducer(config);
}
}
@Service
public class KinesisContactEventProducer {
private final KinesisProducer kinesisProducer;
private final ObjectMapper objectMapper;
@Value("${aws.kinesis.contact-stream}")
private String streamName;
public CompletableFuture<UserRecordResult> publishContactEvent(ContactEvent event) {
try {
String partitionKey = event.getContactId();
String eventJson = objectMapper.writeValueAsString(event);
ByteBuffer data = ByteBuffer.wrap(eventJson.getBytes(StandardCharsets.UTF_8));
UserRecord record = UserRecord.builder()
.streamName(streamName)
.partitionKey(partitionKey)
.data(data)
.explicitHashKey(null) // Let Kinesis handle hash key
.build();
CompletableFuture<UserRecordResult> future = kinesisProducer.addUserRecord(record);
future.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Failed to publish to Kinesis: {}", ex.getMessage(), ex);
} else {
log.debug("Published to Kinesis: shard={}, sequence={}",
result.getShardId(), result.getSequenceNumber());
}
});
return future;
} catch (Exception e) {
log.error("Error preparing Kinesis record: {}", e.getMessage(), e);
CompletableFuture<UserRecordResult> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(e);
return failedFuture;
}
}
}
@Component
public class KinesisContactEventConsumer {
private final AmazonKinesis kinesisClient;
private final ObjectMapper objectMapper;
private final ContactEventHandler eventHandler;
@Value("${aws.kinesis.contact-stream}")
private String streamName;
@PostConstruct
public void startConsumer() {
ExecutorService executor = Executors.newFixedThreadPool(2);
// Start multiple workers for different shards
for (int i = 0; i < 2; i++) {
executor.submit(this::consumeEvents);
}
}
private void consumeEvents() {
String shardIterator = getShardIterator();
while (true) {
try {
GetRecordsRequest request = GetRecordsRequest.builder()
.shardIterator(shardIterator)
.limit(100)
.build();
GetRecordsResult result = kinesisClient.getRecords(request);
for (Record record : result.getRecords()) {
processRecord(record);
}
shardIterator = result.getNextShardIterator();
if (result.getRecords().isEmpty()) {
Thread.sleep(1000); // Wait before polling again
}
} catch (Exception e) {
log.error("Error consuming from Kinesis: {}", e.getMessage(), e);
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}
private void processRecord(Record record) {
try {
String eventJson = StandardCharsets.UTF_8.decode(record.getData()).toString();
ContactEvent event = objectMapper.readValue(eventJson, ContactEvent.class);
eventHandler.handleContactEvent(event);
log.debug("Processed Kinesis record: sequence={}", record.getSequenceNumber());
} catch (Exception e) {
log.error("Error processing Kinesis record: {}", e.getMessage(), e);
}
}
private String getShardIterator() {
// Implementation to get shard iterator
return null;
}
}
3. Apache Pulsar Implementation
@Configuration
public class PulsarConfiguration {
@Bean
public PulsarClient pulsarClient() {
try {
return PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
} catch (PulsarClientException e) {
throw new RuntimeException("Failed to create Pulsar client", e);
}
}
@Bean
public Producer<ContactEvent> contactEventProducer(PulsarClient client) {
try {
return client.newProducer(Schema.JSON(ContactEvent.class))
.topic("contact-events")
.batchingMaxMessages(100)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
.compressionType(CompressionType.LZ4)
.create();
} catch (PulsarClientException e) {
throw new RuntimeException("Failed to create Pulsar producer", e);
}
}
@Bean
public Consumer<ContactEvent> contactEventConsumer(PulsarClient client) {
try {
return client.newConsumer(Schema.JSON(ContactEvent.class))
.topic("contact-events")
.subscriptionName("contact-processing")
.subscriptionType(SubscriptionType.Shared)
.receiverQueueSize(1000)
.subscribe();
} catch (PulsarClientException e) {
throw new RuntimeException("Failed to create Pulsar consumer", e);
}
}
}
@Service
public class PulsarContactEventProducer {
private final Producer<ContactEvent> producer;
public CompletableFuture<MessageId> publishContactEvent(ContactEvent event) {
return producer.newMessage()
.key(event.getContactId())
.property("eventType", event.getEventType().name())
.eventTime(event.getTimestamp().toEpochMilli())
.value(event)
.sendAsync()
.whenComplete((messageId, ex) -> {
if (ex != null) {
log.error("Failed to publish to Pulsar: {}", ex.getMessage(), ex);
} else {
log.debug("Published to Pulsar: messageId={}", messageId);
}
});
}
}
@Component
public class PulsarContactEventConsumer {
private final Consumer<ContactEvent> consumer;
private final ContactEventHandler eventHandler;
@PostConstruct
public void startConsumer() {
CompletableFuture.runAsync(this::consumeEvents);
}
private void consumeEvents() {
while (true) {
try {
Messages<ContactEvent> messages = consumer.batchReceive();
for (Message<ContactEvent> message : messages) {
processMessage(message);
}
consumer.acknowledge(messages);
} catch (Exception e) {
log.error("Error consuming from Pulsar: {}", e.getMessage(), e);
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}
private void processMessage(Message<ContactEvent> message) {
try {
ContactEvent event = message.getValue();
eventHandler.handleContactEvent(event);
log.debug("Processed Pulsar message: messageId={}", message.getMessageId());
} catch (Exception e) {
log.error("Error processing Pulsar message: {}", e.getMessage(), e);
}
}
}
Best Practices
1. Event Design and Schema Management
public class EventDesignBestPractices {
// Use CloudEvents specification for standard event structure
public class CloudEventWrapper<T> {
private String specversion = "1.0";
private String type;
private String source;
private String id;
private Instant time;
private String datacontenttype = "application/json";
private T data;
private String subject;
private Map<String, String> extensionAttributes;
public static <T> CloudEventWrapper<T> create(T data, String source, String type) {
return CloudEventWrapper.<T>builder()
.id(UUID.randomUUID().toString())
.source(source)
.type(type)
.time(Instant.now())
.data(data)
.build();
}
}
// Design events for schema evolution
@SchemaEvolution(compatibility = BACKWARD)
public class ContactEventV2 {
@Required
private String eventId;
@Required
private String contactId;
@Required
private EventType eventType;
@Required
private Instant timestamp;
// New fields with defaults for backward compatibility
@Since("2.0")
@Default("unknown")
private String source = "unknown";
@Since("2.0")
private Map<String, String> metadata = new HashMap<>();
// Deprecated fields marked for future removal
@Deprecated(since = "2.0", forRemoval = "3.0")
private String legacyField;
}
// Use semantic versioning for schemas
public class SchemaVersioning {
public Schema evolveSchema(Schema currentSchema, SchemaChange change) {
SchemaVersion currentVersion = currentSchema.getVersion();
SchemaVersion newVersion;
if (change.isBreaking()) {
newVersion = currentVersion.nextMajor(); // 1.0 -> 2.0
} else if (change.isFeatureAddition()) {
newVersion = currentVersion.nextMinor(); // 1.0 -> 1.1
} else {
newVersion = currentVersion.nextPatch(); // 1.0.0 -> 1.0.1
}
return currentSchema.evolve(newVersion, change);
}
}
}
2. Stream Partitioning and Ordering
@Component
public class StreamPartitioningStrategy {
// Partition by entity ID for ordered processing
public String getPartitionKey(ContactEvent event) {
return event.getContactId(); // All events for same contact go to same partition
}
// Custom partitioner for specific business logic
public class ContactEventPartitioner implements StreamPartitioner<String, ContactEvent> {
@Override
public Integer partition(String topic, String key, ContactEvent value, int numPartitions) {
// Partition by contact region for geographic distribution
String region = extractRegion(value.getContactId());
return Math.abs(region.hashCode()) % numPartitions;
}
private String extractRegion(String contactId) {
// Extract region from contact ID format: REGION-XXXXXXXXXX
return contactId.split("-")[0];
}
}
// Handle out-of-order events with event timestamps
@Component
public class OutOfOrderEventHandler {
private final Map<String, Long> lastProcessedTimestamp = new ConcurrentHashMap<>();
public boolean isOutOfOrder(ContactEvent event) {
String contactId = event.getContactId();
long eventTimestamp = event.getTimestamp().toEpochMilli();
Long lastTimestamp = lastProcessedTimestamp.get(contactId);
if (lastTimestamp == null || eventTimestamp > lastTimestamp) {
lastProcessedTimestamp.put(contactId, eventTimestamp);
return false;
}
return true;
}
public void handleOutOfOrderEvent(ContactEvent event) {
// Buffer out-of-order events for later processing
outOfOrderBuffer.add(event);
// Trigger reprocessing if buffer size threshold reached
if (outOfOrderBuffer.size() > 100) {
reprocessBufferedEvents();
}
}
}
}
3. Stream Processing Patterns
@Component
public class StreamProcessingPatterns {
// Windowed aggregations for time-based analytics
public void configureWindowedAggregation(StreamsBuilder builder) {
builder.stream("contact.events",
Consumed.with(Serdes.String(), new ContactEventSerde()))
.groupBy((key, event) -> event.getEventType(),
Grouped.with(Serdes.String(), new ContactEventSerde()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)).grace(Duration.ofSeconds(30)))
.count(Named.as("event-counts"))
.toStream()
.map((windowedKey, count) -> KeyValue.pair(
windowedKey.key() + "@" + windowedKey.window().start(),
new EventCountSummary(windowedKey.key(), count, windowedKey.window())
))
.to("contact.event.counts");
}
// Stream joins for event correlation
public void configureStreamJoins(StreamsBuilder builder) {
KStream<String, ContactEvent> contactEvents =
builder.stream("contact.events");
KStream<String, UserEvent> userEvents =
builder.stream("user.events");
// Join streams within a time window
contactEvents.join(
userEvents,
(contact, user) -> new CorrelatedEvent(contact, user),
JoinWindows.of(Duration.ofMinutes(5)),
StreamJoined.with(
Serdes.String(),
new ContactEventSerde(),
new UserEventSerde()
)
).to("correlated.events");
}
// Error handling and dead letter topics
public void configureErrorHandling(StreamsBuilder builder) {
builder.stream("contact.events")
.process(() -> new ContactEventProcessor(),
Named.as("process-contacts"))
.to((recordContext, key, value, topicNameExtractor) -> {
if (value instanceof ProcessingError) {
return "contact.events.errors";
} else {
return "contact.events.processed";
}
});
}
}
Common Pitfalls
1. Hot Partitions
Problem: Uneven distribution of events leading to processing bottlenecks
Solution: Use effective partitioning strategies and monitor partition metrics
2. Schema Compatibility Issues
Problem: Breaking schema changes causing consumer failures
Solution: Implement schema registry with compatibility checking
3. Consumer Lag Accumulation
Problem: Consumers falling behind producers causing delays
Solution: Monitor lag metrics and implement auto-scaling
4. Memory Leaks in State Stores
Problem: Unbounded state growth in stream processing applications
Solution: Configure retention policies and compact state stores
5. Exactly-Once Processing Overhead
Problem: Performance degradation with exactly-once guarantees
Solution: Use at-least-once when appropriate and implement idempotent processing
Integration in Distributed Systems
Event Sourcing Implementation
@Service
public class EventSourcedContactService {
public void createContact(CreateContactCommand command) {
ContactCreatedEvent event = ContactCreatedEvent.builder()
.contactId(command.getContactId())
.personalData(command.getPersonalData())
.occurredAt(Instant.now())
.build();
eventStreamProducer.publish("contact.events", event);
}
}
CQRS with Event Streaming
@Service
public class ContactQueryService {
@StreamListener("contact.events")
public void updateContactProjection(ContactEvent event) {
ContactProjection projection = contactProjectionRepository
.findByContactId(event.getContactId())
.orElse(new ContactProjection(event.getContactId()));
projection.apply(event);
contactProjectionRepository.save(projection);
}
}
Conclusion
Event Streaming is essential for building modern, responsive distributed systems. It provides:
- Real-Time Processing: Continuous data processing with low latency
- Scalable Architecture: Linear scalability through partitioning and parallel processing
- Temporal Flexibility: Historical replay and point-in-time recovery capabilities
- Integration Simplification: Unified data pipeline replacing complex point-to-point connections
When properly implemented with appropriate partitioning, schema management, and monitoring, event streaming enables robust, scalable architectures that can process massive data volumes while maintaining consistency and enabling real-time business intelligence.
References
- Kafka: The Definitive Guide
- Designing Event-Driven Systems
- Building Streaming Applications with Apache Kafka
- Event Streaming Patterns