Consensus Algorithms
Consensus algorithms are fundamental protocols that enable multiple nodes in a distributed system to agree on a single value or decision, even in the presence of failures. They are crucial for maintaining consistency and coordinating actions across distributed systems.
Paxos Algorithm
Paxos is a family of protocols for solving consensus in an asynchronous network of unreliable processors. It ensures that a single value is chosen among proposed values, even when some participants fail.
Paxos Phases
- Prepare Phase: Proposer sends prepare requests with proposal numbers
- Promise Phase: Acceptors respond with promises and previously accepted values
- Accept Phase: Proposer sends accept requests with chosen value
- Accepted Phase: Acceptors accept the proposal and notify learners
Spring Boot Paxos Implementation
Paxos Node Configuration
@Configuration
@EnableScheduling
public class PaxosConfig {
@Value("${paxos.node.id}")
private String nodeId;
@Value("${paxos.cluster.nodes}")
private List<String> clusterNodes;
@Bean
public PaxosNode paxosNode() {
return PaxosNode.builder()
.nodeId(nodeId)
.clusterNodes(clusterNodes)
.build();
}
@Bean
public RestTemplate restTemplate() {
RestTemplate template = new RestTemplate();
template.setRequestFactory(new HttpComponentsClientHttpRequestFactory());
return template;
}
}
@Component
@Slf4j
public class PaxosProposer {
@Autowired
private PaxosNode node;
@Autowired
private RestTemplate restTemplate;
@Autowired
private PaxosStateRepository stateRepository;
private final AtomicLong proposalNumber = new AtomicLong(0);
public CompletableFuture<Boolean> propose(String key, String value) {
long proposalId = generateProposalNumber();
log.info("Starting Paxos proposal: {} -> {} with proposal ID: {}", key, value, proposalId);
return CompletableFuture
.supplyAsync(() -> preparePhase(key, proposalId))
.thenCompose(prepareResult -> {
if (prepareResult.isSuccessful()) {
String finalValue = selectValue(value, prepareResult.getPromises());
return acceptPhase(key, proposalId, finalValue);
} else {
return CompletableFuture.completedFuture(false);
}
});
}
private PrepareResult preparePhase(String key, long proposalId) {
log.debug("Phase 1: Prepare for key {} with proposal {}", key, proposalId);
PrepareRequest request = PrepareRequest.builder()
.key(key)
.proposalId(proposalId)
.proposerId(node.getNodeId())
.build();
List<CompletableFuture<PrepareResponse>> futures = new ArrayList<>();
for (String nodeUrl : node.getClusterNodes()) {
if (!nodeUrl.equals(node.getNodeId())) {
CompletableFuture<PrepareResponse> future = sendPrepareRequest(nodeUrl, request);
futures.add(future);
}
}
// Wait for majority responses
List<PrepareResponse> responses = new ArrayList<>();
int majority = (node.getClusterNodes().size() / 2) + 1;
try {
for (CompletableFuture<PrepareResponse> future : futures) {
PrepareResponse response = future.get(5, TimeUnit.SECONDS);
if (response.isPromise()) {
responses.add(response);
if (responses.size() >= majority) {
break;
}
}
}
} catch (Exception e) {
log.error("Prepare phase failed", e);
}
boolean successful = responses.size() >= majority;
log.debug("Prepare phase result: {} (received {} of {} required promises)",
successful, responses.size(), majority);
return PrepareResult.builder()
.successful(successful)
.promises(responses)
.build();
}
private CompletableFuture<Boolean> acceptPhase(String key, long proposalId, String value) {
log.debug("Phase 2: Accept for key {} with proposal {} and value {}", key, proposalId, value);
AcceptRequest request = AcceptRequest.builder()
.key(key)
.proposalId(proposalId)
.value(value)
.proposerId(node.getNodeId())
.build();
List<CompletableFuture<AcceptResponse>> futures = new ArrayList<>();
for (String nodeUrl : node.getClusterNodes()) {
if (!nodeUrl.equals(node.getNodeId())) {
CompletableFuture<AcceptResponse> future = sendAcceptRequest(nodeUrl, request);
futures.add(future);
}
}
// Wait for majority accepts
AtomicInteger acceptCount = new AtomicInteger(0);
int majority = (node.getClusterNodes().size() / 2) + 1;
try {
for (CompletableFuture<AcceptResponse> future : futures) {
AcceptResponse response = future.get(5, TimeUnit.SECONDS);
if (response.isAccepted()) {
acceptCount.incrementAndGet();
}
}
} catch (Exception e) {
log.error("Accept phase failed", e);
}
boolean consensus = acceptCount.get() >= majority;
if (consensus) {
// Store the agreed value
PaxosState state = PaxosState.builder()
.key(key)
.value(value)
.proposalId(proposalId)
.timestamp(Instant.now())
.build();
stateRepository.save(state);
log.info("Consensus reached for key {}: {} (proposal {})", key, value, proposalId);
} else {
log.warn("Failed to reach consensus for key {} (got {} accepts, needed {})",
key, acceptCount.get(), majority);
}
return CompletableFuture.completedFuture(consensus);
}
private CompletableFuture<PrepareResponse> sendPrepareRequest(String nodeUrl, PrepareRequest request) {
return CompletableFuture.supplyAsync(() -> {
try {
ResponseEntity<PrepareResponse> response = restTemplate.postForEntity(
nodeUrl + "/paxos/prepare", request, PrepareResponse.class);
return response.getBody();
} catch (Exception e) {
log.warn("Failed to send prepare request to {}", nodeUrl, e);
return PrepareResponse.builder().promise(false).build();
}
});
}
private CompletableFuture<AcceptResponse> sendAcceptRequest(String nodeUrl, AcceptRequest request) {
return CompletableFuture.supplyAsync(() -> {
try {
ResponseEntity<AcceptResponse> response = restTemplate.postForEntity(
nodeUrl + "/paxos/accept", request, AcceptResponse.class);
return response.getBody();
} catch (Exception e) {
log.warn("Failed to send accept request to {}", nodeUrl, e);
return AcceptResponse.builder().accepted(false).build();
}
});
}
private long generateProposalNumber() {
// Generate globally unique, increasing proposal numbers
long timestamp = System.currentTimeMillis();
long nodeHash = node.getNodeId().hashCode() & 0xFFFF;
return (timestamp << 16) | nodeHash;
}
private String selectValue(String proposedValue, List<PrepareResponse> promises) {
// Select value with highest proposal number from promises, or use proposed value
return promises.stream()
.filter(p -> p.getAcceptedValue() != null)
.max(Comparator.comparing(PrepareResponse::getAcceptedProposalId))
.map(PrepareResponse::getAcceptedValue)
.orElse(proposedValue);
}
}
@Component
@Slf4j
public class PaxosAcceptor {
@Autowired
private PaxosStateRepository stateRepository;
private final Map<String, Long> promisedProposals = new ConcurrentHashMap<>();
private final Map<String, AcceptedValue> acceptedValues = new ConcurrentHashMap<>();
public PrepareResponse handlePrepare(PrepareRequest request) {
String key = request.getKey();
long proposalId = request.getProposalId();
log.debug("Handling prepare request for key {} with proposal {}", key, proposalId);
Long currentPromise = promisedProposals.get(key);
if (currentPromise == null || proposalId > currentPromise) {
// Promise to this proposal
promisedProposals.put(key, proposalId);
AcceptedValue acceptedValue = acceptedValues.get(key);
PrepareResponse response = PrepareResponse.builder()
.promise(true)
.acceptedProposalId(acceptedValue != null ? acceptedValue.getProposalId() : null)
.acceptedValue(acceptedValue != null ? acceptedValue.getValue() : null)
.build();
log.debug("Promised proposal {} for key {}", proposalId, key);
return response;
} else {
log.debug("Rejected prepare for key {} (proposal {} <= current promise {})",
key, proposalId, currentPromise);
return PrepareResponse.builder().promise(false).build();
}
}
public AcceptResponse handleAccept(AcceptRequest request) {
String key = request.getKey();
long proposalId = request.getProposalId();
String value = request.getValue();
log.debug("Handling accept request for key {} with proposal {} and value {}",
key, proposalId, value);
Long currentPromise = promisedProposals.get(key);
if (currentPromise != null && proposalId >= currentPromise) {
// Accept this proposal
AcceptedValue acceptedValue = AcceptedValue.builder()
.proposalId(proposalId)
.value(value)
.timestamp(Instant.now())
.build();
acceptedValues.put(key, acceptedValue);
// Persist the accepted value
PaxosState state = PaxosState.builder()
.key(key)
.value(value)
.proposalId(proposalId)
.timestamp(Instant.now())
.build();
stateRepository.save(state);
log.debug("Accepted proposal {} for key {} with value {}", proposalId, key, value);
return AcceptResponse.builder().accepted(true).build();
} else {
log.debug("Rejected accept for key {} (proposal {} < promised {})",
key, proposalId, currentPromise);
return AcceptResponse.builder().accepted(false).build();
}
}
}
@RestController
@RequestMapping("/paxos")
@Slf4j
public class PaxosController {
@Autowired
private PaxosAcceptor acceptor;
@Autowired
private PaxosProposer proposer;
@PostMapping("/prepare")
public ResponseEntity<PrepareResponse> prepare(@RequestBody PrepareRequest request) {
PrepareResponse response = acceptor.handlePrepare(request);
return ResponseEntity.ok(response);
}
@PostMapping("/accept")
public ResponseEntity<AcceptResponse> accept(@RequestBody AcceptRequest request) {
AcceptResponse response = acceptor.handleAccept(request);
return ResponseEntity.ok(response);
}
@PostMapping("/propose")
public ResponseEntity<ProposeResponse> propose(@RequestBody ProposeRequest request) {
CompletableFuture<Boolean> result = proposer.propose(request.getKey(), request.getValue());
try {
boolean success = result.get(10, TimeUnit.SECONDS);
return ResponseEntity.ok(ProposeResponse.builder()
.success(success)
.message(success ? "Consensus reached" : "Failed to reach consensus")
.build());
} catch (Exception e) {
return ResponseEntity.ok(ProposeResponse.builder()
.success(false)
.message("Proposal timeout or failed: " + e.getMessage())
.build());
}
}
}
Paxos Data Models
@Entity
@Table(name = "paxos_state")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class PaxosState {
@Id
private String key;
@Column(name = "agreed_value")
private String value;
@Column(name = "proposal_id")
private Long proposalId;
@CreationTimestamp
private Instant timestamp;
}
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class PrepareRequest {
private String key;
private Long proposalId;
private String proposerId;
}
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class PrepareResponse {
private boolean promise;
private Long acceptedProposalId;
private String acceptedValue;
}
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class AcceptRequest {
private String key;
private Long proposalId;
private String value;
private String proposerId;
}
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class AcceptResponse {
private boolean accepted;
}
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class AcceptedValue {
private Long proposalId;
private String value;
private Instant timestamp;
}
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class PrepareResult {
private boolean successful;
private List<PrepareResponse> promises;
}
Raft Algorithm
Raft is a consensus algorithm designed to be more understandable than Paxos while providing the same safety guarantees. It uses leader election and log replication to achieve consensus.
Raft Components
- Leader Election: Elects a single leader to manage log replication
- Log Replication: Leader receives client requests and replicates to followers
- Safety: Ensures that committed entries are durable and consistent
Spring Boot Raft Implementation
Raft Node Implementation
@Component
@Slf4j
public class RaftNode {
@Value("${raft.node.id}")
private String nodeId;
@Value("${raft.cluster.nodes}")
private List<String> clusterNodes;
@Autowired
private RaftLogRepository logRepository;
@Autowired
private RaftStateRepository stateRepository;
@Autowired
private RestTemplate restTemplate;
// Raft state
private volatile RaftState state = RaftState.FOLLOWER;
private volatile String currentLeader;
private volatile long currentTerm = 0;
private volatile String votedFor;
private volatile long commitIndex = 0;
private volatile long lastApplied = 0;
// Leader state
private final Map<String, Long> nextIndex = new ConcurrentHashMap<>();
private final Map<String, Long> matchIndex = new ConcurrentHashMap<>();
// Election timing
private volatile long lastHeartbeat = System.currentTimeMillis();
private final Random random = new Random();
@PostConstruct
public void initialize() {
loadPersistedState();
startElectionTimer();
startHeartbeatTimer();
}
@Scheduled(fixedDelay = 50) // Check every 50ms
public void electionTimeoutCheck() {
if (state == RaftState.FOLLOWER || state == RaftState.CANDIDATE) {
long electionTimeout = 150 + random.nextInt(150); // 150-300ms
if (System.currentTimeMillis() - lastHeartbeat > electionTimeout) {
startElection();
}
}
}
@Scheduled(fixedDelay = 50) // Send heartbeats every 50ms
public void leaderHeartbeat() {
if (state == RaftState.LEADER) {
sendHeartbeats();
}
}
private void startElection() {
log.info("Starting election for term {}", currentTerm + 1);
state = RaftState.CANDIDATE;
currentTerm++;
votedFor = nodeId;
lastHeartbeat = System.currentTimeMillis();
persistState();
// Vote for self
AtomicInteger voteCount = new AtomicInteger(1);
int majority = (clusterNodes.size() / 2) + 1;
// Request votes from other nodes
List<CompletableFuture<VoteResponse>> voteRequests = new ArrayList<>();
for (String node : clusterNodes) {
if (!node.equals(nodeId)) {
CompletableFuture<VoteResponse> voteRequest = requestVote(node);
voteRequests.add(voteRequest);
}
}
// Count votes
CompletableFuture.allOf(voteRequests.toArray(new CompletableFuture[0]))
.whenComplete((result, throwable) -> {
for (CompletableFuture<VoteResponse> request : voteRequests) {
try {
VoteResponse response = request.get();
if (response.isVoteGranted()) {
voteCount.incrementAndGet();
}
} catch (Exception e) {
log.warn("Vote request failed", e);
}
}
if (voteCount.get() >= majority && state == RaftState.CANDIDATE) {
becomeLeader();
} else {
becomeFollower(currentTerm, null);
}
});
}
private CompletableFuture<VoteResponse> requestVote(String targetNode) {
return CompletableFuture.supplyAsync(() -> {
try {
long lastLogIndex = getLastLogIndex();
long lastLogTerm = getLastLogTerm();
VoteRequest request = VoteRequest.builder()
.term(currentTerm)
.candidateId(nodeId)
.lastLogIndex(lastLogIndex)
.lastLogTerm(lastLogTerm)
.build();
ResponseEntity<VoteResponse> response = restTemplate.postForEntity(
targetNode + "/raft/vote", request, VoteResponse.class);
return response.getBody();
} catch (Exception e) {
log.warn("Failed to request vote from {}", targetNode, e);
return VoteResponse.builder().voteGranted(false).term(currentTerm).build();
}
});
}
private void becomeLeader() {
log.info("Became leader for term {}", currentTerm);
state = RaftState.LEADER;
currentLeader = nodeId;
// Initialize leader state
long lastLogIndex = getLastLogIndex();
for (String node : clusterNodes) {
if (!node.equals(nodeId)) {
nextIndex.put(node, lastLogIndex + 1);
matchIndex.put(node, 0L);
}
}
// Send initial heartbeats
sendHeartbeats();
}
private void becomeFollower(long term, String leader) {
log.debug("Became follower for term {}, leader: {}", term, leader);
state = RaftState.FOLLOWER;
currentTerm = term;
currentLeader = leader;
votedFor = null;
lastHeartbeat = System.currentTimeMillis();
persistState();
}
private void sendHeartbeats() {
for (String node : clusterNodes) {
if (!node.equals(nodeId)) {
sendAppendEntries(node, Collections.emptyList());
}
}
}
public CompletableFuture<Boolean> appendEntry(String command) {
if (state != RaftState.LEADER) {
return CompletableFuture.completedFuture(false);
}
// Create log entry
RaftLogEntry entry = RaftLogEntry.builder()
.term(currentTerm)
.index(getLastLogIndex() + 1)
.command(command)
.timestamp(Instant.now())
.build();
logRepository.save(entry);
log.info("Leader appended entry: {} at index {}", command, entry.getIndex());
// Replicate to followers
return replicateEntry(entry);
}
private CompletableFuture<Boolean> replicateEntry(RaftLogEntry entry) {
List<CompletableFuture<Boolean>> replicationFutures = new ArrayList<>();
for (String node : clusterNodes) {
if (!node.equals(nodeId)) {
CompletableFuture<Boolean> future = sendAppendEntries(node, List.of(entry));
replicationFutures.add(future);
}
}
return CompletableFuture.allOf(replicationFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> {
// Count successful replications
long successCount = replicationFutures.stream()
.mapToLong(f -> {
try {
return f.get() ? 1 : 0;
} catch (Exception e) {
return 0;
}
})
.sum();
// Include leader's own copy
successCount++;
int majority = (clusterNodes.size() / 2) + 1;
boolean committed = successCount >= majority;
if (committed) {
commitIndex = entry.getIndex();
log.info("Entry committed at index {}: {}", entry.getIndex(), entry.getCommand());
}
return committed;
});
}
private CompletableFuture<Boolean> sendAppendEntries(String targetNode, List<RaftLogEntry> entries) {
return CompletableFuture.supplyAsync(() -> {
try {
long prevLogIndex = nextIndex.get(targetNode) - 1;
long prevLogTerm = getLogTerm(prevLogIndex);
AppendEntriesRequest request = AppendEntriesRequest.builder()
.term(currentTerm)
.leaderId(nodeId)
.prevLogIndex(prevLogIndex)
.prevLogTerm(prevLogTerm)
.entries(entries)
.leaderCommit(commitIndex)
.build();
ResponseEntity<AppendEntriesResponse> response = restTemplate.postForEntity(
targetNode + "/raft/append", request, AppendEntriesResponse.class);
AppendEntriesResponse responseBody = response.getBody();
if (responseBody.isSuccess()) {
if (!entries.isEmpty()) {
nextIndex.put(targetNode, entries.get(entries.size() - 1).getIndex() + 1);
matchIndex.put(targetNode, entries.get(entries.size() - 1).getIndex());
}
return true;
} else {
// Decrement nextIndex and retry
nextIndex.put(targetNode, Math.max(1, nextIndex.get(targetNode) - 1));
return false;
}
} catch (Exception e) {
log.warn("Failed to send append entries to {}", targetNode, e);
return false;
}
});
}
public VoteResponse handleVoteRequest(VoteRequest request) {
if (request.getTerm() > currentTerm) {
becomeFollower(request.getTerm(), null);
}
boolean voteGranted = request.getTerm() == currentTerm &&
(votedFor == null || votedFor.equals(request.getCandidateId())) &&
isLogUpToDate(request.getLastLogIndex(), request.getLastLogTerm());
if (voteGranted) {
votedFor = request.getCandidateId();
lastHeartbeat = System.currentTimeMillis();
persistState();
}
log.debug("Vote request from {}: {} (term {}, my term {})",
request.getCandidateId(), voteGranted ? "GRANTED" : "DENIED",
request.getTerm(), currentTerm);
return VoteResponse.builder()
.term(currentTerm)
.voteGranted(voteGranted)
.build();
}
public AppendEntriesResponse handleAppendEntries(AppendEntriesRequest request) {
if (request.getTerm() > currentTerm) {
becomeFollower(request.getTerm(), request.getLeaderId());
}
if (request.getTerm() == currentTerm) {
becomeFollower(currentTerm, request.getLeaderId());
}
// Check log consistency
if (request.getPrevLogIndex() > 0) {
if (getLastLogIndex() < request.getPrevLogIndex() ||
getLogTerm(request.getPrevLogIndex()) != request.getPrevLogTerm()) {
return AppendEntriesResponse.builder()
.term(currentTerm)
.success(false)
.build();
}
}
// Append entries
if (!request.getEntries().isEmpty()) {
for (RaftLogEntry entry : request.getEntries()) {
logRepository.save(entry);
}
}
// Update commit index
if (request.getLeaderCommit() > commitIndex) {
commitIndex = Math.min(request.getLeaderCommit(), getLastLogIndex());
}
return AppendEntriesResponse.builder()
.term(currentTerm)
.success(true)
.build();
}
private long getLastLogIndex() {
return logRepository.findTopByOrderByIndexDesc()
.map(RaftLogEntry::getIndex)
.orElse(0L);
}
private long getLastLogTerm() {
return logRepository.findTopByOrderByIndexDesc()
.map(RaftLogEntry::getTerm)
.orElse(0L);
}
private long getLogTerm(long index) {
if (index == 0) return 0;
return logRepository.findByIndex(index)
.map(RaftLogEntry::getTerm)
.orElse(0L);
}
private boolean isLogUpToDate(long lastLogIndex, long lastLogTerm) {
long myLastLogTerm = getLastLogTerm();
long myLastLogIndex = getLastLogIndex();
return lastLogTerm > myLastLogTerm ||
(lastLogTerm == myLastLogTerm && lastLogIndex >= myLastLogIndex);
}
private void persistState() {
RaftNodeState nodeState = RaftNodeState.builder()
.nodeId(nodeId)
.currentTerm(currentTerm)
.votedFor(votedFor)
.build();
stateRepository.save(nodeState);
}
private void loadPersistedState() {
stateRepository.findByNodeId(nodeId).ifPresent(state -> {
this.currentTerm = state.getCurrentTerm();
this.votedFor = state.getVotedFor();
});
}
// Getters
public RaftState getState() { return state; }
public String getCurrentLeader() { return currentLeader; }
public long getCurrentTerm() { return currentTerm; }
}
@RestController
@RequestMapping("/raft")
@Slf4j
public class RaftController {
@Autowired
private RaftNode raftNode;
@PostMapping("/vote")
public ResponseEntity<VoteResponse> vote(@RequestBody VoteRequest request) {
VoteResponse response = raftNode.handleVoteRequest(request);
return ResponseEntity.ok(response);
}
@PostMapping("/append")
public ResponseEntity<AppendEntriesResponse> appendEntries(@RequestBody AppendEntriesRequest request) {
AppendEntriesResponse response = raftNode.handleAppendEntries(request);
return ResponseEntity.ok(response);
}
@PostMapping("/command")
public ResponseEntity<CommandResponse> submitCommand(@RequestBody CommandRequest request) {
if (raftNode.getState() != RaftState.LEADER) {
return ResponseEntity.status(HttpStatus.REDIRECT)
.header("Location", raftNode.getCurrentLeader() + "/raft/command")
.body(CommandResponse.builder()
.success(false)
.message("Not leader, redirect to: " + raftNode.getCurrentLeader())
.build());
}
CompletableFuture<Boolean> result = raftNode.appendEntry(request.getCommand());
try {
boolean success = result.get(5, TimeUnit.SECONDS);
return ResponseEntity.ok(CommandResponse.builder()
.success(success)
.message(success ? "Command committed" : "Failed to commit command")
.build());
} catch (Exception e) {
return ResponseEntity.ok(CommandResponse.builder()
.success(false)
.message("Command timeout: " + e.getMessage())
.build());
}
}
@GetMapping("/status")
public ResponseEntity<RaftStatus> getStatus() {
RaftStatus status = RaftStatus.builder()
.nodeId(raftNode.getCurrentLeader())
.state(raftNode.getState().toString())
.term(raftNode.getCurrentTerm())
.leader(raftNode.getCurrentLeader())
.build();
return ResponseEntity.ok(status);
}
}
Raft Data Models
@Entity
@Table(name = "raft_log")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RaftLogEntry {
@Id
private Long index;
private Long term;
@Column(columnDefinition = "TEXT")
private String command;
@CreationTimestamp
private Instant timestamp;
}
@Entity
@Table(name = "raft_state")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RaftNodeState {
@Id
private String nodeId;
private Long currentTerm;
private String votedFor;
@UpdateTimestamp
private Instant updatedAt;
}
public enum RaftState {
FOLLOWER, CANDIDATE, LEADER
}
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class VoteRequest {
private Long term;
private String candidateId;
private Long lastLogIndex;
private Long lastLogTerm;
}
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class VoteResponse {
private Long term;
private boolean voteGranted;
}
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class AppendEntriesRequest {
private Long term;
private String leaderId;
private Long prevLogIndex;
private Long prevLogTerm;
private List<RaftLogEntry> entries;
private Long leaderCommit;
}
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class AppendEntriesResponse {
private Long term;
private boolean success;
}
Configuration and Best Practices
Application Properties
# application.yml for Paxos
paxos:
node:
id: "node-1"
cluster:
nodes:
- "http://node-1:8080"
- "http://node-2:8080"
- "http://node-3:8080"
timeout: 5000
retry:
max-attempts: 3
backoff: 1000
---
# application.yml for Raft
raft:
node:
id: "node-1"
cluster:
nodes:
- "http://node-1:8080"
- "http://node-2:8080"
- "http://node-3:8080"
election:
timeout-min: 150
timeout-max: 300
heartbeat:
interval: 50
spring:
datasource:
url: jdbc:h2:mem:consensus
driver-class-name: org.h2.Driver
jpa:
hibernate:
ddl-auto: create-drop
show-sql: false
Consensus algorithms like Paxos and Raft are essential for building reliable distributed systems. Paxos provides strong theoretical foundations but can be complex to implement correctly. Raft offers a more understandable alternative while maintaining the same safety guarantees. Both algorithms ensure that distributed systems can reach agreement even in the presence of failures.