4.3. Paxos ve Raft Konsensüs Algoritmaları
Paxos
Temel Kavramlar
- Multi-Paxos ve Single-Paxos
- Öneren (Proposer), Kabul Eden (Acceptor), Öğrenen (Learner) roller
- Promise ve Accept mesajları
- Çoğunluk oylama (quorum-based voting)
Protokol Aşamaları
1. Hazırlık Aşaması (Prepare Phase)
- Teklif numarası üretimi
- Promise istekleri
- Çoğunluk kabulü
2. Kabul Aşaması (Accept Phase)
- Değer önerisi
- Accept istekleri
- Değerin taahhüdü
3. Öğrenme Aşaması (Learn Phase)
- Değerin yayılması
- Durum çoğaltma
- Tutarlılığın korunması
Kullanım Senaryoları
- Dağıtık kilitleme
- Konfigürasyon yönetimi
- Durum makinesi çoğaltma
- Lider seçimi
Spring Boot Paxos-Tarzı Uygulama
Dağıtık Konfigürasyon Servisi
java
@Component
public class PaxosStyleConfigurationService {
@Autowired
private List<ConfigurationNode> nodes;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private final AtomicLong proposalNumber = new AtomicLong(0);
public boolean proposeConfiguration(String key, String value) {
long currentProposal = proposalNumber.incrementAndGet();
// Phase 1: Prepare
PrepareResult prepareResult = prepare(key, currentProposal);
if (!prepareResult.isAccepted()) {
log.warn("Prepare phase failed for proposal: {}", currentProposal);
return false;
}
// Use highest accepted value if exists
String proposedValue = prepareResult.getHighestAcceptedValue() != null ?
prepareResult.getHighestAcceptedValue() : value;
// Phase 2: Accept
AcceptResult acceptResult = accept(key, currentProposal, proposedValue);
if (!acceptResult.isAccepted()) {
log.warn("Accept phase failed for proposal: {}", currentProposal);
return false;
}
// Phase 3: Learn
learn(key, proposedValue);
return true;
}
private PrepareResult prepare(String key, long proposalNumber) {
String promiseKey = "promise:" + key;
String acceptedKey = "accepted:" + key;
// Lua script for atomic prepare operation
String script = """
local promise_key = KEYS[1]
local accepted_key = KEYS[2]
local proposal_num = tonumber(ARGV[1])
local current_promise = redis.call('get', promise_key)
local current_accepted = redis.call('hmget', accepted_key, 'number', 'value')
if current_promise == false or tonumber(current_promise) < proposal_num then
redis.call('set', promise_key, proposal_num)
redis.call('expire', promise_key, 300)
local accepted_num = tonumber(current_accepted[1]) or 0
local accepted_val = current_accepted[2] or ''
return {1, accepted_num, accepted_val}
else
return {0, 0, ''}
end
""";
List<String> keys = Arrays.asList(promiseKey, acceptedKey);
List<String> args = Arrays.asList(String.valueOf(proposalNumber));
int acceptedCount = 0;
String highestAcceptedValue = null;
long highestAcceptedNumber = 0;
for (ConfigurationNode node : nodes) {
try {
List<Object> result = (List<Object>) node.getRedisTemplate()
.execute((RedisCallback<Object>) connection ->
connection.eval(script.getBytes(), ReturnType.MULTI,
keys.size(), keys.toArray(new String[0]),
args.toArray(new String[0]))
);
if (result != null && Long.valueOf(1).equals(result.get(0))) {
acceptedCount++;
Long acceptedNum = (Long) result.get(1);
String acceptedVal = (String) result.get(2);
if (acceptedNum > highestAcceptedNumber) {
highestAcceptedNumber = acceptedNum;
highestAcceptedValue = acceptedVal;
}
}
} catch (Exception e) {
log.error("Prepare failed for node: {}", node.getId(), e);
}
}
boolean accepted = acceptedCount > nodes.size() / 2;
return new PrepareResult(accepted, highestAcceptedValue, highestAcceptedNumber);
}
private AcceptResult accept(String key, long proposalNumber, String value) {
String promiseKey = "promise:" + key;
String acceptedKey = "accepted:" + key;
String script = """
local promise_key = KEYS[1]
local accepted_key = KEYS[2]
local proposal_num = tonumber(ARGV[1])
local value = ARGV[2]
local current_promise = redis.call('get', promise_key)
if current_promise and tonumber(current_promise) <= proposal_num then
redis.call('hmset', accepted_key, 'number', proposal_num, 'value', value)
redis.call('expire', accepted_key, 300)
return 1
else
return 0
end
""";
List<String> keys = Arrays.asList(promiseKey, acceptedKey);
List<String> args = Arrays.asList(String.valueOf(proposalNumber), value);
int acceptedCount = 0;
for (ConfigurationNode node : nodes) {
try {
Long result = (Long) node.getRedisTemplate()
.execute((RedisCallback<Long>) connection ->
connection.eval(script.getBytes(), ReturnType.INTEGER,
keys.size(), keys.toArray(new String[0]),
args.toArray(new String[0]))
);
if (Long.valueOf(1).equals(result)) {
acceptedCount++;
}
} catch (Exception e) {
log.error("Accept failed for node: {}", node.getId(), e);
}
}
boolean accepted = acceptedCount > nodes.size() / 2;
return new AcceptResult(accepted, acceptedCount);
}
private void learn(String key, String value) {
// Broadcast learned value to all nodes
String learnedKey = "config:" + key;
for (ConfigurationNode node : nodes) {
CompletableFuture.runAsync(() -> {
try {
node.getRedisTemplate().opsForValue().set(learnedKey, value);
log.info("Configuration learned on node {}: {} = {}",
node.getId(), key, value);
} catch (Exception e) {
log.error("Learn failed for node: {}", node.getId(), e);
}
});
}
}
// Helper classes
public static class PrepareResult {
private final boolean accepted;
private final String highestAcceptedValue;
private final long highestAcceptedNumber;
public PrepareResult(boolean accepted, String highestAcceptedValue,
long highestAcceptedNumber) {
this.accepted = accepted;
this.highestAcceptedValue = highestAcceptedValue;
this.highestAcceptedNumber = highestAcceptedNumber;
}
// Getters...
public boolean isAccepted() { return accepted; }
public String getHighestAcceptedValue() { return highestAcceptedValue; }
public long getHighestAcceptedNumber() { return highestAcceptedNumber; }
}
public static class AcceptResult {
private final boolean accepted;
private final int acceptedCount;
public AcceptResult(boolean accepted, int acceptedCount) {
this.accepted = accepted;
this.acceptedCount = acceptedCount;
}
public boolean isAccepted() { return accepted; }
public int getAcceptedCount() { return acceptedCount; }
}
}
@Component
public class ConfigurationNode {
private final String id;
private final RedisTemplate<String, Object> redisTemplate;
public ConfigurationNode(@Value("${node.id}") String id,
RedisTemplate<String, Object> redisTemplate) {
this.id = id;
this.redisTemplate = redisTemplate;
}
public String getId() { return id; }
public RedisTemplate<String, Object> getRedisTemplate() { return redisTemplate; }
}
Raft
Temel Bileşenler
Lider Seçimi (Leader Election)
- Dönem tabanlı oylama
- Heartbeat mekanizması
- Zaman aşımı yönetimi
- Log tutarlılık kontrolü
Log Çoğaltma (Log Replication)
- Sadece ekleme yapılan log
- Log eşleşme özelliği
- Log sıkıştırma
- Snapshot mekanizması
Güvenlik Mekanizmaları
- Sadece lider güncelleyebilir
- Log tutarlılığı
- Dönem doğrulama
- Çakışma çözümü
Uygulama Örnekleri
etcd
- Anahtar-değer deposu
- İzleme mekanizması
- Kira sistemi (lease system)
- Dağıtık kilitleme
Consul
- Servis keşfi
- Sağlık kontrolü
- Anahtar-değer deposu
- Dağıtık kilitleme
Kubernetes
- etcd entegrasyonu
- Durum yönetimi
- Konfigürasyon saklama
- Lider seçimi
Spring Boot Raft-Tarzı Uygulama
Dağıtık Durum Makinesi
java
@Component
public class RaftStyleStateMachine {
private final String nodeId;
private final List<RaftNode> cluster;
private volatile RaftState state = RaftState.FOLLOWER;
private volatile String currentLeader;
private volatile long currentTerm = 0;
private volatile String votedFor;
private final List<LogEntry> log = new CopyOnWriteArrayList<>();
private volatile long commitIndex = 0;
private volatile long lastApplied = 0;
// Leader state
private final Map<String, Long> nextIndex = new ConcurrentHashMap<>();
private final Map<String, Long> matchIndex = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3);
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public RaftStyleStateMachine(@Value("${raft.node.id}") String nodeId,
List<RaftNode> cluster) {
this.nodeId = nodeId;
this.cluster = cluster;
startElectionTimer();
}
@PostConstruct
public void initialize() {
// Initialize leader state for all nodes
for (RaftNode node : cluster) {
nextIndex.put(node.getId(), log.size() + 1);
matchIndex.put(node.getId(), 0L);
}
}
// Leader Election
public void startElection() {
synchronized (this) {
state = RaftState.CANDIDATE;
currentTerm++;
votedFor = nodeId;
log.info("Node {} starting election for term {}", nodeId, currentTerm);
}
int votes = 1; // Vote for self
long lastLogIndex = log.size();
long lastLogTerm = lastLogIndex > 0 ? log.get((int) lastLogIndex - 1).getTerm() : 0;
List<CompletableFuture<Boolean>> voteRequests = new ArrayList<>();
for (RaftNode node : cluster) {
if (!node.getId().equals(nodeId)) {
CompletableFuture<Boolean> voteRequest = CompletableFuture.supplyAsync(() ->
requestVote(node, currentTerm, lastLogIndex, lastLogTerm)
);
voteRequests.add(voteRequest);
}
}
// Wait for vote responses
CompletableFuture.allOf(voteRequests.toArray(new CompletableFuture[0]))
.thenRun(() -> {
long receivedVotes = voteRequests.stream()
.mapToLong(future -> {
try {
return future.get() ? 1 : 0;
} catch (Exception e) {
return 0;
}
})
.sum() + 1; // +1 for self vote
if (receivedVotes > cluster.size() / 2) {
becomeLeader();
} else {
becomeFollower();
}
});
}
private boolean requestVote(RaftNode node, long term, long lastLogIndex, long lastLogTerm) {
try {
VoteRequest request = new VoteRequest(nodeId, term, lastLogIndex, lastLogTerm);
VoteResponse response = node.requestVote(request);
if (response.getTerm() > currentTerm) {
synchronized (this) {
currentTerm = response.getTerm();
votedFor = null;
becomeFollower();
}
return false;
}
return response.isVoteGranted();
} catch (Exception e) {
log.error("Vote request failed for node: {}", node.getId(), e);
return false;
}
}
public VoteResponse handleVoteRequest(VoteRequest request) {
synchronized (this) {
if (request.getTerm() < currentTerm) {
return new VoteResponse(currentTerm, false);
}
if (request.getTerm() > currentTerm) {
currentTerm = request.getTerm();
votedFor = null;
becomeFollower();
}
boolean logUpToDate = isLogUpToDate(request.getLastLogIndex(), request.getLastLogTerm());
if ((votedFor == null || votedFor.equals(request.getCandidateId())) && logUpToDate) {
votedFor = request.getCandidateId();
resetElectionTimer();
return new VoteResponse(currentTerm, true);
}
return new VoteResponse(currentTerm, false);
}
}
private boolean isLogUpToDate(long lastLogIndex, long lastLogTerm) {
if (log.isEmpty()) {
return true;
}
LogEntry lastEntry = log.get(log.size() - 1);
if (lastLogTerm > lastEntry.getTerm()) {
return true;
}
return lastLogTerm == lastEntry.getTerm() && lastLogIndex >= log.size();
}
// Log Replication
private void becomeLeader() {
synchronized (this) {
state = RaftState.LEADER;
currentLeader = nodeId;
// Initialize leader state
for (RaftNode node : cluster) {
nextIndex.put(node.getId(), log.size() + 1);
matchIndex.put(node.getId(), 0L);
}
}
log.info("Node {} became leader for term {}", nodeId, currentTerm);
// Start sending heartbeats
startHeartbeat();
}
private void startHeartbeat() {
scheduler.scheduleWithFixedDelay(() -> {
if (state == RaftState.LEADER) {
sendHeartbeats();
}
}, 0, 50, TimeUnit.MILLISECONDS); // 50ms heartbeat interval
}
private void sendHeartbeats() {
for (RaftNode node : cluster) {
if (!node.getId().equals(nodeId)) {
CompletableFuture.runAsync(() -> sendAppendEntries(node));
}
}
}
private void sendAppendEntries(RaftNode node) {
long prevLogIndex = nextIndex.get(node.getId()) - 1;
long prevLogTerm = prevLogIndex > 0 ? log.get((int) prevLogIndex - 1).getTerm() : 0;
List<LogEntry> entries = new ArrayList<>();
if (nextIndex.get(node.getId()) <= log.size()) {
entries = log.subList((int) (nextIndex.get(node.getId()) - 1), log.size());
}
AppendEntriesRequest request = new AppendEntriesRequest(
currentTerm, nodeId, prevLogIndex, prevLogTerm, entries, commitIndex
);
try {
AppendEntriesResponse response = node.appendEntries(request);
if (response.getTerm() > currentTerm) {
synchronized (this) {
currentTerm = response.getTerm();
becomeFollower();
}
return;
}
if (response.isSuccess()) {
// Update nextIndex and matchIndex
nextIndex.put(node.getId(), prevLogIndex + entries.size() + 1);
matchIndex.put(node.getId(), prevLogIndex + entries.size());
// Update commit index
updateCommitIndex();
} else {
// Decrement nextIndex and retry
nextIndex.put(node.getId(), Math.max(1, nextIndex.get(node.getId()) - 1));
}
} catch (Exception e) {
log.error("AppendEntries failed for node: {}", node.getId(), e);
}
}
public AppendEntriesResponse handleAppendEntries(AppendEntriesRequest request) {
resetElectionTimer();
synchronized (this) {
if (request.getTerm() < currentTerm) {
return new AppendEntriesResponse(currentTerm, false);
}
if (request.getTerm() > currentTerm) {
currentTerm = request.getTerm();
votedFor = null;
}
becomeFollower();
currentLeader = request.getLeaderId();
// Log consistency check
if (request.getPrevLogIndex() > 0) {
if (log.size() < request.getPrevLogIndex() ||
log.get((int) request.getPrevLogIndex() - 1).getTerm() != request.getPrevLogTerm()) {
return new AppendEntriesResponse(currentTerm, false);
}
}
// Append entries
if (!request.getEntries().isEmpty()) {
// Remove conflicting entries
if (log.size() > request.getPrevLogIndex()) {
log.subList((int) request.getPrevLogIndex(), log.size()).clear();
}
// Append new entries
log.addAll(request.getEntries());
}
// Update commit index
if (request.getLeaderCommit() > commitIndex) {
commitIndex = Math.min(request.getLeaderCommit(), log.size());
applyLogEntries();
}
return new AppendEntriesResponse(currentTerm, true);
}
}
private void updateCommitIndex() {
if (state != RaftState.LEADER) return;
for (long index = commitIndex + 1; index <= log.size(); index++) {
long replicationCount = 1; // Leader itself
for (RaftNode node : cluster) {
if (!node.getId().equals(nodeId) && matchIndex.get(node.getId()) >= index) {
replicationCount++;
}
}
if (replicationCount > cluster.size() / 2 &&
log.get((int) index - 1).getTerm() == currentTerm) {
commitIndex = index;
}
}
applyLogEntries();
}
private void applyLogEntries() {
while (lastApplied < commitIndex) {
lastApplied++;
LogEntry entry = log.get((int) lastApplied - 1);
applyToStateMachine(entry);
}
}
private void applyToStateMachine(LogEntry entry) {
// Apply command to state machine
log.info("Applying log entry: {}", entry.getCommand());
// Store in Redis for persistence
String key = "state:" + entry.getCommand().getKey();
redisTemplate.opsForValue().set(key, entry.getCommand().getValue());
}
// Client operations
public boolean submitCommand(Command command) {
if (state != RaftState.LEADER) {
log.warn("Not leader, cannot submit command");
return false;
}
LogEntry entry = new LogEntry(currentTerm, command);
synchronized (this) {
log.add(entry);
}
// Replicate to followers
sendHeartbeats();
return true;
}
private void becomeFollower() {
state = RaftState.FOLLOWER;
currentLeader = null;
resetElectionTimer();
}
private void startElectionTimer() {
scheduler.schedule(() -> {
if (state != RaftState.LEADER) {
startElection();
}
}, getRandomElectionTimeout(), TimeUnit.MILLISECONDS);
}
private void resetElectionTimer() {
startElectionTimer();
}
private long getRandomElectionTimeout() {
return 150 + (long) (Math.random() * 150); // 150-300ms
}
// Helper classes and enums
public enum RaftState {
FOLLOWER, CANDIDATE, LEADER
}
public static class LogEntry {
private final long term;
private final Command command;
public LogEntry(long term, Command command) {
this.term = term;
this.command = command;
}
public long getTerm() { return term; }
public Command getCommand() { return command; }
}
public static class Command {
private final String key;
private final String value;
public Command(String key, String value) {
this.key = key;
this.value = value;
}
public String getKey() { return key; }
public String getValue() { return value; }
}
// Request/Response classes would be defined here...
}
Karşılaştırma
Paxos
- Daha karmaşık protokol
- Optimize edilmiş mesaj sayısı
- Daha esnek yapı
- Zor uygulama
Raft
- Daha basit protokol
- Daha fazla mesaj trafiği
- Daha anlaşılır yapı
- Kolay uygulama
Spring Boot ile Konsensüs Servisi
java
@Service
public class ConsensusService {
@Autowired
private RaftStyleStateMachine raftStateMachine;
@Autowired
private PaxosStyleConfigurationService paxosService;
public boolean distributeConfiguration(String key, String value,
ConsensusAlgorithm algorithm) {
switch (algorithm) {
case RAFT:
Command command = new Command(key, value);
return raftStateMachine.submitCommand(command);
case PAXOS:
return paxosService.proposeConfiguration(key, value);
default:
throw new IllegalArgumentException("Unsupported algorithm: " + algorithm);
}
}
public enum ConsensusAlgorithm {
RAFT, PAXOS
}
}
Bu uygulamalar, Paxos ve Raft algoritmalarının temel konseptlerini Spring Boot ekosisteminde nasıl uygulayabileceğinizi göstermektedir.