Skip to content

4.1. Strong vs Eventual Consistency

Strong Consistency (Kesin Tutarlılık)

Temel Özellikler

  • Her okuma işlemi, en son yazılan veriyi döndürür
  • Linearizability garantisi sağlar
  • Sıralı işlem yürütme (sequential execution)
  • Atomic işlemler

Kullanım Senaryoları

  • Finansal işlemler
  • Kullanıcı kimlik doğrulama
  • Kritik veri yönetimi
  • Distributed locking

Implementasyon Örnekleri

  • RDBMS: ACID transactions
  • ZooKeeper: Atomic operations
  • etcd: Strong consistency guarantees
  • Consul: Consistent reads

Spring Boot Strong Consistency Implementation

Database Transaction Management

java
@Configuration
@EnableTransactionManagement
public class StrongConsistencyConfig {
    
    @Bean
    public PlatformTransactionManager transactionManager(DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }
    
    @Bean
    public DataSource dataSource() {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:postgresql://localhost:5432/strongdb");
        config.setUsername("user");
        config.setPassword("password");
        
        // Strong consistency için connection ayarları
        config.setAutoCommit(false);
        config.setTransactionIsolation("TRANSACTION_SERIALIZABLE");
        config.setMaximumPoolSize(10);
        config.setConnectionTimeout(30000);
        
        return new HikariDataSource(config);
    }
}

Financial Transaction Service

java
@Service
@Transactional(isolation = Isolation.SERIALIZABLE)
public class BankTransactionService {
    
    @Autowired
    private AccountRepository accountRepository;
    
    @Autowired
    private TransactionHistoryRepository transactionHistoryRepository;
    
    @Transactional(rollbackFor = Exception.class)
    public TransferResult transferMoney(Long fromAccountId, Long toAccountId, 
                                       BigDecimal amount) {
        
        // Strong consistency garantisi için pessimistic locking
        Account fromAccount = accountRepository.findByIdWithLock(fromAccountId);
        Account toAccount = accountRepository.findByIdWithLock(toAccountId);
        
        if (fromAccount == null || toAccount == null) {
            throw new AccountNotFoundException("Account not found");
        }
        
        if (fromAccount.getBalance().compareTo(amount) < 0) {
            throw new InsufficientFundsException("Insufficient funds");
        }
        
        // Atomic operations
        fromAccount.setBalance(fromAccount.getBalance().subtract(amount));
        toAccount.setBalance(toAccount.getBalance().add(amount));
        
        // Aynı transaction içinde güncellemeler
        accountRepository.save(fromAccount);
        accountRepository.save(toAccount);
        
        // Transaction history
        TransactionHistory transaction = new TransactionHistory(
            fromAccountId, toAccountId, amount, Instant.now()
        );
        transactionHistoryRepository.save(transaction);
        
        return new TransferResult(true, "Transfer completed successfully");
    }
    
    @Transactional(readOnly = true, isolation = Isolation.SERIALIZABLE)
    public BigDecimal getAccountBalance(Long accountId) {
        Account account = accountRepository.findById(accountId)
            .orElseThrow(() -> new AccountNotFoundException("Account not found"));
        return account.getBalance();
    }
}

@Repository
public interface AccountRepository extends JpaRepository<Account, Long> {
    
    @Lock(LockModeType.PESSIMISTIC_WRITE)
    @Query("SELECT a FROM Account a WHERE a.id = :id")
    Account findByIdWithLock(@Param("id") Long id);
}

Distributed Locking with Redis

java
@Component
public class RedisDistributedLock {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    public boolean acquireLock(String lockKey, String requestId, long expireTime) {
        String script = """
            if redis.call('get', KEYS[1]) == false then
                return redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2])
            end
            return false
        """;
        
        List<String> keys = Arrays.asList(lockKey);
        List<String> args = Arrays.asList(requestId, String.valueOf(expireTime));
        
        Object result = redisTemplate.execute(
            (RedisCallback<Object>) connection -> 
                connection.eval(script.getBytes(), ReturnType.BOOLEAN, 
                    keys.size(), keys.toArray(new String[0]), 
                    args.toArray(new String[0]))
        );
        
        return Boolean.TRUE.equals(result);
    }
    
    public boolean releaseLock(String lockKey, String requestId) {
        String script = """
            if redis.call('get', KEYS[1]) == ARGV[1] then
                return redis.call('del', KEYS[1])
            else
                return 0
            end
        """;
        
        List<String> keys = Arrays.asList(lockKey);
        List<String> args = Arrays.asList(requestId);
        
        Object result = redisTemplate.execute(
            (RedisCallback<Object>) connection -> 
                connection.eval(script.getBytes(), ReturnType.INTEGER,
                    keys.size(), keys.toArray(new String[0]),
                    args.toArray(new String[0]))
        );
        
        return Long.valueOf(1).equals(result);
    }
}

@Service
public class CriticalOperationService {
    
    @Autowired
    private RedisDistributedLock distributedLock;
    
    public void performCriticalOperation(String operationId) {
        String lockKey = "critical_op:" + operationId;
        String requestId = UUID.randomUUID().toString();
        long expireTime = 30000; // 30 seconds
        
        if (distributedLock.acquireLock(lockKey, requestId, expireTime)) {
            try {
                // Critical operation with strong consistency
                executeCriticalLogic(operationId);
            } finally {
                distributedLock.releaseLock(lockKey, requestId);
            }
        } else {
            throw new ConcurrentOperationException("Operation already in progress");
        }
    }
    
    private void executeCriticalLogic(String operationId) {
        // Strong consistency gerektiren işlemler
        log.info("Executing critical operation: {}", operationId);
    }
}

Eventual Consistency (Sonlu Tutarlılık)

Temel Özellikler

  • Yazma işlemleri zamanla tüm çoğaltmalara ulaşır
  • Geçici tutarsızlıklara izin verir
  • Yüksek performans ve ölçeklenebilirlik
  • Asenkron replikasyon

Kullanım Senaryoları

  • Sosyal medya feed'leri
  • Content delivery networks
  • Analytics ve logging
  • Cache sistemleri

Implementasyon Örnekleri

  • Cassandra: Tunable consistency
  • DynamoDB: Eventually consistent reads
  • MongoDB: Read preferences
  • Redis: Asynchronous replication

Spring Boot Eventual Consistency Implementation

Event-Driven Architecture

java
@Configuration
@EnableJms
public class EventualConsistencyConfig {
    
    @Bean
    public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
        JmsTemplate template = new JmsTemplate(connectionFactory);
        template.setDeliveryPersistent(true);
        template.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
        return template;
    }
    
    @Bean
    public Queue userUpdateQueue() {
        return new ActiveMQQueue("user.updates");
    }
    
    @Bean
    public Queue profileUpdateQueue() {
        return new ActiveMQQueue("profile.updates");
    }
}

@Service
public class UserService {
    
    @Autowired
    private UserRepository userRepository;
    
    @Autowired
    private JmsTemplate jmsTemplate;
    
    @Autowired
    private ApplicationEventPublisher eventPublisher;
    
    @Transactional
    public User updateUser(Long userId, UserUpdateRequest request) {
        User user = userRepository.findById(userId)
            .orElseThrow(() -> new UserNotFoundException("User not found"));
        
        // Primary store update (immediate consistency)
        user.setName(request.getName());
        user.setEmail(request.getEmail());
        user.setUpdatedAt(Instant.now());
        User updatedUser = userRepository.save(user);
        
        // Eventual consistency için event publish
        UserUpdatedEvent event = new UserUpdatedEvent(
            userId, request.getName(), request.getEmail(), Instant.now()
        );
        
        // Asenkron processing
        eventPublisher.publishEvent(event);
        
        return updatedUser;
    }
}

@Component
public class UserEventHandler {
    
    @Autowired
    private ProfileService profileService;
    
    @Autowired
    private CacheService cacheService;
    
    @Autowired
    private SearchIndexService searchIndexService;
    
    @EventListener
    @Async
    public void handleUserUpdated(UserUpdatedEvent event) {
        try {
            // Profile service update (eventual consistency)
            profileService.updateUserProfile(event.getUserId(), 
                event.getName(), event.getEmail());
            
            // Cache invalidation
            cacheService.invalidateUserCache(event.getUserId());
            
            // Search index update
            searchIndexService.updateUserIndex(event.getUserId(), 
                event.getName(), event.getEmail());
            
        } catch (Exception e) {
            log.error("Failed to handle user update event: {}", event, e);
            // Retry mechanism or dead letter queue
        }
    }
}

Asynchronous Replication

java
@Service
public class AsyncReplicationService {
    
    @Autowired
    private PrimaryDataSource primaryDataSource;
    
    @Autowired
    private List<SecondaryDataSource> secondaryDataSources;
    
    @Autowired
    @Qualifier("asyncExecutor")
    private TaskExecutor asyncExecutor;
    
    @Transactional
    public void saveWithReplication(DataEntity entity) {
        // Primary write (immediate)
        primaryDataSource.save(entity);
        
        // Secondary writes (eventual consistency)
        for (SecondaryDataSource secondary : secondaryDataSources) {
            asyncExecutor.execute(() -> {
                try {
                    secondary.saveAsync(entity);
                } catch (Exception e) {
                    log.error("Failed to replicate to secondary: {}", 
                        secondary.getName(), e);
                    // Retry logic or compensating action
                }
            });
        }
    }
    
    @Scheduled(fixedDelay = 60000) // Her dakika
    public void reconcileData() {
        try {
            List<DataEntity> primaryData = primaryDataSource.findAll();
            
            for (SecondaryDataSource secondary : secondaryDataSources) {
                List<DataEntity> secondaryData = secondary.findAll();
                
                // Find inconsistencies
                List<DataEntity> missingInSecondary = findMissingEntities(
                    primaryData, secondaryData
                );
                
                // Repair inconsistencies
                for (DataEntity missing : missingInSecondary) {
                    secondary.saveAsync(missing);
                }
            }
        } catch (Exception e) {
            log.error("Data reconciliation failed", e);
        }
    }
    
    private List<DataEntity> findMissingEntities(List<DataEntity> primary, 
                                               List<DataEntity> secondary) {
        Set<Long> secondaryIds = secondary.stream()
            .map(DataEntity::getId)
            .collect(Collectors.toSet());
            
        return primary.stream()
            .filter(entity -> !secondaryIds.contains(entity.getId()))
            .collect(Collectors.toList());
    }
}

Cache-Aside Pattern with Eventual Consistency

java
@Service
public class CacheAsideService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private DatabaseService databaseService;
    
    @Value("${cache.ttl:300}") // 5 minutes default
    private long cacheTtl;
    
    public User getUser(Long userId) {
        String cacheKey = "user:" + userId;
        
        // Cache'den okuma
        User cachedUser = (User) redisTemplate.opsForValue().get(cacheKey);
        if (cachedUser != null) {
            return cachedUser;
        }
        
        // Cache miss - database'den okuma
        User user = databaseService.findUserById(userId);
        if (user != null) {
            // Cache'e yazma (eventual consistency)
            redisTemplate.opsForValue().set(cacheKey, user, 
                Duration.ofSeconds(cacheTtl));
        }
        
        return user;
    }
    
    public void updateUser(User user) {
        // Database update (immediate)
        databaseService.saveUser(user);
        
        // Cache invalidation (eventual consistency)
        String cacheKey = "user:" + user.getId();
        redisTemplate.delete(cacheKey);
        
        // Alternative: Update cache asynchronously
        CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(100); // Small delay for database propagation
                redisTemplate.opsForValue().set(cacheKey, user, 
                    Duration.ofSeconds(cacheTtl));
            } catch (Exception e) {
                log.error("Failed to update cache for user: {}", user.getId(), e);
            }
        });
    }
}

MongoDB Eventual Consistency Configuration

java
@Configuration
public class MongoEventualConsistencyConfig {
    
    @Bean
    public MongoClient mongoClient() {
        MongoClientSettings settings = MongoClientSettings.builder()
            .applyConnectionString(new ConnectionString(
                "mongodb://localhost:27017,localhost:27018,localhost:27019/mydb?replicaSet=rs0"
            ))
            .readPreference(ReadPreference.secondaryPreferred()) // Eventual consistency
            .writeConcern(WriteConcern.MAJORITY.withWTimeout(5000, TimeUnit.MILLISECONDS))
            .build();
            
        return MongoClients.create(settings);
    }
    
    @Bean
    public MongoTemplate mongoTemplate() {
        return new MongoTemplate(mongoClient(), "mydb");
    }
}

@Repository
public class EventualConsistentUserRepository {
    
    @Autowired
    private MongoTemplate mongoTemplate;
    
    public User save(User user) {
        // Write to primary with majority concern
        return mongoTemplate.save(user);
    }
    
    public User findById(String id) {
        // Read from secondary (eventual consistency)
        Query query = Query.query(Criteria.where("id").is(id));
        return mongoTemplate.findOne(query, User.class);
    }
    
    public List<User> findByStatus(String status) {
        // Secondary read for analytics queries
        Query query = Query.query(Criteria.where("status").is(status));
        return mongoTemplate.find(query, User.class);
    }
}

Social Media Feed Implementation

java
@Service
public class SocialMediaFeedService {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @Autowired
    private PostRepository postRepository;
    
    @Autowired
    private FollowerRepository followerRepository;
    
    @Async
    public void publishPost(Long userId, String content) {
        // Post'u database'e kaydet
        Post post = new Post(userId, content, Instant.now());
        Post savedPost = postRepository.save(post);
        
        // Followers'ları al
        List<Long> followers = followerRepository.findFollowerIds(userId);
        
        // Her follower'ın feed'ine ekle (eventual consistency)
        for (Long followerId : followers) {
            String feedKey = "feed:" + followerId;
            
            CompletableFuture.runAsync(() -> {
                try {
                    redisTemplate.opsForList().leftPush(feedKey, savedPost);
                    redisTemplate.opsForList().trim(feedKey, 0, 99); // Keep latest 100
                    redisTemplate.expire(feedKey, Duration.ofDays(7));
                } catch (Exception e) {
                    log.error("Failed to update feed for user: {}", followerId, e);
                }
            });
        }
    }
    
    public List<Post> getUserFeed(Long userId, int page, int size) {
        String feedKey = "feed:" + userId;
        
        // Redis'den feed'i al
        long start = (long) page * size;
        long end = start + size - 1;
        
        List<Object> feedPosts = redisTemplate.opsForList().range(feedKey, start, end);
        
        if (feedPosts == null || feedPosts.isEmpty()) {
            // Fallback: Database'den generate et
            return generateFeedFromDatabase(userId, page, size);
        }
        
        return feedPosts.stream()
            .map(obj -> (Post) obj)
            .collect(Collectors.toList());
    }
    
    private List<Post> generateFeedFromDatabase(Long userId, int page, int size) {
        List<Long> followingIds = followerRepository.findFollowingIds(userId);
        Pageable pageable = PageRequest.of(page, size);
        return postRepository.findByUserIdInOrderByCreatedAtDesc(followingIds, pageable);
    }
}

Bu implementasyonlar, strong consistency ve eventual consistency modellerinin Spring Boot ekosisteminde nasıl uygulanabileceğini göstermektedir. Her model farklı senaryolar için optimize edilmiştir.

Eren Demir tarafından oluşturulmuştur.