Correlation ID Pattern
Overview
The Correlation ID pattern provides a distributed tracing mechanism that enables tracking of requests, messages, and transactions across multiple systems and services boundaries. It implements a unique identifier that flows with each request through the entire processing pipeline, enabling comprehensive monitoring, debugging, and audit capabilities in distributed architectures. This pattern is essential for maintaining observability in complex, multi-service environments.
Theoretical Foundation
The Correlation ID pattern is grounded in distributed systems observability theory and causal ordering principles. It implements the concept of "request correlation" where related operations are linked through a shared identifier that maintains causal relationships across service boundaries. The pattern embodies "end-to-end traceability" enabling comprehensive understanding of distributed transaction flows.
Core Principles
1. Request Flow Traceability
Every request entering the system receives a unique correlation identifier that accompanies all related processing activities, enabling complete visibility into distributed transaction execution.
2. Service Boundary Preservation
Correlation IDs are propagated across all service boundaries, maintaining the logical connection between related operations regardless of the underlying communication mechanisms or technology stacks.
3. Hierarchical Context Tracking
The pattern supports hierarchical correlation where parent-child relationships between operations are maintained, enabling detailed understanding of complex processing workflows.
4. Non-Intrusive Implementation
Correlation IDs are implemented as metadata that doesn't interfere with business logic while providing comprehensive tracing capabilities across the entire system.
Why Correlation ID is Essential in Integration Architecture
1. Distributed Debugging and Troubleshooting
Modern distributed systems require effective debugging capabilities: - Cross-service error tracking enabling identification of failure points in complex workflows - Performance bottleneck identification across multiple service calls - Request flow reconstruction for understanding system behavior - Root cause analysis in multi-tier architectures
2. Operational Excellence
Correlation IDs enable superior operational capabilities: - End-to-end monitoring providing complete visibility into system operations - Service dependency analysis through request flow tracking - Capacity planning based on actual request patterns and service utilization - SLA compliance monitoring with detailed performance tracking
3. Business Process Visibility
Understanding business operations across distributed systems: - Business transaction tracking from initiation to completion - Audit trail maintenance for compliance and governance requirements - Customer journey analysis across multiple touchpoints - Process optimization based on actual execution patterns
4. Development and Testing Support
Enhanced development and testing capabilities: - Integration testing with comprehensive request tracking - Performance testing with detailed metrics correlation - System integration validation ensuring correct cross-service communication - Debugging support in development and staging environments
Benefits in Integration Contexts
1. Enhanced Observability
- Comprehensive request tracking across all system components
- Service interaction visibility showing actual communication patterns
- Performance correlation linking metrics across related operations
- Error propagation tracking showing failure paths through the system
2. Improved Development Productivity
- Simplified debugging with clear request flow visualization
- Faster issue resolution through detailed transaction tracing
- Better system understanding enabling more effective development
- Quality assurance through comprehensive testing support
3. Operational Efficiency
- Proactive monitoring with early problem detection
- Reduced mean time to resolution (MTTR) through better diagnostics
- Capacity optimization based on actual usage patterns
- Service level management with detailed performance tracking
4. Compliance and Governance
- Audit trail completeness for regulatory compliance
- Data lineage tracking showing information flow paths
- Access pattern monitoring for security analysis
- Regulatory reporting with detailed transaction histories
Integration Architecture Applications
1. Microservices Request Tracing
Correlation IDs enable comprehensive microservices observability: - Service-to-service call tracking showing complete request paths - API gateway integration with automatic correlation ID injection - Service mesh observability providing detailed traffic analysis - Cross-cutting concern correlation linking security, logging, and monitoring
2. Enterprise Service Bus (ESB) Integration
ESB architectures benefit from correlation-based tracking: - Message routing visibility showing transformation and routing paths - Protocol bridging tracking across different communication standards - Enterprise application integration with end-to-end visibility - B2B transaction tracking across organizational boundaries
3. Event-Driven Architecture Correlation
Event processing systems use correlation for flow tracking: - Event chain tracking showing cause-and-effect relationships - Saga pattern implementation with distributed transaction correlation - Event sourcing support linking events to originating transactions - Stream processing correlation maintaining relationships in data pipelines
4. Legacy System Integration
Modern integration with legacy systems requires correlation: - Mainframe transaction tracking bridging modern and legacy systems - Batch processing correlation linking real-time and batch operations - System migration support tracking operations across old and new systems - Hybrid architecture observability spanning multiple technology generations
How Correlation ID Pattern Works
The Correlation ID pattern operates through identifier propagation across all system boundaries and processing stages:
Basic Correlation Flow
Client Request ──┐
│ [Generate/Extract Correlation ID]
▼
┌─────────────────────────────────────────────────────────┐
│ Service A (ID: CORR-123) │
│ ├── Log: "Processing request CORR-123" │
│ ├── Call Service B ──────────────┐ │
│ └── Log: "Completed request CORR-123" │
└─────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Service B (ID: CORR-123) │
│ ├── Log: "Received request CORR-123 from Service A" │
│ ├── Database Query ──────────────┐ │
│ ├── Call Service C ──────────────┼─────────────┐ │
│ └── Log: "Response sent for CORR-123" │
└─────────────────────────────────────────────────────────┘
│ │
▼ ▼
┌─────────────────────────────────┐ ┌─────────────────────────────────┐
│ Database (ID: CORR-123) │ │ Service C (ID: CORR-123) │
│ └── Log: "Query for CORR-123" │ │ ├── Log: "Processing CORR-123" │
└─────────────────────────────────┘ │ └── Log: "Completed CORR-123" │
└─────────────────────────────────┘
Correlation ID Lifecycle
┌─────────────────────────────────────────────────────────────┐
│ Correlation ID Lifecycle │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. [Request Initiated] ──▶ 2. [ID Generated/Extracted] ──▶ │
│ │ │
│ ▼ │
│ 8. [Request Completed] ◀── 7. [Response Aggregated] ◀── │
│ │ │
│ ▼ │
│ 3. [Context Propagation] ──▶ │
│ │ │
│ ▼ │
│ 4. [Service Processing] ──▶ │
│ │ │
│ ▼ │
│ 5. [Cross-Service Calls] ──▶ │
│ │ │
│ ▼ │
│ 6. [Distributed Logging] │
│ │
│ Context Preservation Features: │
│ • Thread-local storage │
│ • HTTP header propagation │
│ • Message header inclusion │
│ • MDC (Mapped Diagnostic Context) integration │
└─────────────────────────────────────────────────────────────┘
Hierarchical Correlation Structure
Root Request: CORR-ROOT-123
├── Service A: CORR-ROOT-123.A.001
│ ├── Database Query: CORR-ROOT-123.A.001.DB.001
│ └── Cache Lookup: CORR-ROOT-123.A.001.CACHE.001
├── Service B: CORR-ROOT-123.B.001
│ ├── External API: CORR-ROOT-123.B.001.EXT.001
│ └── Message Queue: CORR-ROOT-123.B.001.MQ.001
└── Service C: CORR-ROOT-123.C.001
├── Async Processing: CORR-ROOT-123.C.001.ASYNC.001
└── Notification: CORR-ROOT-123.C.001.NOTIF.001
Key Components
1. Correlation Context Manager
Manages correlation ID lifecycle and propagation:
@Component
public class CorrelationContextManager {
private final ThreadLocal<CorrelationContext> contextHolder = new ThreadLocal<>();
private final CorrelationIdGenerator idGenerator;
private final ContextPropagationStrategy propagationStrategy;
public CorrelationContext initializeContext(String correlationId) {
if (correlationId == null || correlationId.trim().isEmpty()) {
correlationId = idGenerator.generate();
}
CorrelationContext context = CorrelationContext.builder()
.correlationId(correlationId)
.rootId(extractRootId(correlationId))
.parentId(extractParentId(correlationId))
.initiatedAt(Instant.now())
.threadId(Thread.currentThread().getId())
.threadName(Thread.currentThread().getName())
.build();
setContext(context);
updateMDC(context);
return context;
}
public CorrelationContext createChildContext(String operation) {
CorrelationContext parentContext = getContext();
if (parentContext == null) {
throw new IllegalStateException("No parent correlation context found");
}
String childId = idGenerator.generateChild(parentContext.getCorrelationId(), operation);
CorrelationContext childContext = CorrelationContext.builder()
.correlationId(childId)
.rootId(parentContext.getRootId())
.parentId(parentContext.getCorrelationId())
.operation(operation)
.initiatedAt(Instant.now())
.threadId(Thread.currentThread().getId())
.threadName(Thread.currentThread().getName())
.build();
setContext(childContext);
updateMDC(childContext);
return childContext;
}
public CorrelationContext getContext() {
return contextHolder.get();
}
public void setContext(CorrelationContext context) {
contextHolder.set(context);
}
public void clearContext() {
contextHolder.remove();
MDC.clear();
}
public String getCurrentCorrelationId() {
CorrelationContext context = getContext();
return context != null ? context.getCorrelationId() : null;
}
public void propagateToHeaders(Map<String, String> headers) {
CorrelationContext context = getContext();
if (context != null) {
propagationStrategy.addToHeaders(context, headers);
}
}
public void propagateToMessage(MessageProperties messageProperties) {
CorrelationContext context = getContext();
if (context != null) {
propagationStrategy.addToMessage(context, messageProperties);
}
}
public CorrelationContext extractFromHeaders(Map<String, String> headers) {
return propagationStrategy.extractFromHeaders(headers);
}
public CorrelationContext extractFromMessage(MessageProperties messageProperties) {
return propagationStrategy.extractFromMessage(messageProperties);
}
private void updateMDC(CorrelationContext context) {
MDC.put("correlationId", context.getCorrelationId());
MDC.put("rootId", context.getRootId());
if (context.getParentId() != null) {
MDC.put("parentId", context.getParentId());
}
if (context.getOperation() != null) {
MDC.put("operation", context.getOperation());
}
}
private String extractRootId(String correlationId) {
// Extract root ID from hierarchical correlation ID
if (correlationId.contains(".")) {
return correlationId.split("\\.")[0];
}
return correlationId;
}
private String extractParentId(String correlationId) {
// Extract parent ID from hierarchical correlation ID
if (correlationId.contains(".")) {
int lastDotIndex = correlationId.lastIndexOf(".");
if (lastDotIndex > 0) {
return correlationId.substring(0, lastDotIndex);
}
}
return null;
}
}
2. Correlation ID Generator
Generates unique correlation identifiers:
@Component
public class CorrelationIdGenerator {
private final String instanceId;
private final AtomicLong sequenceNumber = new AtomicLong(0);
private final Clock clock;
public CorrelationIdGenerator() {
this.instanceId = generateInstanceId();
this.clock = Clock.systemUTC();
}
public String generate() {
return generate(CorrelationIdFormat.HIERARCHICAL);
}
public String generate(CorrelationIdFormat format) {
switch (format) {
case UUID:
return generateUUID();
case TIMESTAMP_BASED:
return generateTimestampBased();
case HIERARCHICAL:
return generateHierarchical();
default:
return generateDefault();
}
}
public String generateChild(String parentId, String operation) {
long childSequence = sequenceNumber.incrementAndGet();
return String.format("%s.%s.%03d", parentId, sanitizeOperation(operation), childSequence);
}
public String generateSpanId(String correlationId) {
return String.format("%s.SPAN.%d", correlationId, System.nanoTime());
}
private String generateUUID() {
return UUID.randomUUID().toString();
}
private String generateTimestampBased() {
long timestamp = clock.millis();
long sequence = sequenceNumber.incrementAndGet();
return String.format("%s-%d-%06d", instanceId, timestamp, sequence);
}
private String generateHierarchical() {
long timestamp = clock.millis();
long sequence = sequenceNumber.incrementAndGet();
return String.format("CORR-%s-%d-%06d", instanceId, timestamp, sequence);
}
private String generateDefault() {
return generateHierarchical();
}
private String generateInstanceId() {
try {
String hostName = InetAddress.getLocalHost().getHostName();
return hostName.substring(0, Math.min(hostName.length(), 8)).toUpperCase();
} catch (UnknownHostException e) {
return "UNKNOWN" + ThreadLocalRandom.current().nextInt(1000, 9999);
}
}
private String sanitizeOperation(String operation) {
if (operation == null || operation.trim().isEmpty()) {
return "OP";
}
return operation.replaceAll("[^A-Za-z0-9]", "")
.substring(0, Math.min(operation.length(), 10))
.toUpperCase();
}
public boolean isValidCorrelationId(String correlationId) {
if (correlationId == null || correlationId.trim().isEmpty()) {
return false;
}
// Validate format and length constraints
if (correlationId.length() > 255) {
return false;
}
// Check for security concerns
if (containsSecurityRisk(correlationId)) {
return false;
}
return true;
}
private boolean containsSecurityRisk(String correlationId) {
// Check for injection patterns, scripts, etc.
String lowerCase = correlationId.toLowerCase();
return lowerCase.contains("<script>") ||
lowerCase.contains("javascript:") ||
lowerCase.contains("sql") ||
lowerCase.contains("union") ||
lowerCase.contains("select") ||
lowerCase.contains("insert") ||
lowerCase.contains("update") ||
lowerCase.contains("delete");
}
public enum CorrelationIdFormat {
UUID, TIMESTAMP_BASED, HIERARCHICAL
}
}
3. Context Propagation Strategy
Handles correlation context propagation across different communication channels:
@Component
public class ContextPropagationStrategy {
public static final String CORRELATION_ID_HEADER = "X-Correlation-ID";
public static final String ROOT_ID_HEADER = "X-Root-ID";
public static final String PARENT_ID_HEADER = "X-Parent-ID";
public static final String TRACE_ID_HEADER = "X-Trace-ID";
public static final String SPAN_ID_HEADER = "X-Span-ID";
public void addToHeaders(CorrelationContext context, Map<String, String> headers) {
headers.put(CORRELATION_ID_HEADER, context.getCorrelationId());
headers.put(ROOT_ID_HEADER, context.getRootId());
if (context.getParentId() != null) {
headers.put(PARENT_ID_HEADER, context.getParentId());
}
if (context.getTraceId() != null) {
headers.put(TRACE_ID_HEADER, context.getTraceId());
}
if (context.getSpanId() != null) {
headers.put(SPAN_ID_HEADER, context.getSpanId());
}
}
public void addToHttpHeaders(CorrelationContext context, HttpHeaders httpHeaders) {
httpHeaders.set(CORRELATION_ID_HEADER, context.getCorrelationId());
httpHeaders.set(ROOT_ID_HEADER, context.getRootId());
if (context.getParentId() != null) {
httpHeaders.set(PARENT_ID_HEADER, context.getParentId());
}
}
public void addToMessage(CorrelationContext context, MessageProperties messageProperties) {
messageProperties.setHeader(CORRELATION_ID_HEADER, context.getCorrelationId());
messageProperties.setHeader(ROOT_ID_HEADER, context.getRootId());
if (context.getParentId() != null) {
messageProperties.setHeader(PARENT_ID_HEADER, context.getParentId());
}
// Set standard message correlation ID
messageProperties.setCorrelationId(context.getCorrelationId());
}
public CorrelationContext extractFromHeaders(Map<String, String> headers) {
String correlationId = headers.get(CORRELATION_ID_HEADER);
if (correlationId == null || correlationId.trim().isEmpty()) {
return null;
}
return CorrelationContext.builder()
.correlationId(correlationId)
.rootId(headers.getOrDefault(ROOT_ID_HEADER, correlationId))
.parentId(headers.get(PARENT_ID_HEADER))
.traceId(headers.get(TRACE_ID_HEADER))
.spanId(headers.get(SPAN_ID_HEADER))
.extractedAt(Instant.now())
.build();
}
public CorrelationContext extractFromHttpHeaders(HttpHeaders httpHeaders) {
String correlationId = httpHeaders.getFirst(CORRELATION_ID_HEADER);
if (correlationId == null || correlationId.trim().isEmpty()) {
return null;
}
return CorrelationContext.builder()
.correlationId(correlationId)
.rootId(httpHeaders.getFirst(ROOT_ID_HEADER))
.parentId(httpHeaders.getFirst(PARENT_ID_HEADER))
.extractedAt(Instant.now())
.build();
}
public CorrelationContext extractFromMessage(MessageProperties messageProperties) {
String correlationId = (String) messageProperties.getHeaders().get(CORRELATION_ID_HEADER);
// Fallback to standard message correlation ID
if (correlationId == null) {
correlationId = messageProperties.getCorrelationId();
}
if (correlationId == null || correlationId.trim().isEmpty()) {
return null;
}
return CorrelationContext.builder()
.correlationId(correlationId)
.rootId((String) messageProperties.getHeaders().get(ROOT_ID_HEADER))
.parentId((String) messageProperties.getHeaders().get(PARENT_ID_HEADER))
.extractedAt(Instant.now())
.build();
}
public void propagateToRestTemplate(CorrelationContext context, RestTemplate restTemplate) {
restTemplate.setInterceptors(Arrays.asList(
new CorrelationIdInterceptor(context)
));
}
public void propagateToWebClient(CorrelationContext context, WebClient.Builder webClientBuilder) {
webClientBuilder.filter(new CorrelationIdWebClientFilter(context));
}
// HTTP Client Interceptor
public static class CorrelationIdInterceptor implements ClientHttpRequestInterceptor {
private final CorrelationContext context;
public CorrelationIdInterceptor(CorrelationContext context) {
this.context = context;
}
@Override
public ClientHttpResponse intercept(
HttpRequest request,
byte[] body,
ClientHttpRequestExecution execution) throws IOException {
HttpHeaders headers = request.getHeaders();
headers.set(CORRELATION_ID_HEADER, context.getCorrelationId());
headers.set(ROOT_ID_HEADER, context.getRootId());
if (context.getParentId() != null) {
headers.set(PARENT_ID_HEADER, context.getParentId());
}
return execution.execute(request, body);
}
}
// WebClient Filter
public static class CorrelationIdWebClientFilter implements ExchangeFilterFunction {
private final CorrelationContext context;
public CorrelationIdWebClientFilter(CorrelationContext context) {
this.context = context;
}
@Override
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
ClientRequest modifiedRequest = ClientRequest.from(request)
.header(CORRELATION_ID_HEADER, context.getCorrelationId())
.header(ROOT_ID_HEADER, context.getRootId())
.header(PARENT_ID_HEADER, context.getParentId())
.build();
return next.exchange(modifiedRequest);
}
}
}
4. Correlation Interceptors and Filters
Automatic correlation handling for different frameworks:
// Spring MVC Interceptor
@Component
public class CorrelationIdHandlerInterceptor implements HandlerInterceptor {
private final CorrelationContextManager contextManager;
private final ContextPropagationStrategy propagationStrategy;
@Override
public boolean preHandle(HttpServletRequest request,
HttpServletResponse response,
Object handler) throws Exception {
// Extract correlation context from request headers
Map<String, String> headers = extractHeaders(request);
CorrelationContext context = propagationStrategy.extractFromHeaders(headers);
if (context == null) {
// Generate new correlation ID for new requests
context = contextManager.initializeContext(null);
} else {
// Use existing correlation context
contextManager.setContext(context);
}
// Add correlation ID to response headers for debugging
response.setHeader(ContextPropagationStrategy.CORRELATION_ID_HEADER,
context.getCorrelationId());
return true;
}
@Override
public void afterCompletion(HttpServletRequest request,
HttpServletResponse response,
Object handler,
Exception ex) throws Exception {
contextManager.clearContext();
}
private Map<String, String> extractHeaders(HttpServletRequest request) {
Map<String, String> headers = new HashMap<>();
Enumeration<String> headerNames = request.getHeaderNames();
while (headerNames.hasMoreElements()) {
String headerName = headerNames.nextElement();
String headerValue = request.getHeader(headerName);
headers.put(headerName, headerValue);
}
return headers;
}
}
// Message Listener Aspect for Automatic Correlation Handling
@Aspect
@Component
public class MessageCorrelationAspect {
private final CorrelationContextManager contextManager;
private final ContextPropagationStrategy propagationStrategy;
@Around("@annotation(org.springframework.jms.annotation.JmsListener) || " +
"@annotation(org.springframework.kafka.annotation.KafkaListener) || " +
"@annotation(org.springframework.amqp.rabbit.annotation.RabbitListener)")
public Object handleMessageCorrelation(ProceedingJoinPoint joinPoint) throws Throwable {
// Extract correlation context from message
CorrelationContext context = extractCorrelationFromMessage(joinPoint);
if (context == null) {
context = contextManager.initializeContext(null);
} else {
contextManager.setContext(context);
}
try {
return joinPoint.proceed();
} finally {
contextManager.clearContext();
}
}
private CorrelationContext extractCorrelationFromMessage(ProceedingJoinPoint joinPoint) {
for (Object arg : joinPoint.getArgs()) {
if (arg instanceof Message) {
Message<?> message = (Message<?>) arg;
MessageHeaders headers = message.getHeaders();
String correlationId = (String) headers.get(ContextPropagationStrategy.CORRELATION_ID_HEADER);
if (correlationId != null) {
return CorrelationContext.builder()
.correlationId(correlationId)
.rootId((String) headers.get(ContextPropagationStrategy.ROOT_ID_HEADER))
.parentId((String) headers.get(ContextPropagationStrategy.PARENT_ID_HEADER))
.extractedAt(Instant.now())
.build();
}
}
}
return null;
}
}
// Servlet Filter for Web Applications
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class CorrelationIdFilter implements Filter {
private final CorrelationContextManager contextManager;
private final ContextPropagationStrategy propagationStrategy;
@Override
public void doFilter(ServletRequest request,
ServletResponse response,
FilterChain chain) throws IOException, ServletException {
if (request instanceof HttpServletRequest && response instanceof HttpServletResponse) {
HttpServletRequest httpRequest = (HttpServletRequest) request;
HttpServletResponse httpResponse = (HttpServletResponse) response;
// Extract or generate correlation ID
String correlationId = httpRequest.getHeader(ContextPropagationStrategy.CORRELATION_ID_HEADER);
CorrelationContext context = contextManager.initializeContext(correlationId);
// Add to response headers
httpResponse.setHeader(ContextPropagationStrategy.CORRELATION_ID_HEADER,
context.getCorrelationId());
try {
chain.doFilter(request, response);
} finally {
contextManager.clearContext();
}
} else {
chain.doFilter(request, response);
}
}
}
Configuration Parameters
Essential Settings
| Parameter | Description | Typical Values |
|---|---|---|
| ID Format | Format for correlation ID generation | UUID/Timestamp/Hierarchical |
| Header Names | HTTP header names for correlation | X-Correlation-ID |
| Context Timeout | Max time to keep context in cache | 1h-24h |
| Propagation Mode | Automatic vs manual propagation | auto/manual |
| Validation Enabled | Enable correlation ID validation | true/false |
| Hierarchy Depth | Maximum depth for hierarchical IDs | 5-20 |
Example Configuration
# Correlation ID Configuration
correlation.id.format=HIERARCHICAL
correlation.id.header-name=X-Correlation-ID
correlation.id.root-header-name=X-Root-ID
correlation.id.parent-header-name=X-Parent-ID
# Context Management
correlation.context.timeout=1h
correlation.context.cleanup-interval=5m
correlation.context.max-cache-size=10000
# Propagation Settings
correlation.propagation.auto-http=true
correlation.propagation.auto-messaging=true
correlation.propagation.auto-database=false
# Validation
correlation.validation.enabled=true
correlation.validation.max-length=255
correlation.validation.allowed-chars=[A-Za-z0-9._-]
# Logging Integration
correlation.logging.mdc-enabled=true
correlation.logging.include-in-response=true
correlation.logging.trace-requests=true
Implementation Examples
1. Spring Boot REST API Implementation
@RestController
@RequestMapping("/api/contacts")
public class ContactController {
private final ContactService contactService;
private final CorrelationContextManager contextManager;
@PostMapping
public ResponseEntity<ContactResponse> createContact(@RequestBody CreateContactRequest request) {
String correlationId = contextManager.getCurrentCorrelationId();
log.info("Creating contact request received - correlationId: {}", correlationId);
ContactResponse response = contactService.createContact(request);
return ResponseEntity.ok()
.header("X-Correlation-ID", correlationId)
.body(response);
}
@GetMapping("/{id}")
public ResponseEntity<ContactResponse> getContact(@PathVariable String id) {
String correlationId = contextManager.getCurrentCorrelationId();
log.info("Get contact request for ID: {} - correlationId: {}", id, correlationId);
ContactResponse response = contactService.getContact(id);
return ResponseEntity.ok()
.header("X-Correlation-ID", correlationId)
.body(response);
}
}
@Service
public class ContactService {
private final ContactRepository contactRepository;
private final UserServiceClient userServiceClient;
private final CorrelationContextManager contextManager;
public ContactResponse createContact(CreateContactRequest request) {
String correlationId = contextManager.getCurrentCorrelationId();
log.info("Processing contact creation - correlationId: {}", correlationId);
// Create child context for database operation
CorrelationContext dbContext = contextManager.createChildContext("DB-SAVE");
Contact contact = Contact.builder()
.id(UUID.randomUUID().toString())
.firstName(request.getFirstName())
.lastName(request.getLastName())
.email(request.getEmail())
.build();
contact = contactRepository.save(contact);
log.info("Contact saved to database - correlationId: {}", dbContext.getCorrelationId());
// Create child context for user service call
CorrelationContext userServiceContext = contextManager.createChildContext("USER-SVC");
// Call external user service with correlation propagation
UserResponse userResponse = userServiceClient.createUser(
CreateUserRequest.builder()
.contactId(contact.getId())
.email(contact.getEmail())
.build());
log.info("User service called - correlationId: {}", userServiceContext.getCorrelationId());
return ContactResponse.builder()
.id(contact.getId())
.firstName(contact.getFirstName())
.lastName(contact.getLastName())
.email(contact.getEmail())
.userId(userResponse.getUserId())
.build();
}
}
@FeignClient(name = "user-service", configuration = UserServiceClientConfiguration.class)
public interface UserServiceClient {
@PostMapping("/users")
UserResponse createUser(@RequestBody CreateUserRequest request);
}
@Configuration
public class UserServiceClientConfiguration {
private final CorrelationContextManager contextManager;
@Bean
public RequestInterceptor correlationIdRequestInterceptor() {
return requestTemplate -> {
String correlationId = contextManager.getCurrentCorrelationId();
if (correlationId != null) {
requestTemplate.header("X-Correlation-ID", correlationId);
}
};
}
}
2. Message Processing with Correlation
@RabbitListener(queues = "contact.processing")
@Component
public class ContactMessageProcessor {
private final ContactService contactService;
private final CorrelationContextManager contextManager;
public void processContactMessage(ContactMessage message,
@Header Map<String, Object> headers) {
// Extract correlation ID from message headers
String correlationId = (String) headers.get("X-Correlation-ID");
CorrelationContext context = contextManager.initializeContext(correlationId);
try {
log.info("Processing contact message - correlationId: {}, messageId: {}",
context.getCorrelationId(), message.getMessageId());
// Process message with correlation context
contactService.processContactMessage(message);
log.info("Contact message processed successfully - correlationId: {}",
context.getCorrelationId());
} catch (Exception e) {
log.error("Error processing contact message - correlationId: {}, error: {}",
context.getCorrelationId(), e.getMessage(), e);
throw e;
} finally {
contextManager.clearContext();
}
}
}
@Service
public class ContactMessagePublisher {
private final RabbitTemplate rabbitTemplate;
private final CorrelationContextManager contextManager;
public void publishContactEvent(ContactEvent event) {
String correlationId = contextManager.getCurrentCorrelationId();
rabbitTemplate.convertAndSend("contact.events", event, message -> {
message.getMessageProperties().setHeader("X-Correlation-ID", correlationId);
message.getMessageProperties().setCorrelationId(correlationId);
return message;
});
log.info("Contact event published - correlationId: {}, eventType: {}",
correlationId, event.getEventType());
}
}
3. Database Integration with Correlation
@Repository
public class ContactRepository {
private final JdbcTemplate jdbcTemplate;
private final CorrelationContextManager contextManager;
public Contact save(Contact contact) {
String correlationId = contextManager.getCurrentCorrelationId();
String sql = """
INSERT INTO contacts (id, first_name, last_name, email, correlation_id, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""";
jdbcTemplate.update(sql,
contact.getId(),
contact.getFirstName(),
contact.getLastName(),
contact.getEmail(),
correlationId,
Instant.now());
log.info("Contact saved with correlationId: {}", correlationId);
return contact;
}
public Optional<Contact> findById(String id) {
String correlationId = contextManager.getCurrentCorrelationId();
String sql = "SELECT * FROM contacts WHERE id = ?";
List<Contact> contacts = jdbcTemplate.query(sql,
(rs, rowNum) -> Contact.builder()
.id(rs.getString("id"))
.firstName(rs.getString("first_name"))
.lastName(rs.getString("last_name"))
.email(rs.getString("email"))
.build(),
id);
log.info("Database query executed - correlationId: {}, found: {}",
correlationId, !contacts.isEmpty());
return contacts.stream().findFirst();
}
}
// Custom JPA Interceptor for automatic correlation logging
@Component
public class CorrelationJpaInterceptor implements Interceptor {
private final CorrelationContextManager contextManager;
@Override
public boolean onLoad(Object entity, Serializable id, Object[] state, String[] propertyNames, Type[] types) {
String correlationId = contextManager.getCurrentCorrelationId();
log.debug("JPA Entity loaded - correlationId: {}, entity: {}, id: {}",
correlationId, entity.getClass().getSimpleName(), id);
return false;
}
@Override
public boolean onSave(Object entity, Serializable id, Object[] state, String[] propertyNames, Type[] types) {
String correlationId = contextManager.getCurrentCorrelationId();
log.debug("JPA Entity saved - correlationId: {}, entity: {}, id: {}",
correlationId, entity.getClass().getSimpleName(), id);
return false;
}
}
4. Async Processing with Correlation Propagation
@Service
public class AsyncContactProcessor {
private final ContactAnalyticsService analyticsService;
private final NotificationService notificationService;
private final CorrelationContextManager contextManager;
@Async("taskExecutor")
public CompletableFuture<Void> processContactAsync(String contactId) {
// Capture current correlation context
CorrelationContext parentContext = contextManager.getContext();
return CompletableFuture.runAsync(() -> {
// Propagate correlation context to async thread
CorrelationContext asyncContext = contextManager.createChildContext("ASYNC-PROC");
try {
log.info("Starting async contact processing - correlationId: {}, contactId: {}",
asyncContext.getCorrelationId(), contactId);
// Perform analytics processing
analyticsService.processContactAnalytics(contactId);
// Send notifications
notificationService.sendWelcomeNotification(contactId);
log.info("Async contact processing completed - correlationId: {}",
asyncContext.getCorrelationId());
} finally {
contextManager.clearContext();
}
});
}
}
@Configuration
@EnableAsync
public class AsyncConfiguration {
@Bean(name = "taskExecutor")
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("async-");
// Set custom task decorator for correlation propagation
executor.setTaskDecorator(new CorrelationAwareTaskDecorator());
executor.initialize();
return executor;
}
public static class CorrelationAwareTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
// Capture correlation context from current thread
CorrelationContext context = MDC.getCopyOfContextMap();
return () -> {
try {
// Set correlation context in async thread
if (context != null) {
MDC.setContextMap(context);
}
runnable.run();
} finally {
MDC.clear();
}
};
}
}
}
Best Practices
1. Correlation ID Generation and Validation
public class CorrelationIdBestPractices {
// Use structured format for better debugging
public class StructuredCorrelationIdGenerator {
public String generateStructuredId(String service, String operation) {
return String.format("%s-%s-%s-%d",
service.toUpperCase(),
operation.toUpperCase(),
UUID.randomUUID().toString().substring(0, 8),
System.currentTimeMillis());
}
// Example: CONTACT-CREATE-A1B2C3D4-1640995200000
public CorrelationIdParts parseStructuredId(String correlationId) {
String[] parts = correlationId.split("-");
if (parts.length >= 4) {
return CorrelationIdParts.builder()
.service(parts[0])
.operation(parts[1])
.uniqueId(parts[2])
.timestamp(Long.parseLong(parts[3]))
.build();
}
throw new IllegalArgumentException("Invalid correlation ID format");
}
}
// Implement size and content validation
public boolean validateCorrelationId(String correlationId) {
if (correlationId == null || correlationId.trim().isEmpty()) {
return false;
}
// Check length constraints
if (correlationId.length() > 255) {
log.warn("Correlation ID too long: {}", correlationId.length());
return false;
}
// Check for security risks
if (containsSecurityRisks(correlationId)) {
log.warn("Security risk detected in correlation ID");
return false;
}
// Check format compliance
if (!matchesAllowedPattern(correlationId)) {
log.warn("Correlation ID format violation: {}", correlationId);
return false;
}
return true;
}
private boolean containsSecurityRisks(String correlationId) {
return correlationId.contains("<") ||
correlationId.contains(">") ||
correlationId.contains("'") ||
correlationId.contains("\"") ||
correlationId.contains("&") ||
correlationId.toLowerCase().contains("script");
}
private boolean matchesAllowedPattern(String correlationId) {
return correlationId.matches("^[A-Za-z0-9._-]+$");
}
}
2. Performance Optimization
@Component
public class CorrelationPerformanceOptimization {
// Use thread-local caching to avoid repeated context creation
private final ThreadLocal<CorrelationContextCache> contextCache =
ThreadLocal.withInitial(CorrelationContextCache::new);
public CorrelationContext getOrCreateContext(String correlationId) {
CorrelationContextCache cache = contextCache.get();
CorrelationContext context = cache.get(correlationId);
if (context == null) {
context = createNewContext(correlationId);
cache.put(correlationId, context);
}
return context;
}
// Optimize MDC operations
public void updateMDCEfficiently(CorrelationContext context) {
Map<String, String> mdcMap = new HashMap<>();
mdcMap.put("correlationId", context.getCorrelationId());
mdcMap.put("rootId", context.getRootId());
if (context.getParentId() != null) {
mdcMap.put("parentId", context.getParentId());
}
// Batch update MDC
MDC.setContextMap(mdcMap);
}
// Minimize header copying overhead
public Map<String, String> extractCorrelationHeaders(HttpHeaders headers) {
Map<String, String> correlationHeaders = new HashMap<>();
String correlationId = headers.getFirst("X-Correlation-ID");
if (correlationId != null) {
correlationHeaders.put("X-Correlation-ID", correlationId);
String rootId = headers.getFirst("X-Root-ID");
if (rootId != null) {
correlationHeaders.put("X-Root-ID", rootId);
}
String parentId = headers.getFirst("X-Parent-ID");
if (parentId != null) {
correlationHeaders.put("X-Parent-ID", parentId);
}
}
return correlationHeaders;
}
// Context cache implementation
private static class CorrelationContextCache {
private final Map<String, CorrelationContext> cache = new LRUCache<>(100);
public CorrelationContext get(String correlationId) {
return cache.get(correlationId);
}
public void put(String correlationId, CorrelationContext context) {
cache.put(correlationId, context);
}
}
}
3. Monitoring and Alerting
@Component
public class CorrelationMonitoring {
private final MeterRegistry meterRegistry;
private final AlertingService alertingService;
@EventListener
public void handleCorrelationEvent(CorrelationEvent event) {
recordCorrelationMetrics(event);
checkCorrelationAlerts(event);
}
private void recordCorrelationMetrics(CorrelationEvent event) {
String service = event.getServiceName();
// Record correlation propagation
meterRegistry.counter("correlation.propagated",
"service", service,
"type", event.getEventType().name())
.increment();
// Record context creation time
meterRegistry.timer("correlation.context.creation", "service", service)
.record(event.getContextCreationTime());
// Record hierarchy depth
meterRegistry.histogram("correlation.hierarchy.depth", "service", service)
.update(event.getHierarchyDepth());
}
private void checkCorrelationAlerts(CorrelationEvent event) {
// Alert on missing correlation IDs
if (event.getEventType() == CorrelationEventType.MISSING_CORRELATION_ID) {
alertingService.sendAlert(AlertLevel.WARNING,
"Missing correlation ID in service: " + event.getServiceName());
}
// Alert on correlation hierarchy too deep
if (event.getHierarchyDepth() > 10) {
alertingService.sendAlert(AlertLevel.WARNING,
"Deep correlation hierarchy detected: " + event.getHierarchyDepth());
}
// Alert on correlation propagation failures
if (event.getEventType() == CorrelationEventType.PROPAGATION_FAILED) {
alertingService.sendAlert(AlertLevel.CRITICAL,
"Correlation propagation failed in service: " + event.getServiceName());
}
}
@Scheduled(fixedRate = 60000) // Every minute
public void collectCorrelationMetrics() {
// Collect active correlation contexts count
long activeContexts = getActiveContextsCount();
meterRegistry.gauge("correlation.contexts.active", activeContexts);
// Collect correlation cache hit ratio
double cacheHitRatio = getCorrelationCacheHitRatio();
meterRegistry.gauge("correlation.cache.hit.ratio", cacheHitRatio);
// Collect orphaned correlations (no activity in last hour)
long orphanedCorrelations = getOrphanedCorrelationsCount();
meterRegistry.gauge("correlation.orphaned", orphanedCorrelations);
}
}
Common Pitfalls
1. Context Leakage
Problem: Correlation context not properly cleaned up leading to memory leaks
Solution: Use try-finally blocks and automatic cleanup mechanisms
2. Header Size Explosion
Problem: Too many correlation headers causing HTTP header size limits
Solution: Use compact correlation formats and limit hierarchy depth
3. Performance Overhead
Problem: Excessive correlation processing affecting application performance
Solution: Optimize context operations and use caching strategies
4. Correlation ID Collisions
Problem: Non-unique correlation IDs causing tracking confusion
Solution: Use proper UUID generation or instance-specific prefixes
5. Missing Propagation
Problem: Correlation IDs lost at service boundaries
Solution: Implement comprehensive propagation strategies and monitoring
Integration in Distributed Systems
Microservices Correlation
@Service
public class DistributedOrderService {
public OrderResponse processOrder(OrderRequest request) {
String correlationId = contextManager.getCurrentCorrelationId();
// Call inventory service
InventoryResponse inventory = inventoryClient.reserveInventory(request.getItems());
// Call payment service
PaymentResponse payment = paymentClient.processPayment(request.getPayment());
// Call shipping service
ShippingResponse shipping = shippingClient.scheduleShipping(request.getShipping());
return OrderResponse.builder()
.correlationId(correlationId)
.orderStatus("COMPLETED")
.build();
}
}
Event Sourcing Integration
@Service
public class EventSourcedService {
public void handleCommand(Command command) {
String correlationId = contextManager.getCurrentCorrelationId();
Event event = Event.builder()
.correlationId(correlationId)
.causationId(command.getId())
.aggregateId(command.getAggregateId())
.eventType(command.getEventType())
.eventData(command.getData())
.build();
eventStore.append(event);
}
}
Conclusion
The Correlation ID pattern is essential for observability in distributed systems. It provides:
- End-to-End Traceability: Complete visibility into distributed request flows
- Enhanced Debugging: Simplified troubleshooting across service boundaries
- Operational Excellence: Comprehensive monitoring and alerting capabilities
- Audit Compliance: Complete transaction history for regulatory requirements
When properly implemented with automatic propagation, performance optimization, and comprehensive monitoring, the Correlation ID pattern enables robust observability that scales with system complexity while maintaining minimal performance overhead.
References
- Distributed Tracing Best Practices
- Spring Cloud Sleuth Documentation
- OpenTelemetry Specification
- Microservices Patterns - Distributed Tracing