in ,

Implementing Raft: Part 2: Commands and Log Replication, Hacker News

[lastIndex]                                  

This is Part 2 in a series of posts describing the Raft distributed consensus algorithm and its complete implementation in Go. Here is a list of posts in the series: Part 0: Introduction

  • Part 1: Elections
  • [:logInsertIndex]
  • Part 2: Commands and log replication (this post)
  • Part 3: Persistence and optimizations (coming soon) [
  • lastIndex] In this part, we’ll greatly enhance the Raft implementation to actually handle commands submitted by clients and replicate them across the Raft cluster. The code structure remains the same as in Part 1 . There are a couple of new structs and functions and some changes to existing code – I’ll present and explain all of these shortly.

    All the code in this part is located in this directory . Client interaction

    We already had a brief discussion of client interaction in Part 0

  • ; I strongly encourage you to go back and re-read that section. Here we’re not going to focus on

    how a client finds the leader; rather, we’ll talk about what happens when It finds one.

    Some terminology first; As discussed earlier, clients use Raft to replicate a sequence of commands , which can be seen as inputs to a generic state machine. As far as our Raft implementation is concerned, these commands are completely arbitrary and we represent them with Go’s empty interface type ( interface {} ). A command goes through the following process in its Raft consensus journey:

    [newEntriesIndex] First, a command is submitted to the leader by a client. In a cluster of Raft peers, a command is typically submitted only to a single peer.

  • The leader replicates the command to its followers.
  • Finally, once the leader is satisfied that the command is sufficiently replicated (that is, a majority of cluster peers acknowledge they have the command in their logs
      [1] , the command is committed

    1. and all clients are notified of a new commit.
    2. [
  • args.PrevLogIndex] Note the asymmetry between submitting and purchase a command - this is important to keep in mind when examining the implementation decisions we're going to be discussing soon. A command is submitted to a single Raft peer, but multiple peers (specifically, all connected / live peers) commit it after some time and notify their clients.

    Recall this diagram from Part 0:

    Raft consensus module and log connected to state machine

    the state machine represents an arbitrary service using Raft for replication; for example, this could be a key-value database.

    Committed commands change the state of the service (e.g. add a key / value pair to the database).

    [currentTerm=%d, votedFor=%d, log index/term=(%d, %d)] Then we're talking of a client in the context of a Raft ConsensusModule [i] , we usually mean this service, because this is where the commits are reported to. In other words, the black arrow from Consensus Module to the service state machine is that notification.

    There is another notion of client, which is the client of the service (e.g. a user of the key-value database). The service's interaction with its clients is its own business; in this post we focus just on Raft's interaction with the service. Implementation: commit channel

    [currentTerm=%d, votedFor=%d, log index/term=(%d, %d)] In our implementation, when a (ConsensusModule) (is created it takes in a

    commit channel

    - a channel which it uses to send committed commands to the caller: commitChan chan CommitEntry is defined as follows:

    [lastIndex]

     
     // CommitEntry is the data reported by Raft to the commit channel. Each commit  // entry notifies the client that consensus was reached on a command and it can 

    // be applied to the client's state machine. (type (CommitEntry) struct {{   Command Command the client command being committed.    Command (interface) {}    // Index is the log index at which the client command is committed.    Index (int    Term Term the Raft term at which the client command is committed.    Term (int

    } [currentTerm=%d, votedFor=%d, log index/term=(%d, %d)] Using a channel for this is a design choice, but it's not the only way to solve it. We could use a callback instead; when creating a ConsensusModule caller would register a callback function that we'd invoke whenever we have a command to commit.

    We'll see the code that sends entries on the channel shortly; first, we have to discuss how Raft servers replicate commands and decide that commands are committed. Raft log [lastIndex] [currentTerm=%d, votedFor=%d, log index/term=(%d, %d)] The The Raft log has been mentioned a number of times in this series already, but we Haven't yet said much about it. The log is simply the linear sequence of commands that should be applied to the state machine; the log should be sufficient to "replay" the state machine from some starting state, if needed. During normal operation, the logs of all Raft peers are identical; when the Leader gets a new command, it places it in its own log and then replicates it to followers. Followers place the command in their logs and acknowledge it to the Leader, which keeps count of the latest log index that was safely replicated to a majority of servers in the cluster.

    The Raft paper has several diagrams of logs that look somewhat like this:

    [newEntriesIndex] Each box is a log entry; a number in the top part of the box is the term in which it was added to the log (these are the terms from

  • Part 1 . The bottom part is the key-value command that this log contains. Each log entry has a linear index
    [2]

  • The colors of the boxes are another representation of terms. [currentTerm=%d, votedFor=%d, log index/term=(%d, %d)] If this log is applied to an empty key-value store, the end result will have the values ​​ x=4, y=7 .
    In our implementation, log entries are represented by: [lastIndex]

     
     type 
      (LogEntry) [

    args.PrevLogIndex] (struct ({   Command (interface) {}    Term (int

    } [currentTerm=%d, votedFor=%d, log index/term=(%d, %d)] And each ConsensusModule 's log is then simply log [] LogEntry . The clients typically don't care about terms; terms are critical to the correctness of Raft, however, so they're important to keep in mind when reading the code.

    Submitting new commands [currentTerm=%d, votedFor=%d, log index/term=(%d, %d)] Let's start with the new Submit method, which lets clients submit new commands:

    [lastIndex]

     
     func 
      () [

    args.PrevLogIndex] (cm) [args.PrevLogIndex] ConsensusModule [lastIndex]

  • (Submit (
     command  (interface) {})  (bool) ({    cm  . mu [:logInsertIndex] 
    (Lock) ()    defer (cm) . (mu) . (Unlock) ()    cm . (dlog [:logInsertIndex] (
    "Submit received by% v:% v"
    , (cm) (state)
    , (command) )    if (cm) . (state)==(Leader) ({      cm . (log [:logInsertIndex]=(append) (

    (cm)
    . 
      [

    args.PrevLogIndex] , LogEntry ({(Command [args.PrevLogIndex] : (command) , (Term)

    : 
     
     cm  .  (currentTerm) 
    )
         cm . (dlog [:logInsertIndex] (
    "... log=% v" , (cm) .
     
     log 
     
    )      return (true)   
    }    return (false)
    } [currentTerm=%d, votedFor=%d, log index/term=(%d, %d)] Very straightforward; if this CM is a leader, the new command is appended to the log and true is returned. Otherwise, it's ignored and false (is returned.)
    (Q:) (is a) (true) value returned from Submit sufficient indication for the client that it has submitted a command to the leader?
    (A:) Unfortunately not. In rare cases, a leader may become partitioned from the other Raft servers, who will go on to elect a new leader after a while. Clients may still be contacting the old leader, however. A client should wait for some reasonable time for the command it submitted to appear on the commit channel; if it does not, it means that it contacted the wrong leader and it should retry with a different leader.

    Replicating log entries [:logInsertIndex]
    We've just seen that a new command submitted to the leader is appended at the end of the log. How does this new command get to followers? The steps followed by leaders are precisely described in the Raft paper, Figure 2 in the "Rules for Servers "section. Our implementation does this in leaderSendHeartbeats ; this is the new method ([3] [lastIndex]: [lastIndex]
     
     func 
      () [

    args.PrevLogIndex] (cm) [args.PrevLogIndex] ConsensusModule [lastIndex]

  • leaderSendHeartbeats () ({    cm . mu [:logInsertIndex]
    (Lock) ()    savedCurrentTerm :=[args.PrevLogIndex] (cm) . (currentTerm)    cm . mu [:logInsertIndex]

    (Unlock) ()    for , peerId

    :=(range) (cm)

    . peerIds ({     go (func) [newEntriesIndex] peerId

    (int) ) {

            cm  . mu [:logInsertIndex] 
    (Lock) ()        ni :=[args.PrevLogIndex] (cm) . (nextIndex) [newEntriesIndex]        prevLogIndex :=[args.PrevLogIndex] (ni)
    - (1)        prevLogTerm :=[args.PrevLogIndex] - (1)        if prevLogIndex>=(0) {{         prevLogTerm=(cm)
    (log) [args.PrevLogIndex] (Term)       
    }       
    entries :=[args.PrevLogIndex] (cm) . (log) [ni:] [:logInsertIndex]        args :=[args.PrevLogIndex] (AppendEntriesArgs) ({         Term : savedCurrentTerm ,          LeaderId : (cm)
    (id) ,
              PrevLogIndex  : prevLogIndex [:logInsertIndex] ,           PrevLogTerm  : prevLogTerm  ,           Entries  : (entries) ,           LeaderCommit  : (cm) 
    commitIndex ,
           
    }        cm . mu [:logInsertIndex]
    (Unlock) ()        cm . (dlog [:logInsertIndex] (
    "sending AppendEntries to% v: ni=% d, args=% v" , (peerId) ,
    (ni) , (args)        var reply (AppendEntriesReply)        if (err :=(cm)
  • (server) (Call) () (peerId)
    , ("ConsensusModule.AppendEntries")
    , (args) ,
    &
    reply ); (err)
    == 
    nil ({         cm . mu [:logInsertIndex]
    (Lock) ()          defer (cm) . (mu) . (Unlock) ()          if reply . (Term)> savedCurrentTerm {
                cm  . (dlog [:

    logInsertIndex] (

    "term out of date in heartbeat reply"

  •            cm . becomeFollower (
    (reply (Term)

    ) 
                return          
    }          if (cm) . (state)==(Leader) &&
    savedCurrentTerm ===reply [3] (Term) {{            if reply . (Success) {{             cm . nextIndex [args.PrevLogIndex]=(ni) ( ) (len)
     (
      (entries) [

    args.PrevLogIndex] )

                  cm  . matchIndex  [

    args.PrevLogIndex]=(cm) (nextIndex)

     [

    peerId]

     
     - 
      (1)               cm  . (dlog [:

    logInsertIndex] (

    "AppendEntries reply from% d success: nextIndex:=% v, matchIndex:=% v" [args.PrevLogIndex] , (peerId)

    , (cm) (nextIndex)

    , (cm) (matchIndex)

    )

                   savedCommitIndex  :=[args.PrevLogIndex] (cm) . commitIndex               for  (i) :=(cm) 
  • (commitIndex) ( ) (1)
    ; (i)
    (len) ( (cm) . (log) ); (i)
    {
                   if (cm) . (log) [i] (Term)==
    (cm) )
    currentTerm ({                 
    matchCount :=[args.PrevLogIndex] (1)                  for , peerId
    :=(range) (cm)
    . peerIds ({                   if (cm) . matchIndex [args.PrevLogIndex]>=(i) ({                     
    matchCount
                       
    }                 
    }                  if (matchCount [newEntriesIndex] (2)> (len) (
     
     cm 
      [

    args.PrevLogIndex] peerIds

    ) 
     
        1) ({                   cm  . commitIndex=(i)                 
    }               
    }             
    }              if (cm) . commitIndex !=(savedCommitIndex) ({                cm . (dlog [:logInsertIndex] (
    "leader sets commitIndex:=% d" , (cm) .
     
     commitIndex 
     
    )                cm . (newCommitReadyChan) }           
    } (else [i] {{             cm . nextIndex [args.PrevLogIndex]=(ni) - (1)              cm . (dlog)
     (
     
     "AppendEntries reply from% d! Success: nextIndex:=% d"  , 
     
     peerId 
     ,  [

    args.PrevLogIndex] (ni) - (1)

    )

              

    }         

    }       

    }     

    } ( [3] (peerId) [args.PrevLogIndex] )   

    }

    } [currentTerm=%d, votedFor=%d, log index/term=(%d, %d)] This is certainly more complicated than what we did in Part 1, but it really just follows Figure 2 in the paper. A few notes on this code:

    The fields of an AE RPC are fully populated now: see Figure 2 in the paper for their meaning.

  • AE responses have a (success) field that tell the leader whether the follower saw a match for prevLogIndex and prevLogTerm . Based on this field, the leader updates nextIndex for this follower.
  • commitIndex is updated based on the count of followers that replicated a certain log index. If an index is replicated by a majority, commitIndex advances to it.
  • [lastIndex] This part of the code is particularly important in relation to our earlier discussion of client interaction:

    [lastIndex]

     
     if 
      (cm) [

    args.PrevLogIndex] commitIndex !=(savedCommitIndex) ({    cm . (dlog [:logInsertIndex] (

    "leader sets commitIndex:=% d" , (cm) .

     
     commitIndex 
     
    )    cm . (newCommitReadyChan) }
    newCommitReadyChan is a channel used internally by the CM to signal that new entries are ready to be sent on the commit channel to the client. It's acted upon by this method which is run in a goroutine on CM start-up:
    [lastIndex]
     
     func 
      () [

    args.PrevLogIndex] (cm) [args.PrevLogIndex] ConsensusModule [lastIndex]

  • commitChanSender ()
    {    for range [i] (cm)

    newCommitReadyChan ({      Find it entries we have to apply.      cm . mu [:logInsertIndex]

    (Lock) ()     

    savedTerm :=[args.PrevLogIndex] (cm) . (currentTerm)      savedLastApplied :=[args.PrevLogIndex] (cm) . (lastApplied)      var (entries) [i] LogEntry [:logInsertIndex]      if (cm) . commitIndex> (cm) .

     
     lastApplied 
            
    entries=(cm)
    (log) [newEntriesIndex]        cm . lastApplied=(cm) (commitIndex)     
    }      cm . mu [:logInsertIndex]
    (Unlock) ()      cm . (dlog [:logInsertIndex] (
    "commitChanSender entries=% v, savedLastApplied=% d"