-
Notifications
You must be signed in to change notification settings - Fork 24
raft: consensus protocol design doc #100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
5a7a27b
df185c3
9657559
099498a
43e600e
bd90b1f
7d26490
5923f4f
02239c8
2b900c0
8527df5
b18332a
b11486e
4e559a5
7eef110
9b209f1
17a8e1a
1cb7555
1209205
8d25041
01d0c32
d354ab9
2e71890
2a8c4d9
c32828c
6815e8c
281906e
b180d33
981d873
fd1f272
a16b909
c312812
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,210 @@ | ||
| # The consensus | ||
|
|
||
| Before talking about consensus, we need to discuss some logistics based on how the systems can co-exist. | ||
|
|
||
| * Communication: Distributed systems need a method to communicate between each other. Remote Procedure Calls is the mechanism using which a standalone server can talk to another. The existing `network` layer of the database will handle all the communication between servers. However, the messages to be passed are decided by the raft layer. | ||
| * Security: Access control mechanisms need to be in place to decide on access to functions in the servers based on their state (leader, follower, candidate) | ||
| * Routing to leader: One of the issues with a varying leader is for the clients to know which IP address to contact for the service. We can solve this problem by advertising any/all IPs of the cluster and the client returns the IP of the leader if its not the leader. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand. Any node knows all IPs and can answer that question. Why should follower nodes only respond with the leader IP?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At any point of time when raft is in the "working phase" (which is the leader is up, no election is happenning), there are only 2 kinds of nodes; the leader and the follower. So if the client doesn't hit the leader, it means it hit the follower. |
||
| * The servers will be implemented in the `interal/node` folders which will import the raft API and perform their functions. | ||
|
|
||
| Maintaining consensus is one of the major parts of a distributed system. To know to have achieved a stable system, we need the following two parts of implementation. | ||
|
|
||
| ## The Raft protocol | ||
|
|
||
| The raft protocol will be implemented in `internal/raft` and will implement APIs that each node can call. | ||
|
|
||
| Raft is an algorithm to handle replicated log, and we maintain the "log" of the SQL statements applied on a DB and have a completely replicated cluster. | ||
|
|
||
| #### General Implementation basics: | ||
| * All RPC calls are done parallely to obtain the best performance. | ||
| * Request retries are done in case of network failures. | ||
| * Raft does not assume network preserves ordering of the packets. | ||
| * The `raft/raft.go` file has all the state parameters and general functions that raft uses. | ||
| * A raft server is implemented as: | ||
| ``` | ||
| type simpleServer struct { | ||
| node *Node | ||
| cluster Cluster | ||
| onReplication ReplicationHandler | ||
| log zerolog.Logger | ||
| } | ||
|
|
||
| type Node struct { | ||
| State string | ||
|
|
||
| PersistentState *PersistentState | ||
| VolatileState *VolatileState | ||
| VolatileStateLeader *VolatileStateLeader | ||
| } | ||
|
|
||
|
Comment on lines
+23
to
+39
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is there implementation in a design doc?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well if I add the API, they struct's would be a good reference to have, was my thought.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Keep in mind that you are writing a design doc, not an implementation guide. Such a concrete struct may have heavy implications on the rest of the implementation. The point of a design doc is, to just outline the components and how they interact, and leave the concrete implementation to the developer. |
||
| // PersistentState describes the persistent state data on a raft node. | ||
| type PersistentState struct { | ||
| CurrentTerm int32 | ||
| VotedFor id.ID // VotedFor is nil at init, and id.ID of the node after voting is complete. | ||
| Log []*message.LogData | ||
|
|
||
| SelfID id.ID | ||
| LeaderID id.ID // LeaderID is nil at init, and the id.ID of the node after the leader is elected. | ||
| PeerIPs []network.Conn // PeerIPs has the connection variables of all the other nodes in the cluster. | ||
| mu sync.Mutex | ||
| } | ||
|
|
||
| // VolatileState describes the volatile state data on a raft node. | ||
| type VolatileState struct { | ||
| CommitIndex int | ||
| LastApplied int | ||
| } | ||
|
|
||
| // VolatileStateLeader describes the volatile state data that exists on a raft leader. | ||
| type VolatileStateLeader struct { | ||
| NextIndex []int // Holds the nextIndex value for each of the followers in the cluster. | ||
| MatchIndex []int // Holds the matchIndex value for each of the followers in the cluster. | ||
| } | ||
| ``` | ||
| * A raft node must always be listening to incoming requests; in absence of any, it starts a leader election after a timeout. This is implemented as: | ||
| ``` | ||
| go func() { | ||
| for { | ||
| // Parallely start waiting for incoming data. | ||
| conn, msg, err := s.cluster.Receive(ctx) | ||
| liveChan <- &incomingData{ | ||
| conn, | ||
| msg, | ||
| } | ||
| if err != nil { | ||
| return | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| for { | ||
| // If any sort of request (heartbeat,appendEntries,requestVote) | ||
| // isn't received by the server(node) it restarts leader election. | ||
| select { | ||
| case <-randomTimer().C: | ||
| err = StartElection(node) | ||
| if err != nil { | ||
| return | ||
| } | ||
| case data := <-liveChan: | ||
| err = processIncomingData(data, node) | ||
| if err != nil { | ||
| return | ||
| } | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| A raft server may be in any of the 3 states; leader, follower or candidate. All requests are serviced through the leader and it then decides how and if the logs must be replicated in the follower machines. The raft protocol has 3 almost independent modules: | ||
tsatke marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| 1. Leader Election | ||
| 2. Log Replication | ||
| 3. Safety | ||
|
|
||
| A detailed description of all the modules and their implementation follow: | ||
|
|
||
| ### Leader Election | ||
|
|
||
| #### Spec | ||
| * Startup: All servers start in the follower state and begin by requesting votes to be elected as a leader. | ||
tsatke marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| * Pre-election: The server increments its `currentTerm` by one, changes to `candidate` state and sends out `RequestVotes` RPC parallely to all the peers. | ||
| * Vote condition: FCFS basis. If there was no request to the server, it votes for itself. | ||
| * Election timeout: A preset time for which the server waits to see if a peer requested a vote. It is randomly chosen between 150-300ms. | ||
| * Election is repeated after an election timeout until: | ||
| 1. The server wins the election | ||
| 2. A peer establishes itself as leader. | ||
| 3. Election timer times out or a split vote occurs (leading to no leader) and the process will be repeated. | ||
| * Election win: Majority votes in the term. (More details in safety) The state of the winner is now `Leader` and the others are `Followers`. | ||
| * The term problem: Current terms are exchanged when-ever servers communicate; if one server’s current term is smaller than the other’s, then it updates its current term to the larger value. If a candidate or leader discovers that its term is out of date,it immediately reverts to follower state. If a server receives a request with a stale term number, itrejects the request. All terms which are not the current terms are considered _out of date_. | ||
| * Maintaining leaders reign: The leader sends `heartbeats` to all servers to establish its reign. This also checks whether other servers are alive based on the response and informs other servers that the leader still is alive too. If the servers do not get timely heartbeat messages, they transform from the `follower` state to `candidate` state. | ||
| * Transition from leader's normal working state to Election happens when a leader fails. | ||
| * Maintaining sanity: While waiting for votes, if a `AppendEntriesRPC` is received by the server, and the term of the leader is greater than of equal to the waiting node's term, the leader is considered to be legitimate and the waiter becomes a follower of the leader. If the term of the leader is lesser, it is rejected. | ||
| * The split vote problem: Though not that common, split votes can occur. To make sure this doesn't continue indefinitely, election timeouts are randomised, making the split votes less probable. | ||
|
|
||
|
|
||
| #### Implementation | ||
|
|
||
| * The implementation of leader election will span over `raft/leader_election.go`, request votes over `raft/request_votes.go` and append entries over `append_entries.go`. | ||
| * The raft module will provide a `StartElection` function that enables a node to begin election. This function just begins the election and doesnt return any result of the election. The decision of the election will be handled by the votes and each server independently. | ||
| * The Leader node is the only one to know its the leader in the beginning. It realises it has obtained the majority votes, and starts behaving like the leader node. During this period, other nodes wait for a possible leader and begin to proceed in the candidate state by advancing to the next term unless the leader contacts them. | ||
|
|
||
| ### Log Replication | ||
|
|
||
| #### Spec | ||
|
|
||
| * Pre-log replication: Once a leader is elected, it starts servicing the client. The leader appends a new request to its `New Entry` log then issues `AppendEntriesRPC` in parallel to all its peers. | ||
| * Successful log: When all logs have been applied successfully to all follower machines, the leader applies the entry to its state machine and returns the result to the client. | ||
| * Repeating `AppendEntries`: `AppendEntriesRPC` are repeated indefinitely until all followers eventually store all log entries. | ||
| * Log entry storage: Log entries are a queue of state machine commands which are applied to that particular state machine. Log entries are associated with a term number to indicate the term of application of that log along with an integer index to identify a particular logs position. | ||
| * A committed entry: When a leader decides that the log entry is safe to apply to other state machines, that entry is called committed. All committed entries are durable and _will eventually be executed_ by all state machines. | ||
| * An entry -> Committed entry: A log entry is called committed once its replicated on the majority of the servers in the cluster. Once an entry is committed, it commits all the previous entries in the leaders log, including the entries created by the previous leaders. | ||
| * The leader keeps track of the highest known index that it knows is committed and it is included in all the future `AppendEntriesRPC` (including heartbeats) to inform other servers. | ||
| * Theres some issue about log committing - "A log entry is committed once the leader that created the entry has replicated it on a majority of the servers" and " Once a follower learns that a log entry is committed, it applies theentry to its local state machine (in log order)." are not clear whether replicating and applying to state machine are the same. If they are its kind of a contradiction, else "application" can mean executing the STMT in the DB in our case. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. resolve the issue and add one or more solutions to this doc |
||
| * Log matching property: | ||
| * If two entries in different logs have the same index and term, then they store the same command. | ||
| * If two entries in different logs have the same index and term, then the logs are identical in all preceding entries. | ||
| * When sending an AppendEntriesRPC, the leader includes the index and term of the entry in its log that immediately precedes the new entries. If the follower does not find an entry in its log with the same index and term, then it refuses the new entries. This helps in log matching. Which implies, a successful `AppendEntriesRPC` means a synced log. | ||
| * Leader crashes inducing inconsistencies in logs: In Raft, the leader handles inconsistencies by forcing the followers’ logs to duplicate its own (the leader's). To do so, the leader must find the latest log entry where the two logs agree, delete any entries in the follower’s log after that point, and send the follower all of the leader’s entries after that point. All of these actions happen in response to the consistency check performed by AppendEntries RPCs. Meaning, the leader checks for the consistency by maintaining a `nextIndex` value and dropping it down and sending `AppendEntriesRPC` (which does the consistency check and fails unless they're same) until a success is returned. These "check `AppendEntries` can be empty to save BandWidth(BW). The follower can also help here by sending the smallest agreeing index in the first RPC instead of the leader probing until it reaches the index. | ||
| * Leader append-only property: A leader never overwrites or deletes entries in its own log. | ||
|
|
||
| #### Implementation | ||
|
|
||
| * Log replication implementation will span over the `raft/log_replication.go` file. | ||
|
|
||
| ### Safety | ||
|
|
||
| This module ensures that the above protocol runs as expected, eliminating the corner cases. | ||
|
|
||
| #### Spec | ||
|
|
||
| * Election restriction: Raft uses the voting process to prevent a candidate from winning an election unless its log contains all committed entries. A candidate must contact a majority of the cluster in order to be elected, which means that every committed entry must be present in at least one of those servers. If the candidate’s log is at least as up-to-date as any other log in that majority, then it will hold all the committed entries. The `RequestVote` RPC implements thisrestriction: the RPC includes information about the candidate’s log, and the voter denies its vote if its own log is more up-to-date than that of the candidate. | ||
| * The decision of _which is the more updated_ log is based on the index and the term of the last log. Term gets priority, if terms are same, index is checked for precedence. | ||
| * Committing from previous term: Raft never commits log entries from previous terms by counting replicas. Only log entries from the leader’s current term are committed bycounting replicas; once an entry from the current term has been committed in this way, then all prior entries are committed indirectly because of the Log Matching Property. | ||
| * Follower or Candidate crashes: `RequestVotes` or `AppendEntries` RPC are tried indefinetely. Raft RPC are _idempotent_ making the retries harmless. (Possible judgement call: Should I wait for the results of the votes to base it for my decision in the election?) | ||
| * `Current term`, `last applied index` and `vote` that was casted along with the logs must be persisted in case of server crashes. | ||
|
|
||
| #### Implementation | ||
|
|
||
| ### Client interaction: | ||
| * Idempotency is maintained by having a unique client ID and the request ID. The same request by the same client cannot be requested twice, we assume here that the client didn't receive the responses due to network errors etc. Each server maintains a _session_ for each client. The session tracks the latest serial number processed for the client, along with the associated response. If a server receives a command whose serial number has already been executed, it responds immediately without re-executing the request. | ||
| * With each request, the client includes the lowest sequencenumber for which it has not yet received a response, and the state machine then discards all responsesfor lower sequence numbers. Quite similar to TCP. | ||
| * The session for a client are _open on all nodes_. Session expiry happens on all nodes at once and in a deterministic way. LRU or an upper bound for sessions can be used. Some sort of timely probing is done to remove stale sessions. | ||
| * Live clients issue keep-alive requests during periods of inactivity, which are also augmented withthe leader’s timestamp and committed to the Raft log, in order to maintain their sessions. | ||
| * Reads can bypass the Raft log only if: | ||
| * If the leader has not yet marked an entry from its current term committed, it waits until ithas done so. The Leader Completeness Property guarantees that a leader has all committed entries, but at the start of its term, it may not know which those are. To find out, it needs to commit an entry from its term. Raft handles this by having each leader commit a blank `no-op` entry into the log at the start of its term. As soon as this no-op entry is committed, the leader’scommit index will be at least as large as any other servers’ during its term. | ||
| * The leader saves its current commit index in a local variable `readIndex`. This will be used as a lower bound for the version of the state that the query operates against. | ||
| * The leader needs to make sure it hasn’t been superseded by a newer leader of which it is unaware. It issues a new round of heartbeats and waits for their acknowledgments from amajority of the cluster. Once these acknowledgments are received, the leader knows that there could not have existed a leader for a greater term at the moment it sent the heartbeats. Thus, the readIndex was, at the time, the largest commit index ever seen by any server in the cluster. | ||
| * The leader waits for its state machine to advance at least as far as the readIndex; this is current enough to satisfy linearizability | ||
| * Finally, the leader issues the query against its state machine and replies to the client with the results. | ||
|
|
||
| ### How the modules interact | ||
|
|
||
| * Leader election is called by every node at init. | ||
| * All nodes send a `RequestVotesRPC` parallely to all other nodes. | ||
| * If a leader is elected, the leader begins to send `AppendEntriesRPC` to other nodes (followers) to establish its leadership. | ||
| * Log replication occurs when the `AppendEntriesRPC` is received by the follower. | ||
| ## A strict testing mechanism | ||
|
|
||
| The testing mechanism to be implemented will enable us in figuring out the problems existing in the implementation leading to a more resilient system. | ||
| We have to test for the following basic failures: | ||
| 1. Network partitioning. | ||
| 2. Un-responsive peers. | ||
| 3. Overloading peer. | ||
| 4. Corruption of data in transit. | ||
|
|
||
| ## Graceful handling of failures | ||
|
|
||
| Accepting failures exist and handling them gracefully enables creation of more resilient and stable systems. Having _circuit breakers_, _backoff mechanisms in clients_ and _validation and coordination mechanisms_ are some of the pointers to be followed. | ||
|
|
||
| ## Running Lbadd on Raft | ||
|
|
||
| * Background: Raft is just a consensus protocol that helps keep different database servers in sync. We need methods to issue a command and enable the sync between servers. | ||
| * Logistics: The `AppendEntriesRPC` will have the command to be executed by the client. This command goes through the leader, is applied by all the followers and then committed by the leader. Thus ensuring an in-sync distributed database. | ||
SUMUKHA-PK marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| * Each server will independently run the SQL statement once the statement is committed. This ensures that the state of the database is in sync. The data that moves around in the raft cluster can be a compiled SQL statement that each node will run independently. | ||
|
|
||
|
|
||
| ## Appendix | ||
|
|
||
| * The difference between _commit_, _replicate_ and _apply_ with respect to raft: Applying implies letting the log run through the node's state machine. This is the end process, happens after a commit. A commit is once replication happens on a majority of the nodes. While replication is simply appending of a log on _one_ node. | ||
| * Some gotchas: | ||
| * Client connects to the leader and leader crashes -> reject the connection. Let the client connect when the new leader is established. | ||
| * Some sort of idempotency must be maintained w.r.t. the client-cluster communication. Multiple requests submitted by the client should not cause problems due to network errors. | ||
Uh oh!
There was an error while loading. Please reload this page.