3.4 Backpressure Kontrolü
Genel Bakış
Backpressure control, sistem kapasitesini aşan isteklerin güvenli bir şekilde yönetilmesi için kritik bir mekanizmadır. Yüksek trafikli durumlarda sistemin çökmesini önleyerek performans ve güvenilirliği korur.
Rate Limiting
Token Bucket Algoritması
java
@Component
public class TokenBucketRateLimiter {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public boolean isAllowed(String key, int capacity, int refillRate) {
String script = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local tokens = tonumber(ARGV[2])
local interval = tonumber(ARGV[3])
local bucket = redis.call('hmget', key, 'tokens', 'last_refill')
local tokens_count = tonumber(bucket[1]) or capacity
local last_refill = tonumber(bucket[2]) or redis.call('time')[1]
local now = redis.call('time')[1]
local elapsed = now - last_refill
local tokens_to_add = math.floor(elapsed * tokens / interval)
tokens_count = math.min(capacity, tokens_count + tokens_to_add)
if tokens_count >= 1 then
tokens_count = tokens_count - 1
redis.call('hmset', key, 'tokens', tokens_count, 'last_refill', now)
redis.call('expire', key, interval * 2)
return 1
else
redis.call('hmset', key, 'tokens', tokens_count, 'last_refill', now)
redis.call('expire', key, interval * 2)
return 0
end
""";
List<String> keys = Arrays.asList(key);
List<Object> args = Arrays.asList(capacity, refillRate, 60);
Long result = (Long) redisTemplate.execute(
(RedisCallback<Long>) connection ->
connection.eval(script.getBytes(),
ReturnType.INTEGER,
keys.size(),
keys.toArray(new String[0]),
args.toArray(new String[0])
)
);
return result != null && result == 1;
}
}
API Gateway Rate Limiting
java
@RestController
@RequestMapping("/api")
public class RateLimitedController {
@Autowired
private TokenBucketRateLimiter rateLimiter;
@GetMapping("/data")
public ResponseEntity<?> getData(HttpServletRequest request) {
String clientId = getClientId(request);
String key = "rate_limit:" + clientId;
// Her client için dakikada 100 istek
if (!rateLimiter.isAllowed(key, 100, 100)) {
HttpHeaders headers = new HttpHeaders();
headers.add("X-RateLimit-Limit", "100");
headers.add("X-RateLimit-Remaining", "0");
headers.add("Retry-After", "60");
return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS)
.headers(headers)
.body(Map.of("error", "Rate limit exceeded"));
}
return ResponseEntity.ok(processData());
}
private String getClientId(HttpServletRequest request) {
String apiKey = request.getHeader("X-API-Key");
if (apiKey != null) {
return apiKey;
}
return request.getRemoteAddr();
}
}
Queue Management
Message Queue Backpressure
java
@Configuration
@EnableRabbitMQ
public class QueueBackpressureConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
// Publisher confirms için callback
template.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
log.error("Message rejected: {}", cause);
// Retry logic veya dead letter queue
}
});
// Return callback for unroutable messages
template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.error("Message returned: {} - {}", replyCode, replyText);
});
return template;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// Backpressure control
factory.setPrefetchCount(10); // Her consumer için max 10 mesaj
factory.setConcurrentConsumers(2);
factory.setMaxConcurrentConsumers(10);
// Error handling
factory.setErrorHandler(new ConditionalRejectingErrorHandler(
new ConditionalRejectingErrorHandler.DefaultExceptionStrategy())
);
return factory;
}
}
Async Processing with Backpressure
java
@Service
public class BackpressureAwareProcessor {
private final Semaphore semaphore;
private final ExecutorService executorService;
public BackpressureAwareProcessor() {
this.semaphore = new Semaphore(100); // Max 100 concurrent operations
this.executorService = Executors.newFixedThreadPool(20);
}
@Async
public CompletableFuture<String> processWithBackpressure(String data) {
return CompletableFuture.supplyAsync(() -> {
try {
// Semaphore ile concurrency control
if (!semaphore.tryAcquire(1, TimeUnit.SECONDS)) {
throw new BackpressureException("System overloaded");
}
try {
return processData(data);
} finally {
semaphore.release();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}, executorService);
}
private String processData(String data) {
// Simulate processing time
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Processed: " + data;
}
}
@ResponseStatus(HttpStatus.SERVICE_UNAVAILABLE)
public class BackpressureException extends RuntimeException {
public BackpressureException(String message) {
super(message);
}
}
Circuit Breaking
Resilience4j Integration
java
@Configuration
public class CircuitBreakerConfig {
@Bean
public CircuitBreaker databaseCircuitBreaker() {
return CircuitBreaker.ofDefaults("database");
}
@Bean
public CircuitBreakerRegistry circuitBreakerRegistry() {
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.failureRateThreshold(50) // %50 failure rate
.waitDurationInOpenState(Duration.ofSeconds(30))
.slidingWindowSize(10)
.minimumNumberOfCalls(5)
.slowCallRateThreshold(50)
.slowCallDurationThreshold(Duration.ofSeconds(2))
.build();
return CircuitBreakerRegistry.of(config);
}
}
@Service
public class ExternalServiceClient {
private final CircuitBreaker circuitBreaker;
private final TimeLimiter timeLimiter;
private final Retry retry;
public ExternalServiceClient(CircuitBreakerRegistry registry) {
this.circuitBreaker = registry.circuitBreaker("external-service");
this.timeLimiter = TimeLimiter.ofDefaults();
this.retry = Retry.ofDefaults("external-service");
}
public CompletableFuture<String> callExternalService(String data) {
Supplier<CompletableFuture<String>> decoratedSupplier =
Decorators.ofSupplier(() -> CompletableFuture.supplyAsync(() -> {
// External service call simulation
if (Math.random() > 0.7) {
throw new RuntimeException("Service unavailable");
}
return "Response for: " + data;
}))
.withCircuitBreaker(circuitBreaker)
.withTimeLimiter(timeLimiter)
.withRetry(retry)
.decorate();
return decoratedSupplier.get();
}
}
Connection Pooling
Database Connection Management
java
@Configuration
public class DatabasePoolConfig {
@Bean
@Primary
public DataSource dataSource() {
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:postgresql://localhost:5432/mydb");
config.setUsername("user");
config.setPassword("password");
// Connection pool settings
config.setMaximumPoolSize(20);
config.setMinimumIdle(5);
config.setConnectionTimeout(30000); // 30 seconds
config.setIdleTimeout(600000); // 10 minutes
config.setMaxLifetime(1800000); // 30 minutes
// Backpressure settings
config.setLeakDetectionThreshold(60000); // 1 minute
return new HikariDataSource(config);
}
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
JdbcTemplate template = new JdbcTemplate(dataSource);
template.setQueryTimeout(30); // 30 seconds query timeout
return template;
}
}
HTTP Client Pool Management
java
@Configuration
public class HttpClientConfig {
@Bean
public RestTemplate restTemplate() {
HttpComponentsClientHttpRequestFactory factory =
new HttpComponentsClientHttpRequestFactory();
// Connection pool configuration
factory.setConnectTimeout(5000); // 5 seconds
factory.setReadTimeout(10000); // 10 seconds
CloseableHttpClient httpClient = HttpClients.custom()
.setMaxConnTotal(100) // Total max connections
.setMaxConnPerRoute(20) // Max connections per route
.setConnectionTimeToLive(30, TimeUnit.SECONDS)
.build();
factory.setHttpClient(httpClient);
RestTemplate restTemplate = new RestTemplate(factory);
// Error handling for backpressure
restTemplate.setErrorHandler(new ResponseErrorHandler() {
@Override
public boolean hasError(ClientHttpResponse response) throws IOException {
return response.getStatusCode().series() ==
HttpStatus.Series.CLIENT_ERROR ||
response.getStatusCode().series() ==
HttpStatus.Series.SERVER_ERROR;
}
@Override
public void handleError(ClientHttpResponse response) throws IOException {
if (response.getStatusCode() == HttpStatus.TOO_MANY_REQUESTS) {
throw new BackpressureException("Rate limit exceeded");
}
throw new RestClientException("HTTP error: " +
response.getStatusCode());
}
});
return restTemplate;
}
}
Monitoring ve Alerting
Backpressure Metrics
java
@Component
public class BackpressureMetrics {
private final MeterRegistry meterRegistry;
private final Counter rejectedRequests;
private final Gauge queueSize;
private final Timer processingTime;
public BackpressureMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.rejectedRequests = Counter.builder("requests.rejected")
.description("Number of rejected requests due to backpressure")
.register(meterRegistry);
this.queueSize = Gauge.builder("queue.size")
.description("Current queue size")
.register(meterRegistry, this, BackpressureMetrics::getCurrentQueueSize);
this.processingTime = Timer.builder("processing.time")
.description("Request processing time")
.register(meterRegistry);
}
public void recordRejection(String reason) {
rejectedRequests.increment(Tags.of("reason", reason));
}
public Timer.Sample startTimer() {
return Timer.start(meterRegistry);
}
private double getCurrentQueueSize() {
// Implementation to get current queue size
return 0.0;
}
}
Health Check Integration
java
@Component
public class BackpressureHealthIndicator implements HealthIndicator {
private final BackpressureAwareProcessor processor;
private final TokenBucketRateLimiter rateLimiter;
public BackpressureHealthIndicator(
BackpressureAwareProcessor processor,
TokenBucketRateLimiter rateLimiter) {
this.processor = processor;
this.rateLimiter = rateLimiter;
}
@Override
public Health health() {
Health.Builder builder = new Health.Builder();
try {
// Check system capacity
boolean hasCapacity = checkSystemCapacity();
if (hasCapacity) {
builder.up()
.withDetail("status", "System has available capacity")
.withDetail("available_permits", getAvailablePermits());
} else {
builder.down()
.withDetail("status", "System at capacity")
.withDetail("available_permits", 0);
}
} catch (Exception e) {
builder.down()
.withDetail("error", e.getMessage());
}
return builder.build();
}
private boolean checkSystemCapacity() {
// Implementation to check system capacity
return true;
}
private int getAvailablePermits() {
// Implementation to get available permits
return 50;
}
}
Bu backpressure control implementasyonu, sistemin aşırı yüklenme durumlarında güvenli bir şekilde çalışmasını sağlar ve performans degradasyonunu önler.