Skip to content

3.4 Backpressure Control

Overview

Backpressure control is a critical mechanism for safely managing requests that exceed system capacity. It prevents system crashes during high traffic scenarios while maintaining performance and reliability.

Rate Limiting

Token Bucket Algorithm

java
@Component
public class TokenBucketRateLimiter {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    public boolean isAllowed(String key, int capacity, int refillRate) {
        String script = """
            local key = KEYS[1]
            local capacity = tonumber(ARGV[1])
            local tokens = tonumber(ARGV[2])
            local interval = tonumber(ARGV[3])
            
            local bucket = redis.call('hmget', key, 'tokens', 'last_refill')
            local tokens_count = tonumber(bucket[1]) or capacity
            local last_refill = tonumber(bucket[2]) or redis.call('time')[1]
            
            local now = redis.call('time')[1]
            local elapsed = now - last_refill
            local tokens_to_add = math.floor(elapsed * tokens / interval)
            
            tokens_count = math.min(capacity, tokens_count + tokens_to_add)
            
            if tokens_count >= 1 then
                tokens_count = tokens_count - 1
                redis.call('hmset', key, 'tokens', tokens_count, 'last_refill', now)
                redis.call('expire', key, interval * 2)
                return 1
            else
                redis.call('hmset', key, 'tokens', tokens_count, 'last_refill', now)
                redis.call('expire', key, interval * 2)
                return 0
            end
        """;
        
        List<String> keys = Arrays.asList(key);
        List<Object> args = Arrays.asList(capacity, refillRate, 60);
        
        Long result = (Long) redisTemplate.execute(
            (RedisCallback<Long>) connection -> 
                connection.eval(script.getBytes(), 
                    ReturnType.INTEGER, 
                    keys.size(), 
                    keys.toArray(new String[0]),
                    args.toArray(new String[0])
                )
        );
        
        return result != null && result == 1;
    }
}

API Gateway Rate Limiting

java
@RestController
@RequestMapping("/api")
public class RateLimitedController {
    
    @Autowired
    private TokenBucketRateLimiter rateLimiter;
    
    @GetMapping("/data")
    public ResponseEntity<?> getData(HttpServletRequest request) {
        String clientId = getClientId(request);
        String key = "rate_limit:" + clientId;
        
        // 100 requests per minute per client
        if (!rateLimiter.isAllowed(key, 100, 100)) {
            HttpHeaders headers = new HttpHeaders();
            headers.add("X-RateLimit-Limit", "100");
            headers.add("X-RateLimit-Remaining", "0");
            headers.add("Retry-After", "60");
            
            return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS)
                .headers(headers)
                .body(Map.of("error", "Rate limit exceeded"));
        }
        
        return ResponseEntity.ok(processData());
    }
    
    private String getClientId(HttpServletRequest request) {
        String apiKey = request.getHeader("X-API-Key");
        if (apiKey != null) {
            return apiKey;
        }
        return request.getRemoteAddr();
    }
}

Queue Management

Message Queue Backpressure

java
@Configuration
@EnableRabbitMQ
public class QueueBackpressureConfig {
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        
        // Publisher confirms callback
        template.setConfirmCallback((correlationData, ack, cause) -> {
            if (!ack) {
                log.error("Message rejected: {}", cause);
                // Retry logic or dead letter queue
            }
        });
        
        // Return callback for unroutable messages
        template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.error("Message returned: {} - {}", replyCode, replyText);
        });
        
        return template;
    }
    
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory) {
        
        SimpleRabbitListenerContainerFactory factory = 
            new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        
        // Backpressure control
        factory.setPrefetchCount(10); // Max 10 messages per consumer
        factory.setConcurrentConsumers(2);
        factory.setMaxConcurrentConsumers(10);
        
        // Error handling
        factory.setErrorHandler(new ConditionalRejectingErrorHandler(
            new ConditionalRejectingErrorHandler.DefaultExceptionStrategy())
        );
        
        return factory;
    }
}

Async Processing with Backpressure

java
@Service
public class BackpressureAwareProcessor {
    
    private final Semaphore semaphore;
    private final ExecutorService executorService;
    
    public BackpressureAwareProcessor() {
        this.semaphore = new Semaphore(100); // Max 100 concurrent operations
        this.executorService = Executors.newFixedThreadPool(20);
    }
    
    @Async
    public CompletableFuture<String> processWithBackpressure(String data) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                // Semaphore for concurrency control
                if (!semaphore.tryAcquire(1, TimeUnit.SECONDS)) {
                    throw new BackpressureException("System overloaded");
                }
                
                try {
                    return processData(data);
                } finally {
                    semaphore.release();
                }
                
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }, executorService);
    }
    
    private String processData(String data) {
        // Simulate processing time
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return "Processed: " + data;
    }
}

@ResponseStatus(HttpStatus.SERVICE_UNAVAILABLE)
public class BackpressureException extends RuntimeException {
    public BackpressureException(String message) {
        super(message);
    }
}

Circuit Breaking

Resilience4j Integration

java
@Configuration
public class CircuitBreakerConfig {
    
    @Bean
    public CircuitBreaker databaseCircuitBreaker() {
        return CircuitBreaker.ofDefaults("database");
    }
    
    @Bean
    public CircuitBreakerRegistry circuitBreakerRegistry() {
        CircuitBreakerConfig config = CircuitBreakerConfig.custom()
            .failureRateThreshold(50) // 50% failure rate
            .waitDurationInOpenState(Duration.ofSeconds(30))
            .slidingWindowSize(10)
            .minimumNumberOfCalls(5)
            .slowCallRateThreshold(50)
            .slowCallDurationThreshold(Duration.ofSeconds(2))
            .build();
            
        return CircuitBreakerRegistry.of(config);
    }
}

@Service
public class ExternalServiceClient {
    
    private final CircuitBreaker circuitBreaker;
    private final TimeLimiter timeLimiter;
    private final Retry retry;
    
    public ExternalServiceClient(CircuitBreakerRegistry registry) {
        this.circuitBreaker = registry.circuitBreaker("external-service");
        this.timeLimiter = TimeLimiter.ofDefaults();
        this.retry = Retry.ofDefaults("external-service");
    }
    
    public CompletableFuture<String> callExternalService(String data) {
        Supplier<CompletableFuture<String>> decoratedSupplier = 
            Decorators.ofSupplier(() -> CompletableFuture.supplyAsync(() -> {
                // External service call simulation
                if (Math.random() > 0.7) {
                    throw new RuntimeException("Service unavailable");
                }
                return "Response for: " + data;
            }))
            .withCircuitBreaker(circuitBreaker)
            .withTimeLimiter(timeLimiter)
            .withRetry(retry)
            .decorate();
            
        return decoratedSupplier.get();
    }
}

Connection Pooling

Database Connection Management

java
@Configuration
public class DatabasePoolConfig {
    
    @Bean
    @Primary
    public DataSource dataSource() {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:postgresql://localhost:5432/mydb");
        config.setUsername("user");
        config.setPassword("password");
        
        // Connection pool settings
        config.setMaximumPoolSize(20);
        config.setMinimumIdle(5);
        config.setConnectionTimeout(30000); // 30 seconds
        config.setIdleTimeout(600000); // 10 minutes
        config.setMaxLifetime(1800000); // 30 minutes
        
        // Backpressure settings
        config.setLeakDetectionThreshold(60000); // 1 minute
        
        return new HikariDataSource(config);
    }
    
    @Bean
    public JdbcTemplate jdbcTemplate(DataSource dataSource) {
        JdbcTemplate template = new JdbcTemplate(dataSource);
        template.setQueryTimeout(30); // 30 seconds query timeout
        return template;
    }
}

HTTP Client Pool Management

java
@Configuration
public class HttpClientConfig {
    
    @Bean
    public RestTemplate restTemplate() {
        HttpComponentsClientHttpRequestFactory factory = 
            new HttpComponentsClientHttpRequestFactory();
            
        // Connection pool configuration
        factory.setConnectTimeout(5000); // 5 seconds
        factory.setReadTimeout(10000); // 10 seconds
        
        CloseableHttpClient httpClient = HttpClients.custom()
            .setMaxConnTotal(100) // Total max connections
            .setMaxConnPerRoute(20) // Max connections per route
            .setConnectionTimeToLive(30, TimeUnit.SECONDS)
            .build();
            
        factory.setHttpClient(httpClient);
        
        RestTemplate restTemplate = new RestTemplate(factory);
        
        // Error handling for backpressure
        restTemplate.setErrorHandler(new ResponseErrorHandler() {
            @Override
            public boolean hasError(ClientHttpResponse response) throws IOException {
                return response.getStatusCode().series() == 
                    HttpStatus.Series.CLIENT_ERROR ||
                    response.getStatusCode().series() == 
                    HttpStatus.Series.SERVER_ERROR;
            }
            
            @Override
            public void handleError(ClientHttpResponse response) throws IOException {
                if (response.getStatusCode() == HttpStatus.TOO_MANY_REQUESTS) {
                    throw new BackpressureException("Rate limit exceeded");
                }
                throw new RestClientException("HTTP error: " + 
                    response.getStatusCode());
            }
        });
        
        return restTemplate;
    }
}

Monitoring and Alerting

Backpressure Metrics

java
@Component
public class BackpressureMetrics {
    
    private final MeterRegistry meterRegistry;
    private final Counter rejectedRequests;
    private final Gauge queueSize;
    private final Timer processingTime;
    
    public BackpressureMetrics(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.rejectedRequests = Counter.builder("requests.rejected")
            .description("Number of rejected requests due to backpressure")
            .register(meterRegistry);
            
        this.queueSize = Gauge.builder("queue.size")
            .description("Current queue size")
            .register(meterRegistry, this, BackpressureMetrics::getCurrentQueueSize);
            
        this.processingTime = Timer.builder("processing.time")
            .description("Request processing time")
            .register(meterRegistry);
    }
    
    public void recordRejection(String reason) {
        rejectedRequests.increment(Tags.of("reason", reason));
    }
    
    public Timer.Sample startTimer() {
        return Timer.start(meterRegistry);
    }
    
    private double getCurrentQueueSize() {
        // Implementation to get current queue size
        return 0.0;
    }
}

Health Check Integration

java
@Component
public class BackpressureHealthIndicator implements HealthIndicator {
    
    private final BackpressureAwareProcessor processor;
    private final TokenBucketRateLimiter rateLimiter;
    
    public BackpressureHealthIndicator(
            BackpressureAwareProcessor processor,
            TokenBucketRateLimiter rateLimiter) {
        this.processor = processor;
        this.rateLimiter = rateLimiter;
    }
    
    @Override
    public Health health() {
        Health.Builder builder = new Health.Builder();
        
        try {
            // Check system capacity
            boolean hasCapacity = checkSystemCapacity();
            
            if (hasCapacity) {
                builder.up()
                    .withDetail("status", "System has available capacity")
                    .withDetail("available_permits", getAvailablePermits());
            } else {
                builder.down()
                    .withDetail("status", "System at capacity")
                    .withDetail("available_permits", 0);
            }
            
        } catch (Exception e) {
            builder.down()
                .withDetail("error", e.getMessage());
        }
        
        return builder.build();
    }
    
    private boolean checkSystemCapacity() {
        // Implementation to check system capacity
        return true;
    }
    
    private int getAvailablePermits() {
        // Implementation to get available permits
        return 50;
    }
}

This backpressure control implementation ensures systems operate safely under overload conditions and prevents performance degradation.

Created by Eren Demir.