This site is in English. Use your browser's built-in translate feature to read it in your language.

Request-Reply Pattern

Overview

The Request-Reply pattern is a fundamental messaging pattern that enables synchronous-style communication in distributed systems through asynchronous message exchange. It provides a structured approach for implementing point-to-point communication where a client sends a request message and expects to receive a corresponding reply message, effectively bridging the gap between synchronous and asynchronous communication paradigms.

Theoretical Foundation

The Request-Reply pattern is rooted in distributed systems communication theory and message-oriented middleware principles. It addresses the fundamental challenge of implementing request-response semantics in loosely coupled, distributed architectures. The pattern embodies the principle of "temporal decoupling with logical coupling" - maintaining the familiar request-response interaction model while enabling independent component lifecycles and deployment flexibility.

Core Principles

1. Message-Based Request-Response Semantics

The pattern transforms traditional synchronous method calls into asynchronous message exchanges while preserving the request-response interaction model that developers expect.

2. Correlation Management

Each request message is associated with a unique correlation identifier that links the eventual response back to the original request, enabling proper request-reply matching in concurrent environments.

3. Temporal Decoupling

Unlike direct API calls, request-reply messaging allows the requestor and responder to operate independently in time, with messages persisted in queues until processing is possible.

4. Location Transparency

The pattern abstracts the physical location and deployment details of the responding service, enabling location independence and dynamic service discovery.

Why Request-Reply is Essential in Integration Architecture

1. Legacy System Integration

Many enterprise systems are designed around synchronous communication patterns: - Database-centric applications expecting immediate responses to queries - Mainframe systems operating on transaction-response cycles - ERP systems requiring confirmation of business operations - Synchronous API consumers needing familiar interaction patterns

2. Service Boundary Management

Request-reply enables clean service boundaries while maintaining familiar semantics: - Service encapsulation with well-defined request-response contracts - Interface standardization across different technology stacks - Protocol abstraction hiding underlying messaging complexity from business logic - Service versioning through message schema evolution

3. Transaction Context Preservation

The pattern maintains transactional context across service boundaries: - Business process continuity where operations depend on immediate results - Data consistency requirements needing confirmation before proceeding - Error handling propagation from service providers back to consumers - Audit trail maintenance linking requests with their corresponding responses

4. Performance Optimization

Request-reply enables various performance optimization strategies: - Connection pooling and resource sharing across multiple requests - Batch processing optimization while maintaining individual response semantics - Caching strategies at the messaging layer for frequently requested data - Load balancing across multiple service instances transparently

Benefits in Integration Contexts

1. Familiar Programming Model

2. Enhanced Reliability

3. Scalability and Flexibility

4. Monitoring and Governance

Integration Architecture Applications

1. Microservices Communication

Request-reply serves as the backbone for inter-service communication: - Service-to-service API calls with messaging-based transport - Command-query separation with dedicated request-reply channels - Service mesh integration providing transparent request-reply semantics - Cross-cutting concern handling like authentication and authorization

2. Enterprise Service Bus (ESB) Integration

ESB architectures leverage request-reply for service mediation: - Protocol bridging between different communication standards - Message transformation and enrichment during request-reply flows - Service orchestration coordinating multiple request-reply interactions - Enterprise application integration connecting disparate systems

3. Event-Driven Architecture Enhancement

Request-reply complements purely asynchronous event patterns: - Synchronous queries in predominantly asynchronous systems - State inquiry services providing current system state on demand - Command acknowledgment confirming successful event processing - Reference data lookup during asynchronous event processing

4. API Gateway Implementation

API gateways use request-reply for backend service integration: - Backend service aggregation collecting data from multiple services - Protocol translation between HTTP and messaging systems - Request routing to appropriate backend services based on content - Response composition assembling unified responses from multiple services

How Request-Reply Pattern Works

The Request-Reply pattern operates through a coordinated message exchange involving request queues, reply queues, and correlation mechanisms:

Basic Request-Reply Flow

Client Application                    Service Provider
       │                                     │
       ├──1. Send Request Message──────────→ │
       │   (with CorrelationId &             │
       │    ReplyTo queue address)           │
       │                                     │
       ├──2. Wait for Reply──────┐           │
       │                         │           ├──3. Process Request
       │                         │           │
       │                         │           ├──4. Send Reply Message──┐
       │                         │           │   (with CorrelationId)  │
       │                         │           │                         │
       │   ┌─────────────────────┘           │                         │
       ├──5. Receive Reply←───────────────────────────────────────────┘
       │   (match CorrelationId)
       │
       ├──6. Process Response

Message Flow Components

1. Request Message Structure

┌─────────────────────────────────────┐
│ Request Message                     │
├─────────────────────────────────────┤
│ Headers:                            │
│  - CorrelationId: uuid-1234         │
│  - ReplyTo: reply.queue.name        │
│  - MessageType: REQUEST             │
│  - Timestamp: 2025-12-30T10:00:00Z  │
├─────────────────────────────────────┤
│ Payload:                            │
│  - Request data (JSON/XML/Binary)   │
│  - Operation parameters             │
│  - Security context                 │
└─────────────────────────────────────┘

2. Reply Message Structure

┌─────────────────────────────────────┐
│ Reply Message                       │
├─────────────────────────────────────┤
│ Headers:                            │
│  - CorrelationId: uuid-1234         │
│  - MessageType: REPLY               │
│  - Status: SUCCESS|ERROR            │
│  - Timestamp: 2025-12-30T10:00:05Z  │
├─────────────────────────────────────┤
│ Payload:                            │
│  - Response data                    │
│  - Error information (if failed)    │
│  - Processing metadata              │
└─────────────────────────────────────┘

Request-Reply Interaction Types

1. Synchronous Request-Reply

Client ──Request──→ [Queue] ──→ Service
   ↑                              ↓
   └──Reply←──[Reply Queue] ←──Reply

2. Asynchronous Request-Reply

Client ──Request──→ [Queue] ──→ Service
   │                              ↓
   │         Later...              ↓
   └──Poll/Callback←──[Reply Queue] ←──Reply

3. Temporary Reply Queue

Client ──Request──→ [Queue] ──→ Service
   ↑      (ReplyTo: temp.queue.123)  ↓
   └──Reply←──[temp.queue.123] ←──Reply

Key Components

1. Request-Reply Manager

Coordinates request-reply interactions and manages correlation:

public class RequestReplyManager {
    private final MessageTemplate messageTemplate;
    private final CorrelationManager correlationManager;
    private final ReplyTimeout replyTimeout;
    private final Map<String, PendingRequest> pendingRequests = new ConcurrentHashMap<>();

    public <T> CompletableFuture<T> sendRequest(String destination,
                                              Object request,
                                              Class<T> responseType,
                                              Duration timeout) {

        String correlationId = generateCorrelationId();
        String replyTo = getReplyToQueue();

        RequestMessage requestMessage = RequestMessage.builder()
            .correlationId(correlationId)
            .replyTo(replyTo)
            .payload(request)
            .timestamp(Instant.now())
            .build();

        CompletableFuture<T> responseFuture = new CompletableFuture<>();

        PendingRequest pendingRequest = PendingRequest.builder()
            .correlationId(correlationId)
            .requestTime(Instant.now())
            .timeout(timeout)
            .responseFuture(responseFuture)
            .responseType(responseType)
            .build();

        pendingRequests.put(correlationId, pendingRequest);

        // Schedule timeout
        replyTimeout.scheduleTimeout(correlationId, timeout, () -> {
            PendingRequest pending = pendingRequests.remove(correlationId);
            if (pending != null) {
                pending.getResponseFuture().completeExceptionally(
                    new RequestReplyTimeoutException("Request timed out: " + correlationId)
                );
            }
        });

        // Send request
        messageTemplate.send(destination, requestMessage);

        return responseFuture;
    }

    @MessageHandler("reply.queue")
    public void handleReply(ReplyMessage reply) {
        String correlationId = reply.getCorrelationId();
        PendingRequest pendingRequest = pendingRequests.remove(correlationId);

        if (pendingRequest == null) {
            log.warn("Received reply for unknown correlation ID: {}", correlationId);
            return;
        }

        replyTimeout.cancelTimeout(correlationId);

        try {
            if (reply.isSuccess()) {
                Object response = deserializeResponse(
                    reply.getPayload(), 
                    pendingRequest.getResponseType()
                );
                pendingRequest.getResponseFuture().complete(response);
            } else {
                Exception exception = createExceptionFromReply(reply);
                pendingRequest.getResponseFuture().completeExceptionally(exception);
            }
        } catch (Exception e) {
            pendingRequest.getResponseFuture().completeExceptionally(e);
        }
    }
}

2. Correlation Manager

Manages correlation IDs and request tracking:

public class CorrelationManager {
    private final String instanceId;
    private final AtomicLong sequenceNumber = new AtomicLong(0);

    public String generateCorrelationId() {
        return String.format("%s-%d-%d", 
            instanceId, 
            System.currentTimeMillis(), 
            sequenceNumber.incrementAndGet()
        );
    }

    public boolean isValidCorrelationId(String correlationId) {
        return correlationId != null && 
               correlationId.startsWith(instanceId + "-") &&
               correlationId.split("-").length == 3;
    }

    public CorrelationMetadata parseCorrelationId(String correlationId) {
        String[] parts = correlationId.split("-");
        if (parts.length != 3) {
            throw new IllegalArgumentException("Invalid correlation ID format");
        }

        return CorrelationMetadata.builder()
            .instanceId(parts[0])
            .timestamp(Long.parseLong(parts[1]))
            .sequenceNumber(Long.parseLong(parts[2]))
            .build();
    }
}

3. Request-Reply Service Handler

Processes incoming requests and generates replies:

@Component
public class RequestReplyServiceHandler {
    private final MessageTemplate messageTemplate;
    private final RequestProcessor requestProcessor;

    @MessageHandler("service.request.queue")
    public void handleRequest(RequestMessage request) {
        String correlationId = request.getCorrelationId();
        String replyTo = request.getReplyTo();

        ReplyMessage reply;

        try {
            // Process the request
            Object result = requestProcessor.process(request.getPayload());

            reply = ReplyMessage.builder()
                .correlationId(correlationId)
                .status(ReplyStatus.SUCCESS)
                .payload(result)
                .timestamp(Instant.now())
                .build();

        } catch (Exception e) {
            log.error("Error processing request {}: {}", correlationId, e.getMessage(), e);

            reply = ReplyMessage.builder()
                .correlationId(correlationId)
                .status(ReplyStatus.ERROR)
                .errorCode(determineErrorCode(e))
                .errorMessage(e.getMessage())
                .timestamp(Instant.now())
                .build();
        }

        // Send reply
        if (replyTo != null) {
            messageTemplate.send(replyTo, reply);
        } else {
            log.warn("No reply-to address specified for request: {}", correlationId);
        }
    }

    private String determineErrorCode(Exception e) {
        if (e instanceof ValidationException) {
            return "VALIDATION_ERROR";
        } else if (e instanceof BusinessException) {
            return "BUSINESS_ERROR";
        } else if (e instanceof TimeoutException) {
            return "TIMEOUT_ERROR";
        } else {
            return "INTERNAL_ERROR";
        }
    }
}

4. Reply Timeout Manager

Manages request timeouts and cleanup:

@Component
public class ReplyTimeoutManager {
    private final ScheduledExecutorService timeoutExecutor;
    private final Map<String, ScheduledFuture<?>> timeoutTasks = new ConcurrentHashMap<>();

    public void scheduleTimeout(String correlationId, Duration timeout, Runnable timeoutAction) {
        ScheduledFuture<?> timeoutTask = timeoutExecutor.schedule(
            timeoutAction,
            timeout.toMillis(),
            TimeUnit.MILLISECONDS
        );

        timeoutTasks.put(correlationId, timeoutTask);
    }

    public boolean cancelTimeout(String correlationId) {
        ScheduledFuture<?> timeoutTask = timeoutTasks.remove(correlationId);
        if (timeoutTask != null) {
            return timeoutTask.cancel(false);
        }
        return false;
    }

    @Scheduled(fixedRate = 60000) // Clean up every minute
    public void cleanupCompletedTasks() {
        timeoutTasks.entrySet().removeIf(entry -> 
            entry.getValue().isDone() || entry.getValue().isCancelled()
        );
    }
}

Configuration Parameters

Essential Settings

Parameter Description Typical Values
Request Timeout Maximum time to wait for reply 30s-300s
Reply Queue TTL Time-to-live for reply messages 1h-24h
Max Concurrent Requests Maximum pending requests per client 100-10000
Correlation ID Format Format for generating correlation IDs UUID/Timestamp-based
Reply Queue Type Temporary vs permanent reply queues temp/permanent

Example Configuration

# Request-Reply Configuration
request-reply.default-timeout=60s
request-reply.max-concurrent-requests=1000
request-reply.correlation-format=uuid
request-reply.reply-queue-ttl=1h

# Queue Configuration
request-reply.request-queue=service.requests
request-reply.reply-queue=service.replies
request-reply.dead-letter-queue=service.requests.dlq

# Performance Tuning
request-reply.connection-pool-size=20
request-reply.consumer-concurrency=10
request-reply.retry-attempts=3
request-reply.retry-delay=1s

Implementation Examples

1. Spring AMQP Request-Reply Implementation

@Configuration
public class RequestReplyConfiguration {

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setReplyTimeout(60000);
        template.setReplyAddress("amq.rabbitmq.reply-to");
        template.setCorrelationKey("correlation-id");
        return template;
    }

    @Bean
    public Queue requestQueue() {
        return QueueBuilder.durable("contact.service.requests")
            .deadLetterExchange("dlx")
            .deadLetterRoutingKey("contact.service.requests.dlq")
            .build();
    }
}

@Service
public class ContactRequestReplyService {
    private final RabbitTemplate rabbitTemplate;

    public ContactResponse getContact(ContactRequest request) {
        return (ContactResponse) rabbitTemplate.convertSendAndReceive(
            "contact.service.requests", 
            request
        );
    }

    public CompletableFuture<ContactResponse> getContactAsync(ContactRequest request) {
        return CompletableFuture.supplyAsync(() -> 
            (ContactResponse) rabbitTemplate.convertSendAndReceive(
                "contact.service.requests", 
                request
            )
        );
    }
}

@RabbitListener(queues = "contact.service.requests")
@Component
public class ContactRequestHandler {
    private final ContactService contactService;

    @RabbitHandler
    public ContactResponse handleContactRequest(ContactRequest request,
                                              @Header("spring_request_return_correlation") String correlationId) {
        try {
            return contactService.processContactRequest(request);
        } catch (Exception e) {
            throw new AmqpRejectAndDontRequeueException("Failed to process contact request", e);
        }
    }
}

2. Apache Kafka Request-Reply Implementation

@Configuration
public class KafkaRequestReplyConfiguration {

    @Bean
    public ReplyingKafkaTemplate<String, ContactRequest, ContactResponse> replyingKafkaTemplate(
            ProducerFactory<String, ContactRequest> producerFactory,
            KafkaMessageListenerContainer<String, ContactResponse> replyContainer) {

        return new ReplyingKafkaTemplate<>(producerFactory, replyContainer);
    }

    @Bean
    public KafkaMessageListenerContainer<String, ContactResponse> replyContainer(
            ConsumerFactory<String, ContactResponse> consumerFactory) {

        ContainerProperties properties = new ContainerProperties("contact.service.replies");
        return new KafkaMessageListenerContainer<>(consumerFactory, properties);
    }
}

@Service
public class KafkaContactRequestReplyService {
    private final ReplyingKafkaTemplate<String, ContactRequest, ContactResponse> replyingTemplate;

    public ContactResponse getContact(ContactRequest request) throws ExecutionException, InterruptedException {
        ProducerRecord<String, ContactRequest> producerRecord = 
            new ProducerRecord<>("contact.service.requests", request);

        RequestReplyFuture<String, ContactRequest, ContactResponse> future = 
            replyingTemplate.sendAndReceive(producerRecord);

        ConsumerRecord<String, ContactResponse> responseRecord = future.get(30, TimeUnit.SECONDS);
        return responseRecord.value();
    }
}

@KafkaListener(topics = "contact.service.requests")
@SendTo("contact.service.replies")
@Component
public class KafkaContactRequestHandler {
    private final ContactService contactService;

    public ContactResponse handleContactRequest(@Payload ContactRequest request,
                                              @Header(KafkaHeaders.REPLY_TOPIC) String replyTopic,
                                              @Header(KafkaHeaders.CORRELATION_ID) String correlationId) {

        return contactService.processContactRequest(request);
    }
}

3. Custom HTTP-to-Message Bridge

@RestController
public class HttpToMessageBridgeController {
    private final RequestReplyManager requestReplyManager;

    @PostMapping("/contacts")
    public CompletableFuture<ResponseEntity<ContactResponse>> createContact(
            @RequestBody ContactRequest request) {

        return requestReplyManager.sendRequest(
            "contact.service.create", 
            request, 
            ContactResponse.class,
            Duration.ofSeconds(30)
        ).thenApply(response -> ResponseEntity.ok(response))
         .exceptionally(throwable -> {
             if (throwable.getCause() instanceof RequestReplyTimeoutException) {
                 return ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT).build();
             } else {
                 return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
             }
         });
    }

    @GetMapping("/contacts/{id}")
    public DeferredResult<ResponseEntity<ContactResponse>> getContact(@PathVariable String id) {
        DeferredResult<ResponseEntity<ContactResponse>> deferredResult = 
            new DeferredResult<>(30000L);

        ContactRequest request = new ContactRequest(id);

        requestReplyManager.sendRequest(
            "contact.service.get", 
            request, 
            ContactResponse.class,
            Duration.ofSeconds(30)
        ).whenComplete((response, throwable) -> {
            if (throwable != null) {
                deferredResult.setErrorResult(
                    ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build()
                );
            } else {
                deferredResult.setResult(ResponseEntity.ok(response));
            }
        });

        return deferredResult;
    }
}

4. Request-Reply with Circuit Breaker Integration

@Component
public class ResilientRequestReplyService {
    private final RequestReplyManager requestReplyManager;
    private final CircuitBreaker circuitBreaker;

    @CircuitBreaker(name = "contact-service")
    @TimeLimiter(name = "contact-service")
    @Retry(name = "contact-service")
    public CompletableFuture<ContactResponse> getContactWithResilience(ContactRequest request) {

        return requestReplyManager.sendRequest(
            "contact.service.requests",
            request,
            ContactResponse.class,
            Duration.ofSeconds(30)
        );
    }

    public ContactResponse getContactWithFallback(ContactRequest request) {
        try {
            return getContactWithResilience(request).get(30, TimeUnit.SECONDS);
        } catch (Exception e) {
            log.warn("Request-reply failed, using fallback: {}", e.getMessage());
            return createFallbackResponse(request);
        }
    }

    private ContactResponse createFallbackResponse(ContactRequest request) {
        return ContactResponse.builder()
            .id(request.getId())
            .status("UNAVAILABLE")
            .message("Service temporarily unavailable")
            .fallbackUsed(true)
            .build();
    }
}

Best Practices

1. Correlation ID Management

public class CorrelationIdBestPractices {

    // Use structured correlation IDs for better traceability
    public String generateStructuredCorrelationId(String serviceId, String operationType) {
        return String.format("%s-%s-%s-%d", 
            serviceId,
            operationType,
            UUID.randomUUID().toString().substring(0, 8),
            System.currentTimeMillis()
        );
    }

    // Propagate correlation IDs across service boundaries
    @MessageHandler
    public void handleRequest(RequestMessage request, 
                            @Header("X-Correlation-ID") String correlationId) {

        // Set in thread context for logging
        MDC.put("correlationId", correlationId);

        try {
            processRequest(request);
        } finally {
            MDC.remove("correlationId");
        }
    }

    // Validate correlation ID format for security
    public boolean isValidCorrelationId(String correlationId) {
        return correlationId != null &&
               correlationId.matches("^[a-zA-Z0-9\\-_]{8,64}$") &&
               !correlationId.contains("..") &&
               !correlationId.startsWith("-") &&
               !correlationId.endsWith("-");
    }
}

2. Request-Reply Monitoring

@Component
public class RequestReplyMonitoring {
    private final MeterRegistry meterRegistry;
    private final Timer requestDuration;
    private final Counter requestCount;
    private final Counter timeoutCount;

    public void recordRequest(String destination, Duration duration, boolean successful) {
        requestCount.tag("destination", destination)
                   .tag("status", successful ? "success" : "failure")
                   .increment();

        requestDuration.tag("destination", destination)
                      .record(duration);
    }

    public void recordTimeout(String destination) {
        timeoutCount.tag("destination", destination).increment();
    }

    @EventListener
    public void handleRequestReplyEvent(RequestReplyEvent event) {
        recordRequest(
            event.getDestination(),
            event.getDuration(),
            event.isSuccessful()
        );

        if (event.isTimedOut()) {
            recordTimeout(event.getDestination());
        }
    }
}

3. Reply Queue Management

@Component
public class ReplyQueueManager {
    private final Map<String, String> temporaryQueues = new ConcurrentHashMap<>();

    public String createTemporaryReplyQueue(String requestId) {
        String queueName = "reply.temp." + requestId;

        // Create temporary queue with TTL
        QueueSpecification spec = QueueSpecification.builder()
            .name(queueName)
            .temporary(true)
            .ttl(Duration.ofHours(1))
            .autoDelete(true)
            .build();

        messageQueueManager.createQueue(spec);
        temporaryQueues.put(requestId, queueName);

        return queueName;
    }

    public void cleanupReplyQueue(String requestId) {
        String queueName = temporaryQueues.remove(requestId);
        if (queueName != null) {
            try {
                messageQueueManager.deleteQueue(queueName);
            } catch (Exception e) {
                log.warn("Failed to cleanup reply queue {}: {}", queueName, e.getMessage());
            }
        }
    }

    @Scheduled(fixedRate = 300000) // Every 5 minutes
    public void cleanupExpiredQueues() {
        temporaryQueues.entrySet().removeIf(entry -> {
            try {
                return !messageQueueManager.queueExists(entry.getValue());
            } catch (Exception e) {
                return true; // Remove if we can't check existence
            }
        });
    }
}

Common Pitfalls

1. Correlation ID Collisions

Problem: Using simple sequential or timestamp-based IDs that can collide across instances
Solution: Use UUIDs or include instance identifiers in correlation ID generation

2. Reply Queue Memory Leaks

Problem: Temporary reply queues not cleaned up after request completion
Solution: Implement automatic cleanup with TTL and monitoring for queue growth

3. Timeout Handling

Problem: Not properly handling timeout scenarios leading to resource leaks
Solution: Always implement timeout cleanup and ensure futures are completed properly

4. Message Ordering Issues

Problem: Assuming message ordering in request-reply scenarios
Solution: Design for out-of-order message handling and use explicit sequencing when needed

5. Exception Propagation

Problem: Not properly converting and transmitting exception information in replies
Solution: Standardize error response formats and exception serialization

Integration in Distributed Systems

In distributed integration scenarios, Request-Reply Pattern provides:

Microservices Integration

@Service
public class DistributedContactService {

    @RequestReply(destination = "user.service", timeout = "30s")
    public UserData getUserData(String userId) {
        return UserRequest.builder().userId(userId).build();
    }

    @RequestReply(destination = "profile.service", timeout = "15s")
    public ProfileData getProfileData(String profileId) {
        return ProfileRequest.builder().profileId(profileId).build();
    }
}

Legacy System Integration

@Component
public class LegacySystemBridge {

    @RequestReply(destination = "mainframe.bridge", timeout = "120s")
    public LegacyResponse queryMainframe(LegacyQuery query) {
        return transformToLegacyFormat(query);
    }
}

Event-Driven Query Services

@Service
public class QueryService {

    @RequestReply(destination = "data.query.service")
    public QueryResponse executeQuery(QueryRequest query) {
        return query;
    }
}

Conclusion

The Request-Reply pattern is essential for building distributed systems that require synchronous-style interactions while maintaining the benefits of message-oriented architectures. It provides:

When properly implemented with appropriate correlation management, timeout handling, and monitoring, the Request-Reply pattern serves as a crucial bridge between synchronous and asynchronous communication paradigms in distributed architectures.

References

← Back to All Patterns