Stream Processing
Stream Processing is an architectural approach that continuously processes data streams in real-time. Unlike traditional batch processing, this approach processes data as it arrives and produces immediate results.
Core Concepts
Event Stream
Event Stream is an infinite sequence of data ordered by timestamp:
@Entity
@Table(name = "event_streams")
public class EventStream {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "event_id", unique = true, nullable = false)
private String eventId;
@Column(name = "event_type", nullable = false)
private String eventType;
@Column(name = "event_data", columnDefinition = "TEXT")
private String eventData;
@Column(name = "timestamp", nullable = false)
private Instant timestamp;
@Column(name = "partition_key")
private String partitionKey;
@Column(name = "stream_name", nullable = false)
private String streamName;
// Constructors, getters, setters
public EventStream() {}
public EventStream(String eventId, String eventType, String eventData,
String partitionKey, String streamName) {
this.eventId = eventId;
this.eventType = eventType;
this.eventData = eventData;
this.partitionKey = partitionKey;
this.streamName = streamName;
this.timestamp = Instant.now();
}
// Getters and setters...
}
Stream Producer
Stream producer implementation with Kafka:
@Service
@Slf4j
public class StreamProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
private final MeterRegistry meterRegistry;
private final Timer publishTimer;
private final Counter publishSuccessCounter;
private final Counter publishErrorCounter;
public StreamProducerService(KafkaTemplate<String, String> kafkaTemplate,
MeterRegistry meterRegistry) {
this.kafkaTemplate = kafkaTemplate;
this.meterRegistry = meterRegistry;
this.publishTimer = Timer.builder("stream.publish.duration")
.description("Time taken to publish stream events")
.register(meterRegistry);
this.publishSuccessCounter = Counter.builder("stream.publish.success")
.description("Number of successful stream publishes")
.register(meterRegistry);
this.publishErrorCounter = Counter.builder("stream.publish.error")
.description("Number of failed stream publishes")
.register(meterRegistry);
}
@Async
public CompletableFuture<Void> publishEvent(String streamName,
String partitionKey,
Object eventData) {
return Timer.Sample.start(meterRegistry)
.stop(publishTimer)
.thenCompose(timer -> {
try {
String eventJson = objectMapper.writeValueAsString(eventData);
return kafkaTemplate.send(streamName, partitionKey, eventJson)
.addCallback(
result -> {
publishSuccessCounter.increment();
log.info("Event published successfully to stream: {} with key: {}",
streamName, partitionKey);
},
failure -> {
publishErrorCounter.increment();
log.error("Failed to publish event to stream: {} with key: {}",
streamName, partitionKey, failure);
}
)
.completable()
.thenApply(result -> null);
} catch (Exception e) {
publishErrorCounter.increment();
log.error("Error serializing event data", e);
throw new StreamProcessingException("Failed to serialize event", e);
}
});
}
public void publishOrderEvent(OrderEvent orderEvent) {
String partitionKey = orderEvent.getCustomerId();
publishEvent("order-events", partitionKey, orderEvent);
}
public void publishUserEvent(UserEvent userEvent) {
String partitionKey = userEvent.getUserId();
publishEvent("user-events", partitionKey, userEvent);
}
}
Apache Flink Implementation
Flink Stream Job
Stream processing job with Apache Flink:
@Component
@Slf4j
public class OrderStreamProcessor {
private final StreamExecutionEnvironment env;
private final KafkaSource<String> kafkaSource;
private final OrderAnalyticsService analyticsService;
public OrderStreamProcessor(OrderAnalyticsService analyticsService) {
this.analyticsService = analyticsService;
this.env = StreamExecutionEnvironment.getExecutionEnvironment();
// Checkpoint configuration
env.enableCheckpointing(60000); // 1 minute
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(600000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// Kafka source configuration
this.kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("order-events")
.setGroupId("flink-order-processor")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
}
@PostConstruct
public void startProcessing() {
try {
DataStream<String> orderStream = env
.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Order Events");
// Parse and filter order events
DataStream<OrderEvent> parsedOrderStream = orderStream
.map(this::parseOrderEvent)
.filter(Objects::nonNull);
// Process confirmed orders
DataStream<OrderEvent> confirmedOrders = parsedOrderStream
.filter(order -> "ORDER_CONFIRMED".equals(order.getEventType()));
// Windowed aggregations - Order count per customer per hour
DataStream<CustomerOrderStats> customerStats = confirmedOrders
.keyBy(OrderEvent::getCustomerId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.aggregate(new OrderCountAggregator());
// Revenue calculation per hour
DataStream<RevenueStats> revenueStats = confirmedOrders
.windowAll(TumblingEventTimeWindows.of(Time.hours(1)))
.aggregate(new RevenueAggregator());
// Fraud detection - More than 5 orders per minute
DataStream<FraudAlert> fraudAlerts = confirmedOrders
.keyBy(OrderEvent::getCustomerId)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new FraudDetectionAggregator())
.filter(stats -> stats.getOrderCount() > 5)
.map(this::createFraudAlert);
// Sink configurations
customerStats.addSink(new CustomerStatsSink(analyticsService));
revenueStats.addSink(new RevenueStatsSink(analyticsService));
fraudAlerts.addSink(new FraudAlertSink());
env.execute("Order Stream Processing Job");
} catch (Exception e) {
log.error("Failed to start Flink stream processing", e);
throw new StreamProcessingException("Stream processing startup failed", e);
}
}
private OrderEvent parseOrderEvent(String json) {
try {
return objectMapper.readValue(json, OrderEvent.class);
} catch (Exception e) {
log.warn("Failed to parse order event: {}", json, e);
return null;
}
}
private FraudAlert createFraudAlert(CustomerOrderStats stats) {
return new FraudAlert(
stats.getCustomerId(),
"HIGH_ORDER_FREQUENCY",
String.format("Customer placed %d orders in 1 minute", stats.getOrderCount()),
Instant.now()
);
}
}
Custom Aggregators
Custom aggregator classes for Flink:
public class OrderCountAggregator implements AggregateFunction<OrderEvent, CustomerOrderStats, CustomerOrderStats> {
@Override
public CustomerOrderStats createAccumulator() {
return new CustomerOrderStats();
}
@Override
public CustomerOrderStats add(OrderEvent orderEvent, CustomerOrderStats accumulator) {
accumulator.setCustomerId(orderEvent.getCustomerId());
accumulator.incrementOrderCount();
accumulator.addRevenue(orderEvent.getAmount());
accumulator.setLastUpdated(Instant.now());
return accumulator;
}
@Override
public CustomerOrderStats getResult(CustomerOrderStats accumulator) {
return accumulator;
}
@Override
public CustomerOrderStats merge(CustomerOrderStats a, CustomerOrderStats b) {
CustomerOrderStats merged = new CustomerOrderStats();
merged.setCustomerId(a.getCustomerId());
merged.setOrderCount(a.getOrderCount() + b.getOrderCount());
merged.setTotalRevenue(a.getTotalRevenue().add(b.getTotalRevenue()));
merged.setLastUpdated(Instant.now());
return merged;
}
}
public class RevenueAggregator implements AggregateFunction<OrderEvent, RevenueStats, RevenueStats> {
@Override
public RevenueStats createAccumulator() {
return new RevenueStats();
}
@Override
public RevenueStats add(OrderEvent orderEvent, RevenueStats accumulator) {
accumulator.addRevenue(orderEvent.getAmount());
accumulator.incrementOrderCount();
accumulator.setLastUpdated(Instant.now());
return accumulator;
}
@Override
public RevenueStats getResult(RevenueStats accumulator) {
return accumulator;
}
@Override
public RevenueStats merge(RevenueStats a, RevenueStats b) {
RevenueStats merged = new RevenueStats();
merged.setTotalRevenue(a.getTotalRevenue().add(b.getTotalRevenue()));
merged.setOrderCount(a.getOrderCount() + b.getOrderCount());
merged.setLastUpdated(Instant.now());
return merged;
}
}
Kafka Streams Implementation
Kafka Streams Application
Lightweight stream processing with Kafka Streams:
@Service
@Slf4j
public class KafkaStreamsOrderProcessor {
private final StreamsBuilder streamsBuilder;
private final KafkaStreams kafkaStreams;
private final OrderAnalyticsService analyticsService;
public KafkaStreamsOrderProcessor(OrderAnalyticsService analyticsService) {
this.analyticsService = analyticsService;
this.streamsBuilder = new StreamsBuilder();
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-stream-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
buildTopology();
this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
}
private void buildTopology() {
// Order events stream
KStream<String, String> orderEvents = streamsBuilder.stream("order-events");
// Parse JSON events
KStream<String, OrderEvent> parsedOrders = orderEvents
.mapValues(this::parseOrderEvent)
.filter((key, value) -> value != null);
// Filter confirmed orders
KStream<String, OrderEvent> confirmedOrders = parsedOrders
.filter((key, order) -> "ORDER_CONFIRMED".equals(order.getEventType()));
// Customer order aggregation
KTable<String, CustomerOrderStats> customerStats = confirmedOrders
.groupBy((key, order) -> order.getCustomerId())
.windowedBy(TimeWindows.of(Duration.ofHours(1)))
.aggregate(
CustomerOrderStats::new,
(customerId, order, stats) -> {
stats.setCustomerId(customerId);
stats.incrementOrderCount();
stats.addRevenue(order.getAmount());
stats.setLastUpdated(Instant.now());
return stats;
},
Materialized.with(Serdes.String(), customerStatsJsonSerde())
)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
// Global revenue aggregation
KTable<String, RevenueStats> revenueStats = confirmedOrders
.groupBy((key, order) -> "global")
.windowedBy(TimeWindows.of(Duration.ofHours(1)))
.aggregate(
RevenueStats::new,
(globalKey, order, stats) -> {
stats.addRevenue(order.getAmount());
stats.incrementOrderCount();
stats.setLastUpdated(Instant.now());
return stats;
},
Materialized.with(Serdes.String(), revenueStatsJsonSerde())
);
// Fraud detection
KStream<String, FraudAlert> fraudAlerts = confirmedOrders
.groupBy((key, order) -> order.getCustomerId())
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.count(Materialized.with(Serdes.String(), Serdes.Long()))
.toStream()
.filter((windowedKey, count) -> count > 5)
.map((windowedKey, count) -> KeyValue.pair(
windowedKey.key(),
new FraudAlert(
windowedKey.key(),
"HIGH_ORDER_FREQUENCY",
String.format("Customer placed %d orders in 1 minute", count),
Instant.now()
)
));
// Output streams
customerStats.toStream().foreach(this::processCustomerStats);
revenueStats.toStream().foreach(this::processRevenueStats);
fraudAlerts.foreach(this::processFraudAlert);
}
@PostConstruct
public void start() {
kafkaStreams.setUncaughtExceptionHandler((thread, exception) -> {
log.error("Uncaught exception in Kafka Streams thread: {}", thread.getName(), exception);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
kafkaStreams.start();
log.info("Kafka Streams application started");
}
@PreDestroy
public void stop() {
kafkaStreams.close(Duration.ofSeconds(30));
log.info("Kafka Streams application stopped");
}
private OrderEvent parseOrderEvent(String json) {
try {
return objectMapper.readValue(json, OrderEvent.class);
} catch (Exception e) {
log.warn("Failed to parse order event: {}", json, e);
return null;
}
}
private void processCustomerStats(Windowed<String> windowedKey, CustomerOrderStats stats) {
analyticsService.updateCustomerStats(stats);
}
private void processRevenueStats(Windowed<String> windowedKey, RevenueStats stats) {
analyticsService.updateRevenueStats(stats);
}
private void processFraudAlert(String customerId, FraudAlert alert) {
log.warn("Fraud alert for customer {}: {}", customerId, alert.getMessage());
// Send to fraud detection service
}
private Serde<CustomerOrderStats> customerStatsJsonSerde() {
return Serdes.serdeFrom(
new JsonSerializer<>(),
new JsonDeserializer<>(CustomerOrderStats.class)
);
}
private Serde<RevenueStats> revenueStatsJsonSerde() {
return Serdes.serdeFrom(
new JsonSerializer<>(),
new JsonDeserializer<>(RevenueStats.class)
);
}
}
Windowing and Time-Based Processing
Time Windows
Time-based windowing operations:
@Component
public class WindowedAnalyticsProcessor {
private final StreamsBuilder streamsBuilder;
public void configureTimeWindows() {
KStream<String, OrderEvent> orderStream = streamsBuilder.stream("order-events");
// Tumbling Window - 1 hour intervals
KTable<Windowed<String>, Long> hourlyOrderCounts = orderStream
.groupBy((key, order) -> order.getCustomerId())
.windowedBy(TimeWindows.of(Duration.ofHours(1)))
.count();
// Hopping Window - 1 hour window sliding every 15 minutes
KTable<Windowed<String>, Long> rollingOrderCounts = orderStream
.groupBy((key, order) -> order.getCustomerId())
.windowedBy(TimeWindows.of(Duration.ofHours(1)).advanceBy(Duration.ofMinutes(15)))
.count();
// Session Window - Activity-based windows
KTable<Windowed<String>, Long> sessionOrderCounts = orderStream
.groupBy((key, order) -> order.getCustomerId())
.windowedBy(SessionWindows.with(Duration.ofMinutes(30)))
.count();
// Custom Window - Custom time intervals
KTable<Windowed<String>, OrderAnalytics> customAnalytics = orderStream
.groupBy((key, order) -> order.getCustomerId())
.windowedBy(TimeWindows.of(Duration.ofHours(6)).grace(Duration.ofMinutes(10)))
.aggregate(
OrderAnalytics::new,
(customerId, order, analytics) -> {
analytics.addOrder(order);
analytics.calculateMetrics();
return analytics;
},
Materialized.with(Serdes.String(), orderAnalyticsJsonSerde())
);
}
}
State Management
Stateful Processing
State management and fault tolerance:
@Service
public class StatefulOrderProcessor {
private final KafkaStreams streams;
private ReadOnlyKeyValueStore<String, CustomerOrderStats> customerStatsStore;
@PostConstruct
public void initializeStores() {
// Wait for streams to be ready
streams.start();
// Get read-only access to state store
customerStatsStore = streams.store(
StoreQueryParameters.fromNameAndType(
"customer-stats-store",
QueryableStoreTypes.keyValueStore()
)
);
}
public CustomerOrderStats getCustomerStats(String customerId) {
return customerStatsStore.get(customerId);
}
public void configureStatefulProcessing(StreamsBuilder builder) {
KStream<String, OrderEvent> orderStream = builder.stream("order-events");
// Stateful transformation with custom store
KTable<String, CustomerOrderStats> customerStats = orderStream
.groupBy((key, order) -> order.getCustomerId())
.aggregate(
CustomerOrderStats::new,
(customerId, order, stats) -> {
// Stateful logic
stats.updateWithOrder(order);
stats.calculateRunningAverages();
stats.updateTrends();
return stats;
},
Materialized.<String, CustomerOrderStats, KeyValueStore<Bytes, byte[]>>as("customer-stats-store")
.withKeySerde(Serdes.String())
.withValueSerde(customerStatsJsonSerde())
.withCachingEnabled()
.withLoggingEnabled(Map.of(
"cleanup.policy", "compact",
"segment.ms", "86400000" // 1 day
))
);
// Join with enrichment data
KTable<String, CustomerProfile> customerProfiles = builder.table("customer-profiles");
KTable<String, EnrichedCustomerStats> enrichedStats = customerStats
.join(customerProfiles,
(stats, profile) -> new EnrichedCustomerStats(stats, profile),
Materialized.with(Serdes.String(), enrichedStatsJsonSerde())
);
}
}
Performance and Monitoring
Stream Metrics
Stream processing performance metrics:
@Component
@Slf4j
public class StreamMetricsCollector {
private final MeterRegistry meterRegistry;
private final KafkaStreams kafkaStreams;
public StreamMetricsCollector(MeterRegistry meterRegistry, KafkaStreams kafkaStreams) {
this.meterRegistry = meterRegistry;
this.kafkaStreams = kafkaStreams;
configureMetrics();
}
private void configureMetrics() {
// Kafka Streams metrics
Gauge.builder("stream.state")
.description("Kafka Streams application state")
.register(meterRegistry, kafkaStreams, streams ->
streams.state().ordinal());
// Custom processing metrics
Timer.builder("stream.processing.latency")
.description("Stream processing latency")
.register(meterRegistry);
Counter.builder("stream.events.processed")
.description("Number of events processed")
.register(meterRegistry);
Counter.builder("stream.events.failed")
.description("Number of failed events")
.register(meterRegistry);
Gauge.builder("stream.lag.records")
.description("Consumer lag in records")
.register(meterRegistry, this::calculateConsumerLag);
}
@Scheduled(fixedRate = 30000) // Every 30 seconds
public void collectCustomMetrics() {
try {
// Stream topology metrics
Map<MetricName, ? extends Metric> streamMetrics = kafkaStreams.metrics();
streamMetrics.entrySet().stream()
.filter(entry -> entry.getKey().name().contains("process-rate"))
.forEach(entry -> {
String metricName = String.format("stream.%s",
entry.getKey().name().replace("-", "."));
Gauge.builder(metricName)
.description(entry.getKey().description())
.register(meterRegistry, entry.getValue(),
metric -> ((Number) metric.metricValue()).doubleValue());
});
// State store metrics
ReadOnlyKeyValueStore<String, CustomerOrderStats> store = kafkaStreams.store(
StoreQueryParameters.fromNameAndType(
"customer-stats-store",
QueryableStoreTypes.keyValueStore()
)
);
KeyValueIterator<String, CustomerOrderStats> iterator = store.all();
long storeSize = 0;
while (iterator.hasNext()) {
iterator.next();
storeSize++;
}
iterator.close();
Gauge.builder("stream.store.size")
.description("Number of entries in state store")
.register(meterRegistry, () -> storeSize);
} catch (Exception e) {
log.error("Failed to collect stream metrics", e);
}
}
private double calculateConsumerLag() {
// Implement consumer lag calculation
return 0.0;
}
}
Error Handling and Resilience
Error Recovery
Error management and recovery strategies:
@Service
@Slf4j
public class StreamErrorHandler {
private final DeadLetterQueueService deadLetterService;
private final AlertService alertService;
public StreamErrorHandler(DeadLetterQueueService deadLetterService,
AlertService alertService) {
this.deadLetterService = deadLetterService;
this.alertService = alertService;
}
public void configureErrorHandling(StreamsBuilder builder) {
KStream<String, String> orderEvents = builder.stream("order-events");
// Branching for error handling
KStream<String, String>[] branches = orderEvents.branch(
(key, value) -> isValidEvent(value), // Valid events
(key, value) -> true // Invalid events (catch-all)
);
KStream<String, String> validEvents = branches[0];
KStream<String, String> invalidEvents = branches[1];
// Process valid events
validEvents
.mapValues(this::parseEvent)
.filter((key, event) -> event != null)
.foreach(this::processEvent);
// Handle invalid events
invalidEvents
.mapValues(this::createErrorRecord)
.to("dead-letter-queue");
// Retry mechanism for failed processing
validEvents
.transform(() -> new RetryTransformer<>())
.filter((key, result) -> !result.isSuccess())
.mapValues(result -> result.getFailedEvent())
.to("retry-queue");
}
private boolean isValidEvent(String eventJson) {
try {
JsonNode node = objectMapper.readTree(eventJson);
return node.has("eventType") &&
node.has("timestamp") &&
node.has("data");
} catch (Exception e) {
return false;
}
}
private OrderEvent parseEvent(String eventJson) {
try {
return objectMapper.readValue(eventJson, OrderEvent.class);
} catch (Exception e) {
log.warn("Failed to parse event: {}", eventJson, e);
deadLetterService.sendToDeadLetter(eventJson, e.getMessage());
return null;
}
}
private void processEvent(String key, OrderEvent event) {
try {
// Process event logic
log.debug("Processing event: {}", event.getEventId());
} catch (Exception e) {
log.error("Failed to process event: {}", event.getEventId(), e);
alertService.sendAlert("Stream processing failed", e.getMessage());
throw e; // Re-throw to trigger retry
}
}
private ErrorRecord createErrorRecord(String invalidEvent) {
return new ErrorRecord(
UUID.randomUUID().toString(),
invalidEvent,
"INVALID_FORMAT",
Instant.now()
);
}
}
// Retry Transformer
public class RetryTransformer<K, V> implements Transformer<K, V, KeyValue<K, ProcessingResult<V>>> {
private ProcessorContext context;
private KeyValueStore<String, Integer> retryCountStore;
@Override
public void init(ProcessorContext context) {
this.context = context;
this.retryCountStore = (KeyValueStore<String, Integer>) context.getStateStore("retry-counts");
}
@Override
public KeyValue<K, ProcessingResult<V>> transform(K key, V value) {
String keyStr = key.toString();
Integer retryCount = retryCountStore.get(keyStr);
if (retryCount == null) {
retryCount = 0;
}
try {
// Process the value
ProcessingResult<V> result = processWithRetry(value);
if (result.isSuccess()) {
retryCountStore.delete(keyStr); // Clear retry count on success
} else if (retryCount < 3) {
retryCountStore.put(keyStr, retryCount + 1);
// Schedule retry
context.schedule(Duration.ofSeconds(Math.pow(2, retryCount)),
PunctuationType.WALL_CLOCK_TIME,
timestamp -> transform(key, value));
}
return KeyValue.pair(key, result);
} catch (Exception e) {
return KeyValue.pair(key, ProcessingResult.failure(value, e.getMessage()));
}
}
private ProcessingResult<V> processWithRetry(V value) {
// Implement retry logic
try {
// Process value
return ProcessingResult.success(value);
} catch (Exception e) {
return ProcessingResult.failure(value, e.getMessage());
}
}
@Override
public void close() {
// Cleanup resources
}
}
Batch vs Stream Processing Comparison
Batch Processing Example
Traditional batch processing approach:
@Service
@Slf4j
public class BatchOrderProcessor {
private final OrderRepository orderRepository;
private final ReportService reportService;
@Scheduled(cron = "0 0 1 * * ?") // Daily at 01:00
public void processDailyOrders() {
LocalDate yesterday = LocalDate.now().minusDays(1);
LocalDateTime startOfDay = yesterday.atStartOfDay();
LocalDateTime endOfDay = yesterday.atTime(23, 59, 59);
log.info("Starting daily batch processing for date: {}", yesterday);
try {
// Retrieve all orders for the day
List<Order> dailyOrders = orderRepository
.findByCreatedAtBetween(startOfDay, endOfDay);
// Process in chunks
int chunkSize = 1000;
for (int i = 0; i < dailyOrders.size(); i += chunkSize) {
int endIndex = Math.min(i + chunkSize, dailyOrders.size());
List<Order> chunk = dailyOrders.subList(i, endIndex);
processBatch(chunk);
}
// Generate reports
generateDailyReports(dailyOrders, yesterday);
log.info("Completed daily batch processing for {} orders", dailyOrders.size());
} catch (Exception e) {
log.error("Failed to process daily batch", e);
throw new BatchProcessingException("Daily batch processing failed", e);
}
}
private void processBatch(List<Order> orders) {
orders.parallelStream().forEach(order -> {
try {
// Calculate metrics
OrderMetrics metrics = calculateOrderMetrics(order);
// Update aggregations
updateCustomerStats(order);
updateProductStats(order);
updateRevenueStats(order);
} catch (Exception e) {
log.error("Failed to process order: {}", order.getId(), e);
}
});
}
private void generateDailyReports(List<Order> orders, LocalDate date) {
// Revenue report
BigDecimal totalRevenue = orders.stream()
.map(Order::getAmount)
.reduce(BigDecimal.ZERO, BigDecimal::add);
// Customer report
Map<String, Long> customerOrderCounts = orders.stream()
.collect(Collectors.groupingBy(
Order::getCustomerId,
Collectors.counting()
));
// Product report
Map<String, Long> productSales = orders.stream()
.flatMap(order -> order.getItems().stream())
.collect(Collectors.groupingBy(
OrderItem::getProductId,
Collectors.summingLong(OrderItem::getQuantity)
));
reportService.generateDailyReport(date, totalRevenue, customerOrderCounts, productSales);
}
}
Stream vs Batch Comparison
Feature | Stream Processing | Batch Processing |
---|---|---|
Latency | Low (ms-seconds) | High (minutes-hours) |
Throughput | Medium-High | Very High |
Data Model | Infinite stream | Fixed dataset |
Processing Time | Real-time | Periodic |
Resource Usage | Continuous | Periodic |
Fault Tolerance | Checkpoint/Recovery | Restart from beginning |
Use Cases | Real-time analytics, Fraud detection | ETL, Reporting |
Stream Processing is critical for modern applications because it provides:
- Real-time Insights: Enables immediate decision-making
- Proactive Approach: Early problem detection
- Customer Experience: Fast response and personalization
- Business Continuity: Uninterrupted data processing
These examples demonstrate real-time data processing with Apache Flink and Kafka Streams, including windowing, state management, and error handling implementations. Stream processing is a fundamental building block of modern data architectures.