Skip to content

Dağıtık İzleme ve Korelasyon Kimlikleri (Correlation IDs)

Mikroservis mimarisinde isteklerin servisler arasında takibi ve korelasyon yönetimi için gelişmiş teknikler.

Sistem Mimarisi

Korelasyon ID Yaşam Döngüsü

Korelasyon Kimliği (Correlation ID) Uygulaması

MDC (Mapped Diagnostic Context) Kullanımı

MDC, thread-local storage kullanarak context bilgilerini yönetir:

java
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class CorrelationIdFilter implements Filter {
    
    private static final String CORRELATION_ID_HEADER = "X-Correlation-ID";
    private static final String CORRELATION_ID_MDC_KEY = "correlationId";
    private static final String USER_ID_HEADER = "X-User-ID";
    private static final String SESSION_ID_HEADER = "X-Session-ID";
    private static final String REQUEST_ID_HEADER = "X-Request-ID";
    
    @Override
    public void doFilter(ServletRequest request, ServletResponse response, 
                        FilterChain chain) throws IOException, ServletException {
        
        HttpServletRequest httpRequest = (HttpServletRequest) request;
        HttpServletResponse httpResponse = (HttpServletResponse) response;
        
        // Correlation ID'yi header'dan al veya oluştur
        String correlationId = extractOrGenerateCorrelationId(httpRequest);
        String userId = httpRequest.getHeader(USER_ID_HEADER);
        String sessionId = httpRequest.getHeader(SESSION_ID_HEADER);
        String requestId = UUID.randomUUID().toString();
        
        try {
            // MDC'ye context bilgilerini ekle
            MDC.put(CORRELATION_ID_MDC_KEY, correlationId);
            MDC.put("userId", userId != null ? userId : "anonymous");
            MDC.put("sessionId", sessionId != null ? sessionId : "none");
            MDC.put("requestId", requestId);
            MDC.put("userAgent", httpRequest.getHeader("User-Agent"));
            MDC.put("ipAddress", getClientIpAddress(httpRequest));
            MDC.put("endpoint", httpRequest.getRequestURI());
            MDC.put("method", httpRequest.getMethod());
            
            // Response header'larına ekle
            httpResponse.setHeader(CORRELATION_ID_HEADER, correlationId);
            httpResponse.setHeader(REQUEST_ID_HEADER, requestId);
            
            chain.doFilter(request, response);
            
        } finally {
            // Thread temizliği
            MDC.clear();
        }
    }
    
    private String extractOrGenerateCorrelationId(HttpServletRequest request) {
        String correlationId = request.getHeader(CORRELATION_ID_HEADER);
        if (correlationId == null || correlationId.trim().isEmpty()) {
            correlationId = generateCorrelationId();
        }
        return correlationId;
    }
    
    private String generateCorrelationId() {
        // Timestamp + Random = Unique ID
        return System.currentTimeMillis() + "-" + UUID.randomUUID().toString().substring(0, 8);
    }
    
    private String getClientIpAddress(HttpServletRequest request) {
        String xForwardedFor = request.getHeader("X-Forwarded-For");
        if (xForwardedFor != null && !xForwardedFor.isEmpty()) {
            return xForwardedFor.split(",")[0].trim();
        }
        
        String xRealIp = request.getHeader("X-Real-IP");
        if (xRealIp != null && !xRealIp.isEmpty()) {
            return xRealIp;
        }
        
        return request.getRemoteAddr();
    }
}

Context Propagation Service

java
@Service
@Component
public class ContextPropagationService {
    
    private static final String CORRELATION_ID_KEY = "correlationId";
    private static final String USER_ID_KEY = "userId";
    private static final String SESSION_ID_KEY = "sessionId";
    private static final String TRACE_ID_KEY = "traceId";
    private static final String SPAN_ID_KEY = "spanId";
    
    public CorrelationContext getCurrentContext() {
        return CorrelationContext.builder()
                .correlationId(MDC.get(CORRELATION_ID_KEY))
                .userId(MDC.get(USER_ID_KEY))
                .sessionId(MDC.get(SESSION_ID_KEY))
                .traceId(MDC.get(TRACE_ID_KEY))
                .spanId(MDC.get(SPAN_ID_KEY))
                .ipAddress(MDC.get("ipAddress"))
                .userAgent(MDC.get("userAgent"))
                .endpoint(MDC.get("endpoint"))
                .method(MDC.get("method"))
                .build();
    }
    
    public void setContext(CorrelationContext context) {
        if (context.getCorrelationId() != null) {
            MDC.put(CORRELATION_ID_KEY, context.getCorrelationId());
        }
        if (context.getUserId() != null) {
            MDC.put(USER_ID_KEY, context.getUserId());
        }
        if (context.getSessionId() != null) {
            MDC.put(SESSION_ID_KEY, context.getSessionId());
        }
        if (context.getTraceId() != null) {
            MDC.put(TRACE_ID_KEY, context.getTraceId());
        }
        if (context.getSpanId() != null) {
            MDC.put(SPAN_ID_KEY, context.getSpanId());
        }
    }
    
    public <T> T executeWithContext(CorrelationContext context, Supplier<T> action) {
        CorrelationContext previousContext = getCurrentContext();
        try {
            setContext(context);
            return action.get();
        } finally {
            if (previousContext != null) {
                setContext(previousContext);
            } else {
                MDC.clear();
            }
        }
    }
    
    public void executeWithContext(CorrelationContext context, Runnable action) {
        executeWithContext(context, () -> {
            action.run();
            return null;
        });
    }
}

@Data
@Builder
public class CorrelationContext {
    private String correlationId;
    private String userId;
    private String sessionId;
    private String traceId;
    private String spanId;
    private String ipAddress;
    private String userAgent;
    private String endpoint;
    private String method;
    private Instant timestamp;
    
    public Map<String, String> toHeaders() {
        Map<String, String> headers = new HashMap<>();
        if (correlationId != null) headers.put("X-Correlation-ID", correlationId);
        if (userId != null) headers.put("X-User-ID", userId);
        if (sessionId != null) headers.put("X-Session-ID", sessionId);
        if (traceId != null) headers.put("X-Trace-ID", traceId);
        if (spanId != null) headers.put("X-Span-ID", spanId);
        return headers;
    }
}

Servisler Arası Korelasyon

HTTP İstemci Araçları için Kesici (Interceptor)

java
@Component
public class CorrelationHttpClientInterceptor implements ClientHttpRequestInterceptor {
    
    private final ContextPropagationService contextService;
    
    public CorrelationHttpClientInterceptor(ContextPropagationService contextService) {
        this.contextService = contextService;
    }
    
    @Override
    public ClientHttpResponse intercept(
            HttpRequest request, 
            byte[] body,
            ClientHttpRequestExecution execution) throws IOException {
        
        // Mevcut context'i al
        CorrelationContext context = contextService.getCurrentContext();
        
        // Context bilgilerini header'lara ekle
        HttpHeaders headers = request.getHeaders();
        context.toHeaders().forEach(headers::set);
        
        // Service-to-service call metadata'sı ekle
        headers.set("X-Calling-Service", getCurrentServiceName());
        headers.set("X-Call-Timestamp", Instant.now().toString());
        headers.set("X-Call-Depth", String.valueOf(getCurrentCallDepth() + 1));
        
        return execution.execute(request, body);
    }
    
    private String getCurrentServiceName() {
        return System.getProperty("spring.application.name", "unknown");
    }
    
    private int getCurrentCallDepth() {
        String depth = MDC.get("callDepth");
        return depth != null ? Integer.parseInt(depth) : 0;
    }
}

Feign İstemci Araçları için Kesici (Interceptor)

java
@Component
public class CorrelationFeignInterceptor implements RequestInterceptor {
    
    private final ContextPropagationService contextService;
    
    public CorrelationFeignInterceptor(ContextPropagationService contextService) {
        this.contextService = contextService;
    }
    
    @Override
    public void apply(RequestTemplate template) {
        CorrelationContext context = contextService.getCurrentContext();
        
        // Context header'larını ekle
        context.toHeaders().forEach(template::header);
        
        // Feign-specific metadata
        template.header("X-Client-Type", "feign");
        template.header("X-Client-Service", getCurrentServiceName());
        template.header("X-Client-Instance", getInstanceId());
        
        // Request timing
        template.header("X-Request-Start-Time", String.valueOf(System.currentTimeMillis()));
    }
    
    private String getCurrentServiceName() {
        return System.getProperty("spring.application.name", "unknown");
    }
    
    private String getInstanceId() {
        return System.getProperty("instance.id", InetAddress.getLocalHost().getHostName());
    }
}

Mesaj Kuyruğu Korelasyonu

java
@Component
public class CorrelationKafkaProducer {
    
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final ContextPropagationService contextService;
    
    public CorrelationKafkaProducer(KafkaTemplate<String, Object> kafkaTemplate,
                                   ContextPropagationService contextService) {
        this.kafkaTemplate = kafkaTemplate;
        this.contextService = contextService;
    }
    
    public void sendMessage(String topic, Object message) {
        CorrelationContext context = contextService.getCurrentContext();
        
        // Message header'larına context bilgilerini ekle
        ProducerRecord<String, Object> record = new ProducerRecord<>(topic, message);
        context.toHeaders().forEach((key, value) -> 
                record.headers().add(key, value.getBytes(StandardCharsets.UTF_8)));
        
        // Producer metadata
        record.headers().add("X-Producer-Service", getCurrentServiceName().getBytes());
        record.headers().add("X-Message-Timestamp", 
                Instant.now().toString().getBytes(StandardCharsets.UTF_8));
        record.headers().add("X-Message-Id", 
                UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8));
        
        kafkaTemplate.send(record);
    }
    
    private String getCurrentServiceName() {
        return System.getProperty("spring.application.name", "unknown");
    }
}

@KafkaListener(topics = "order-events")
public class CorrelationKafkaConsumer {
    
    private final ContextPropagationService contextService;
    
    public CorrelationKafkaConsumer(ContextPropagationService contextService) {
        this.contextService = contextService;
    }
    
    @KafkaListener(topics = "order-events")
    public void handleOrderEvent(@Payload OrderEvent event, 
                                @Header Map<String, Object> headers) {
        
        // Header'lardan context'i reconstruct et
        CorrelationContext context = reconstructContextFromHeaders(headers);
        
        // Context'i set et ve işlemi yap
        contextService.executeWithContext(context, () -> {
            processOrderEvent(event);
        });
    }
    
    private CorrelationContext reconstructContextFromHeaders(Map<String, Object> headers) {
        return CorrelationContext.builder()
                .correlationId(getStringHeader(headers, "X-Correlation-ID"))
                .userId(getStringHeader(headers, "X-User-ID"))
                .sessionId(getStringHeader(headers, "X-Session-ID"))
                .traceId(getStringHeader(headers, "X-Trace-ID"))
                .spanId(getStringHeader(headers, "X-Span-ID"))
                .build();
    }
    
    private String getStringHeader(Map<String, Object> headers, String key) {
        Object value = headers.get(key);
        if (value instanceof byte[]) {
            return new String((byte[]) value, StandardCharsets.UTF_8);
        }
        return value != null ? value.toString() : null;
    }
    
    private void processOrderEvent(OrderEvent event) {
        // İş mantığı burada
        log.info("Processing order event: {}", event);
    }
}

Gelişmiş Korelasyon Desenleri

Kullanıcı Seyahati Takibi

java
@Service
@Slf4j
public class UserJourneyTracker {
    
    private final RedisTemplate<String, Object> redisTemplate;
    private final ContextPropagationService contextService;
    
    public UserJourneyTracker(RedisTemplate<String, Object> redisTemplate,
                             ContextPropagationService contextService) {
        this.redisTemplate = redisTemplate;
        this.contextService = contextService;
    }
    
    public void trackUserAction(String action, Map<String, Object> metadata) {
        CorrelationContext context = contextService.getCurrentContext();
        
        UserAction userAction = UserAction.builder()
                .sessionId(context.getSessionId())
                .userId(context.getUserId())
                .correlationId(context.getCorrelationId())
                .action(action)
                .metadata(metadata)
                .timestamp(Instant.now())
                .endpoint(context.getEndpoint())
                .userAgent(context.getUserAgent())
                .ipAddress(context.getIpAddress())
                .build();
        
        // Redis'e journey bilgisini kaydet
        String journeyKey = "user:journey:" + context.getSessionId();
        redisTemplate.opsForList().rightPush(journeyKey, userAction);
        redisTemplate.expire(journeyKey, Duration.ofHours(24));
        
        log.info("User action tracked: sessionId={}, action={}, userId={}", 
                context.getSessionId(), action, context.getUserId());
    }
    
    public List<UserAction> getUserJourney(String sessionId) {
        String journeyKey = "user:journey:" + sessionId;
        List<Object> actions = redisTemplate.opsForList().range(journeyKey, 0, -1);
        
        return actions.stream()
                .map(obj -> (UserAction) obj)
                .collect(Collectors.toList());
    }
    
    public void trackConversion(String conversionType, double value) {
        CorrelationContext context = contextService.getCurrentContext();
        
        ConversionEvent conversion = ConversionEvent.builder()
                .sessionId(context.getSessionId())
                .userId(context.getUserId())
                .correlationId(context.getCorrelationId())
                .conversionType(conversionType)
                .value(value)
                .timestamp(Instant.now())
                .build();
        
        // Conversion event'ini ayrı bir stream'e kaydet
        String conversionKey = "conversion:events:" + LocalDate.now().toString();
        redisTemplate.opsForList().rightPush(conversionKey, conversion);
        redisTemplate.expire(conversionKey, Duration.ofDays(90));
        
        log.info("Conversion tracked: userId={}, type={}, value={}", 
                context.getUserId(), conversionType, value);
    }
}

@Data
@Builder
public class UserAction {
    private String sessionId;
    private String userId;
    private String correlationId;
    private String action;
    private Map<String, Object> metadata;
    private Instant timestamp;
    private String endpoint;
    private String userAgent;
    private String ipAddress;
}

@Data
@Builder
public class ConversionEvent {
    private String sessionId;
    private String userId;
    private String correlationId;
    private String conversionType;
    private double value;
    private Instant timestamp;
}

Hata Korelasyonu

java
@Component
@Slf4j
public class ErrorCorrelationService {
    
    private final ContextPropagationService contextService;
    private final MeterRegistry meterRegistry;
    
    public ErrorCorrelationService(ContextPropagationService contextService,
                                  MeterRegistry meterRegistry) {
        this.contextService = contextService;
        this.meterRegistry = meterRegistry;
    }
    
    public void logError(Exception exception, String operation) {
        CorrelationContext context = contextService.getCurrentContext();
        
        ErrorEvent errorEvent = ErrorEvent.builder()
                .correlationId(context.getCorrelationId())
                .userId(context.getUserId())
                .sessionId(context.getSessionId())
                .operation(operation)
                .errorType(exception.getClass().getSimpleName())
                .errorMessage(exception.getMessage())
                .stackTrace(getStackTrace(exception))
                .timestamp(Instant.now())
                .endpoint(context.getEndpoint())
                .method(context.getMethod())
                .ipAddress(context.getIpAddress())
                .userAgent(context.getUserAgent())
                .build();
        
        // Structured error logging
        MDC.put("errorType", errorEvent.getErrorType());
        MDC.put("operation", operation);
        log.error("Operation failed: operation={}, error={}, correlationId={}", 
                operation, exception.getMessage(), context.getCorrelationId(), exception);
        
        // Error metrics
        meterRegistry.counter("errors.total",
                Tags.of(
                    Tag.of("error.type", errorEvent.getErrorType()),
                    Tag.of("operation", operation),
                    Tag.of("endpoint", context.getEndpoint())
                )).increment();
        
        // Kritik hatalar için alert
        if (isCriticalError(exception)) {
            sendCriticalErrorAlert(errorEvent);
        }
    }
    
    private String getStackTrace(Exception exception) {
        StringWriter sw = new StringWriter();
        PrintWriter pw = new PrintWriter(sw);
        exception.printStackTrace(pw);
        return sw.toString();
    }
    
    private boolean isCriticalError(Exception exception) {
        return exception instanceof OutOfMemoryError ||
               exception instanceof StackOverflowError ||
               exception instanceof SecurityException ||
               exception.getMessage().contains("database connection");
    }
    
    private void sendCriticalErrorAlert(ErrorEvent errorEvent) {
        // Alert mechanism implementation
        log.error("CRITICAL ERROR ALERT: {}", errorEvent);
    }
}

@Data
@Builder
public class ErrorEvent {
    private String correlationId;
    private String userId;
    private String sessionId;
    private String operation;
    private String errorType;
    private String errorMessage;
    private String stackTrace;
    private Instant timestamp;
    private String endpoint;
    private String method;
    private String ipAddress;
    private String userAgent;
}

Üretim (Production) Dikkate Alınması Gerekenler

Örnekleme Stratejisi

java
@Configuration
public class TracingSamplingConfiguration {
    
    @Bean
    public ProbabilityBasedSampler probabilitySampler() {
        // Production'da düşük sampling rate kullan
        return ProbabilityBasedSampler.create(0.1f); // %10
    }
    
    @Bean
    public RateLimitingSampler rateLimitingSampler() {
        // Saniyede maksimum 100 trace
        return RateLimitingSampler.create(100);
    }
    
    @Bean
    public CustomSampler customSampler() {
        return new CustomSampler();
    }
    
    public static class CustomSampler implements Sampler {
        
        @Override
        public SamplingResult shouldSample(Context parentContext, String traceId, 
                                         String name, SpanKind spanKind, 
                                         Attributes attributes, List<Link> parentLinks) {
            
            // Kritik endpoint'ler için %100 sampling
            String endpoint = attributes.get(AttributeKey.stringKey("http.target"));
            if (endpoint != null && (endpoint.contains("/payment") || endpoint.contains("/order"))) {
                return SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE);
            }
            
            // Health check endpoint'leri için sampling yapma
            if (endpoint != null && endpoint.contains("/actuator/health")) {
                return SamplingResult.create(SamplingDecision.NOT_RECORD);
            }
            
            // Diğer endpoint'ler için %5 sampling
            return Math.random() < 0.05 ? 
                    SamplingResult.create(SamplingDecision.RECORD_AND_SAMPLE) :
                    SamplingResult.create(SamplingDecision.NOT_RECORD);
        }
    }
}

Performans Etkisi İzleme

java
@Component
@Slf4j
public class TracingPerformanceMonitor {
    
    private final MeterRegistry meterRegistry;
    private final Timer tracingOverheadTimer;
    
    public TracingPerformanceMonitor(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.tracingOverheadTimer = Timer.builder("tracing.overhead")
                .description("Time spent on tracing operations")
                .register(meterRegistry);
    }
    
    @EventListener
    public void handleSpanEnd(SpanEndEvent event) {
        // Span süresini ölç
        long spanDuration = event.getEndEpochNanos() - event.getStartEpochNanos();
        
        // Tracing overhead'i hesapla (yaklaşık)
        long estimatedOverhead = spanDuration / 100; // %1 overhead varsayımı
        
        tracingOverheadTimer.record(estimatedOverhead, TimeUnit.NANOSECONDS);
        
        // Uzun süren span'ler için uyarı
        if (TimeUnit.NANOSECONDS.toMillis(spanDuration) > 5000) {
            log.warn("Long-running span detected: name={}, duration={}ms", 
                    event.getSpanName(), TimeUnit.NANOSECONDS.toMillis(spanDuration));
        }
    }
    
    @Scheduled(fixedRate = 60000) // Her dakika
    public void reportTracingMetrics() {
        double averageOverhead = tracingOverheadTimer.mean(TimeUnit.MICROSECONDS);
        log.info("Tracing performance - Average overhead: {}μs", averageOverhead);
        
        // Memory usage for tracing
        long tracingMemoryUsage = getTracingMemoryUsage();
        meterRegistry.gauge("tracing.memory.usage", tracingMemoryUsage);
    }
    
    private long getTracingMemoryUsage() {
        // Tracing-related memory usage calculation
        return Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
    }
}

Bu Türkçe dağıtık izleme dokümantasyonu, korelasyon ID yönetimi, cross-service propagation ve üretim dikkate alınması gerekenleri kapsamlı bir şekilde ele alır.

Eren Demir tarafından oluşturulmuştur.