Publish-Subscribe Pattern
Overview
The Publish-Subscribe (Pub-Sub) pattern is a fundamental messaging paradigm that enables one-to-many communication through asynchronous message distribution. It establishes a decoupled communication model where message producers (publishers) send messages to topics or channels without knowledge of message consumers (subscribers), while subscribers express interest in specific message types without awareness of publishers. This pattern forms the foundation for scalable, event-driven architectures.
Theoretical Foundation
The Publish-Subscribe pattern is grounded in observer pattern theory and event-driven architecture principles. It implements the "Law of Demeter" at the messaging level, ensuring loose coupling between system components. The pattern embodies "publish-subscribe semantics" where message delivery is based on content filtering and subscription matching rather than explicit addressing.
Core Principles
1. Topic-Based Message Distribution
Messages are published to logical channels (topics) rather than specific destinations, enabling dynamic subscriber registration and flexible message routing based on content classification.
2. Temporal and Spatial Decoupling
Publishers and subscribers operate independently in both time and space - they don't need to be active simultaneously, nor do they need knowledge of each other's location or existence.
3. Content-Based Filtering
Subscribers receive only messages matching their subscription criteria, enabling fine-grained message filtering and reducing unnecessary message processing overhead.
4. Dynamic Subscription Management
Subscribers can dynamically register and unregister interest in topics at runtime, enabling adaptive system behavior and efficient resource utilization.
Why Publish-Subscribe is Essential in Integration Architecture
1. Event-Driven Integration
Modern distributed systems require responsive, real-time integration: - Business event propagation across multiple interested systems - Data synchronization between distributed databases and caches - Real-time notifications for user interfaces and external systems - Audit trail distribution for compliance and monitoring systems
2. System Decoupling
Pub-Sub enables architectural flexibility and maintainability: - Service independence allowing systems to evolve independently - Dynamic service discovery through topic-based communication - Reduced integration complexity by eliminating point-to-point connections - Simplified testing through isolation of publishers and subscribers
3. Scalability and Performance
The pattern supports horizontal scaling and performance optimization: - Fan-out messaging distributing load across multiple consumers - Parallel processing enabling concurrent message handling - Load balancing through consumer group partitioning - Elastic scaling by adding or removing subscribers dynamically
4. Business Agility
Pub-Sub supports rapid business adaptation and feature development: - New subscriber integration without modifying existing publishers - A/B testing through selective message routing to test subscribers - Feature rollout using gradual subscription activation - Business process flexibility through configurable event flows
Benefits in Integration Contexts
1. Architectural Flexibility
- Loose coupling between event producers and consumers
- Dynamic composition of business processes through subscription management
- Service mesh integration with topic-based service discovery
- Multi-tenant architectures through topic segmentation and access control
2. Operational Excellence
- Fault isolation preventing subscriber failures from affecting publishers
- Monitoring simplification through centralized topic metrics
- Capacity planning based on topic volume and subscriber performance
- Deployment flexibility enabling independent service updates
3. Development Productivity
- Parallel development of publishers and subscribers
- Simplified testing through message replay and subscription mocking
- Code reusability through standardized topic interfaces
- Rapid prototyping by subscribing to existing event streams
4. Business Intelligence
- Real-time analytics through event stream processing
- Business activity monitoring via comprehensive event capture
- Compliance reporting through audit event subscription
- Performance optimization through behavioral event analysis
Integration Architecture Applications
1. Microservices Event Choreography
Coordinates distributed business processes through event flows: - Order processing workflows triggering inventory, payment, and shipping events - User account lifecycle management propagating registration, verification, and deletion events - Data consistency maintenance through eventual consistency event patterns - Service dependency management via event-driven state synchronization
2. Enterprise Application Integration (EAI)
Enables integration of heterogeneous enterprise systems: - ERP system integration broadcasting financial and operational events - CRM data synchronization sharing customer events across systems - Master data management propagating reference data changes - Legacy system modernization through event-driven API exposure
3. IoT and Real-Time Systems
Supports massive-scale device integration and real-time processing: - Sensor data distribution from IoT devices to multiple processing systems - Device lifecycle management through device status and configuration events - Real-time alerting based on sensor threshold breaches - Telemetry data aggregation for analytics and monitoring systems
4. Multi-Channel User Experience
Coordinates user interactions across different channels and devices: - User activity synchronization across web, mobile, and desktop applications - Notification delivery to multiple communication channels (email, SMS, push) - Personalization engines consuming user behavior events - Content management distributing updates across multiple presentation layers
How Publish-Subscribe Pattern Works
The Publish-Subscribe pattern operates through a message broker that manages topics and handles message distribution to subscribers:
Basic Pub-Sub Flow
Publisher A ────┐ ┌──→ Subscriber X
│ │
Publisher B ────┼─→ [Topic: Orders] ─┼──→ Subscriber Y
│ │
Publisher C ────┘ └──→ Subscriber Z
Publisher D ────┐ ┌──→ Subscriber P
│ │
Publisher E ────┼─→ [Topic: Events] ─┼──→ Subscriber Q
│ │
Publisher F ────┘ └──→ Subscriber R
Message Broker Architecture
┌─────────────────────────────────────────────────────────┐
│ Message Broker │
├─────────────────────────────────────────────────────────┤
│ Topic Manager │ Subscription Manager │
│ ┌─────────────┐ │ ┌──────────────────────┐ │
│ │ Topic A │ │ │ Active Subscriptions │ │
│ │ Topic B │ │ │ - Subscriber 1 │ │
│ │ Topic C │ │ │ - Subscriber 2 │ │
│ └─────────────┘ │ │ - Subscriber 3 │ │
│ │ └──────────────────────┘ │
├─────────────────────────────────────────────────────────┤
│ Message Routing Engine │
│ ┌─────────────┐ ┌──────────────┐ ┌────────────────┐ │
│ │ Filter │ │ Transform │ │ Delivery │ │
│ │ Engine │ │ Engine │ │ Manager │ │
│ └─────────────┘ └──────────────┘ └────────────────┘ │
├─────────────────────────────────────────────────────────┤
│ Persistence & Recovery │
│ ┌─────────────┐ ┌──────────────┐ ┌────────────────┐ │
│ │ Message │ │ Offset │ │ Dead Letter │ │
│ │ Storage │ │ Management │ │ Queue │ │
│ └─────────────┘ └──────────────┘ └────────────────┘ │
└─────────────────────────────────────────────────────────┘
Subscription Types and Message Delivery
1. Topic Subscription
Subscriber registers for: Topic = "order.created"
Message matches if: topic == "order.created"
┌─────────────────────────────────────┐
│ Subscription Registration │
├─────────────────────────────────────┤
│ Subscriber ID: service-001 │
│ Topic: order.created │
│ Filter: null │
│ Delivery Mode: At-Least-Once │
│ Consumer Group: order-processors │
└─────────────────────────────────────┘
2. Pattern-Based Subscription
Subscriber registers for: Pattern = "order.*"
Message matches if: topic matches pattern
┌─────────────────────────────────────┐
│ Pattern Subscription │
├─────────────────────────────────────┤
│ Subscriber ID: audit-service │
│ Pattern: order.* │
│ Matches: │
│ - order.created │
│ - order.updated │
│ - order.cancelled │
└─────────────────────────────────────┘
3. Content-Based Subscription
Subscriber registers for: Topic + Content Filter
Message matches if: topic == "order.created" AND order.amount > 1000
┌─────────────────────────────────────┐
│ Content Filter Subscription │
├─────────────────────────────────────┤
│ Subscriber ID: risk-analysis │
│ Topic: order.created │
│ Filter: amount > 1000 AND │
│ category != 'internal' │
│ Delivery: Push │
└─────────────────────────────────────┘
Key Components
1. Topic Manager
Manages topic creation, configuration, and lifecycle:
@Component
public class TopicManager {
private final Map<String, Topic> topics = new ConcurrentHashMap<>();
private final TopicConfigurationService configService;
private final MetricRegistry metricRegistry;
public Topic createTopic(TopicConfiguration config) {
validateTopicConfiguration(config);
Topic topic = Topic.builder()
.name(config.getName())
.partitions(config.getPartitions())
.replicationFactor(config.getReplicationFactor())
.retentionPolicy(config.getRetentionPolicy())
.compressionType(config.getCompressionType())
.maxMessageSize(config.getMaxMessageSize())
.build();
topics.put(config.getName(), topic);
// Create metrics for the topic
metricRegistry.counter("topic.messages.published", "topic", topic.getName());
metricRegistry.counter("topic.messages.consumed", "topic", topic.getName());
metricRegistry.gauge("topic.subscribers.count", "topic", topic.getName(),
() -> getSubscriberCount(topic.getName()));
log.info("Created topic: {} with {} partitions", topic.getName(), topic.getPartitions());
return topic;
}
public Optional<Topic> getTopic(String topicName) {
return Optional.ofNullable(topics.get(topicName));
}
public List<Topic> getTopicsByPattern(String pattern) {
Pattern compiledPattern = Pattern.compile(pattern);
return topics.values().stream()
.filter(topic -> compiledPattern.matcher(topic.getName()).matches())
.collect(Collectors.toList());
}
private void validateTopicConfiguration(TopicConfiguration config) {
if (config.getName() == null || config.getName().trim().isEmpty()) {
throw new IllegalArgumentException("Topic name cannot be null or empty");
}
if (config.getName().contains("..") || config.getName().startsWith(".")) {
throw new IllegalArgumentException("Invalid topic name format");
}
if (config.getPartitions() <= 0) {
throw new IllegalArgumentException("Partition count must be positive");
}
if (config.getReplicationFactor() <= 0) {
throw new IllegalArgumentException("Replication factor must be positive");
}
}
}
2. Subscription Manager
Manages subscriber registrations and message routing:
@Component
public class SubscriptionManager {
private final Map<String, Set<Subscription>> topicSubscriptions = new ConcurrentHashMap<>();
private final Map<String, Subscription> activeSubscriptions = new ConcurrentHashMap<>();
private final MessageMatcher messageMatcher;
public Subscription subscribe(SubscriptionRequest request) {
validateSubscriptionRequest(request);
Subscription subscription = Subscription.builder()
.id(generateSubscriptionId())
.subscriberId(request.getSubscriberId())
.topicPattern(request.getTopicPattern())
.contentFilter(request.getContentFilter())
.deliveryMode(request.getDeliveryMode())
.consumerGroup(request.getConsumerGroup())
.createdAt(Instant.now())
.build();
// Register subscription for all matching topics
if (request.isPatternBased()) {
registerPatternSubscription(subscription);
} else {
registerTopicSubscription(subscription, request.getTopic());
}
activeSubscriptions.put(subscription.getId(), subscription);
log.info("Created subscription {} for subscriber {} on topic/pattern: {}",
subscription.getId(),
subscription.getSubscriberId(),
subscription.getTopicPattern());
return subscription;
}
public void unsubscribe(String subscriptionId) {
Subscription subscription = activeSubscriptions.remove(subscriptionId);
if (subscription != null) {
// Remove from topic-specific subscriptions
topicSubscriptions.values().forEach(subscriptions ->
subscriptions.removeIf(s -> s.getId().equals(subscriptionId)));
log.info("Removed subscription {} for subscriber {}",
subscriptionId,
subscription.getSubscriberId());
}
}
public Set<Subscription> getMatchingSubscriptions(String topic, Message message) {
Set<Subscription> matchingSubscriptions = new HashSet<>();
// Get direct topic subscriptions
Set<Subscription> topicSubs = topicSubscriptions.getOrDefault(topic, Collections.emptySet());
for (Subscription subscription : topicSubs) {
if (messageMatcher.matches(subscription, topic, message)) {
matchingSubscriptions.add(subscription);
}
}
return matchingSubscriptions;
}
private void registerTopicSubscription(Subscription subscription, String topic) {
topicSubscriptions.computeIfAbsent(topic, k -> new ConcurrentHashMap<>().newKeySet())
.add(subscription);
}
private void registerPatternSubscription(Subscription subscription) {
// For pattern subscriptions, check against all existing topics
topicSubscriptions.keySet().stream()
.filter(topic -> matchesPattern(topic, subscription.getTopicPattern()))
.forEach(topic -> registerTopicSubscription(subscription, topic));
}
private boolean matchesPattern(String topic, String pattern) {
try {
return Pattern.compile(pattern).matcher(topic).matches();
} catch (PatternSyntaxException e) {
log.warn("Invalid topic pattern: {}", pattern, e);
return false;
}
}
}
3. Message Publisher
Handles message publishing and distribution:
@Component
public class MessagePublisher {
private final TopicManager topicManager;
private final SubscriptionManager subscriptionManager;
private final MessageDistributor messageDistributor;
private final MessagePersistence messagePersistence;
public PublishResult publish(String topicName, Message message) {
return publish(topicName, message, PublishOptions.defaultOptions());
}
public PublishResult publish(String topicName, Message message, PublishOptions options) {
validatePublishRequest(topicName, message);
Optional<Topic> topicOpt = topicManager.getTopic(topicName);
if (!topicOpt.isPresent() && !options.isAutoCreateTopic()) {
throw new TopicNotFoundException("Topic not found: " + topicName);
}
Topic topic = topicOpt.orElseGet(() -> createTopicIfAllowed(topicName, options));
// Enrich message with metadata
Message enrichedMessage = enrichMessage(message, topicName, options);
try {
// Persist message if required
if (topic.isPersistent()) {
messagePersistence.store(topicName, enrichedMessage);
}
// Get matching subscriptions
Set<Subscription> subscriptions = subscriptionManager.getMatchingSubscriptions(
topicName, enrichedMessage);
// Distribute message to subscribers
PublishResult result = messageDistributor.distribute(
topic, enrichedMessage, subscriptions, options);
// Record metrics
recordPublishMetrics(topicName, enrichedMessage, result);
log.debug("Published message {} to topic {} - delivered to {} subscribers",
enrichedMessage.getId(), topicName, result.getDeliveredCount());
return result;
} catch (Exception e) {
log.error("Failed to publish message to topic {}: {}", topicName, e.getMessage(), e);
throw new PublishException("Failed to publish message", e);
}
}
public CompletableFuture<PublishResult> publishAsync(String topicName,
Message message,
PublishOptions options) {
return CompletableFuture.supplyAsync(() -> publish(topicName, message, options));
}
private Message enrichMessage(Message message, String topicName, PublishOptions options) {
return message.toBuilder()
.id(message.getId() != null ? message.getId() : UUID.randomUUID().toString())
.topic(topicName)
.timestamp(message.getTimestamp() != null ? message.getTimestamp() : Instant.now())
.publisher(options.getPublisherId())
.correlationId(options.getCorrelationId())
.build();
}
private void recordPublishMetrics(String topicName, Message message, PublishResult result) {
metricRegistry.counter("messages.published", "topic", topicName).increment();
metricRegistry.counter("messages.delivered", "topic", topicName)
.increment(result.getDeliveredCount());
metricRegistry.histogram("message.size", "topic", topicName)
.update(message.getPayload().length);
}
}
4. Message Subscriber
Handles subscription management and message consumption:
@Component
public abstract class MessageSubscriber {
private final SubscriptionManager subscriptionManager;
private final MessageDeserializer messageDeserializer;
private Subscription subscription;
public void start() {
SubscriptionRequest request = createSubscriptionRequest();
this.subscription = subscriptionManager.subscribe(request);
log.info("Started subscriber {} with subscription {}",
getSubscriberId(), subscription.getId());
}
public void stop() {
if (subscription != null) {
subscriptionManager.unsubscribe(subscription.getId());
subscription = null;
log.info("Stopped subscriber {}", getSubscriberId());
}
}
public void handleMessage(String topic, Message message) {
try {
// Deserialize message payload
Object payload = messageDeserializer.deserialize(
message.getPayload(), getExpectedPayloadType());
// Process message
MessageContext context = MessageContext.builder()
.topic(topic)
.message(message)
.payload(payload)
.subscription(subscription)
.receivedAt(Instant.now())
.build();
processMessage(context);
// Acknowledge message processing
acknowledgeMessage(message);
} catch (Exception e) {
log.error("Error processing message {} from topic {}: {}",
message.getId(), topic, e.getMessage(), e);
handleProcessingError(topic, message, e);
}
}
protected abstract void processMessage(MessageContext context);
protected abstract String getSubscriberId();
protected abstract Class<?> getExpectedPayloadType();
protected abstract SubscriptionRequest createSubscriptionRequest();
protected void acknowledgeMessage(Message message) {
// Default implementation - can be overridden for specific acknowledgment strategies
log.debug("Acknowledged message: {}", message.getId());
}
protected void handleProcessingError(String topic, Message message, Exception error) {
// Default error handling - can be overridden for custom error strategies
log.error("Message processing failed for message {} on topic {}",
message.getId(), topic);
}
}
// Example concrete subscriber implementation
@Component
public class OrderEventSubscriber extends MessageSubscriber {
private final OrderService orderService;
@Override
protected void processMessage(MessageContext context) {
OrderEvent orderEvent = (OrderEvent) context.getPayload();
switch (orderEvent.getType()) {
case CREATED:
orderService.handleOrderCreated(orderEvent);
break;
case UPDATED:
orderService.handleOrderUpdated(orderEvent);
break;
case CANCELLED:
orderService.handleOrderCancelled(orderEvent);
break;
default:
log.warn("Unknown order event type: {}", orderEvent.getType());
}
}
@Override
protected String getSubscriberId() {
return "order-event-subscriber";
}
@Override
protected Class<?> getExpectedPayloadType() {
return OrderEvent.class;
}
@Override
protected SubscriptionRequest createSubscriptionRequest() {
return SubscriptionRequest.builder()
.subscriberId(getSubscriberId())
.topic("order.*")
.deliveryMode(DeliveryMode.AT_LEAST_ONCE)
.consumerGroup("order-processors")
.contentFilter("eventType IN ['CREATED', 'UPDATED', 'CANCELLED']")
.build();
}
}
Configuration Parameters
Essential Settings
| Parameter | Description | Typical Values |
|---|---|---|
| Topic Partitions | Number of partitions per topic | 1-100 |
| Replication Factor | Number of message replicas | 1-5 |
| Message Retention | How long to keep messages | 1h-30d |
| Subscriber Timeout | Max time for message processing | 30s-300s |
| Batch Size | Messages per batch delivery | 1-1000 |
| Buffer Size | Publisher message buffer size | 1MB-100MB |
Example Configuration
# Topic Configuration
pub-sub.topic.default-partitions=10
pub-sub.topic.default-replication-factor=3
pub-sub.topic.default-retention=24h
pub-sub.topic.auto-create=false
pub-sub.topic.max-message-size=1MB
# Publisher Configuration
pub-sub.publisher.batch-size=100
pub-sub.publisher.linger-ms=10
pub-sub.publisher.buffer-memory=32MB
pub-sub.publisher.compression-type=lz4
pub-sub.publisher.retries=3
pub-sub.publisher.retry-delay=1s
# Subscriber Configuration
pub-sub.subscriber.fetch-min-bytes=1KB
pub-sub.subscriber.fetch-max-wait=500ms
pub-sub.subscriber.max-poll-records=500
pub-sub.subscriber.session-timeout=30s
pub-sub.subscriber.heartbeat-interval=3s
# Consumer Group Configuration
pub-sub.consumer-group.rebalance-timeout=60s
pub-sub.consumer-group.offset-retention=7d
pub-sub.consumer-group.auto-commit=true
pub-sub.consumer-group.auto-commit-interval=5s
Implementation Examples
1. Apache Kafka Publish-Subscribe Implementation
@Configuration
public class KafkaPublishSubscribeConfiguration {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
configProps.put(ProducerConfig.LINGER_MS_CONFIG, 10);
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "contact-service-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(props);
}
}
@Service
public class ContactEventPublisher {
private final KafkaTemplate<String, Object> kafkaTemplate;
public void publishContactCreated(ContactCreatedEvent event) {
kafkaTemplate.send("contact.events", "contact.created", event)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Failed to publish contact created event: {}", ex.getMessage(), ex);
} else {
log.info("Contact created event published successfully: {}",
event.getContactId());
}
});
}
public void publishContactUpdated(ContactUpdatedEvent event) {
kafkaTemplate.send("contact.events", "contact.updated", event);
}
public void publishContactDeleted(ContactDeletedEvent event) {
kafkaTemplate.send("contact.events", "contact.deleted", event);
}
}
@Component
public class ContactEventSubscriber {
private final ContactEventHandler contactEventHandler;
@KafkaListener(topics = "contact.events", groupId = "notification-service")
public void handleContactEvent(@Payload ContactEvent event,
@Header("kafka_receivedTopic") String topic,
@Header("kafka_receivedPartition") int partition,
@Header("kafka_offset") long offset) {
log.info("Received contact event: {} from topic: {}, partition: {}, offset: {}",
event.getEventType(), topic, partition, offset);
try {
contactEventHandler.handleEvent(event);
} catch (Exception e) {
log.error("Error processing contact event: {}", e.getMessage(), e);
throw e; // Will trigger retry or DLQ based on configuration
}
}
@KafkaListener(topics = "contact.events",
groupId = "audit-service",
topicPattern = "contact\\..*")
public void handleContactAuditEvent(@Payload ContactEvent event) {
auditService.recordContactEvent(event);
}
}
2. RabbitMQ Topic Exchange Implementation
@Configuration
public class RabbitMQPubSubConfiguration {
@Bean
public TopicExchange contactExchange() {
return new TopicExchange("contact.exchange", true, false);
}
@Bean
public Queue contactNotificationQueue() {
return QueueBuilder.durable("contact.notifications").build();
}
@Bean
public Queue contactAuditQueue() {
return QueueBuilder.durable("contact.audit").build();
}
@Bean
public Binding contactCreatedBinding() {
return BindingBuilder.bind(contactNotificationQueue())
.to(contactExchange())
.with("contact.created");
}
@Bean
public Binding contactUpdatedBinding() {
return BindingBuilder.bind(contactNotificationQueue())
.to(contactExchange())
.with("contact.updated");
}
@Bean
public Binding allContactEventsAuditBinding() {
return BindingBuilder.bind(contactAuditQueue())
.to(contactExchange())
.with("contact.*");
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
}
@Service
public class RabbitMQContactEventPublisher {
private final RabbitTemplate rabbitTemplate;
private final String exchangeName = "contact.exchange";
public void publishEvent(String routingKey, ContactEvent event) {
try {
rabbitTemplate.convertAndSend(exchangeName, routingKey, event, message -> {
message.getMessageProperties().setHeader("eventId", event.getEventId());
message.getMessageProperties().setHeader("eventType", event.getEventType());
message.getMessageProperties().setTimestamp(Date.from(event.getTimestamp()));
return message;
});
log.info("Published event {} with routing key: {}", event.getEventId(), routingKey);
} catch (Exception e) {
log.error("Failed to publish event {}: {}", event.getEventId(), e.getMessage(), e);
throw new EventPublishException("Failed to publish event", e);
}
}
}
@RabbitListener(queues = "contact.notifications")
@Component
public class ContactNotificationSubscriber {
private final NotificationService notificationService;
public void handleContactEvent(ContactEvent event) {
log.info("Processing notification for contact event: {}", event.getEventType());
switch (event.getEventType()) {
case CREATED:
notificationService.sendWelcomeNotification(event.getContactId());
break;
case UPDATED:
notificationService.sendUpdateNotification(event.getContactId());
break;
case DELETED:
notificationService.sendGoodbyeNotification(event.getContactId());
break;
}
}
}
3. Redis Pub-Sub Implementation
@Configuration
public class RedisPublishSubscribeConfiguration {
@Bean
public RedisConnectionFactory connectionFactory() {
LettuceConnectionFactory factory = new LettuceConnectionFactory();
factory.setHostName("localhost");
factory.setPort(6379);
return factory;
}
@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory());
template.setDefaultSerializer(new Jackson2JsonRedisSerializer<>(Object.class));
return template;
}
@Bean
public RedisMessageListenerContainer messageListenerContainer() {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
return container;
}
}
@Service
public class RedisEventPublisher {
private final RedisTemplate<String, Object> redisTemplate;
public void publishContactEvent(String channel, ContactEvent event) {
try {
redisTemplate.convertAndSend(channel, event);
log.info("Published event {} to channel: {}", event.getEventId(), channel);
} catch (Exception e) {
log.error("Failed to publish event to Redis: {}", e.getMessage(), e);
throw new EventPublishException("Failed to publish to Redis", e);
}
}
}
@Component
public class RedisEventSubscriber implements MessageListener {
private final ObjectMapper objectMapper;
private final ContactEventHandler eventHandler;
@PostConstruct
public void subscribeToChannels() {
messageListenerContainer.addMessageListener(this,
Arrays.asList(
new PatternTopic("contact.*"),
new ChannelTopic("system.alerts")
));
}
@Override
public void onMessage(Message message, byte[] pattern) {
try {
String channel = new String(message.getChannel());
String body = new String(message.getBody());
ContactEvent event = objectMapper.readValue(body, ContactEvent.class);
log.info("Received event {} from channel: {}", event.getEventId(), channel);
eventHandler.handleEvent(event);
} catch (Exception e) {
log.error("Error processing Redis message: {}", e.getMessage(), e);
}
}
}
4. Event-Driven Microservices Integration
@Service
public class ContactServiceEventIntegration {
private final ContactEventPublisher eventPublisher;
@EventListener
@Async
public void handleContactCreated(ContactCreatedEvent event) {
// Publish to multiple subscribers
eventPublisher.publishContactCreated(event);
// Trigger related events
UserAccountCreationEvent accountEvent = UserAccountCreationEvent.builder()
.contactId(event.getContactId())
.email(event.getEmail())
.firstName(event.getFirstName())
.lastName(event.getLastName())
.build();
eventPublisher.publishUserAccountCreation(accountEvent);
}
@KafkaListener(topics = "user.account.created")
public void handleUserAccountCreated(UserAccountCreatedEvent event) {
// Update contact with user account information
contactService.linkUserAccount(event.getContactId(), event.getUserId());
// Publish contact linked event
ContactLinkedEvent linkedEvent = ContactLinkedEvent.builder()
.contactId(event.getContactId())
.userId(event.getUserId())
.linkedAt(Instant.now())
.build();
eventPublisher.publishContactLinked(linkedEvent);
}
}
Best Practices
1. Message Schema Design
public class ContactEventSchema {
// Use versioned schemas for backward compatibility
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "version")
@JsonSubTypes({
@JsonSubTypes.Type(value = ContactEventV1.class, name = "v1"),
@JsonSubTypes.Type(value = ContactEventV2.class, name = "v2")
})
public abstract class ContactEvent {
public abstract String getEventId();
public abstract String getEventType();
public abstract Instant getTimestamp();
public abstract String getVersion();
}
// Include event metadata for traceability
public class ContactEventEnvelope {
private String eventId;
private String eventType;
private String source;
private String specVersion;
private Instant timestamp;
private String correlationId;
private String causationId;
private ContactEvent data;
// CloudEvents specification compliance
public static ContactEventEnvelope createCloudEvent(ContactEvent event, String source) {
return ContactEventEnvelope.builder()
.eventId(UUID.randomUUID().toString())
.eventType(event.getEventType())
.source(source)
.specVersion("1.0")
.timestamp(Instant.now())
.data(event)
.build();
}
}
}
2. Subscription Management Best Practices
@Component
public class SubscriptionBestPractices {
// Use consumer groups for load balancing
public void configureConsumerGroups() {
SubscriptionRequest request = SubscriptionRequest.builder()
.subscriberId("notification-service-1")
.topic("contact.events")
.consumerGroup("notification-processors") // Multiple instances share load
.maxConcurrency(5)
.batchSize(10)
.build();
}
// Implement graceful shutdown for subscribers
@PreDestroy
public void shutdown() {
log.info("Shutting down subscribers gracefully...");
// Stop accepting new messages
stopAcceptingMessages();
// Wait for in-flight messages to complete
waitForInflightMessages(Duration.ofSeconds(30));
// Unsubscribe from topics
unsubscribeFromAllTopics();
log.info("Subscriber shutdown completed");
}
// Use dead letter queues for failed message handling
@RetryableTopic(
attempts = "3",
backoff = @Backoff(delay = 1000, multiplier = 2),
dltStrategy = DltStrategy.FAIL_ON_ERROR
)
@KafkaListener(topics = "contact.events")
public void handleContactEvent(ContactEvent event) {
processEvent(event);
}
@DltHandler
public void handleDlt(ContactEvent event, @Header("kafka_dlt-exception-message") String exception) {
log.error("Message sent to DLT after retries: {}, exception: {}",
event.getEventId(), exception);
deadLetterService.handleFailedEvent(event, exception);
}
}
3. Message Ordering and Partitioning
@Component
public class MessageOrderingStrategy {
// Use message keys for ordered processing within partitions
public void publishOrderedEvent(ContactEvent event) {
String messageKey = event.getContactId(); // Events for same contact go to same partition
kafkaTemplate.send("contact.events", messageKey, event);
}
// Implement custom partitioner for specific ordering requirements
public class ContactPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
if (key instanceof String) {
String contactId = (String) key;
// Use consistent hashing based on contact ID
return Math.abs(contactId.hashCode()) % cluster.partitionCountForTopic(topic);
}
// Fallback to round-robin
return ThreadLocalRandom.current().nextInt(cluster.partitionCountForTopic(topic));
}
}
// Handle out-of-order messages with sequence numbers
@Component
public class OrderedMessageProcessor {
private final Map<String, Long> lastProcessedSequence = new ConcurrentHashMap<>();
public void processMessage(ContactEvent event) {
String contactId = event.getContactId();
long eventSequence = event.getSequenceNumber();
Long lastSequence = lastProcessedSequence.get(contactId);
if (lastSequence == null || eventSequence == lastSequence + 1) {
// Process in-order message
doProcessMessage(event);
lastProcessedSequence.put(contactId, eventSequence);
} else if (eventSequence > lastSequence + 1) {
// Out-of-order message - buffer for later processing
bufferOutOfOrderMessage(event);
} else {
// Duplicate or old message - ignore
log.warn("Ignoring duplicate/old message for contact {}: sequence {}, last processed: {}",
contactId, eventSequence, lastSequence);
}
}
}
}
Common Pitfalls
1. Message Duplication
Problem: Subscribers receiving the same message multiple times
Solution: Implement idempotent message processing and duplicate detection
2. Topic Proliferation
Problem: Creating too many fine-grained topics leading to operational complexity
Solution: Use hierarchical topic naming and content-based filtering
3. Unbounded Message Growth
Problem: Topics growing indefinitely without retention policies
Solution: Configure appropriate retention policies and monitoring
4. Subscriber Lag
Problem: Slow subscribers causing message backlog
Solution: Implement consumer lag monitoring and auto-scaling
5. Poison Messages
Problem: Malformed messages causing subscriber failures
Solution: Use schema validation and dead letter queues
Integration in Distributed Systems
Event-Driven Microservices
@Service
public class DistributedEventOrchestration {
@EventListener
public void handleOrderCreated(OrderCreatedEvent event) {
// Fan-out to multiple services
inventoryEventPublisher.publishInventoryReservation(event);
paymentEventPublisher.publishPaymentRequest(event);
shippingEventPublisher.publishShippingRequest(event);
}
}
Multi-Tenant Event Streaming
@Service
public class MultiTenantEventPublisher {
public void publishTenantEvent(String tenantId, TenantEvent event) {
String topicName = "tenant." + tenantId + ".events";
eventPublisher.publish(topicName, event);
}
}
Conclusion
The Publish-Subscribe pattern is fundamental to building scalable, loosely coupled distributed systems. It provides:
- Decoupled Communication: Publishers and subscribers operate independently
- Scalable Distribution: One-to-many message delivery with dynamic subscription management
- Event-Driven Architecture: Foundation for reactive, event-driven system design
- Integration Flexibility: Support for heterogeneous systems and protocols
When properly implemented with appropriate topic design, subscription management, and monitoring, the Publish-Subscribe pattern enables robust, scalable integration architectures that can evolve and adapt to changing business requirements.
References
- Enterprise Integration Patterns - Publish-Subscribe Channel
- Apache Kafka Documentation
- RabbitMQ Publish/Subscribe Tutorial
- Event-Driven Architecture Patterns