This site is in English. Use your browser's built-in translate feature to read it in your language.

Publish-Subscribe Pattern

Overview

The Publish-Subscribe (Pub-Sub) pattern is a fundamental messaging paradigm that enables one-to-many communication through asynchronous message distribution. It establishes a decoupled communication model where message producers (publishers) send messages to topics or channels without knowledge of message consumers (subscribers), while subscribers express interest in specific message types without awareness of publishers. This pattern forms the foundation for scalable, event-driven architectures.

Theoretical Foundation

The Publish-Subscribe pattern is grounded in observer pattern theory and event-driven architecture principles. It implements the "Law of Demeter" at the messaging level, ensuring loose coupling between system components. The pattern embodies "publish-subscribe semantics" where message delivery is based on content filtering and subscription matching rather than explicit addressing.

Core Principles

1. Topic-Based Message Distribution

Messages are published to logical channels (topics) rather than specific destinations, enabling dynamic subscriber registration and flexible message routing based on content classification.

2. Temporal and Spatial Decoupling

Publishers and subscribers operate independently in both time and space - they don't need to be active simultaneously, nor do they need knowledge of each other's location or existence.

3. Content-Based Filtering

Subscribers receive only messages matching their subscription criteria, enabling fine-grained message filtering and reducing unnecessary message processing overhead.

4. Dynamic Subscription Management

Subscribers can dynamically register and unregister interest in topics at runtime, enabling adaptive system behavior and efficient resource utilization.

Why Publish-Subscribe is Essential in Integration Architecture

1. Event-Driven Integration

Modern distributed systems require responsive, real-time integration: - Business event propagation across multiple interested systems - Data synchronization between distributed databases and caches - Real-time notifications for user interfaces and external systems - Audit trail distribution for compliance and monitoring systems

2. System Decoupling

Pub-Sub enables architectural flexibility and maintainability: - Service independence allowing systems to evolve independently - Dynamic service discovery through topic-based communication - Reduced integration complexity by eliminating point-to-point connections - Simplified testing through isolation of publishers and subscribers

3. Scalability and Performance

The pattern supports horizontal scaling and performance optimization: - Fan-out messaging distributing load across multiple consumers - Parallel processing enabling concurrent message handling - Load balancing through consumer group partitioning - Elastic scaling by adding or removing subscribers dynamically

4. Business Agility

Pub-Sub supports rapid business adaptation and feature development: - New subscriber integration without modifying existing publishers - A/B testing through selective message routing to test subscribers - Feature rollout using gradual subscription activation - Business process flexibility through configurable event flows

Benefits in Integration Contexts

1. Architectural Flexibility

2. Operational Excellence

3. Development Productivity

4. Business Intelligence

Integration Architecture Applications

1. Microservices Event Choreography

Coordinates distributed business processes through event flows: - Order processing workflows triggering inventory, payment, and shipping events - User account lifecycle management propagating registration, verification, and deletion events - Data consistency maintenance through eventual consistency event patterns - Service dependency management via event-driven state synchronization

2. Enterprise Application Integration (EAI)

Enables integration of heterogeneous enterprise systems: - ERP system integration broadcasting financial and operational events - CRM data synchronization sharing customer events across systems - Master data management propagating reference data changes - Legacy system modernization through event-driven API exposure

3. IoT and Real-Time Systems

Supports massive-scale device integration and real-time processing: - Sensor data distribution from IoT devices to multiple processing systems - Device lifecycle management through device status and configuration events - Real-time alerting based on sensor threshold breaches - Telemetry data aggregation for analytics and monitoring systems

4. Multi-Channel User Experience

Coordinates user interactions across different channels and devices: - User activity synchronization across web, mobile, and desktop applications - Notification delivery to multiple communication channels (email, SMS, push) - Personalization engines consuming user behavior events - Content management distributing updates across multiple presentation layers

How Publish-Subscribe Pattern Works

The Publish-Subscribe pattern operates through a message broker that manages topics and handles message distribution to subscribers:

Basic Pub-Sub Flow

Publisher A ────┐                    ┌──→ Subscriber X
                │                    │
Publisher B ────┼─→ [Topic: Orders] ─┼──→ Subscriber Y
                │                    │
Publisher C ────┘                    └──→ Subscriber Z

Publisher D ────┐                    ┌──→ Subscriber P
                │                    │
Publisher E ────┼─→ [Topic: Events] ─┼──→ Subscriber Q
                │                    │
Publisher F ────┘                    └──→ Subscriber R

Message Broker Architecture

┌─────────────────────────────────────────────────────────┐
│                 Message Broker                          │
├─────────────────────────────────────────────────────────┤
│  Topic Manager              │  Subscription Manager     │
│  ┌─────────────┐             │  ┌──────────────────────┐ │
│  │ Topic A     │             │  │ Active Subscriptions │ │
│  │ Topic B     │             │  │ - Subscriber 1       │ │
│  │ Topic C     │             │  │ - Subscriber 2       │ │
│  └─────────────┘             │  │ - Subscriber 3       │ │
│                               │  └──────────────────────┘ │
├─────────────────────────────────────────────────────────┤
│  Message Routing Engine                                 │
│  ┌─────────────┐  ┌──────────────┐  ┌────────────────┐  │
│  │ Filter      │  │ Transform    │  │ Delivery       │  │
│  │ Engine      │  │ Engine       │  │ Manager        │  │
│  └─────────────┘  └──────────────┘  └────────────────┘  │
├─────────────────────────────────────────────────────────┤
│  Persistence & Recovery                                 │
│  ┌─────────────┐  ┌──────────────┐  ┌────────────────┐  │
│  │ Message     │  │ Offset       │  │ Dead Letter    │  │
│  │ Storage     │  │ Management   │  │ Queue          │  │
│  └─────────────┘  └──────────────┘  └────────────────┘  │
└─────────────────────────────────────────────────────────┘

Subscription Types and Message Delivery

1. Topic Subscription

Subscriber registers for: Topic = "order.created"
Message matches if: topic == "order.created"

┌─────────────────────────────────────┐
│ Subscription Registration           │
├─────────────────────────────────────┤
│ Subscriber ID: service-001          │
│ Topic: order.created                │
│ Filter: null                        │
│ Delivery Mode: At-Least-Once        │
│ Consumer Group: order-processors    │
└─────────────────────────────────────┘

2. Pattern-Based Subscription

Subscriber registers for: Pattern = "order.*"
Message matches if: topic matches pattern

┌─────────────────────────────────────┐
│ Pattern Subscription                │
├─────────────────────────────────────┤
│ Subscriber ID: audit-service        │
│ Pattern: order.*                    │
│ Matches:                            │
│ - order.created                     │
│ - order.updated                     │
│ - order.cancelled                   │
└─────────────────────────────────────┘

3. Content-Based Subscription

Subscriber registers for: Topic + Content Filter
Message matches if: topic == "order.created" AND order.amount > 1000

┌─────────────────────────────────────┐
│ Content Filter Subscription         │
├─────────────────────────────────────┤
│ Subscriber ID: risk-analysis        │
│ Topic: order.created                │
│ Filter: amount > 1000 AND           │
│         category != 'internal'      │
│ Delivery: Push                      │
└─────────────────────────────────────┘

Key Components

1. Topic Manager

Manages topic creation, configuration, and lifecycle:

@Component
public class TopicManager {
    private final Map<String, Topic> topics = new ConcurrentHashMap<>();
    private final TopicConfigurationService configService;
    private final MetricRegistry metricRegistry;

    public Topic createTopic(TopicConfiguration config) {
        validateTopicConfiguration(config);

        Topic topic = Topic.builder()
            .name(config.getName())
            .partitions(config.getPartitions())
            .replicationFactor(config.getReplicationFactor())
            .retentionPolicy(config.getRetentionPolicy())
            .compressionType(config.getCompressionType())
            .maxMessageSize(config.getMaxMessageSize())
            .build();

        topics.put(config.getName(), topic);

        // Create metrics for the topic
        metricRegistry.counter("topic.messages.published", "topic", topic.getName());
        metricRegistry.counter("topic.messages.consumed", "topic", topic.getName());
        metricRegistry.gauge("topic.subscribers.count", "topic", topic.getName(), 
                           () -> getSubscriberCount(topic.getName()));

        log.info("Created topic: {} with {} partitions", topic.getName(), topic.getPartitions());
        return topic;
    }

    public Optional<Topic> getTopic(String topicName) {
        return Optional.ofNullable(topics.get(topicName));
    }

    public List<Topic> getTopicsByPattern(String pattern) {
        Pattern compiledPattern = Pattern.compile(pattern);
        return topics.values().stream()
            .filter(topic -> compiledPattern.matcher(topic.getName()).matches())
            .collect(Collectors.toList());
    }

    private void validateTopicConfiguration(TopicConfiguration config) {
        if (config.getName() == null || config.getName().trim().isEmpty()) {
            throw new IllegalArgumentException("Topic name cannot be null or empty");
        }

        if (config.getName().contains("..") || config.getName().startsWith(".")) {
            throw new IllegalArgumentException("Invalid topic name format");
        }

        if (config.getPartitions() <= 0) {
            throw new IllegalArgumentException("Partition count must be positive");
        }

        if (config.getReplicationFactor() <= 0) {
            throw new IllegalArgumentException("Replication factor must be positive");
        }
    }
}

2. Subscription Manager

Manages subscriber registrations and message routing:

@Component
public class SubscriptionManager {
    private final Map<String, Set<Subscription>> topicSubscriptions = new ConcurrentHashMap<>();
    private final Map<String, Subscription> activeSubscriptions = new ConcurrentHashMap<>();
    private final MessageMatcher messageMatcher;

    public Subscription subscribe(SubscriptionRequest request) {
        validateSubscriptionRequest(request);

        Subscription subscription = Subscription.builder()
            .id(generateSubscriptionId())
            .subscriberId(request.getSubscriberId())
            .topicPattern(request.getTopicPattern())
            .contentFilter(request.getContentFilter())
            .deliveryMode(request.getDeliveryMode())
            .consumerGroup(request.getConsumerGroup())
            .createdAt(Instant.now())
            .build();

        // Register subscription for all matching topics
        if (request.isPatternBased()) {
            registerPatternSubscription(subscription);
        } else {
            registerTopicSubscription(subscription, request.getTopic());
        }

        activeSubscriptions.put(subscription.getId(), subscription);

        log.info("Created subscription {} for subscriber {} on topic/pattern: {}", 
                 subscription.getId(), 
                 subscription.getSubscriberId(), 
                 subscription.getTopicPattern());

        return subscription;
    }

    public void unsubscribe(String subscriptionId) {
        Subscription subscription = activeSubscriptions.remove(subscriptionId);

        if (subscription != null) {
            // Remove from topic-specific subscriptions
            topicSubscriptions.values().forEach(subscriptions -> 
                subscriptions.removeIf(s -> s.getId().equals(subscriptionId)));

            log.info("Removed subscription {} for subscriber {}", 
                     subscriptionId, 
                     subscription.getSubscriberId());
        }
    }

    public Set<Subscription> getMatchingSubscriptions(String topic, Message message) {
        Set<Subscription> matchingSubscriptions = new HashSet<>();

        // Get direct topic subscriptions
        Set<Subscription> topicSubs = topicSubscriptions.getOrDefault(topic, Collections.emptySet());

        for (Subscription subscription : topicSubs) {
            if (messageMatcher.matches(subscription, topic, message)) {
                matchingSubscriptions.add(subscription);
            }
        }

        return matchingSubscriptions;
    }

    private void registerTopicSubscription(Subscription subscription, String topic) {
        topicSubscriptions.computeIfAbsent(topic, k -> new ConcurrentHashMap<>().newKeySet())
                          .add(subscription);
    }

    private void registerPatternSubscription(Subscription subscription) {
        // For pattern subscriptions, check against all existing topics
        topicSubscriptions.keySet().stream()
            .filter(topic -> matchesPattern(topic, subscription.getTopicPattern()))
            .forEach(topic -> registerTopicSubscription(subscription, topic));
    }

    private boolean matchesPattern(String topic, String pattern) {
        try {
            return Pattern.compile(pattern).matcher(topic).matches();
        } catch (PatternSyntaxException e) {
            log.warn("Invalid topic pattern: {}", pattern, e);
            return false;
        }
    }
}

3. Message Publisher

Handles message publishing and distribution:

@Component
public class MessagePublisher {
    private final TopicManager topicManager;
    private final SubscriptionManager subscriptionManager;
    private final MessageDistributor messageDistributor;
    private final MessagePersistence messagePersistence;

    public PublishResult publish(String topicName, Message message) {
        return publish(topicName, message, PublishOptions.defaultOptions());
    }

    public PublishResult publish(String topicName, Message message, PublishOptions options) {
        validatePublishRequest(topicName, message);

        Optional<Topic> topicOpt = topicManager.getTopic(topicName);
        if (!topicOpt.isPresent() && !options.isAutoCreateTopic()) {
            throw new TopicNotFoundException("Topic not found: " + topicName);
        }

        Topic topic = topicOpt.orElseGet(() -> createTopicIfAllowed(topicName, options));

        // Enrich message with metadata
        Message enrichedMessage = enrichMessage(message, topicName, options);

        try {
            // Persist message if required
            if (topic.isPersistent()) {
                messagePersistence.store(topicName, enrichedMessage);
            }

            // Get matching subscriptions
            Set<Subscription> subscriptions = subscriptionManager.getMatchingSubscriptions(
                topicName, enrichedMessage);

            // Distribute message to subscribers
            PublishResult result = messageDistributor.distribute(
                topic, enrichedMessage, subscriptions, options);

            // Record metrics
            recordPublishMetrics(topicName, enrichedMessage, result);

            log.debug("Published message {} to topic {} - delivered to {} subscribers", 
                     enrichedMessage.getId(), topicName, result.getDeliveredCount());

            return result;

        } catch (Exception e) {
            log.error("Failed to publish message to topic {}: {}", topicName, e.getMessage(), e);
            throw new PublishException("Failed to publish message", e);
        }
    }

    public CompletableFuture<PublishResult> publishAsync(String topicName, 
                                                       Message message, 
                                                       PublishOptions options) {
        return CompletableFuture.supplyAsync(() -> publish(topicName, message, options));
    }

    private Message enrichMessage(Message message, String topicName, PublishOptions options) {
        return message.toBuilder()
            .id(message.getId() != null ? message.getId() : UUID.randomUUID().toString())
            .topic(topicName)
            .timestamp(message.getTimestamp() != null ? message.getTimestamp() : Instant.now())
            .publisher(options.getPublisherId())
            .correlationId(options.getCorrelationId())
            .build();
    }

    private void recordPublishMetrics(String topicName, Message message, PublishResult result) {
        metricRegistry.counter("messages.published", "topic", topicName).increment();
        metricRegistry.counter("messages.delivered", "topic", topicName)
                     .increment(result.getDeliveredCount());
        metricRegistry.histogram("message.size", "topic", topicName)
                     .update(message.getPayload().length);
    }
}

4. Message Subscriber

Handles subscription management and message consumption:

@Component
public abstract class MessageSubscriber {
    private final SubscriptionManager subscriptionManager;
    private final MessageDeserializer messageDeserializer;
    private Subscription subscription;

    public void start() {
        SubscriptionRequest request = createSubscriptionRequest();
        this.subscription = subscriptionManager.subscribe(request);

        log.info("Started subscriber {} with subscription {}", 
                 getSubscriberId(), subscription.getId());
    }

    public void stop() {
        if (subscription != null) {
            subscriptionManager.unsubscribe(subscription.getId());
            subscription = null;
            log.info("Stopped subscriber {}", getSubscriberId());
        }
    }

    public void handleMessage(String topic, Message message) {
        try {
            // Deserialize message payload
            Object payload = messageDeserializer.deserialize(
                message.getPayload(), getExpectedPayloadType());

            // Process message
            MessageContext context = MessageContext.builder()
                .topic(topic)
                .message(message)
                .payload(payload)
                .subscription(subscription)
                .receivedAt(Instant.now())
                .build();

            processMessage(context);

            // Acknowledge message processing
            acknowledgeMessage(message);

        } catch (Exception e) {
            log.error("Error processing message {} from topic {}: {}", 
                     message.getId(), topic, e.getMessage(), e);

            handleProcessingError(topic, message, e);
        }
    }

    protected abstract void processMessage(MessageContext context);
    protected abstract String getSubscriberId();
    protected abstract Class<?> getExpectedPayloadType();
    protected abstract SubscriptionRequest createSubscriptionRequest();

    protected void acknowledgeMessage(Message message) {
        // Default implementation - can be overridden for specific acknowledgment strategies
        log.debug("Acknowledged message: {}", message.getId());
    }

    protected void handleProcessingError(String topic, Message message, Exception error) {
        // Default error handling - can be overridden for custom error strategies
        log.error("Message processing failed for message {} on topic {}", 
                 message.getId(), topic);
    }
}

// Example concrete subscriber implementation
@Component
public class OrderEventSubscriber extends MessageSubscriber {
    private final OrderService orderService;

    @Override
    protected void processMessage(MessageContext context) {
        OrderEvent orderEvent = (OrderEvent) context.getPayload();

        switch (orderEvent.getType()) {
            case CREATED:
                orderService.handleOrderCreated(orderEvent);
                break;
            case UPDATED:
                orderService.handleOrderUpdated(orderEvent);
                break;
            case CANCELLED:
                orderService.handleOrderCancelled(orderEvent);
                break;
            default:
                log.warn("Unknown order event type: {}", orderEvent.getType());
        }
    }

    @Override
    protected String getSubscriberId() {
        return "order-event-subscriber";
    }

    @Override
    protected Class<?> getExpectedPayloadType() {
        return OrderEvent.class;
    }

    @Override
    protected SubscriptionRequest createSubscriptionRequest() {
        return SubscriptionRequest.builder()
            .subscriberId(getSubscriberId())
            .topic("order.*")
            .deliveryMode(DeliveryMode.AT_LEAST_ONCE)
            .consumerGroup("order-processors")
            .contentFilter("eventType IN ['CREATED', 'UPDATED', 'CANCELLED']")
            .build();
    }
}

Configuration Parameters

Essential Settings

Parameter Description Typical Values
Topic Partitions Number of partitions per topic 1-100
Replication Factor Number of message replicas 1-5
Message Retention How long to keep messages 1h-30d
Subscriber Timeout Max time for message processing 30s-300s
Batch Size Messages per batch delivery 1-1000
Buffer Size Publisher message buffer size 1MB-100MB

Example Configuration

# Topic Configuration
pub-sub.topic.default-partitions=10
pub-sub.topic.default-replication-factor=3
pub-sub.topic.default-retention=24h
pub-sub.topic.auto-create=false
pub-sub.topic.max-message-size=1MB

# Publisher Configuration
pub-sub.publisher.batch-size=100
pub-sub.publisher.linger-ms=10
pub-sub.publisher.buffer-memory=32MB
pub-sub.publisher.compression-type=lz4
pub-sub.publisher.retries=3
pub-sub.publisher.retry-delay=1s

# Subscriber Configuration
pub-sub.subscriber.fetch-min-bytes=1KB
pub-sub.subscriber.fetch-max-wait=500ms
pub-sub.subscriber.max-poll-records=500
pub-sub.subscriber.session-timeout=30s
pub-sub.subscriber.heartbeat-interval=3s

# Consumer Group Configuration
pub-sub.consumer-group.rebalance-timeout=60s
pub-sub.consumer-group.offset-retention=7d
pub-sub.consumer-group.auto-commit=true
pub-sub.consumer-group.auto-commit-interval=5s

Implementation Examples

1. Apache Kafka Publish-Subscribe Implementation

@Configuration
public class KafkaPublishSubscribeConfiguration {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        configProps.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "contact-service-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        return new DefaultKafkaConsumerFactory<>(props);
    }
}

@Service
public class ContactEventPublisher {
    private final KafkaTemplate<String, Object> kafkaTemplate;

    public void publishContactCreated(ContactCreatedEvent event) {
        kafkaTemplate.send("contact.events", "contact.created", event)
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    log.error("Failed to publish contact created event: {}", ex.getMessage(), ex);
                } else {
                    log.info("Contact created event published successfully: {}", 
                             event.getContactId());
                }
            });
    }

    public void publishContactUpdated(ContactUpdatedEvent event) {
        kafkaTemplate.send("contact.events", "contact.updated", event);
    }

    public void publishContactDeleted(ContactDeletedEvent event) {
        kafkaTemplate.send("contact.events", "contact.deleted", event);
    }
}

@Component
public class ContactEventSubscriber {
    private final ContactEventHandler contactEventHandler;

    @KafkaListener(topics = "contact.events", groupId = "notification-service")
    public void handleContactEvent(@Payload ContactEvent event, 
                                 @Header("kafka_receivedTopic") String topic,
                                 @Header("kafka_receivedPartition") int partition,
                                 @Header("kafka_offset") long offset) {

        log.info("Received contact event: {} from topic: {}, partition: {}, offset: {}", 
                 event.getEventType(), topic, partition, offset);

        try {
            contactEventHandler.handleEvent(event);
        } catch (Exception e) {
            log.error("Error processing contact event: {}", e.getMessage(), e);
            throw e; // Will trigger retry or DLQ based on configuration
        }
    }

    @KafkaListener(topics = "contact.events", 
                  groupId = "audit-service",
                  topicPattern = "contact\\..*")
    public void handleContactAuditEvent(@Payload ContactEvent event) {
        auditService.recordContactEvent(event);
    }
}

2. RabbitMQ Topic Exchange Implementation

@Configuration
public class RabbitMQPubSubConfiguration {

    @Bean
    public TopicExchange contactExchange() {
        return new TopicExchange("contact.exchange", true, false);
    }

    @Bean
    public Queue contactNotificationQueue() {
        return QueueBuilder.durable("contact.notifications").build();
    }

    @Bean
    public Queue contactAuditQueue() {
        return QueueBuilder.durable("contact.audit").build();
    }

    @Bean
    public Binding contactCreatedBinding() {
        return BindingBuilder.bind(contactNotificationQueue())
                           .to(contactExchange())
                           .with("contact.created");
    }

    @Bean
    public Binding contactUpdatedBinding() {
        return BindingBuilder.bind(contactNotificationQueue())
                           .to(contactExchange())
                           .with("contact.updated");
    }

    @Bean
    public Binding allContactEventsAuditBinding() {
        return BindingBuilder.bind(contactAuditQueue())
                           .to(contactExchange())
                           .with("contact.*");
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }
}

@Service
public class RabbitMQContactEventPublisher {
    private final RabbitTemplate rabbitTemplate;
    private final String exchangeName = "contact.exchange";

    public void publishEvent(String routingKey, ContactEvent event) {
        try {
            rabbitTemplate.convertAndSend(exchangeName, routingKey, event, message -> {
                message.getMessageProperties().setHeader("eventId", event.getEventId());
                message.getMessageProperties().setHeader("eventType", event.getEventType());
                message.getMessageProperties().setTimestamp(Date.from(event.getTimestamp()));
                return message;
            });

            log.info("Published event {} with routing key: {}", event.getEventId(), routingKey);
        } catch (Exception e) {
            log.error("Failed to publish event {}: {}", event.getEventId(), e.getMessage(), e);
            throw new EventPublishException("Failed to publish event", e);
        }
    }
}

@RabbitListener(queues = "contact.notifications")
@Component
public class ContactNotificationSubscriber {
    private final NotificationService notificationService;

    public void handleContactEvent(ContactEvent event) {
        log.info("Processing notification for contact event: {}", event.getEventType());

        switch (event.getEventType()) {
            case CREATED:
                notificationService.sendWelcomeNotification(event.getContactId());
                break;
            case UPDATED:
                notificationService.sendUpdateNotification(event.getContactId());
                break;
            case DELETED:
                notificationService.sendGoodbyeNotification(event.getContactId());
                break;
        }
    }
}

3. Redis Pub-Sub Implementation

@Configuration
public class RedisPublishSubscribeConfiguration {

    @Bean
    public RedisConnectionFactory connectionFactory() {
        LettuceConnectionFactory factory = new LettuceConnectionFactory();
        factory.setHostName("localhost");
        factory.setPort(6379);
        return factory;
    }

    @Bean
    public RedisTemplate<String, Object> redisTemplate() {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(connectionFactory());
        template.setDefaultSerializer(new Jackson2JsonRedisSerializer<>(Object.class));
        return template;
    }

    @Bean
    public RedisMessageListenerContainer messageListenerContainer() {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        return container;
    }
}

@Service
public class RedisEventPublisher {
    private final RedisTemplate<String, Object> redisTemplate;

    public void publishContactEvent(String channel, ContactEvent event) {
        try {
            redisTemplate.convertAndSend(channel, event);
            log.info("Published event {} to channel: {}", event.getEventId(), channel);
        } catch (Exception e) {
            log.error("Failed to publish event to Redis: {}", e.getMessage(), e);
            throw new EventPublishException("Failed to publish to Redis", e);
        }
    }
}

@Component
public class RedisEventSubscriber implements MessageListener {
    private final ObjectMapper objectMapper;
    private final ContactEventHandler eventHandler;

    @PostConstruct
    public void subscribeToChannels() {
        messageListenerContainer.addMessageListener(this, 
            Arrays.asList(
                new PatternTopic("contact.*"),
                new ChannelTopic("system.alerts")
            ));
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        try {
            String channel = new String(message.getChannel());
            String body = new String(message.getBody());

            ContactEvent event = objectMapper.readValue(body, ContactEvent.class);

            log.info("Received event {} from channel: {}", event.getEventId(), channel);
            eventHandler.handleEvent(event);

        } catch (Exception e) {
            log.error("Error processing Redis message: {}", e.getMessage(), e);
        }
    }
}

4. Event-Driven Microservices Integration

@Service
public class ContactServiceEventIntegration {
    private final ContactEventPublisher eventPublisher;

    @EventListener
    @Async
    public void handleContactCreated(ContactCreatedEvent event) {
        // Publish to multiple subscribers
        eventPublisher.publishContactCreated(event);

        // Trigger related events
        UserAccountCreationEvent accountEvent = UserAccountCreationEvent.builder()
            .contactId(event.getContactId())
            .email(event.getEmail())
            .firstName(event.getFirstName())
            .lastName(event.getLastName())
            .build();

        eventPublisher.publishUserAccountCreation(accountEvent);
    }

    @KafkaListener(topics = "user.account.created")
    public void handleUserAccountCreated(UserAccountCreatedEvent event) {
        // Update contact with user account information
        contactService.linkUserAccount(event.getContactId(), event.getUserId());

        // Publish contact linked event
        ContactLinkedEvent linkedEvent = ContactLinkedEvent.builder()
            .contactId(event.getContactId())
            .userId(event.getUserId())
            .linkedAt(Instant.now())
            .build();

        eventPublisher.publishContactLinked(linkedEvent);
    }
}

Best Practices

1. Message Schema Design

public class ContactEventSchema {

    // Use versioned schemas for backward compatibility
    @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "version")
    @JsonSubTypes({
        @JsonSubTypes.Type(value = ContactEventV1.class, name = "v1"),
        @JsonSubTypes.Type(value = ContactEventV2.class, name = "v2")
    })
    public abstract class ContactEvent {
        public abstract String getEventId();
        public abstract String getEventType();
        public abstract Instant getTimestamp();
        public abstract String getVersion();
    }

    // Include event metadata for traceability
    public class ContactEventEnvelope {
        private String eventId;
        private String eventType;
        private String source;
        private String specVersion;
        private Instant timestamp;
        private String correlationId;
        private String causationId;
        private ContactEvent data;

        // CloudEvents specification compliance
        public static ContactEventEnvelope createCloudEvent(ContactEvent event, String source) {
            return ContactEventEnvelope.builder()
                .eventId(UUID.randomUUID().toString())
                .eventType(event.getEventType())
                .source(source)
                .specVersion("1.0")
                .timestamp(Instant.now())
                .data(event)
                .build();
        }
    }
}

2. Subscription Management Best Practices

@Component
public class SubscriptionBestPractices {

    // Use consumer groups for load balancing
    public void configureConsumerGroups() {
        SubscriptionRequest request = SubscriptionRequest.builder()
            .subscriberId("notification-service-1")
            .topic("contact.events")
            .consumerGroup("notification-processors") // Multiple instances share load
            .maxConcurrency(5)
            .batchSize(10)
            .build();
    }

    // Implement graceful shutdown for subscribers
    @PreDestroy
    public void shutdown() {
        log.info("Shutting down subscribers gracefully...");

        // Stop accepting new messages
        stopAcceptingMessages();

        // Wait for in-flight messages to complete
        waitForInflightMessages(Duration.ofSeconds(30));

        // Unsubscribe from topics
        unsubscribeFromAllTopics();

        log.info("Subscriber shutdown completed");
    }

    // Use dead letter queues for failed message handling
    @RetryableTopic(
        attempts = "3",
        backoff = @Backoff(delay = 1000, multiplier = 2),
        dltStrategy = DltStrategy.FAIL_ON_ERROR
    )
    @KafkaListener(topics = "contact.events")
    public void handleContactEvent(ContactEvent event) {
        processEvent(event);
    }

    @DltHandler
    public void handleDlt(ContactEvent event, @Header("kafka_dlt-exception-message") String exception) {
        log.error("Message sent to DLT after retries: {}, exception: {}", 
                 event.getEventId(), exception);
        deadLetterService.handleFailedEvent(event, exception);
    }
}

3. Message Ordering and Partitioning

@Component
public class MessageOrderingStrategy {

    // Use message keys for ordered processing within partitions
    public void publishOrderedEvent(ContactEvent event) {
        String messageKey = event.getContactId(); // Events for same contact go to same partition

        kafkaTemplate.send("contact.events", messageKey, event);
    }

    // Implement custom partitioner for specific ordering requirements
    public class ContactPartitioner implements Partitioner {

        @Override
        public int partition(String topic, Object key, byte[] keyBytes, 
                           Object value, byte[] valueBytes, Cluster cluster) {

            if (key instanceof String) {
                String contactId = (String) key;
                // Use consistent hashing based on contact ID
                return Math.abs(contactId.hashCode()) % cluster.partitionCountForTopic(topic);
            }

            // Fallback to round-robin
            return ThreadLocalRandom.current().nextInt(cluster.partitionCountForTopic(topic));
        }
    }

    // Handle out-of-order messages with sequence numbers
    @Component
    public class OrderedMessageProcessor {
        private final Map<String, Long> lastProcessedSequence = new ConcurrentHashMap<>();

        public void processMessage(ContactEvent event) {
            String contactId = event.getContactId();
            long eventSequence = event.getSequenceNumber();

            Long lastSequence = lastProcessedSequence.get(contactId);

            if (lastSequence == null || eventSequence == lastSequence + 1) {
                // Process in-order message
                doProcessMessage(event);
                lastProcessedSequence.put(contactId, eventSequence);
            } else if (eventSequence > lastSequence + 1) {
                // Out-of-order message - buffer for later processing
                bufferOutOfOrderMessage(event);
            } else {
                // Duplicate or old message - ignore
                log.warn("Ignoring duplicate/old message for contact {}: sequence {}, last processed: {}", 
                         contactId, eventSequence, lastSequence);
            }
        }
    }
}

Common Pitfalls

1. Message Duplication

Problem: Subscribers receiving the same message multiple times
Solution: Implement idempotent message processing and duplicate detection

2. Topic Proliferation

Problem: Creating too many fine-grained topics leading to operational complexity
Solution: Use hierarchical topic naming and content-based filtering

3. Unbounded Message Growth

Problem: Topics growing indefinitely without retention policies
Solution: Configure appropriate retention policies and monitoring

4. Subscriber Lag

Problem: Slow subscribers causing message backlog
Solution: Implement consumer lag monitoring and auto-scaling

5. Poison Messages

Problem: Malformed messages causing subscriber failures
Solution: Use schema validation and dead letter queues

Integration in Distributed Systems

Event-Driven Microservices

@Service
public class DistributedEventOrchestration {

    @EventListener
    public void handleOrderCreated(OrderCreatedEvent event) {
        // Fan-out to multiple services
        inventoryEventPublisher.publishInventoryReservation(event);
        paymentEventPublisher.publishPaymentRequest(event);
        shippingEventPublisher.publishShippingRequest(event);
    }
}

Multi-Tenant Event Streaming

@Service
public class MultiTenantEventPublisher {

    public void publishTenantEvent(String tenantId, TenantEvent event) {
        String topicName = "tenant." + tenantId + ".events";
        eventPublisher.publish(topicName, event);
    }
}

Conclusion

The Publish-Subscribe pattern is fundamental to building scalable, loosely coupled distributed systems. It provides:

When properly implemented with appropriate topic design, subscription management, and monitoring, the Publish-Subscribe pattern enables robust, scalable integration architectures that can evolve and adapt to changing business requirements.

References

← Back to All Patterns