600 lines
18 KiB
Go
600 lines
18 KiB
Go
package raft
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/armon/go-metrics"
|
|
)
|
|
|
|
const (
|
|
maxFailureScale = 12
|
|
failureWait = 10 * time.Millisecond
|
|
)
|
|
|
|
var (
|
|
// ErrLogNotFound indicates a given log entry is not available.
|
|
ErrLogNotFound = errors.New("log not found")
|
|
|
|
// ErrPipelineReplicationNotSupported can be returned by the transport to
|
|
// signal that pipeline replication is not supported in general, and that
|
|
// no error message should be produced.
|
|
ErrPipelineReplicationNotSupported = errors.New("pipeline replication not supported")
|
|
)
|
|
|
|
// followerReplication is in charge of sending snapshots and log entries from
|
|
// this leader during this particular term to a remote follower.
|
|
type followerReplication struct {
|
|
// currentTerm and nextIndex must be kept at the top of the struct so
|
|
// they're 64 bit aligned which is a requirement for atomic ops on 32 bit
|
|
// platforms.
|
|
|
|
// currentTerm is the term of this leader, to be included in AppendEntries
|
|
// requests.
|
|
currentTerm uint64
|
|
|
|
// nextIndex is the index of the next log entry to send to the follower,
|
|
// which may fall past the end of the log.
|
|
nextIndex uint64
|
|
|
|
// peer contains the network address and ID of the remote follower.
|
|
peer Server
|
|
|
|
// commitment tracks the entries acknowledged by followers so that the
|
|
// leader's commit index can advance. It is updated on successful
|
|
// AppendEntries responses.
|
|
commitment *commitment
|
|
|
|
// stopCh is notified/closed when this leader steps down or the follower is
|
|
// removed from the cluster. In the follower removed case, it carries a log
|
|
// index; replication should be attempted with a best effort up through that
|
|
// index, before exiting.
|
|
stopCh chan uint64
|
|
|
|
// triggerCh is notified every time new entries are appended to the log.
|
|
triggerCh chan struct{}
|
|
|
|
// triggerDeferErrorCh is used to provide a backchannel. By sending a
|
|
// deferErr, the sender can be notifed when the replication is done.
|
|
triggerDeferErrorCh chan *deferError
|
|
|
|
// lastContact is updated to the current time whenever any response is
|
|
// received from the follower (successful or not). This is used to check
|
|
// whether the leader should step down (Raft.checkLeaderLease()).
|
|
lastContact time.Time
|
|
// lastContactLock protects 'lastContact'.
|
|
lastContactLock sync.RWMutex
|
|
|
|
// failures counts the number of failed RPCs since the last success, which is
|
|
// used to apply backoff.
|
|
failures uint64
|
|
|
|
// notifyCh is notified to send out a heartbeat, which is used to check that
|
|
// this server is still leader.
|
|
notifyCh chan struct{}
|
|
// notify is a map of futures to be resolved upon receipt of an
|
|
// acknowledgement, then cleared from this map.
|
|
notify map[*verifyFuture]struct{}
|
|
// notifyLock protects 'notify'.
|
|
notifyLock sync.Mutex
|
|
|
|
// stepDown is used to indicate to the leader that we
|
|
// should step down based on information from a follower.
|
|
stepDown chan struct{}
|
|
|
|
// allowPipeline is used to determine when to pipeline the AppendEntries RPCs.
|
|
// It is private to this replication goroutine.
|
|
allowPipeline bool
|
|
}
|
|
|
|
// notifyAll is used to notify all the waiting verify futures
|
|
// if the follower believes we are still the leader.
|
|
func (s *followerReplication) notifyAll(leader bool) {
|
|
// Clear the waiting notifies minimizing lock time
|
|
s.notifyLock.Lock()
|
|
n := s.notify
|
|
s.notify = make(map[*verifyFuture]struct{})
|
|
s.notifyLock.Unlock()
|
|
|
|
// Submit our votes
|
|
for v := range n {
|
|
v.vote(leader)
|
|
}
|
|
}
|
|
|
|
// cleanNotify is used to delete notify, .
|
|
func (s *followerReplication) cleanNotify(v *verifyFuture) {
|
|
s.notifyLock.Lock()
|
|
delete(s.notify, v)
|
|
s.notifyLock.Unlock()
|
|
}
|
|
|
|
// LastContact returns the time of last contact.
|
|
func (s *followerReplication) LastContact() time.Time {
|
|
s.lastContactLock.RLock()
|
|
last := s.lastContact
|
|
s.lastContactLock.RUnlock()
|
|
return last
|
|
}
|
|
|
|
// setLastContact sets the last contact to the current time.
|
|
func (s *followerReplication) setLastContact() {
|
|
s.lastContactLock.Lock()
|
|
s.lastContact = time.Now()
|
|
s.lastContactLock.Unlock()
|
|
}
|
|
|
|
// replicate is a long running routine that replicates log entries to a single
|
|
// follower.
|
|
func (r *Raft) replicate(s *followerReplication) {
|
|
// Start an async heartbeating routing
|
|
stopHeartbeat := make(chan struct{})
|
|
defer close(stopHeartbeat)
|
|
r.goFunc(func() { r.heartbeat(s, stopHeartbeat) })
|
|
|
|
RPC:
|
|
shouldStop := false
|
|
for !shouldStop {
|
|
select {
|
|
case maxIndex := <-s.stopCh:
|
|
// Make a best effort to replicate up to this index
|
|
if maxIndex > 0 {
|
|
r.replicateTo(s, maxIndex)
|
|
}
|
|
return
|
|
case deferErr := <-s.triggerDeferErrorCh:
|
|
lastLogIdx, _ := r.getLastLog()
|
|
shouldStop = r.replicateTo(s, lastLogIdx)
|
|
if !shouldStop {
|
|
deferErr.respond(nil)
|
|
} else {
|
|
deferErr.respond(fmt.Errorf("replication failed"))
|
|
}
|
|
case <-s.triggerCh:
|
|
lastLogIdx, _ := r.getLastLog()
|
|
shouldStop = r.replicateTo(s, lastLogIdx)
|
|
// This is _not_ our heartbeat mechanism but is to ensure
|
|
// followers quickly learn the leader's commit index when
|
|
// raft commits stop flowing naturally. The actual heartbeats
|
|
// can't do this to keep them unblocked by disk IO on the
|
|
// follower. See https://github.com/hashicorp/raft/issues/282.
|
|
case <-randomTimeout(r.conf.CommitTimeout):
|
|
lastLogIdx, _ := r.getLastLog()
|
|
shouldStop = r.replicateTo(s, lastLogIdx)
|
|
}
|
|
|
|
// If things looks healthy, switch to pipeline mode
|
|
if !shouldStop && s.allowPipeline {
|
|
goto PIPELINE
|
|
}
|
|
}
|
|
return
|
|
|
|
PIPELINE:
|
|
// Disable until re-enabled
|
|
s.allowPipeline = false
|
|
|
|
// Replicates using a pipeline for high performance. This method
|
|
// is not able to gracefully recover from errors, and so we fall back
|
|
// to standard mode on failure.
|
|
if err := r.pipelineReplicate(s); err != nil {
|
|
if err != ErrPipelineReplicationNotSupported {
|
|
r.logger.Error("failed to start pipeline replication to", "peer", s.peer, "error", err)
|
|
}
|
|
}
|
|
goto RPC
|
|
}
|
|
|
|
// replicateTo is a helper to replicate(), used to replicate the logs up to a
|
|
// given last index.
|
|
// If the follower log is behind, we take care to bring them up to date.
|
|
func (r *Raft) replicateTo(s *followerReplication, lastIndex uint64) (shouldStop bool) {
|
|
// Create the base request
|
|
var req AppendEntriesRequest
|
|
var resp AppendEntriesResponse
|
|
var start time.Time
|
|
START:
|
|
// Prevent an excessive retry rate on errors
|
|
if s.failures > 0 {
|
|
select {
|
|
case <-time.After(backoff(failureWait, s.failures, maxFailureScale)):
|
|
case <-r.shutdownCh:
|
|
}
|
|
}
|
|
|
|
// Setup the request
|
|
if err := r.setupAppendEntries(s, &req, atomic.LoadUint64(&s.nextIndex), lastIndex); err == ErrLogNotFound {
|
|
goto SEND_SNAP
|
|
} else if err != nil {
|
|
return
|
|
}
|
|
|
|
// Make the RPC call
|
|
start = time.Now()
|
|
if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil {
|
|
r.logger.Error("failed to appendEntries to", "peer", s.peer, "error", err)
|
|
s.failures++
|
|
return
|
|
}
|
|
appendStats(string(s.peer.ID), start, float32(len(req.Entries)))
|
|
|
|
// Check for a newer term, stop running
|
|
if resp.Term > req.Term {
|
|
r.handleStaleTerm(s)
|
|
return true
|
|
}
|
|
|
|
// Update the last contact
|
|
s.setLastContact()
|
|
|
|
// Update s based on success
|
|
if resp.Success {
|
|
// Update our replication state
|
|
updateLastAppended(s, &req)
|
|
|
|
// Clear any failures, allow pipelining
|
|
s.failures = 0
|
|
s.allowPipeline = true
|
|
} else {
|
|
atomic.StoreUint64(&s.nextIndex, max(min(s.nextIndex-1, resp.LastLog+1), 1))
|
|
if resp.NoRetryBackoff {
|
|
s.failures = 0
|
|
} else {
|
|
s.failures++
|
|
}
|
|
r.logger.Warn("appendEntries rejected, sending older logs", "peer", s.peer, "next", atomic.LoadUint64(&s.nextIndex))
|
|
}
|
|
|
|
CHECK_MORE:
|
|
// Poll the stop channel here in case we are looping and have been asked
|
|
// to stop, or have stepped down as leader. Even for the best effort case
|
|
// where we are asked to replicate to a given index and then shutdown,
|
|
// it's better to not loop in here to send lots of entries to a straggler
|
|
// that's leaving the cluster anyways.
|
|
select {
|
|
case <-s.stopCh:
|
|
return true
|
|
default:
|
|
}
|
|
|
|
// Check if there are more logs to replicate
|
|
if atomic.LoadUint64(&s.nextIndex) <= lastIndex {
|
|
goto START
|
|
}
|
|
return
|
|
|
|
// SEND_SNAP is used when we fail to get a log, usually because the follower
|
|
// is too far behind, and we must ship a snapshot down instead
|
|
SEND_SNAP:
|
|
if stop, err := r.sendLatestSnapshot(s); stop {
|
|
return true
|
|
} else if err != nil {
|
|
r.logger.Error("failed to send snapshot to", "peer", s.peer, "error", err)
|
|
return
|
|
}
|
|
|
|
// Check if there is more to replicate
|
|
goto CHECK_MORE
|
|
}
|
|
|
|
// sendLatestSnapshot is used to send the latest snapshot we have
|
|
// down to our follower.
|
|
func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
|
|
// Get the snapshots
|
|
snapshots, err := r.snapshots.List()
|
|
if err != nil {
|
|
r.logger.Error("failed to list snapshots", "error", err)
|
|
return false, err
|
|
}
|
|
|
|
// Check we have at least a single snapshot
|
|
if len(snapshots) == 0 {
|
|
return false, fmt.Errorf("no snapshots found")
|
|
}
|
|
|
|
// Open the most recent snapshot
|
|
snapID := snapshots[0].ID
|
|
meta, snapshot, err := r.snapshots.Open(snapID)
|
|
if err != nil {
|
|
r.logger.Error("failed to open snapshot", "id", snapID, "error", err)
|
|
return false, err
|
|
}
|
|
defer snapshot.Close()
|
|
|
|
// Setup the request
|
|
req := InstallSnapshotRequest{
|
|
RPCHeader: r.getRPCHeader(),
|
|
SnapshotVersion: meta.Version,
|
|
Term: s.currentTerm,
|
|
Leader: r.trans.EncodePeer(r.localID, r.localAddr),
|
|
LastLogIndex: meta.Index,
|
|
LastLogTerm: meta.Term,
|
|
Peers: meta.Peers,
|
|
Size: meta.Size,
|
|
Configuration: EncodeConfiguration(meta.Configuration),
|
|
ConfigurationIndex: meta.ConfigurationIndex,
|
|
}
|
|
|
|
// Make the call
|
|
start := time.Now()
|
|
var resp InstallSnapshotResponse
|
|
if err := r.trans.InstallSnapshot(s.peer.ID, s.peer.Address, &req, &resp, snapshot); err != nil {
|
|
r.logger.Error("failed to install snapshot", "id", snapID, "error", err)
|
|
s.failures++
|
|
return false, err
|
|
}
|
|
metrics.MeasureSince([]string{"raft", "replication", "installSnapshot", string(s.peer.ID)}, start)
|
|
|
|
// Check for a newer term, stop running
|
|
if resp.Term > req.Term {
|
|
r.handleStaleTerm(s)
|
|
return true, nil
|
|
}
|
|
|
|
// Update the last contact
|
|
s.setLastContact()
|
|
|
|
// Check for success
|
|
if resp.Success {
|
|
// Update the indexes
|
|
atomic.StoreUint64(&s.nextIndex, meta.Index+1)
|
|
s.commitment.match(s.peer.ID, meta.Index)
|
|
|
|
// Clear any failures
|
|
s.failures = 0
|
|
|
|
// Notify we are still leader
|
|
s.notifyAll(true)
|
|
} else {
|
|
s.failures++
|
|
r.logger.Warn("installSnapshot rejected to", "peer", s.peer)
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
// heartbeat is used to periodically invoke AppendEntries on a peer
|
|
// to ensure they don't time out. This is done async of replicate(),
|
|
// since that routine could potentially be blocked on disk IO.
|
|
func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
|
|
var failures uint64
|
|
req := AppendEntriesRequest{
|
|
RPCHeader: r.getRPCHeader(),
|
|
Term: s.currentTerm,
|
|
Leader: r.trans.EncodePeer(r.localID, r.localAddr),
|
|
}
|
|
var resp AppendEntriesResponse
|
|
for {
|
|
// Wait for the next heartbeat interval or forced notify
|
|
select {
|
|
case <-s.notifyCh:
|
|
case <-randomTimeout(r.conf.HeartbeatTimeout / 10):
|
|
case <-stopCh:
|
|
return
|
|
}
|
|
|
|
start := time.Now()
|
|
if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil {
|
|
r.logger.Error("failed to heartbeat to", "peer", s.peer.Address, "error", err)
|
|
failures++
|
|
select {
|
|
case <-time.After(backoff(failureWait, failures, maxFailureScale)):
|
|
case <-stopCh:
|
|
}
|
|
} else {
|
|
s.setLastContact()
|
|
failures = 0
|
|
metrics.MeasureSince([]string{"raft", "replication", "heartbeat", string(s.peer.ID)}, start)
|
|
s.notifyAll(resp.Success)
|
|
}
|
|
}
|
|
}
|
|
|
|
// pipelineReplicate is used when we have synchronized our state with the follower,
|
|
// and want to switch to a higher performance pipeline mode of replication.
|
|
// We only pipeline AppendEntries commands, and if we ever hit an error, we fall
|
|
// back to the standard replication which can handle more complex situations.
|
|
func (r *Raft) pipelineReplicate(s *followerReplication) error {
|
|
// Create a new pipeline
|
|
pipeline, err := r.trans.AppendEntriesPipeline(s.peer.ID, s.peer.Address)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer pipeline.Close()
|
|
|
|
// Log start and stop of pipeline
|
|
r.logger.Info("pipelining replication", "peer", s.peer)
|
|
defer r.logger.Info("aborting pipeline replication", "peer", s.peer)
|
|
|
|
// Create a shutdown and finish channel
|
|
stopCh := make(chan struct{})
|
|
finishCh := make(chan struct{})
|
|
|
|
// Start a dedicated decoder
|
|
r.goFunc(func() { r.pipelineDecode(s, pipeline, stopCh, finishCh) })
|
|
|
|
// Start pipeline sends at the last good nextIndex
|
|
nextIndex := atomic.LoadUint64(&s.nextIndex)
|
|
|
|
shouldStop := false
|
|
SEND:
|
|
for !shouldStop {
|
|
select {
|
|
case <-finishCh:
|
|
break SEND
|
|
case maxIndex := <-s.stopCh:
|
|
// Make a best effort to replicate up to this index
|
|
if maxIndex > 0 {
|
|
r.pipelineSend(s, pipeline, &nextIndex, maxIndex)
|
|
}
|
|
break SEND
|
|
case deferErr := <-s.triggerDeferErrorCh:
|
|
lastLogIdx, _ := r.getLastLog()
|
|
shouldStop = r.pipelineSend(s, pipeline, &nextIndex, lastLogIdx)
|
|
if !shouldStop {
|
|
deferErr.respond(nil)
|
|
} else {
|
|
deferErr.respond(fmt.Errorf("replication failed"))
|
|
}
|
|
case <-s.triggerCh:
|
|
lastLogIdx, _ := r.getLastLog()
|
|
shouldStop = r.pipelineSend(s, pipeline, &nextIndex, lastLogIdx)
|
|
case <-randomTimeout(r.conf.CommitTimeout):
|
|
lastLogIdx, _ := r.getLastLog()
|
|
shouldStop = r.pipelineSend(s, pipeline, &nextIndex, lastLogIdx)
|
|
}
|
|
}
|
|
|
|
// Stop our decoder, and wait for it to finish
|
|
close(stopCh)
|
|
select {
|
|
case <-finishCh:
|
|
case <-r.shutdownCh:
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// pipelineSend is used to send data over a pipeline. It is a helper to
|
|
// pipelineReplicate.
|
|
func (r *Raft) pipelineSend(s *followerReplication, p AppendPipeline, nextIdx *uint64, lastIndex uint64) (shouldStop bool) {
|
|
// Create a new append request
|
|
req := new(AppendEntriesRequest)
|
|
if err := r.setupAppendEntries(s, req, *nextIdx, lastIndex); err != nil {
|
|
return true
|
|
}
|
|
|
|
// Pipeline the append entries
|
|
if _, err := p.AppendEntries(req, new(AppendEntriesResponse)); err != nil {
|
|
r.logger.Error("failed to pipeline appendEntries", "peer", s.peer, "error", err)
|
|
return true
|
|
}
|
|
|
|
// Increase the next send log to avoid re-sending old logs
|
|
if n := len(req.Entries); n > 0 {
|
|
last := req.Entries[n-1]
|
|
atomic.StoreUint64(nextIdx, last.Index+1)
|
|
}
|
|
return false
|
|
}
|
|
|
|
// pipelineDecode is used to decode the responses of pipelined requests.
|
|
func (r *Raft) pipelineDecode(s *followerReplication, p AppendPipeline, stopCh, finishCh chan struct{}) {
|
|
defer close(finishCh)
|
|
respCh := p.Consumer()
|
|
for {
|
|
select {
|
|
case ready := <-respCh:
|
|
req, resp := ready.Request(), ready.Response()
|
|
appendStats(string(s.peer.ID), ready.Start(), float32(len(req.Entries)))
|
|
|
|
// Check for a newer term, stop running
|
|
if resp.Term > req.Term {
|
|
r.handleStaleTerm(s)
|
|
return
|
|
}
|
|
|
|
// Update the last contact
|
|
s.setLastContact()
|
|
|
|
// Abort pipeline if not successful
|
|
if !resp.Success {
|
|
return
|
|
}
|
|
|
|
// Update our replication state
|
|
updateLastAppended(s, req)
|
|
case <-stopCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// setupAppendEntries is used to setup an append entries request.
|
|
func (r *Raft) setupAppendEntries(s *followerReplication, req *AppendEntriesRequest, nextIndex, lastIndex uint64) error {
|
|
req.RPCHeader = r.getRPCHeader()
|
|
req.Term = s.currentTerm
|
|
req.Leader = r.trans.EncodePeer(r.localID, r.localAddr)
|
|
req.LeaderCommitIndex = r.getCommitIndex()
|
|
if err := r.setPreviousLog(req, nextIndex); err != nil {
|
|
return err
|
|
}
|
|
if err := r.setNewLogs(req, nextIndex, lastIndex); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// setPreviousLog is used to setup the PrevLogEntry and PrevLogTerm for an
|
|
// AppendEntriesRequest given the next index to replicate.
|
|
func (r *Raft) setPreviousLog(req *AppendEntriesRequest, nextIndex uint64) error {
|
|
// Guard for the first index, since there is no 0 log entry
|
|
// Guard against the previous index being a snapshot as well
|
|
lastSnapIdx, lastSnapTerm := r.getLastSnapshot()
|
|
if nextIndex == 1 {
|
|
req.PrevLogEntry = 0
|
|
req.PrevLogTerm = 0
|
|
|
|
} else if (nextIndex - 1) == lastSnapIdx {
|
|
req.PrevLogEntry = lastSnapIdx
|
|
req.PrevLogTerm = lastSnapTerm
|
|
|
|
} else {
|
|
var l Log
|
|
if err := r.logs.GetLog(nextIndex-1, &l); err != nil {
|
|
r.logger.Error("failed to get log", "index", nextIndex-1, "error", err)
|
|
return err
|
|
}
|
|
|
|
// Set the previous index and term (0 if nextIndex is 1)
|
|
req.PrevLogEntry = l.Index
|
|
req.PrevLogTerm = l.Term
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// setNewLogs is used to setup the logs which should be appended for a request.
|
|
func (r *Raft) setNewLogs(req *AppendEntriesRequest, nextIndex, lastIndex uint64) error {
|
|
// Append up to MaxAppendEntries or up to the lastIndex
|
|
req.Entries = make([]*Log, 0, r.conf.MaxAppendEntries)
|
|
maxIndex := min(nextIndex+uint64(r.conf.MaxAppendEntries)-1, lastIndex)
|
|
for i := nextIndex; i <= maxIndex; i++ {
|
|
oldLog := new(Log)
|
|
if err := r.logs.GetLog(i, oldLog); err != nil {
|
|
r.logger.Error("failed to get log", "index", i, "error", err)
|
|
return err
|
|
}
|
|
req.Entries = append(req.Entries, oldLog)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// appendStats is used to emit stats about an AppendEntries invocation.
|
|
func appendStats(peer string, start time.Time, logs float32) {
|
|
metrics.MeasureSince([]string{"raft", "replication", "appendEntries", "rpc", peer}, start)
|
|
metrics.IncrCounter([]string{"raft", "replication", "appendEntries", "logs", peer}, logs)
|
|
}
|
|
|
|
// handleStaleTerm is used when a follower indicates that we have a stale term.
|
|
func (r *Raft) handleStaleTerm(s *followerReplication) {
|
|
r.logger.Error("peer has newer term, stopping replication", "peer", s.peer)
|
|
s.notifyAll(false) // No longer leader
|
|
asyncNotifyCh(s.stepDown)
|
|
}
|
|
|
|
// updateLastAppended is used to update follower replication state after a
|
|
// successful AppendEntries RPC.
|
|
// TODO: This isn't used during InstallSnapshot, but the code there is similar.
|
|
func updateLastAppended(s *followerReplication, req *AppendEntriesRequest) {
|
|
// Mark any inflight logs as committed
|
|
if logs := req.Entries; len(logs) > 0 {
|
|
last := logs[len(logs)-1]
|
|
atomic.StoreUint64(&s.nextIndex, last.Index+1)
|
|
s.commitment.match(s.peer.ID, last.Index)
|
|
}
|
|
|
|
// Notify still leader
|
|
s.notifyAll(true)
|
|
}
|