Skip to content

Event Sourcing

Event Sourcing is an architectural pattern where state changes are stored as a sequence of immutable events. Instead of storing just the current state, all changes (events) that led to the current state are persisted, enabling complete audit trails and temporal queries.

Event Sourcing Architecture

Event Store Data Model

Event Flow

Core Concepts

Event Store

The Event Store is the central repository for all domain events:

java
@Entity
@Table(name = "events")
public class EventEntity {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    
    @Column(name = "aggregate_id", nullable = false)
    private String aggregateId;
    
    @Column(name = "aggregate_type", nullable = false)
    private String aggregateType;
    
    @Column(name = "event_type", nullable = false)
    private String eventType;
    
    @Column(name = "event_data", columnDefinition = "TEXT")
    private String eventData;
    
    @Column(name = "event_version", nullable = false)
    private Long eventVersion;
    
    @Column(name = "timestamp", nullable = false)
    private Instant timestamp;
    
    @Column(name = "metadata", columnDefinition = "TEXT")
    private String metadata;
    
    @Version
    private Long version; // For optimistic locking
    
    // Constructors, getters, setters
    public EventEntity() {}
    
    public EventEntity(String aggregateId, String aggregateType, String eventType, 
                      String eventData, Long eventVersion, String metadata) {
        this.aggregateId = aggregateId;
        this.aggregateType = aggregateType;
        this.eventType = eventType;
        this.eventData = eventData;
        this.eventVersion = eventVersion;
        this.metadata = metadata;
        this.timestamp = Instant.now();
    }
    
    // Getters and setters...
}

Event Store Repository

Repository interface for event persistence:

java
@Repository
public interface EventStoreRepository extends JpaRepository<EventEntity, Long> {
    
    @Query("SELECT e FROM EventEntity e WHERE e.aggregateId = :aggregateId " +
           "ORDER BY e.eventVersion ASC")
    List<EventEntity> findByAggregateIdOrderByEventVersion(@Param("aggregateId") String aggregateId);
    
    @Query("SELECT e FROM EventEntity e WHERE e.aggregateId = :aggregateId " +
           "AND e.eventVersion > :fromVersion ORDER BY e.eventVersion ASC")
    List<EventEntity> findByAggregateIdAndEventVersionGreaterThan(
        @Param("aggregateId") String aggregateId, 
        @Param("fromVersion") Long fromVersion);
    
    @Query("SELECT e FROM EventEntity e WHERE e.timestamp >= :fromTimestamp " +
           "ORDER BY e.timestamp ASC")
    List<EventEntity> findEventsFromTimestamp(@Param("fromTimestamp") Instant fromTimestamp);
    
    @Query("SELECT e FROM EventEntity e WHERE e.aggregateType = :aggregateType " +
           "AND e.timestamp BETWEEN :startTime AND :endTime")
    List<EventEntity> findEventsByTypeAndTimeRange(
        @Param("aggregateType") String aggregateType,
        @Param("startTime") Instant startTime,
        @Param("endTime") Instant endTime);
    
    @Modifying
    @Query("DELETE FROM EventEntity e WHERE e.aggregateId = :aggregateId " +
           "AND e.eventVersion <= :version")
    int deleteEventsUpToVersion(@Param("aggregateId") String aggregateId, 
                               @Param("version") Long version);
}

Event Store Service

Service layer for event operations:

java
@Service
@Transactional
@Slf4j
public class EventStoreService {
    
    private final EventStoreRepository eventRepository;
    private final ObjectMapper objectMapper;
    private final ApplicationEventPublisher eventPublisher;
    private final MeterRegistry meterRegistry;
    
    private final Timer saveEventTimer;
    private final Timer loadEventsTimer;
    private final Counter saveEventSuccessCounter;
    private final Counter saveEventErrorCounter;
    
    public EventStoreService(EventStoreRepository eventRepository,
                           ObjectMapper objectMapper,
                           ApplicationEventPublisher eventPublisher,
                           MeterRegistry meterRegistry) {
        this.eventRepository = eventRepository;
        this.objectMapper = objectMapper;
        this.eventPublisher = eventPublisher;
        this.meterRegistry = meterRegistry;
        
        this.saveEventTimer = Timer.builder("event.store.save.duration")
            .description("Time taken to save events")
            .register(meterRegistry);
        this.loadEventsTimer = Timer.builder("event.store.load.duration")
            .description("Time taken to load events")
            .register(meterRegistry);
        this.saveEventSuccessCounter = Counter.builder("event.store.save.success")
            .description("Number of successful event saves")
            .register(meterRegistry);
        this.saveEventErrorCounter = Counter.builder("event.store.save.error")
            .description("Number of failed event saves")
            .register(meterRegistry);
    }
    
    public void saveEvents(String aggregateId, String aggregateType, 
                          List<DomainEvent> events, Long expectedVersion) {
        Timer.Sample sample = Timer.Sample.start(meterRegistry);
        
        try {
            Long currentVersion = getCurrentVersion(aggregateId);
            
            if (!Objects.equals(currentVersion, expectedVersion)) {
                throw new OptimisticLockingException(
                    String.format("Expected version %d but found %d for aggregate %s", 
                                expectedVersion, currentVersion, aggregateId));
            }
            
            List<EventEntity> eventEntities = new ArrayList<>();
            Long nextVersion = currentVersion + 1;
            
            for (DomainEvent event : events) {
                String eventData = objectMapper.writeValueAsString(event);
                String metadata = createMetadata(event);
                
                EventEntity eventEntity = new EventEntity(
                    aggregateId,
                    aggregateType,
                    event.getClass().getSimpleName(),
                    eventData,
                    nextVersion++,
                    metadata
                );
                
                eventEntities.add(eventEntity);
            }
            
            eventRepository.saveAll(eventEntities);
            
            // Publish events asynchronously
            events.forEach(event -> eventPublisher.publishEvent(event));
            
            saveEventSuccessCounter.increment();
            log.info("Saved {} events for aggregate {} with version range {}-{}", 
                    events.size(), aggregateId, expectedVersion + 1, nextVersion - 1);
            
        } catch (Exception e) {
            saveEventErrorCounter.increment();
            log.error("Failed to save events for aggregate: {}", aggregateId, e);
            throw new EventStoreException("Failed to save events", e);
        } finally {
            sample.stop(saveEventTimer);
        }
    }
    
    public List<DomainEvent> loadEvents(String aggregateId) {
        Timer.Sample sample = Timer.Sample.start(meterRegistry);
        
        try {
            List<EventEntity> eventEntities = eventRepository
                .findByAggregateIdOrderByEventVersion(aggregateId);
            
            return eventEntities.stream()
                .map(this::deserializeEvent)
                .filter(Objects::nonNull)
                .collect(Collectors.toList());
                
        } catch (Exception e) {
            log.error("Failed to load events for aggregate: {}", aggregateId, e);
            throw new EventStoreException("Failed to load events", e);
        } finally {
            sample.stop(loadEventsTimer);
        }
    }
    
    public List<DomainEvent> loadEventsFromVersion(String aggregateId, Long fromVersion) {
        try {
            List<EventEntity> eventEntities = eventRepository
                .findByAggregateIdAndEventVersionGreaterThan(aggregateId, fromVersion);
            
            return eventEntities.stream()
                .map(this::deserializeEvent)
                .filter(Objects::nonNull)
                .collect(Collectors.toList());
                
        } catch (Exception e) {
            log.error("Failed to load events from version {} for aggregate: {}", 
                     fromVersion, aggregateId, e);
            throw new EventStoreException("Failed to load events from version", e);
        }
    }
    
    private Long getCurrentVersion(String aggregateId) {
        return eventRepository.findByAggregateIdOrderByEventVersion(aggregateId)
            .stream()
            .mapToLong(EventEntity::getEventVersion)
            .max()
            .orElse(0L);
    }
    
    private DomainEvent deserializeEvent(EventEntity eventEntity) {
        try {
            Class<?> eventClass = Class.forName("com.example.events." + eventEntity.getEventType());
            return (DomainEvent) objectMapper.readValue(eventEntity.getEventData(), eventClass);
        } catch (Exception e) {
            log.error("Failed to deserialize event: {}", eventEntity.getEventType(), e);
            return null;
        }
    }
    
    private String createMetadata(DomainEvent event) {
        try {
            Map<String, Object> metadata = new HashMap<>();
            metadata.put("correlationId", event.getCorrelationId());
            metadata.put("causationId", event.getCausationId());
            metadata.put("userId", getCurrentUserId());
            metadata.put("timestamp", Instant.now());
            
            return objectMapper.writeValueAsString(metadata);
        } catch (Exception e) {
            log.warn("Failed to create metadata", e);
            return "{}";
        }
    }
    
    private String getCurrentUserId() {
        // Get current user from security context
        return "system"; // Placeholder
    }
}

Aggregate Root Pattern

Base Aggregate Root

Base class for all aggregates with event handling:

java
public abstract class AggregateRoot {
    
    protected String id;
    protected Long version;
    private final List<DomainEvent> uncommittedEvents = new ArrayList<>();
    
    protected AggregateRoot() {
        this.version = 0L;
    }
    
    protected AggregateRoot(String id) {
        this.id = id;
        this.version = 0L;
    }
    
    protected void addEvent(DomainEvent event) {
        event.setAggregateId(this.id);
        event.setEventVersion(this.version + uncommittedEvents.size() + 1);
        event.setTimestamp(Instant.now());
        uncommittedEvents.add(event);
    }
    
    public List<DomainEvent> getUncommittedEvents() {
        return new ArrayList<>(uncommittedEvents);
    }
    
    public void markEventsAsCommitted() {
        this.version += uncommittedEvents.size();
        uncommittedEvents.clear();
    }
    
    public void replayEvents(List<DomainEvent> events) {
        for (DomainEvent event : events) {
            apply(event);
            this.version = event.getEventVersion();
        }
    }
    
    protected abstract void apply(DomainEvent event);
    
    // Getters
    public String getId() { return id; }
    public Long getVersion() { return version; }
}

Domain Events

Domain event definitions:

java
public abstract class DomainEvent {
    private String eventId;
    private String aggregateId;
    private Long eventVersion;
    private Instant timestamp;
    private String correlationId;
    private String causationId;
    
    protected DomainEvent() {
        this.eventId = UUID.randomUUID().toString();
        this.timestamp = Instant.now();
    }
    
    // Getters and setters...
}

public class OrderCreatedEvent extends DomainEvent {
    private String orderId;
    private String customerId;
    private BigDecimal amount;
    private List<OrderItem> items;
    private Instant createdAt;
    
    public OrderCreatedEvent() {}
    
    public OrderCreatedEvent(String orderId, String customerId, 
                           BigDecimal amount, List<OrderItem> items) {
        super();
        this.orderId = orderId;
        this.customerId = customerId;
        this.amount = amount;
        this.items = items;
        this.createdAt = Instant.now();
    }
    
    // Getters and setters...
}

public class OrderConfirmedEvent extends DomainEvent {
    private String orderId;
    private Instant confirmedAt;
    private String confirmedBy;
    
    public OrderConfirmedEvent() {}
    
    public OrderConfirmedEvent(String orderId, String confirmedBy) {
        super();
        this.orderId = orderId;
        this.confirmedBy = confirmedBy;
        this.confirmedAt = Instant.now();
    }
    
    // Getters and setters...
}

public class OrderCancelledEvent extends DomainEvent {
    private String orderId;
    private String reason;
    private Instant cancelledAt;
    private String cancelledBy;
    
    public OrderCancelledEvent() {}
    
    public OrderCancelledEvent(String orderId, String reason, String cancelledBy) {
        super();
        this.orderId = orderId;
        this.reason = reason;
        this.cancelledBy = cancelledBy;
        this.cancelledAt = Instant.now();
    }
    
    // Getters and setters...
}

Order Aggregate

Order aggregate with event sourcing:

java
public class Order extends AggregateRoot {
    
    private String customerId;
    private OrderStatus status;
    private BigDecimal amount;
    private List<OrderItem> items;
    private Instant createdAt;
    private Instant confirmedAt;
    private Instant cancelledAt;
    private String cancellationReason;
    
    // For reconstruction from events
    public Order() {
        super();
    }
    
    // For new order creation
    public Order(String orderId, String customerId, BigDecimal amount, List<OrderItem> items) {
        super(orderId);
        
        if (amount.compareTo(BigDecimal.ZERO) <= 0) {
            throw new IllegalArgumentException("Order amount must be positive");
        }
        
        if (items == null || items.isEmpty()) {
            throw new IllegalArgumentException("Order must have at least one item");
        }
        
        addEvent(new OrderCreatedEvent(orderId, customerId, amount, items));
    }
    
    public void confirm(String confirmedBy) {
        if (status != OrderStatus.PENDING) {
            throw new IllegalStateException("Only pending orders can be confirmed");
        }
        
        addEvent(new OrderConfirmedEvent(this.id, confirmedBy));
    }
    
    public void cancel(String reason, String cancelledBy) {
        if (status == OrderStatus.CANCELLED) {
            throw new IllegalStateException("Order is already cancelled");
        }
        
        if (status == OrderStatus.DELIVERED) {
            throw new IllegalStateException("Delivered orders cannot be cancelled");
        }
        
        addEvent(new OrderCancelledEvent(this.id, reason, cancelledBy));
    }
    
    @Override
    protected void apply(DomainEvent event) {
        if (event instanceof OrderCreatedEvent) {
            apply((OrderCreatedEvent) event);
        } else if (event instanceof OrderConfirmedEvent) {
            apply((OrderConfirmedEvent) event);
        } else if (event instanceof OrderCancelledEvent) {
            apply((OrderCancelledEvent) event);
        }
    }
    
    private void apply(OrderCreatedEvent event) {
        this.id = event.getOrderId();
        this.customerId = event.getCustomerId();
        this.amount = event.getAmount();
        this.items = new ArrayList<>(event.getItems());
        this.status = OrderStatus.PENDING;
        this.createdAt = event.getCreatedAt();
    }
    
    private void apply(OrderConfirmedEvent event) {
        this.status = OrderStatus.CONFIRMED;
        this.confirmedAt = event.getConfirmedAt();
    }
    
    private void apply(OrderCancelledEvent event) {
        this.status = OrderStatus.CANCELLED;
        this.cancelledAt = event.getCancelledAt();
        this.cancellationReason = event.getReason();
    }
    
    // Getters
    public String getCustomerId() { return customerId; }
    public OrderStatus getStatus() { return status; }
    public BigDecimal getAmount() { return amount; }
    public List<OrderItem> getItems() { return new ArrayList<>(items); }
    public Instant getCreatedAt() { return createdAt; }
    public Instant getConfirmedAt() { return confirmedAt; }
    public Instant getCancelledAt() { return cancelledAt; }
    public String getCancellationReason() { return cancellationReason; }
}

public enum OrderStatus {
    PENDING,
    CONFIRMED,
    SHIPPED,
    DELIVERED,
    CANCELLED
}

Event Publishing

Event Publisher Service

Asynchronous event publishing with Kafka:

java
@Service
@Slf4j
public class EventPublisherService {
    
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final ObjectMapper objectMapper;
    private final MeterRegistry meterRegistry;
    
    private final Timer publishTimer;
    private final Counter publishSuccessCounter;
    private final Counter publishErrorCounter;
    
    public EventPublisherService(KafkaTemplate<String, String> kafkaTemplate,
                               ObjectMapper objectMapper,
                               MeterRegistry meterRegistry) {
        this.kafkaTemplate = kafkaTemplate;
        this.objectMapper = objectMapper;
        this.meterRegistry = meterRegistry;
        
        this.publishTimer = Timer.builder("event.publish.duration")
            .description("Time taken to publish events")
            .register(meterRegistry);
        this.publishSuccessCounter = Counter.builder("event.publish.success")
            .description("Number of successful event publishes")
            .register(meterRegistry);
        this.publishErrorCounter = Counter.builder("event.publish.error")
            .description("Number of failed event publishes")
            .register(meterRegistry);
    }
    
    @EventListener
    @Async
    public void handleDomainEvent(DomainEvent domainEvent) {
        Timer.Sample sample = Timer.Sample.start(meterRegistry);
        
        try {
            String topic = getTopicForEvent(domainEvent);
            String key = domainEvent.getAggregateId();
            String eventJson = objectMapper.writeValueAsString(domainEvent);
            
            kafkaTemplate.send(topic, key, eventJson)
                .addCallback(
                    result -> {
                        publishSuccessCounter.increment();
                        log.info("Successfully published event {} to topic {}", 
                                domainEvent.getEventId(), topic);
                    },
                    failure -> {
                        publishErrorCounter.increment();
                        log.error("Failed to publish event {} to topic {}", 
                                 domainEvent.getEventId(), topic, failure);
                    }
                );
                
        } catch (Exception e) {
            publishErrorCounter.increment();
            log.error("Error publishing domain event: {}", domainEvent.getEventId(), e);
        } finally {
            sample.stop(publishTimer);
        }
    }
    
    private String getTopicForEvent(DomainEvent event) {
        String eventType = event.getClass().getSimpleName();
        return "domain-events-" + eventType.toLowerCase().replace("event", "");
    }
}

Event Handlers and Projections

Order Projection Handler

Event handler for read model updates:

java
@Component
@Slf4j
public class OrderProjectionHandler {
    
    private final OrderReadModelRepository orderReadModelRepository;
    private final CustomerStatsRepository customerStatsRepository;
    private final MeterRegistry meterRegistry;
    
    private final Counter eventProcessedCounter;
    private final Timer projectionUpdateTimer;
    
    public OrderProjectionHandler(OrderReadModelRepository orderReadModelRepository,
                                CustomerStatsRepository customerStatsRepository,
                                MeterRegistry meterRegistry) {
        this.orderReadModelRepository = orderReadModelRepository;
        this.customerStatsRepository = customerStatsRepository;
        this.meterRegistry = meterRegistry;
        
        this.eventProcessedCounter = Counter.builder("projection.events.processed")
            .description("Number of events processed by projections")
            .register(meterRegistry);
        this.projectionUpdateTimer = Timer.builder("projection.update.duration")
            .description("Time taken to update projections")
            .register(meterRegistry);
    }
    
    @EventListener
    @Async
    @Transactional
    public void handle(OrderCreatedEvent event) {
        Timer.Sample sample = Timer.Sample.start(meterRegistry);
        
        try {
            OrderReadModel readModel = new OrderReadModel(
                event.getOrderId(),
                event.getCustomerId(),
                event.getAmount(),
                "PENDING",
                event.getCreatedAt(),
                null,
                null
            );
            
            orderReadModelRepository.save(readModel);
            updateCustomerStats(event.getCustomerId(), event.getAmount(), 1);
            
            eventProcessedCounter.increment();
            log.debug("Updated read model for order created: {}", event.getOrderId());
            
        } catch (Exception e) {
            log.error("Failed to handle OrderCreatedEvent: {}", event.getEventId(), e);
            throw e;
        } finally {
            sample.stop(projectionUpdateTimer);
        }
    }
    
    @EventListener
    @Async
    @Transactional
    public void handle(OrderConfirmedEvent event) {
        Timer.Sample sample = Timer.Sample.start(meterRegistry);
        
        try {
            orderReadModelRepository.findByOrderId(event.getOrderId())
                .ifPresentOrElse(
                    readModel -> {
                        readModel.setStatus("CONFIRMED");
                        readModel.setConfirmedAt(event.getConfirmedAt());
                        readModel.setLastUpdated(Instant.now());
                        orderReadModelRepository.save(readModel);
                    },
                    () -> log.warn("Order read model not found for confirmation: {}", event.getOrderId())
                );
            
            eventProcessedCounter.increment();
            log.debug("Updated read model for order confirmed: {}", event.getOrderId());
            
        } catch (Exception e) {
            log.error("Failed to handle OrderConfirmedEvent: {}", event.getEventId(), e);
            throw e;
        } finally {
            sample.stop(projectionUpdateTimer);
        }
    }
    
    @EventListener
    @Async
    @Transactional
    public void handle(OrderCancelledEvent event) {
        Timer.Sample sample = Timer.Sample.start(meterRegistry);
        
        try {
            orderReadModelRepository.findByOrderId(event.getOrderId())
                .ifPresentOrElse(
                    readModel -> {
                        // Update customer stats - subtract cancelled order
                        updateCustomerStats(readModel.getCustomerId(), 
                                          readModel.getAmount().negate(), -1);
                        
                        readModel.setStatus("CANCELLED");
                        readModel.setCancelledAt(event.getCancelledAt());
                        readModel.setLastUpdated(Instant.now());
                        orderReadModelRepository.save(readModel);
                    },
                    () -> log.warn("Order read model not found for cancellation: {}", event.getOrderId())
                );
            
            eventProcessedCounter.increment();
            log.debug("Updated read model for order cancelled: {}", event.getOrderId());
            
        } catch (Exception e) {
            log.error("Failed to handle OrderCancelledEvent: {}", event.getEventId(), e);
            throw e;
        } finally {
            sample.stop(projectionUpdateTimer);
        }
    }
    
    private void updateCustomerStats(String customerId, BigDecimal amountDelta, int orderCountDelta) {
        CustomerStats stats = customerStatsRepository.findByCustomerId(customerId)
            .orElse(new CustomerStats(customerId));
        
        stats.setTotalSpent(stats.getTotalSpent().add(amountDelta));
        stats.setOrderCount(stats.getOrderCount() + orderCountDelta);
        stats.setLastOrderDate(Instant.now());
        stats.setLastUpdated(Instant.now());
        
        customerStatsRepository.save(stats);
    }
}

Snapshot Support

Snapshot Service

Snapshot creation for performance optimization:

java
@Service
@Slf4j
public class SnapshotService {
    
    private final SnapshotRepository snapshotRepository;
    private final EventStoreService eventStoreService;
    private final ObjectMapper objectMapper;
    
    private static final int SNAPSHOT_FREQUENCY = 100; // Create snapshot every 100 events
    
    public SnapshotService(SnapshotRepository snapshotRepository,
                         EventStoreService eventStoreService,
                         ObjectMapper objectMapper) {
        this.snapshotRepository = snapshotRepository;
        this.eventStoreService = eventStoreService;
        this.objectMapper = objectMapper;
    }
    
    public void saveSnapshot(AggregateRoot aggregate) {
        try {
            String aggregateData = objectMapper.writeValueAsString(aggregate);
            
            SnapshotEntity snapshot = new SnapshotEntity(
                aggregate.getId(),
                aggregate.getClass().getSimpleName(),
                aggregate.getVersion(),
                aggregateData
            );
            
            snapshotRepository.save(snapshot);
            log.info("Saved snapshot for aggregate {} at version {}", 
                    aggregate.getId(), aggregate.getVersion());
            
        } catch (Exception e) {
            log.error("Failed to save snapshot for aggregate: {}", aggregate.getId(), e);
        }
    }
    
    public <T extends AggregateRoot> Optional<T> loadSnapshot(String aggregateId, Class<T> aggregateClass) {
        try {
            return snapshotRepository.findLatestByAggregateId(aggregateId)
                .map(snapshot -> {
                    try {
                        T aggregate = objectMapper.readValue(snapshot.getAggregateData(), aggregateClass);
                        
                        // Load events after snapshot
                        List<DomainEvent> eventsAfterSnapshot = eventStoreService
                            .loadEventsFromVersion(aggregateId, snapshot.getVersion());
                        
                        aggregate.replayEvents(eventsAfterSnapshot);
                        
                        return aggregate;
                    } catch (Exception e) {
                        log.error("Failed to deserialize snapshot for aggregate: {}", aggregateId, e);
                        return null;
                    }
                })
                .filter(Objects::nonNull);
                
        } catch (Exception e) {
            log.error("Failed to load snapshot for aggregate: {}", aggregateId, e);
            return Optional.empty();
        }
    }
    
    public boolean shouldCreateSnapshot(String aggregateId) {
        try {
            Optional<SnapshotEntity> latestSnapshot = snapshotRepository.findLatestByAggregateId(aggregateId);
            
            if (latestSnapshot.isEmpty()) {
                return true; // No snapshot exists
            }
            
            Long currentVersion = eventStoreService.getCurrentVersion(aggregateId);
            Long snapshotVersion = latestSnapshot.get().getVersion();
            
            return (currentVersion - snapshotVersion) >= SNAPSHOT_FREQUENCY;
            
        } catch (Exception e) {
            log.error("Failed to check snapshot condition for aggregate: {}", aggregateId, e);
            return false;
        }
    }
    
    @Scheduled(fixedRate = 300000) // Every 5 minutes
    public void createPeriodicSnapshots() {
        try {
            List<String> aggregateIds = eventStoreService.getActiveAggregateIds();
            
            for (String aggregateId : aggregateIds) {
                if (shouldCreateSnapshot(aggregateId)) {
                    // Load aggregate and create snapshot
                    createSnapshotForAggregate(aggregateId);
                }
            }
            
        } catch (Exception e) {
            log.error("Failed to create periodic snapshots", e);
        }
    }
    
    private void createSnapshotForAggregate(String aggregateId) {
        try {
            // This would need to be implemented based on aggregate type
            // For now, we'll skip the implementation details
            log.debug("Creating snapshot for aggregate: {}", aggregateId);
        } catch (Exception e) {
            log.error("Failed to create snapshot for aggregate: {}", aggregateId, e);
        }
    }
}

Event Sourcing provides several key benefits:

  1. Complete Audit Trail: Every change is recorded as an immutable event
  2. Temporal Queries: Query system state at any point in time
  3. Event Replay: Rebuild system state from events
  4. Integration: Events serve as integration points between bounded contexts
  5. Debugging: Full history of what happened and when

The pattern is particularly useful for:

  • Financial systems requiring audit trails
  • Systems needing complex business logic replay
  • Integration scenarios with eventual consistency
  • Analytics and reporting on historical data

This implementation provides a production-ready Event Sourcing solution with Spring Boot, including optimistic locking, event publishing, projections, and snapshot support for performance optimization.

Created by Eren Demir.