package raft import ( "bytes" "container/list" "fmt" "io" "io/ioutil" "time" "github.com/armon/go-metrics" ) const ( minCheckInterval = 10 * time.Millisecond ) var ( keyCurrentTerm = []byte("CurrentTerm") keyLastVoteTerm = []byte("LastVoteTerm") keyLastVoteCand = []byte("LastVoteCand") ) // getRPCHeader returns an initialized RPCHeader struct for the given // Raft instance. This structure is sent along with RPC requests and // responses. func (r *Raft) getRPCHeader() RPCHeader { return RPCHeader{ ProtocolVersion: r.conf.ProtocolVersion, } } // checkRPCHeader houses logic about whether this instance of Raft can process // the given RPC message. func (r *Raft) checkRPCHeader(rpc RPC) error { // Get the header off the RPC message. wh, ok := rpc.Command.(WithRPCHeader) if !ok { return fmt.Errorf("RPC does not have a header") } header := wh.GetRPCHeader() // First check is to just make sure the code can understand the // protocol at all. if header.ProtocolVersion < ProtocolVersionMin || header.ProtocolVersion > ProtocolVersionMax { return ErrUnsupportedProtocol } // Second check is whether we should support this message, given the // current protocol we are configured to run. This will drop support // for protocol version 0 starting at protocol version 2, which is // currently what we want, and in general support one version back. We // may need to revisit this policy depending on how future protocol // changes evolve. if header.ProtocolVersion < r.conf.ProtocolVersion-1 { return ErrUnsupportedProtocol } return nil } // getSnapshotVersion returns the snapshot version that should be used when // creating snapshots, given the protocol version in use. func getSnapshotVersion(protocolVersion ProtocolVersion) SnapshotVersion { // Right now we only have two versions and they are backwards compatible // so we don't need to look at the protocol version. return 1 } // commitTuple is used to send an index that was committed, // with an optional associated future that should be invoked. type commitTuple struct { log *Log future *logFuture } // leaderState is state that is used while we are a leader. type leaderState struct { commitCh chan struct{} commitment *commitment inflight *list.List // list of logFuture in log index order replState map[ServerID]*followerReplication notify map[*verifyFuture]struct{} stepDown chan struct{} } // setLeader is used to modify the current leader of the cluster func (r *Raft) setLeader(leader ServerAddress) { r.leaderLock.Lock() oldLeader := r.leader r.leader = leader r.leaderLock.Unlock() if oldLeader != leader { r.observe(LeaderObservation{leader: leader}) } } // requestConfigChange is a helper for the above functions that make // configuration change requests. 'req' describes the change. For timeout, // see AddVoter. func (r *Raft) requestConfigChange(req configurationChangeRequest, timeout time.Duration) IndexFuture { var timer <-chan time.Time if timeout > 0 { timer = time.After(timeout) } future := &configurationChangeFuture{ req: req, } future.init() select { case <-timer: return errorFuture{ErrEnqueueTimeout} case r.configurationChangeCh <- future: return future case <-r.shutdownCh: return errorFuture{ErrRaftShutdown} } } // run is a long running goroutine that runs the Raft FSM. func (r *Raft) run() { for { // Check if we are doing a shutdown select { case <-r.shutdownCh: // Clear the leader to prevent forwarding r.setLeader("") return default: } // Enter into a sub-FSM switch r.getState() { case Follower: r.runFollower() case Candidate: r.runCandidate() case Leader: r.runLeader() } } } // runFollower runs the FSM for a follower. func (r *Raft) runFollower() { didWarn := false r.logger.Printf("[INFO] raft: %v entering Follower state (Leader: %q)", r, r.Leader()) metrics.IncrCounter([]string{"raft", "state", "follower"}, 1) heartbeatTimer := randomTimeout(r.conf.HeartbeatTimeout) for { select { case rpc := <-r.rpcCh: r.processRPC(rpc) case c := <-r.configurationChangeCh: // Reject any operations since we are not the leader c.respond(ErrNotLeader) case a := <-r.applyCh: // Reject any operations since we are not the leader a.respond(ErrNotLeader) case v := <-r.verifyCh: // Reject any operations since we are not the leader v.respond(ErrNotLeader) case r := <-r.userRestoreCh: // Reject any restores since we are not the leader r.respond(ErrNotLeader) case c := <-r.configurationsCh: c.configurations = r.configurations.Clone() c.respond(nil) case b := <-r.bootstrapCh: b.respond(r.liveBootstrap(b.configuration)) case <-heartbeatTimer: // Restart the heartbeat timer heartbeatTimer = randomTimeout(r.conf.HeartbeatTimeout) // Check if we have had a successful contact lastContact := r.LastContact() if time.Now().Sub(lastContact) < r.conf.HeartbeatTimeout { continue } // Heartbeat failed! Transition to the candidate state lastLeader := r.Leader() r.setLeader("") if r.configurations.latestIndex == 0 { if !didWarn { r.logger.Printf("[WARN] raft: no known peers, aborting election") didWarn = true } } else if r.configurations.latestIndex == r.configurations.committedIndex && !hasVote(r.configurations.latest, r.localID) { if !didWarn { r.logger.Printf("[WARN] raft: not part of stable configuration, aborting election") didWarn = true } } else { r.logger.Printf(`[WARN] raft: Heartbeat timeout from %q reached, starting election`, lastLeader) metrics.IncrCounter([]string{"raft", "transition", "heartbeat_timeout"}, 1) r.setState(Candidate) return } case <-r.shutdownCh: return } } } // liveBootstrap attempts to seed an initial configuration for the cluster. See // the Raft object's member BootstrapCluster for more details. This must only be // called on the main thread, and only makes sense in the follower state. func (r *Raft) liveBootstrap(configuration Configuration) error { // Use the pre-init API to make the static updates. err := BootstrapCluster(&r.conf, r.logs, r.stable, r.snapshots, r.trans, configuration) if err != nil { return err } // Make the configuration live. var entry Log if err := r.logs.GetLog(1, &entry); err != nil { panic(err) } r.setCurrentTerm(1) r.setLastLog(entry.Index, entry.Term) r.processConfigurationLogEntry(&entry) return nil } // runCandidate runs the FSM for a candidate. func (r *Raft) runCandidate() { r.logger.Printf("[INFO] raft: %v entering Candidate state in term %v", r, r.getCurrentTerm()+1) metrics.IncrCounter([]string{"raft", "state", "candidate"}, 1) // Start vote for us, and set a timeout voteCh := r.electSelf() electionTimer := randomTimeout(r.conf.ElectionTimeout) // Tally the votes, need a simple majority grantedVotes := 0 votesNeeded := r.quorumSize() r.logger.Printf("[DEBUG] raft: Votes needed: %d", votesNeeded) for r.getState() == Candidate { select { case rpc := <-r.rpcCh: r.processRPC(rpc) case vote := <-voteCh: // Check if the term is greater than ours, bail if vote.Term > r.getCurrentTerm() { r.logger.Printf("[DEBUG] raft: Newer term discovered, fallback to follower") r.setState(Follower) r.setCurrentTerm(vote.Term) return } // Check if the vote is granted if vote.Granted { grantedVotes++ r.logger.Printf("[DEBUG] raft: Vote granted from %s in term %v. Tally: %d", vote.voterID, vote.Term, grantedVotes) } // Check if we've become the leader if grantedVotes >= votesNeeded { r.logger.Printf("[INFO] raft: Election won. Tally: %d", grantedVotes) r.setState(Leader) r.setLeader(r.localAddr) return } case c := <-r.configurationChangeCh: // Reject any operations since we are not the leader c.respond(ErrNotLeader) case a := <-r.applyCh: // Reject any operations since we are not the leader a.respond(ErrNotLeader) case v := <-r.verifyCh: // Reject any operations since we are not the leader v.respond(ErrNotLeader) case r := <-r.userRestoreCh: // Reject any restores since we are not the leader r.respond(ErrNotLeader) case c := <-r.configurationsCh: c.configurations = r.configurations.Clone() c.respond(nil) case b := <-r.bootstrapCh: b.respond(ErrCantBootstrap) case <-electionTimer: // Election failed! Restart the election. We simply return, // which will kick us back into runCandidate r.logger.Printf("[WARN] raft: Election timeout reached, restarting election") return case <-r.shutdownCh: return } } } // runLeader runs the FSM for a leader. Do the setup here and drop into // the leaderLoop for the hot loop. func (r *Raft) runLeader() { r.logger.Printf("[INFO] raft: %v entering Leader state", r) metrics.IncrCounter([]string{"raft", "state", "leader"}, 1) // Notify that we are the leader asyncNotifyBool(r.leaderCh, true) // Push to the notify channel if given if notify := r.conf.NotifyCh; notify != nil { select { case notify <- true: case <-r.shutdownCh: } } // Setup leader state r.leaderState.commitCh = make(chan struct{}, 1) r.leaderState.commitment = newCommitment(r.leaderState.commitCh, r.configurations.latest, r.getLastIndex()+1 /* first index that may be committed in this term */) r.leaderState.inflight = list.New() r.leaderState.replState = make(map[ServerID]*followerReplication) r.leaderState.notify = make(map[*verifyFuture]struct{}) r.leaderState.stepDown = make(chan struct{}, 1) // Cleanup state on step down defer func() { // Since we were the leader previously, we update our // last contact time when we step down, so that we are not // reporting a last contact time from before we were the // leader. Otherwise, to a client it would seem our data // is extremely stale. r.setLastContact() // Stop replication for _, p := range r.leaderState.replState { close(p.stopCh) } // Respond to all inflight operations for e := r.leaderState.inflight.Front(); e != nil; e = e.Next() { e.Value.(*logFuture).respond(ErrLeadershipLost) } // Respond to any pending verify requests for future := range r.leaderState.notify { future.respond(ErrLeadershipLost) } // Clear all the state r.leaderState.commitCh = nil r.leaderState.commitment = nil r.leaderState.inflight = nil r.leaderState.replState = nil r.leaderState.notify = nil r.leaderState.stepDown = nil // If we are stepping down for some reason, no known leader. // We may have stepped down due to an RPC call, which would // provide the leader, so we cannot always blank this out. r.leaderLock.Lock() if r.leader == r.localAddr { r.leader = "" } r.leaderLock.Unlock() // Notify that we are not the leader asyncNotifyBool(r.leaderCh, false) // Push to the notify channel if given if notify := r.conf.NotifyCh; notify != nil { select { case notify <- false: case <-r.shutdownCh: // On shutdown, make a best effort but do not block select { case notify <- false: default: } } } }() // Start a replication routine for each peer r.startStopReplication() // Dispatch a no-op log entry first. This gets this leader up to the latest // possible commit index, even in the absence of client commands. This used // to append a configuration entry instead of a noop. However, that permits // an unbounded number of uncommitted configurations in the log. We now // maintain that there exists at most one uncommitted configuration entry in // any log, so we have to do proper no-ops here. noop := &logFuture{ log: Log{ Type: LogNoop, }, } r.dispatchLogs([]*logFuture{noop}) // Sit in the leader loop until we step down r.leaderLoop() } // startStopReplication will set up state and start asynchronous replication to // new peers, and stop replication to removed peers. Before removing a peer, // it'll instruct the replication routines to try to replicate to the current // index. This must only be called from the main thread. func (r *Raft) startStopReplication() { inConfig := make(map[ServerID]bool, len(r.configurations.latest.Servers)) lastIdx := r.getLastIndex() // Start replication goroutines that need starting for _, server := range r.configurations.latest.Servers { if server.ID == r.localID { continue } inConfig[server.ID] = true if _, ok := r.leaderState.replState[server.ID]; !ok { r.logger.Printf("[INFO] raft: Added peer %v, starting replication", server.ID) s := &followerReplication{ peer: server, commitment: r.leaderState.commitment, stopCh: make(chan uint64, 1), triggerCh: make(chan struct{}, 1), currentTerm: r.getCurrentTerm(), nextIndex: lastIdx + 1, lastContact: time.Now(), notify: make(map[*verifyFuture]struct{}), notifyCh: make(chan struct{}, 1), stepDown: r.leaderState.stepDown, } r.leaderState.replState[server.ID] = s r.goFunc(func() { r.replicate(s) }) asyncNotifyCh(s.triggerCh) } } // Stop replication goroutines that need stopping for serverID, repl := range r.leaderState.replState { if inConfig[serverID] { continue } // Replicate up to lastIdx and stop r.logger.Printf("[INFO] raft: Removed peer %v, stopping replication after %v", serverID, lastIdx) repl.stopCh <- lastIdx close(repl.stopCh) delete(r.leaderState.replState, serverID) } } // configurationChangeChIfStable returns r.configurationChangeCh if it's safe // to process requests from it, or nil otherwise. This must only be called // from the main thread. // // Note that if the conditions here were to change outside of leaderLoop to take // this from nil to non-nil, we would need leaderLoop to be kicked. func (r *Raft) configurationChangeChIfStable() chan *configurationChangeFuture { // Have to wait until: // 1. The latest configuration is committed, and // 2. This leader has committed some entry (the noop) in this term // https://groups.google.com/forum/#!msg/raft-dev/t4xj6dJTP6E/d2D9LrWRza8J if r.configurations.latestIndex == r.configurations.committedIndex && r.getCommitIndex() >= r.leaderState.commitment.startIndex { return r.configurationChangeCh } return nil } // leaderLoop is the hot loop for a leader. It is invoked // after all the various leader setup is done. func (r *Raft) leaderLoop() { // stepDown is used to track if there is an inflight log that // would cause us to lose leadership (specifically a RemovePeer of // ourselves). If this is the case, we must not allow any logs to // be processed in parallel, otherwise we are basing commit on // only a single peer (ourself) and replicating to an undefined set // of peers. stepDown := false lease := time.After(r.conf.LeaderLeaseTimeout) for r.getState() == Leader { select { case rpc := <-r.rpcCh: r.processRPC(rpc) case <-r.leaderState.stepDown: r.setState(Follower) case <-r.leaderState.commitCh: // Process the newly committed entries oldCommitIndex := r.getCommitIndex() commitIndex := r.leaderState.commitment.getCommitIndex() r.setCommitIndex(commitIndex) if r.configurations.latestIndex > oldCommitIndex && r.configurations.latestIndex <= commitIndex { r.configurations.committed = r.configurations.latest r.configurations.committedIndex = r.configurations.latestIndex if !hasVote(r.configurations.committed, r.localID) { stepDown = true } } var numProcessed int start := time.Now() for { e := r.leaderState.inflight.Front() if e == nil { break } commitLog := e.Value.(*logFuture) idx := commitLog.log.Index if idx > commitIndex { break } // Measure the commit time metrics.MeasureSince([]string{"raft", "commitTime"}, commitLog.dispatch) r.processLogs(idx, commitLog) r.leaderState.inflight.Remove(e) numProcessed++ } // Measure the time to enqueue batch of logs for FSM to apply metrics.MeasureSince([]string{"raft", "fsm", "enqueue"}, start) // Count the number of logs enqueued metrics.SetGauge([]string{"raft", "commitNumLogs"}, float32(numProcessed)) if stepDown { if r.conf.ShutdownOnRemove { r.logger.Printf("[INFO] raft: Removed ourself, shutting down") r.Shutdown() } else { r.logger.Printf("[INFO] raft: Removed ourself, transitioning to follower") r.setState(Follower) } } case v := <-r.verifyCh: if v.quorumSize == 0 { // Just dispatched, start the verification r.verifyLeader(v) } else if v.votes < v.quorumSize { // Early return, means there must be a new leader r.logger.Printf("[WARN] raft: New leader elected, stepping down") r.setState(Follower) delete(r.leaderState.notify, v) for _, repl := range r.leaderState.replState { repl.cleanNotify(v) } v.respond(ErrNotLeader) } else { // Quorum of members agree, we are still leader delete(r.leaderState.notify, v) for _, repl := range r.leaderState.replState { repl.cleanNotify(v) } v.respond(nil) } case future := <-r.userRestoreCh: err := r.restoreUserSnapshot(future.meta, future.reader) future.respond(err) case c := <-r.configurationsCh: c.configurations = r.configurations.Clone() c.respond(nil) case future := <-r.configurationChangeChIfStable(): r.appendConfigurationEntry(future) case b := <-r.bootstrapCh: b.respond(ErrCantBootstrap) case newLog := <-r.applyCh: // Group commit, gather all the ready commits ready := []*logFuture{newLog} for i := 0; i < r.conf.MaxAppendEntries; i++ { select { case newLog := <-r.applyCh: ready = append(ready, newLog) default: break } } // Dispatch the logs if stepDown { // we're in the process of stepping down as leader, don't process anything new for i := range ready { ready[i].respond(ErrNotLeader) } } else { r.dispatchLogs(ready) } case <-lease: // Check if we've exceeded the lease, potentially stepping down maxDiff := r.checkLeaderLease() // Next check interval should adjust for the last node we've // contacted, without going negative checkInterval := r.conf.LeaderLeaseTimeout - maxDiff if checkInterval < minCheckInterval { checkInterval = minCheckInterval } // Renew the lease timer lease = time.After(checkInterval) case <-r.shutdownCh: return } } } // verifyLeader must be called from the main thread for safety. // Causes the followers to attempt an immediate heartbeat. func (r *Raft) verifyLeader(v *verifyFuture) { // Current leader always votes for self v.votes = 1 // Set the quorum size, hot-path for single node v.quorumSize = r.quorumSize() if v.quorumSize == 1 { v.respond(nil) return } // Track this request v.notifyCh = r.verifyCh r.leaderState.notify[v] = struct{}{} // Trigger immediate heartbeats for _, repl := range r.leaderState.replState { repl.notifyLock.Lock() repl.notify[v] = struct{}{} repl.notifyLock.Unlock() asyncNotifyCh(repl.notifyCh) } } // checkLeaderLease is used to check if we can contact a quorum of nodes // within the last leader lease interval. If not, we need to step down, // as we may have lost connectivity. Returns the maximum duration without // contact. This must only be called from the main thread. func (r *Raft) checkLeaderLease() time.Duration { // Track contacted nodes, we can always contact ourself contacted := 1 // Check each follower var maxDiff time.Duration now := time.Now() for peer, f := range r.leaderState.replState { diff := now.Sub(f.LastContact()) if diff <= r.conf.LeaderLeaseTimeout { contacted++ if diff > maxDiff { maxDiff = diff } } else { // Log at least once at high value, then debug. Otherwise it gets very verbose. if diff <= 3*r.conf.LeaderLeaseTimeout { r.logger.Printf("[WARN] raft: Failed to contact %v in %v", peer, diff) } else { r.logger.Printf("[DEBUG] raft: Failed to contact %v in %v", peer, diff) } } metrics.AddSample([]string{"raft", "leader", "lastContact"}, float32(diff/time.Millisecond)) } // Verify we can contact a quorum quorum := r.quorumSize() if contacted < quorum { r.logger.Printf("[WARN] raft: Failed to contact quorum of nodes, stepping down") r.setState(Follower) metrics.IncrCounter([]string{"raft", "transition", "leader_lease_timeout"}, 1) } return maxDiff } // quorumSize is used to return the quorum size. This must only be called on // the main thread. // TODO: revisit usage func (r *Raft) quorumSize() int { voters := 0 for _, server := range r.configurations.latest.Servers { if server.Suffrage == Voter { voters++ } } return voters/2 + 1 } // restoreUserSnapshot is used to manually consume an external snapshot, such // as if restoring from a backup. We will use the current Raft configuration, // not the one from the snapshot, so that we can restore into a new cluster. We // will also use the higher of the index of the snapshot, or the current index, // and then add 1 to that, so we force a new state with a hole in the Raft log, // so that the snapshot will be sent to followers and used for any new joiners. // This can only be run on the leader, and returns a future that can be used to // block until complete. func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.Reader) error { defer metrics.MeasureSince([]string{"raft", "restoreUserSnapshot"}, time.Now()) // Sanity check the version. version := meta.Version if version < SnapshotVersionMin || version > SnapshotVersionMax { return fmt.Errorf("unsupported snapshot version %d", version) } // We don't support snapshots while there's a config change // outstanding since the snapshot doesn't have a means to // represent this state. committedIndex := r.configurations.committedIndex latestIndex := r.configurations.latestIndex if committedIndex != latestIndex { return fmt.Errorf("cannot restore snapshot now, wait until the configuration entry at %v has been applied (have applied %v)", latestIndex, committedIndex) } // Cancel any inflight requests. for { e := r.leaderState.inflight.Front() if e == nil { break } e.Value.(*logFuture).respond(ErrAbortedByRestore) r.leaderState.inflight.Remove(e) } // We will overwrite the snapshot metadata with the current term, // an index that's greater than the current index, or the last // index in the snapshot. It's important that we leave a hole in // the index so we know there's nothing in the Raft log there and // replication will fault and send the snapshot. term := r.getCurrentTerm() lastIndex := r.getLastIndex() if meta.Index > lastIndex { lastIndex = meta.Index } lastIndex++ // Dump the snapshot. Note that we use the latest configuration, // not the one that came with the snapshot. sink, err := r.snapshots.Create(version, lastIndex, term, r.configurations.latest, r.configurations.latestIndex, r.trans) if err != nil { return fmt.Errorf("failed to create snapshot: %v", err) } n, err := io.Copy(sink, reader) if err != nil { sink.Cancel() return fmt.Errorf("failed to write snapshot: %v", err) } if n != meta.Size { sink.Cancel() return fmt.Errorf("failed to write snapshot, size didn't match (%d != %d)", n, meta.Size) } if err := sink.Close(); err != nil { return fmt.Errorf("failed to close snapshot: %v", err) } r.logger.Printf("[INFO] raft: Copied %d bytes to local snapshot", n) // Restore the snapshot into the FSM. If this fails we are in a // bad state so we panic to take ourselves out. fsm := &restoreFuture{ID: sink.ID()} fsm.init() select { case r.fsmMutateCh <- fsm: case <-r.shutdownCh: return ErrRaftShutdown } if err := fsm.Error(); err != nil { panic(fmt.Errorf("failed to restore snapshot: %v", err)) } // We set the last log so it looks like we've stored the empty // index we burned. The last applied is set because we made the // FSM take the snapshot state, and we store the last snapshot // in the stable store since we created a snapshot as part of // this process. r.setLastLog(lastIndex, term) r.setLastApplied(lastIndex) r.setLastSnapshot(lastIndex, term) r.logger.Printf("[INFO] raft: Restored user snapshot (index %d)", lastIndex) return nil } // appendConfigurationEntry changes the configuration and adds a new // configuration entry to the log. This must only be called from the // main thread. func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) { configuration, err := nextConfiguration(r.configurations.latest, r.configurations.latestIndex, future.req) if err != nil { future.respond(err) return } r.logger.Printf("[INFO] raft: Updating configuration with %s (%v, %v) to %+v", future.req.command, future.req.serverID, future.req.serverAddress, configuration.Servers) // In pre-ID compatibility mode we translate all configuration changes // in to an old remove peer message, which can handle all supported // cases for peer changes in the pre-ID world (adding and removing // voters). Both add peer and remove peer log entries are handled // similarly on old Raft servers, but remove peer does extra checks to // see if a leader needs to step down. Since they both assert the full // configuration, then we can safely call remove peer for everything. if r.protocolVersion < 2 { future.log = Log{ Type: LogRemovePeerDeprecated, Data: encodePeers(configuration, r.trans), } } else { future.log = Log{ Type: LogConfiguration, Data: encodeConfiguration(configuration), } } r.dispatchLogs([]*logFuture{&future.logFuture}) index := future.Index() r.configurations.latest = configuration r.configurations.latestIndex = index r.leaderState.commitment.setConfiguration(configuration) r.startStopReplication() } // dispatchLog is called on the leader to push a log to disk, mark it // as inflight and begin replication of it. func (r *Raft) dispatchLogs(applyLogs []*logFuture) { now := time.Now() defer metrics.MeasureSince([]string{"raft", "leader", "dispatchLog"}, now) term := r.getCurrentTerm() lastIndex := r.getLastIndex() n := len(applyLogs) logs := make([]*Log, n) metrics.SetGauge([]string{"raft", "leader", "dispatchNumLogs"}, float32(n)) for idx, applyLog := range applyLogs { applyLog.dispatch = now lastIndex++ applyLog.log.Index = lastIndex applyLog.log.Term = term logs[idx] = &applyLog.log r.leaderState.inflight.PushBack(applyLog) } // Write the log entry locally if err := r.logs.StoreLogs(logs); err != nil { r.logger.Printf("[ERR] raft: Failed to commit logs: %v", err) for _, applyLog := range applyLogs { applyLog.respond(err) } r.setState(Follower) return } r.leaderState.commitment.match(r.localID, lastIndex) // Update the last log since it's on disk now r.setLastLog(lastIndex, term) // Notify the replicators of the new log for _, f := range r.leaderState.replState { asyncNotifyCh(f.triggerCh) } } // processLogs is used to apply all the committed entries that haven't been // applied up to the given index limit. // This can be called from both leaders and followers. // Followers call this from AppendEntries, for n entries at a time, and always // pass future=nil. // Leaders call this once per inflight when entries are committed. They pass // the future from inflights. func (r *Raft) processLogs(index uint64, future *logFuture) { // Reject logs we've applied already lastApplied := r.getLastApplied() if index <= lastApplied { r.logger.Printf("[WARN] raft: Skipping application of old log: %d", index) return } // Apply all the preceding logs for idx := r.getLastApplied() + 1; idx <= index; idx++ { // Get the log, either from the future or from our log store if future != nil && future.log.Index == idx { r.processLog(&future.log, future) } else { l := new(Log) if err := r.logs.GetLog(idx, l); err != nil { r.logger.Printf("[ERR] raft: Failed to get log at %d: %v", idx, err) panic(err) } r.processLog(l, nil) } // Update the lastApplied index and term r.setLastApplied(idx) } } // processLog is invoked to process the application of a single committed log entry. func (r *Raft) processLog(l *Log, future *logFuture) { switch l.Type { case LogBarrier: // Barrier is handled by the FSM fallthrough case LogCommand: // Forward to the fsm handler select { case r.fsmMutateCh <- &commitTuple{l, future}: case <-r.shutdownCh: if future != nil { future.respond(ErrRaftShutdown) } } // Return so that the future is only responded to // by the FSM handler when the application is done return case LogConfiguration: case LogAddPeerDeprecated: case LogRemovePeerDeprecated: case LogNoop: // Ignore the no-op default: panic(fmt.Errorf("unrecognized log type: %#v", l)) } // Invoke the future if given if future != nil { future.respond(nil) } } // processRPC is called to handle an incoming RPC request. This must only be // called from the main thread. func (r *Raft) processRPC(rpc RPC) { if err := r.checkRPCHeader(rpc); err != nil { rpc.Respond(nil, err) return } switch cmd := rpc.Command.(type) { case *AppendEntriesRequest: r.appendEntries(rpc, cmd) case *RequestVoteRequest: r.requestVote(rpc, cmd) case *InstallSnapshotRequest: r.installSnapshot(rpc, cmd) default: r.logger.Printf("[ERR] raft: Got unexpected command: %#v", rpc.Command) rpc.Respond(nil, fmt.Errorf("unexpected command")) } } // processHeartbeat is a special handler used just for heartbeat requests // so that they can be fast-pathed if a transport supports it. This must only // be called from the main thread. func (r *Raft) processHeartbeat(rpc RPC) { defer metrics.MeasureSince([]string{"raft", "rpc", "processHeartbeat"}, time.Now()) // Check if we are shutdown, just ignore the RPC select { case <-r.shutdownCh: return default: } // Ensure we are only handling a heartbeat switch cmd := rpc.Command.(type) { case *AppendEntriesRequest: r.appendEntries(rpc, cmd) default: r.logger.Printf("[ERR] raft: Expected heartbeat, got command: %#v", rpc.Command) rpc.Respond(nil, fmt.Errorf("unexpected command")) } } // appendEntries is invoked when we get an append entries RPC call. This must // only be called from the main thread. func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) { defer metrics.MeasureSince([]string{"raft", "rpc", "appendEntries"}, time.Now()) // Setup a response resp := &AppendEntriesResponse{ RPCHeader: r.getRPCHeader(), Term: r.getCurrentTerm(), LastLog: r.getLastIndex(), Success: false, NoRetryBackoff: false, } var rpcErr error defer func() { rpc.Respond(resp, rpcErr) }() // Ignore an older term if a.Term < r.getCurrentTerm() { return } // Increase the term if we see a newer one, also transition to follower // if we ever get an appendEntries call if a.Term > r.getCurrentTerm() || r.getState() != Follower { // Ensure transition to follower r.setState(Follower) r.setCurrentTerm(a.Term) resp.Term = a.Term } // Save the current leader r.setLeader(ServerAddress(r.trans.DecodePeer(a.Leader))) // Verify the last log entry if a.PrevLogEntry > 0 { lastIdx, lastTerm := r.getLastEntry() var prevLogTerm uint64 if a.PrevLogEntry == lastIdx { prevLogTerm = lastTerm } else { var prevLog Log if err := r.logs.GetLog(a.PrevLogEntry, &prevLog); err != nil { r.logger.Printf("[WARN] raft: Failed to get previous log: %d %v (last: %d)", a.PrevLogEntry, err, lastIdx) resp.NoRetryBackoff = true return } prevLogTerm = prevLog.Term } if a.PrevLogTerm != prevLogTerm { r.logger.Printf("[WARN] raft: Previous log term mis-match: ours: %d remote: %d", prevLogTerm, a.PrevLogTerm) resp.NoRetryBackoff = true return } } // Process any new entries if len(a.Entries) > 0 { start := time.Now() // Delete any conflicting entries, skip any duplicates lastLogIdx, _ := r.getLastLog() var newEntries []*Log for i, entry := range a.Entries { if entry.Index > lastLogIdx { newEntries = a.Entries[i:] break } var storeEntry Log if err := r.logs.GetLog(entry.Index, &storeEntry); err != nil { r.logger.Printf("[WARN] raft: Failed to get log entry %d: %v", entry.Index, err) return } if entry.Term != storeEntry.Term { r.logger.Printf("[WARN] raft: Clearing log suffix from %d to %d", entry.Index, lastLogIdx) if err := r.logs.DeleteRange(entry.Index, lastLogIdx); err != nil { r.logger.Printf("[ERR] raft: Failed to clear log suffix: %v", err) return } if entry.Index <= r.configurations.latestIndex { r.configurations.latest = r.configurations.committed r.configurations.latestIndex = r.configurations.committedIndex } newEntries = a.Entries[i:] break } } if n := len(newEntries); n > 0 { // Append the new entries if err := r.logs.StoreLogs(newEntries); err != nil { r.logger.Printf("[ERR] raft: Failed to append to logs: %v", err) // TODO: leaving r.getLastLog() in the wrong // state if there was a truncation above return } // Handle any new configuration changes for _, newEntry := range newEntries { r.processConfigurationLogEntry(newEntry) } // Update the lastLog last := newEntries[n-1] r.setLastLog(last.Index, last.Term) } metrics.MeasureSince([]string{"raft", "rpc", "appendEntries", "storeLogs"}, start) } // Update the commit index if a.LeaderCommitIndex > 0 && a.LeaderCommitIndex > r.getCommitIndex() { start := time.Now() idx := min(a.LeaderCommitIndex, r.getLastIndex()) r.setCommitIndex(idx) if r.configurations.latestIndex <= idx { r.configurations.committed = r.configurations.latest r.configurations.committedIndex = r.configurations.latestIndex } r.processLogs(idx, nil) metrics.MeasureSince([]string{"raft", "rpc", "appendEntries", "processLogs"}, start) } // Everything went well, set success resp.Success = true r.setLastContact() return } // processConfigurationLogEntry takes a log entry and updates the latest // configuration if the entry results in a new configuration. This must only be // called from the main thread, or from NewRaft() before any threads have begun. func (r *Raft) processConfigurationLogEntry(entry *Log) { if entry.Type == LogConfiguration { r.configurations.committed = r.configurations.latest r.configurations.committedIndex = r.configurations.latestIndex r.configurations.latest = decodeConfiguration(entry.Data) r.configurations.latestIndex = entry.Index } else if entry.Type == LogAddPeerDeprecated || entry.Type == LogRemovePeerDeprecated { r.configurations.committed = r.configurations.latest r.configurations.committedIndex = r.configurations.latestIndex r.configurations.latest = decodePeers(entry.Data, r.trans) r.configurations.latestIndex = entry.Index } } // requestVote is invoked when we get an request vote RPC call. func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { defer metrics.MeasureSince([]string{"raft", "rpc", "requestVote"}, time.Now()) r.observe(*req) // Setup a response resp := &RequestVoteResponse{ RPCHeader: r.getRPCHeader(), Term: r.getCurrentTerm(), Granted: false, } var rpcErr error defer func() { rpc.Respond(resp, rpcErr) }() // Version 0 servers will panic unless the peers is present. It's only // used on them to produce a warning message. if r.protocolVersion < 2 { resp.Peers = encodePeers(r.configurations.latest, r.trans) } // Check if we have an existing leader [who's not the candidate] candidate := r.trans.DecodePeer(req.Candidate) if leader := r.Leader(); leader != "" && leader != candidate { r.logger.Printf("[WARN] raft: Rejecting vote request from %v since we have a leader: %v", candidate, leader) return } // Ignore an older term if req.Term < r.getCurrentTerm() { return } // Increase the term if we see a newer one if req.Term > r.getCurrentTerm() { // Ensure transition to follower r.setState(Follower) r.setCurrentTerm(req.Term) resp.Term = req.Term } // Check if we have voted yet lastVoteTerm, err := r.stable.GetUint64(keyLastVoteTerm) if err != nil && err.Error() != "not found" { r.logger.Printf("[ERR] raft: Failed to get last vote term: %v", err) return } lastVoteCandBytes, err := r.stable.Get(keyLastVoteCand) if err != nil && err.Error() != "not found" { r.logger.Printf("[ERR] raft: Failed to get last vote candidate: %v", err) return } // Check if we've voted in this election before if lastVoteTerm == req.Term && lastVoteCandBytes != nil { r.logger.Printf("[INFO] raft: Duplicate RequestVote for same term: %d", req.Term) if bytes.Compare(lastVoteCandBytes, req.Candidate) == 0 { r.logger.Printf("[WARN] raft: Duplicate RequestVote from candidate: %s", req.Candidate) resp.Granted = true } return } // Reject if their term is older lastIdx, lastTerm := r.getLastEntry() if lastTerm > req.LastLogTerm { r.logger.Printf("[WARN] raft: Rejecting vote request from %v since our last term is greater (%d, %d)", candidate, lastTerm, req.LastLogTerm) return } if lastTerm == req.LastLogTerm && lastIdx > req.LastLogIndex { r.logger.Printf("[WARN] raft: Rejecting vote request from %v since our last index is greater (%d, %d)", candidate, lastIdx, req.LastLogIndex) return } // Persist a vote for safety if err := r.persistVote(req.Term, req.Candidate); err != nil { r.logger.Printf("[ERR] raft: Failed to persist vote: %v", err) return } resp.Granted = true r.setLastContact() return } // installSnapshot is invoked when we get a InstallSnapshot RPC call. // We must be in the follower state for this, since it means we are // too far behind a leader for log replay. This must only be called // from the main thread. func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) { defer metrics.MeasureSince([]string{"raft", "rpc", "installSnapshot"}, time.Now()) // Setup a response resp := &InstallSnapshotResponse{ Term: r.getCurrentTerm(), Success: false, } var rpcErr error defer func() { io.Copy(ioutil.Discard, rpc.Reader) // ensure we always consume all the snapshot data from the stream [see issue #212] rpc.Respond(resp, rpcErr) }() // Sanity check the version if req.SnapshotVersion < SnapshotVersionMin || req.SnapshotVersion > SnapshotVersionMax { rpcErr = fmt.Errorf("unsupported snapshot version %d", req.SnapshotVersion) return } // Ignore an older term if req.Term < r.getCurrentTerm() { r.logger.Printf("[INFO] raft: Ignoring installSnapshot request with older term of %d vs currentTerm %d", req.Term, r.getCurrentTerm()) return } // Increase the term if we see a newer one if req.Term > r.getCurrentTerm() { // Ensure transition to follower r.setState(Follower) r.setCurrentTerm(req.Term) resp.Term = req.Term } // Save the current leader r.setLeader(ServerAddress(r.trans.DecodePeer(req.Leader))) // Create a new snapshot var reqConfiguration Configuration var reqConfigurationIndex uint64 if req.SnapshotVersion > 0 { reqConfiguration = decodeConfiguration(req.Configuration) reqConfigurationIndex = req.ConfigurationIndex } else { reqConfiguration = decodePeers(req.Peers, r.trans) reqConfigurationIndex = req.LastLogIndex } version := getSnapshotVersion(r.protocolVersion) sink, err := r.snapshots.Create(version, req.LastLogIndex, req.LastLogTerm, reqConfiguration, reqConfigurationIndex, r.trans) if err != nil { r.logger.Printf("[ERR] raft: Failed to create snapshot to install: %v", err) rpcErr = fmt.Errorf("failed to create snapshot: %v", err) return } // Spill the remote snapshot to disk n, err := io.Copy(sink, rpc.Reader) if err != nil { sink.Cancel() r.logger.Printf("[ERR] raft: Failed to copy snapshot: %v", err) rpcErr = err return } // Check that we received it all if n != req.Size { sink.Cancel() r.logger.Printf("[ERR] raft: Failed to receive whole snapshot: %d / %d", n, req.Size) rpcErr = fmt.Errorf("short read") return } // Finalize the snapshot if err := sink.Close(); err != nil { r.logger.Printf("[ERR] raft: Failed to finalize snapshot: %v", err) rpcErr = err return } r.logger.Printf("[INFO] raft: Copied %d bytes to local snapshot", n) // Restore snapshot future := &restoreFuture{ID: sink.ID()} future.init() select { case r.fsmMutateCh <- future: case <-r.shutdownCh: future.respond(ErrRaftShutdown) return } // Wait for the restore to happen if err := future.Error(); err != nil { r.logger.Printf("[ERR] raft: Failed to restore snapshot: %v", err) rpcErr = err return } // Update the lastApplied so we don't replay old logs r.setLastApplied(req.LastLogIndex) // Update the last stable snapshot info r.setLastSnapshot(req.LastLogIndex, req.LastLogTerm) // Restore the peer set r.configurations.latest = reqConfiguration r.configurations.latestIndex = reqConfigurationIndex r.configurations.committed = reqConfiguration r.configurations.committedIndex = reqConfigurationIndex // Compact logs, continue even if this fails if err := r.compactLogs(req.LastLogIndex); err != nil { r.logger.Printf("[ERR] raft: Failed to compact logs: %v", err) } r.logger.Printf("[INFO] raft: Installed remote snapshot") resp.Success = true r.setLastContact() return } // setLastContact is used to set the last contact time to now func (r *Raft) setLastContact() { r.lastContactLock.Lock() r.lastContact = time.Now() r.lastContactLock.Unlock() } type voteResult struct { RequestVoteResponse voterID ServerID } // electSelf is used to send a RequestVote RPC to all peers, and vote for // ourself. This has the side affecting of incrementing the current term. The // response channel returned is used to wait for all the responses (including a // vote for ourself). This must only be called from the main thread. func (r *Raft) electSelf() <-chan *voteResult { // Create a response channel respCh := make(chan *voteResult, len(r.configurations.latest.Servers)) // Increment the term r.setCurrentTerm(r.getCurrentTerm() + 1) // Construct the request lastIdx, lastTerm := r.getLastEntry() req := &RequestVoteRequest{ RPCHeader: r.getRPCHeader(), Term: r.getCurrentTerm(), Candidate: r.trans.EncodePeer(r.localID, r.localAddr), LastLogIndex: lastIdx, LastLogTerm: lastTerm, } // Construct a function to ask for a vote askPeer := func(peer Server) { r.goFunc(func() { defer metrics.MeasureSince([]string{"raft", "candidate", "electSelf"}, time.Now()) resp := &voteResult{voterID: peer.ID} err := r.trans.RequestVote(peer.ID, peer.Address, req, &resp.RequestVoteResponse) if err != nil { r.logger.Printf("[ERR] raft: Failed to make RequestVote RPC to %v: %v", peer, err) resp.Term = req.Term resp.Granted = false } respCh <- resp }) } // For each peer, request a vote for _, server := range r.configurations.latest.Servers { if server.Suffrage == Voter { if server.ID == r.localID { // Persist a vote for ourselves if err := r.persistVote(req.Term, req.Candidate); err != nil { r.logger.Printf("[ERR] raft: Failed to persist vote : %v", err) return nil } // Include our own vote respCh <- &voteResult{ RequestVoteResponse: RequestVoteResponse{ RPCHeader: r.getRPCHeader(), Term: req.Term, Granted: true, }, voterID: r.localID, } } else { askPeer(server) } } } return respCh } // persistVote is used to persist our vote for safety. func (r *Raft) persistVote(term uint64, candidate []byte) error { if err := r.stable.SetUint64(keyLastVoteTerm, term); err != nil { return err } if err := r.stable.Set(keyLastVoteCand, candidate); err != nil { return err } return nil } // setCurrentTerm is used to set the current term in a durable manner. func (r *Raft) setCurrentTerm(t uint64) { // Persist to disk first if err := r.stable.SetUint64(keyCurrentTerm, t); err != nil { panic(fmt.Errorf("failed to save current term: %v", err)) } r.raftState.setCurrentTerm(t) } // setState is used to update the current state. Any state // transition causes the known leader to be cleared. This means // that leader should be set only after updating the state. func (r *Raft) setState(state RaftState) { r.setLeader("") oldState := r.raftState.getState() r.raftState.setState(state) if oldState != state { r.observe(state) } }