2018-07-06 16:09:34 +00:00
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package raft
import (
"bytes"
"errors"
"fmt"
"math"
"math/rand"
"sort"
"strings"
"sync"
"time"
2020-01-29 14:16:38 +00:00
"go.etcd.io/etcd/raft/confchange"
"go.etcd.io/etcd/raft/quorum"
2018-10-03 16:55:26 +00:00
pb "go.etcd.io/etcd/raft/raftpb"
2020-01-29 14:16:38 +00:00
"go.etcd.io/etcd/raft/tracker"
2018-07-06 16:09:34 +00:00
)
// None is a placeholder node ID used when there is no leader.
const None uint64 = 0
const noLimit = math . MaxUint64
// Possible values for StateType.
const (
StateFollower StateType = iota
StateCandidate
StateLeader
StatePreCandidate
numStates
)
type ReadOnlyOption int
const (
// ReadOnlySafe guarantees the linearizability of the read only request by
// communicating with the quorum. It is the default and suggested option.
ReadOnlySafe ReadOnlyOption = iota
// ReadOnlyLeaseBased ensures linearizability of the read only request by
// relying on the leader lease. It can be affected by clock drift.
// If the clock drift is unbounded, leader might keep the lease longer than it
// should (clock can move backward/pause without any bound). ReadIndex is not safe
// in that case.
ReadOnlyLeaseBased
)
// Possible values for CampaignType
const (
// campaignPreElection represents the first phase of a normal election when
// Config.PreVote is true.
campaignPreElection CampaignType = "CampaignPreElection"
// campaignElection represents a normal (time-based) election (the second phase
// of the election when Config.PreVote is true).
campaignElection CampaignType = "CampaignElection"
// campaignTransfer represents the type of leader transfer
campaignTransfer CampaignType = "CampaignTransfer"
)
// ErrProposalDropped is returned when the proposal is ignored by some cases,
// so that the proposer can be notified and fail fast.
var ErrProposalDropped = errors . New ( "raft proposal dropped" )
// lockedRand is a small wrapper around rand.Rand to provide
// synchronization among multiple raft groups. Only the methods needed
// by the code are exposed (e.g. Intn).
type lockedRand struct {
mu sync . Mutex
rand * rand . Rand
}
func ( r * lockedRand ) Intn ( n int ) int {
r . mu . Lock ( )
v := r . rand . Intn ( n )
r . mu . Unlock ( )
return v
}
var globalRand = & lockedRand {
rand : rand . New ( rand . NewSource ( time . Now ( ) . UnixNano ( ) ) ) ,
}
// CampaignType represents the type of campaigning
// the reason we use the type of string instead of uint64
// is because it's simpler to compare and fill in raft entries
type CampaignType string
// StateType represents the role of a node in a cluster.
type StateType uint64
var stmap = [ ... ] string {
"StateFollower" ,
"StateCandidate" ,
"StateLeader" ,
"StatePreCandidate" ,
}
func ( st StateType ) String ( ) string {
return stmap [ uint64 ( st ) ]
}
// Config contains the parameters to start a raft.
type Config struct {
// ID is the identity of the local raft. ID cannot be 0.
ID uint64
// peers contains the IDs of all nodes (including self) in the raft cluster. It
// should only be set when starting a new raft cluster. Restarting raft from
// previous configuration will panic if peers is set. peer is private and only
// used for testing right now.
peers [ ] uint64
// learners contains the IDs of all learner nodes (including self if the
// local node is a learner) in the raft cluster. learners only receives
// entries from the leader node. It does not vote or promote itself.
learners [ ] uint64
// ElectionTick is the number of Node.Tick invocations that must pass between
// elections. That is, if a follower does not receive any message from the
// leader of current term before ElectionTick has elapsed, it will become
// candidate and start an election. ElectionTick must be greater than
// HeartbeatTick. We suggest ElectionTick = 10 * HeartbeatTick to avoid
// unnecessary leader switching.
ElectionTick int
// HeartbeatTick is the number of Node.Tick invocations that must pass between
// heartbeats. That is, a leader sends heartbeat messages to maintain its
// leadership every HeartbeatTick ticks.
HeartbeatTick int
// Storage is the storage for raft. raft generates entries and states to be
// stored in storage. raft reads the persisted entries and states out of
// Storage when it needs. raft reads out the previous state and configuration
// out of storage when restarting.
Storage Storage
// Applied is the last applied index. It should only be set when restarting
// raft. raft will not return entries to the application smaller or equal to
// Applied. If Applied is unset when restarting, raft might return previous
// applied entries. This is a very application dependent configuration.
Applied uint64
2018-10-15 21:36:55 +00:00
// MaxSizePerMsg limits the max byte size of each append message. Smaller
// value lowers the raft recovery cost(initial probing and message lost
// during normal operation). On the other side, it might affect the
// throughput during normal replication. Note: math.MaxUint64 for unlimited,
// 0 for at most one entry per message.
2018-07-06 16:09:34 +00:00
MaxSizePerMsg uint64
2019-01-23 19:35:03 +00:00
// MaxCommittedSizePerReady limits the size of the committed entries which
// can be applied.
MaxCommittedSizePerReady uint64
2018-10-15 21:36:55 +00:00
// MaxUncommittedEntriesSize limits the aggregate byte size of the
// uncommitted entries that may be appended to a leader's log. Once this
// limit is exceeded, proposals will begin to return ErrProposalDropped
// errors. Note: 0 for no limit.
MaxUncommittedEntriesSize uint64
2018-07-06 16:09:34 +00:00
// MaxInflightMsgs limits the max number of in-flight append messages during
// optimistic replication phase. The application transportation layer usually
// has its own sending buffer over TCP/UDP. Setting MaxInflightMsgs to avoid
// overflowing that sending buffer. TODO (xiangli): feedback to application to
// limit the proposal rate?
MaxInflightMsgs int
// CheckQuorum specifies if the leader should check quorum activity. Leader
// steps down when quorum is not active for an electionTimeout.
CheckQuorum bool
// PreVote enables the Pre-Vote algorithm described in raft thesis section
// 9.6. This prevents disruption when a node that has been partitioned away
// rejoins the cluster.
PreVote bool
// ReadOnlyOption specifies how the read only request is processed.
//
// ReadOnlySafe guarantees the linearizability of the read only request by
// communicating with the quorum. It is the default and suggested option.
//
// ReadOnlyLeaseBased ensures linearizability of the read only request by
// relying on the leader lease. It can be affected by clock drift.
// If the clock drift is unbounded, leader might keep the lease longer than it
// should (clock can move backward/pause without any bound). ReadIndex is not safe
// in that case.
// CheckQuorum MUST be enabled if ReadOnlyOption is ReadOnlyLeaseBased.
ReadOnlyOption ReadOnlyOption
// Logger is the logger used for raft log. For multinode which can host
// multiple raft group, each raft group can have its own logger
Logger Logger
// DisableProposalForwarding set to true means that followers will drop
// proposals, rather than forwarding them to the leader. One use case for
// this feature would be in a situation where the Raft leader is used to
// compute the data of a proposal, for example, adding a timestamp from a
// hybrid logical clock to data in a monotonically increasing way. Forwarding
// should be disabled to prevent a follower with an inaccurate hybrid
// logical clock from assigning the timestamp and then forwarding the data
// to the leader.
DisableProposalForwarding bool
}
func ( c * Config ) validate ( ) error {
if c . ID == None {
return errors . New ( "cannot use none as id" )
}
if c . HeartbeatTick <= 0 {
return errors . New ( "heartbeat tick must be greater than 0" )
}
if c . ElectionTick <= c . HeartbeatTick {
return errors . New ( "election tick must be greater than heartbeat tick" )
}
if c . Storage == nil {
return errors . New ( "storage cannot be nil" )
}
2018-10-15 21:36:55 +00:00
if c . MaxUncommittedEntriesSize == 0 {
c . MaxUncommittedEntriesSize = noLimit
}
2019-01-23 19:35:03 +00:00
// default MaxCommittedSizePerReady to MaxSizePerMsg because they were
// previously the same parameter.
if c . MaxCommittedSizePerReady == 0 {
c . MaxCommittedSizePerReady = c . MaxSizePerMsg
}
2018-07-06 16:09:34 +00:00
if c . MaxInflightMsgs <= 0 {
return errors . New ( "max inflight messages must be greater than 0" )
}
if c . Logger == nil {
c . Logger = raftLogger
}
if c . ReadOnlyOption == ReadOnlyLeaseBased && ! c . CheckQuorum {
return errors . New ( "CheckQuorum must be enabled when ReadOnlyOption is ReadOnlyLeaseBased" )
}
return nil
}
type raft struct {
id uint64
Term uint64
Vote uint64
readStates [ ] ReadState
// the log
raftLog * raftLog
2018-10-15 21:36:55 +00:00
maxMsgSize uint64
maxUncommittedSize uint64
2020-01-29 14:16:38 +00:00
// TODO(tbg): rename to trk.
prs tracker . ProgressTracker
2018-07-06 16:09:34 +00:00
state StateType
// isLearner is true if the local raft node is a learner.
isLearner bool
msgs [ ] pb . Message
// the leader id
lead uint64
// leadTransferee is id of the leader transfer target when its value is not zero.
// Follow the procedure defined in raft thesis 3.10.
leadTransferee uint64
// Only one conf change may be pending (in the log, but not yet
// applied) at a time. This is enforced via pendingConfIndex, which
// is set to a value >= the log index of the latest pending
// configuration change (if any). Config changes are only allowed to
// be proposed if the leader's applied index is greater than this
// value.
pendingConfIndex uint64
2018-10-15 21:36:55 +00:00
// an estimate of the size of the uncommitted tail of the Raft log. Used to
// prevent unbounded log growth. Only maintained by the leader. Reset on
// term changes.
uncommittedSize uint64
2018-07-06 16:09:34 +00:00
readOnly * readOnly
// number of ticks since it reached last electionTimeout when it is leader
// or candidate.
// number of ticks since it reached last electionTimeout or received a
// valid message from current leader when it is a follower.
electionElapsed int
// number of ticks since it reached last heartbeatTimeout.
// only leader keeps heartbeatElapsed.
heartbeatElapsed int
checkQuorum bool
preVote bool
heartbeatTimeout int
electionTimeout int
// randomizedElectionTimeout is a random number between
// [electiontimeout, 2 * electiontimeout - 1]. It gets reset
// when raft changes its state to follower or candidate.
randomizedElectionTimeout int
disableProposalForwarding bool
tick func ( )
step stepFunc
logger Logger
}
func newRaft ( c * Config ) * raft {
if err := c . validate ( ) ; err != nil {
panic ( err . Error ( ) )
}
2019-01-23 19:35:03 +00:00
raftlog := newLogWithSize ( c . Storage , c . Logger , c . MaxCommittedSizePerReady )
2018-07-06 16:09:34 +00:00
hs , cs , err := c . Storage . InitialState ( )
if err != nil {
panic ( err ) // TODO(bdarnell)
}
2020-01-29 14:16:38 +00:00
if len ( c . peers ) > 0 || len ( c . learners ) > 0 {
if len ( cs . Voters ) > 0 || len ( cs . Learners ) > 0 {
2018-07-06 16:09:34 +00:00
// TODO(bdarnell): the peers argument is always nil except in
// tests; the argument should be removed and these tests should be
// updated to specify their nodes through a snapshot.
2020-01-29 14:16:38 +00:00
panic ( "cannot specify both newRaft(peers, learners) and ConfState.(Voters, Learners)" )
2018-07-06 16:09:34 +00:00
}
2020-01-29 14:16:38 +00:00
cs . Voters = c . peers
cs . Learners = c . learners
2018-07-06 16:09:34 +00:00
}
2020-01-29 14:16:38 +00:00
2018-07-06 16:09:34 +00:00
r := & raft {
id : c . ID ,
lead : None ,
isLearner : false ,
raftLog : raftlog ,
maxMsgSize : c . MaxSizePerMsg ,
2018-10-15 21:36:55 +00:00
maxUncommittedSize : c . MaxUncommittedEntriesSize ,
2020-01-29 14:16:38 +00:00
prs : tracker . MakeProgressTracker ( c . MaxInflightMsgs ) ,
2018-07-06 16:09:34 +00:00
electionTimeout : c . ElectionTick ,
heartbeatTimeout : c . HeartbeatTick ,
logger : c . Logger ,
checkQuorum : c . CheckQuorum ,
preVote : c . PreVote ,
readOnly : newReadOnly ( c . ReadOnlyOption ) ,
disableProposalForwarding : c . DisableProposalForwarding ,
}
2020-01-29 14:16:38 +00:00
cfg , prs , err := confchange . Restore ( confchange . Changer {
Tracker : r . prs ,
LastIndex : raftlog . lastIndex ( ) ,
} , cs )
if err != nil {
panic ( err )
2018-07-06 16:09:34 +00:00
}
2020-01-29 14:16:38 +00:00
assertConfStatesEquivalent ( r . logger , cs , r . switchToConfig ( cfg , prs ) )
2018-07-06 16:09:34 +00:00
2020-01-29 14:16:38 +00:00
if ! IsEmptyHardState ( hs ) {
2018-07-06 16:09:34 +00:00
r . loadState ( hs )
}
if c . Applied > 0 {
raftlog . appliedTo ( c . Applied )
}
r . becomeFollower ( r . Term , None )
var nodesStrs [ ] string
2020-01-29 14:16:38 +00:00
for _ , n := range r . prs . VoterNodes ( ) {
2018-07-06 16:09:34 +00:00
nodesStrs = append ( nodesStrs , fmt . Sprintf ( "%x" , n ) )
}
r . logger . Infof ( "newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]" ,
r . id , strings . Join ( nodesStrs , "," ) , r . Term , r . raftLog . committed , r . raftLog . applied , r . raftLog . lastIndex ( ) , r . raftLog . lastTerm ( ) )
return r
}
func ( r * raft ) hasLeader ( ) bool { return r . lead != None }
func ( r * raft ) softState ( ) * SoftState { return & SoftState { Lead : r . lead , RaftState : r . state } }
func ( r * raft ) hardState ( ) pb . HardState {
return pb . HardState {
Term : r . Term ,
Vote : r . Vote ,
Commit : r . raftLog . committed ,
}
}
// send persists state to stable storage and then sends to its mailbox.
func ( r * raft ) send ( m pb . Message ) {
m . From = r . id
if m . Type == pb . MsgVote || m . Type == pb . MsgVoteResp || m . Type == pb . MsgPreVote || m . Type == pb . MsgPreVoteResp {
if m . Term == 0 {
// All {pre-,}campaign messages need to have the term set when
// sending.
// - MsgVote: m.Term is the term the node is campaigning for,
// non-zero as we increment the term when campaigning.
// - MsgVoteResp: m.Term is the new r.Term if the MsgVote was
// granted, non-zero for the same reason MsgVote is
// - MsgPreVote: m.Term is the term the node will campaign,
// non-zero as we use m.Term to indicate the next term we'll be
// campaigning for
// - MsgPreVoteResp: m.Term is the term received in the original
// MsgPreVote if the pre-vote was granted, non-zero for the
// same reasons MsgPreVote is
panic ( fmt . Sprintf ( "term should be set when sending %s" , m . Type ) )
}
} else {
if m . Term != 0 {
panic ( fmt . Sprintf ( "term should not be set when sending %s (was %d)" , m . Type , m . Term ) )
}
// do not attach term to MsgProp, MsgReadIndex
// proposals are a way to forward to the leader and
// should be treated as local message.
// MsgReadIndex is also forwarded to leader.
if m . Type != pb . MsgProp && m . Type != pb . MsgReadIndex {
m . Term = r . Term
}
}
r . msgs = append ( r . msgs , m )
}
2018-10-03 16:55:26 +00:00
// sendAppend sends an append RPC with new entries (if any) and the
// current commit index to the given peer.
2018-07-06 16:09:34 +00:00
func ( r * raft ) sendAppend ( to uint64 ) {
2018-10-03 16:55:26 +00:00
r . maybeSendAppend ( to , true )
}
// maybeSendAppend sends an append RPC with new entries to the given peer,
// if necessary. Returns true if a message was sent. The sendIfEmpty
// argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
func ( r * raft ) maybeSendAppend ( to uint64 , sendIfEmpty bool ) bool {
2020-01-29 14:16:38 +00:00
pr := r . prs . Progress [ to ]
2018-07-06 16:09:34 +00:00
if pr . IsPaused ( ) {
2018-10-03 16:55:26 +00:00
return false
2018-07-06 16:09:34 +00:00
}
m := pb . Message { }
m . To = to
term , errt := r . raftLog . term ( pr . Next - 1 )
ents , erre := r . raftLog . entries ( pr . Next , r . maxMsgSize )
2018-10-03 16:55:26 +00:00
if len ( ents ) == 0 && ! sendIfEmpty {
return false
}
2018-07-06 16:09:34 +00:00
if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
if ! pr . RecentActive {
r . logger . Debugf ( "ignore sending snapshot to %x since it is not recently active" , to )
2018-10-03 16:55:26 +00:00
return false
2018-07-06 16:09:34 +00:00
}
m . Type = pb . MsgSnap
snapshot , err := r . raftLog . snapshot ( )
if err != nil {
if err == ErrSnapshotTemporarilyUnavailable {
r . logger . Debugf ( "%x failed to send snapshot to %x because snapshot is temporarily unavailable" , r . id , to )
2018-10-03 16:55:26 +00:00
return false
2018-07-06 16:09:34 +00:00
}
panic ( err ) // TODO(bdarnell)
}
if IsEmptySnap ( snapshot ) {
panic ( "need non-empty snapshot" )
}
m . Snapshot = snapshot
sindex , sterm := snapshot . Metadata . Index , snapshot . Metadata . Term
r . logger . Debugf ( "%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]" ,
r . id , r . raftLog . firstIndex ( ) , r . raftLog . committed , sindex , sterm , to , pr )
2020-01-29 14:16:38 +00:00
pr . BecomeSnapshot ( sindex )
2018-07-06 16:09:34 +00:00
r . logger . Debugf ( "%x paused sending replication messages to %x [%s]" , r . id , to , pr )
} else {
m . Type = pb . MsgApp
m . Index = pr . Next - 1
m . LogTerm = term
m . Entries = ents
m . Commit = r . raftLog . committed
if n := len ( m . Entries ) ; n != 0 {
switch pr . State {
2020-01-29 14:16:38 +00:00
// optimistically increase the next when in StateReplicate
case tracker . StateReplicate :
2018-07-06 16:09:34 +00:00
last := m . Entries [ n - 1 ] . Index
2020-01-29 14:16:38 +00:00
pr . OptimisticUpdate ( last )
pr . Inflights . Add ( last )
case tracker . StateProbe :
pr . ProbeSent = true
2018-07-06 16:09:34 +00:00
default :
r . logger . Panicf ( "%x is sending append in unhandled state %s" , r . id , pr . State )
}
}
}
r . send ( m )
2018-10-03 16:55:26 +00:00
return true
2018-07-06 16:09:34 +00:00
}
2018-10-15 21:36:55 +00:00
// sendHeartbeat sends a heartbeat RPC to the given peer.
2018-07-06 16:09:34 +00:00
func ( r * raft ) sendHeartbeat ( to uint64 , ctx [ ] byte ) {
// Attach the commit as min(to.matched, r.committed).
// When the leader sends out heartbeat message,
// the receiver(follower) might not be matched with the leader
// or it might not have all the committed entries.
// The leader MUST NOT forward the follower's commit to
// an unmatched index.
2020-01-29 14:16:38 +00:00
commit := min ( r . prs . Progress [ to ] . Match , r . raftLog . committed )
2018-07-06 16:09:34 +00:00
m := pb . Message {
To : to ,
Type : pb . MsgHeartbeat ,
Commit : commit ,
Context : ctx ,
}
r . send ( m )
}
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
// according to the progress recorded in r.prs.
func ( r * raft ) bcastAppend ( ) {
2020-01-29 14:16:38 +00:00
r . prs . Visit ( func ( id uint64 , _ * tracker . Progress ) {
2018-07-06 16:09:34 +00:00
if id == r . id {
return
}
r . sendAppend ( id )
} )
}
// bcastHeartbeat sends RPC, without entries to all the peers.
func ( r * raft ) bcastHeartbeat ( ) {
lastCtx := r . readOnly . lastPendingRequestCtx ( )
if len ( lastCtx ) == 0 {
r . bcastHeartbeatWithCtx ( nil )
} else {
r . bcastHeartbeatWithCtx ( [ ] byte ( lastCtx ) )
}
}
func ( r * raft ) bcastHeartbeatWithCtx ( ctx [ ] byte ) {
2020-01-29 14:16:38 +00:00
r . prs . Visit ( func ( id uint64 , _ * tracker . Progress ) {
2018-07-06 16:09:34 +00:00
if id == r . id {
return
}
r . sendHeartbeat ( id , ctx )
} )
}
2020-01-29 14:16:38 +00:00
func ( r * raft ) advance ( rd Ready ) {
// If entries were applied (or a snapshot), update our cursor for
// the next Ready. Note that if the current HardState contains a
// new Commit index, this does not mean that we're also applying
// all of the new entries due to commit pagination by size.
if index := rd . appliedCursor ( ) ; index > 0 {
r . raftLog . appliedTo ( index )
if r . prs . Config . AutoLeave && index >= r . pendingConfIndex && r . state == StateLeader {
// If the current (and most recent, at least for this leader's term)
// configuration should be auto-left, initiate that now.
ccdata , err := ( & pb . ConfChangeV2 { } ) . Marshal ( )
if err != nil {
panic ( err )
}
ent := pb . Entry {
Type : pb . EntryConfChangeV2 ,
Data : ccdata ,
}
if ! r . appendEntry ( ent ) {
// If we could not append the entry, bump the pending conf index
// so that we'll try again later.
//
// TODO(tbg): test this case.
r . pendingConfIndex = r . raftLog . lastIndex ( )
} else {
r . logger . Infof ( "initiating automatic transition out of joint configuration %s" , r . prs . Config )
}
}
}
r . reduceUncommittedSize ( rd . CommittedEntries )
if len ( rd . Entries ) > 0 {
e := rd . Entries [ len ( rd . Entries ) - 1 ]
r . raftLog . stableTo ( e . Index , e . Term )
}
if ! IsEmptySnap ( rd . Snapshot ) {
r . raftLog . stableSnapTo ( rd . Snapshot . Metadata . Index )
}
}
2018-07-06 16:09:34 +00:00
// maybeCommit attempts to advance the commit index. Returns true if
// the commit index changed (in which case the caller should call
// r.bcastAppend).
func ( r * raft ) maybeCommit ( ) bool {
2020-01-29 14:16:38 +00:00
mci := r . prs . Committed ( )
2018-07-06 16:09:34 +00:00
return r . raftLog . maybeCommit ( mci , r . Term )
}
func ( r * raft ) reset ( term uint64 ) {
if r . Term != term {
r . Term = term
r . Vote = None
}
r . lead = None
r . electionElapsed = 0
r . heartbeatElapsed = 0
r . resetRandomizedElectionTimeout ( )
r . abortLeaderTransfer ( )
2020-01-29 14:16:38 +00:00
r . prs . ResetVotes ( )
r . prs . Visit ( func ( id uint64 , pr * tracker . Progress ) {
* pr = tracker . Progress {
Match : 0 ,
Next : r . raftLog . lastIndex ( ) + 1 ,
Inflights : tracker . NewInflights ( r . prs . MaxInflight ) ,
IsLearner : pr . IsLearner ,
}
2018-07-06 16:09:34 +00:00
if id == r . id {
pr . Match = r . raftLog . lastIndex ( )
}
} )
r . pendingConfIndex = 0
2018-10-15 21:36:55 +00:00
r . uncommittedSize = 0
2018-07-06 16:09:34 +00:00
r . readOnly = newReadOnly ( r . readOnly . option )
}
2019-01-23 19:35:03 +00:00
func ( r * raft ) appendEntry ( es ... pb . Entry ) ( accepted bool ) {
2018-07-06 16:09:34 +00:00
li := r . raftLog . lastIndex ( )
for i := range es {
es [ i ] . Term = r . Term
es [ i ] . Index = li + 1 + uint64 ( i )
}
2019-01-23 19:35:03 +00:00
// Track the size of this uncommitted proposal.
if ! r . increaseUncommittedSize ( es ) {
r . logger . Debugf (
"%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal" ,
r . id ,
)
// Drop the proposal.
return false
}
2018-07-06 16:09:34 +00:00
// use latest "last" index after truncate/append
li = r . raftLog . append ( es ... )
2020-01-29 14:16:38 +00:00
r . prs . Progress [ r . id ] . MaybeUpdate ( li )
2018-07-06 16:09:34 +00:00
// Regardless of maybeCommit's return, our caller will call bcastAppend.
r . maybeCommit ( )
2019-01-23 19:35:03 +00:00
return true
2018-07-06 16:09:34 +00:00
}
// tickElection is run by followers and candidates after r.electionTimeout.
func ( r * raft ) tickElection ( ) {
r . electionElapsed ++
if r . promotable ( ) && r . pastElectionTimeout ( ) {
r . electionElapsed = 0
r . Step ( pb . Message { From : r . id , Type : pb . MsgHup } )
}
}
// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
func ( r * raft ) tickHeartbeat ( ) {
r . heartbeatElapsed ++
r . electionElapsed ++
if r . electionElapsed >= r . electionTimeout {
r . electionElapsed = 0
if r . checkQuorum {
r . Step ( pb . Message { From : r . id , Type : pb . MsgCheckQuorum } )
}
// If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
if r . state == StateLeader && r . leadTransferee != None {
r . abortLeaderTransfer ( )
}
}
if r . state != StateLeader {
return
}
if r . heartbeatElapsed >= r . heartbeatTimeout {
r . heartbeatElapsed = 0
r . Step ( pb . Message { From : r . id , Type : pb . MsgBeat } )
}
}
func ( r * raft ) becomeFollower ( term uint64 , lead uint64 ) {
r . step = stepFollower
r . reset ( term )
r . tick = r . tickElection
r . lead = lead
r . state = StateFollower
r . logger . Infof ( "%x became follower at term %d" , r . id , r . Term )
}
func ( r * raft ) becomeCandidate ( ) {
// TODO(xiangli) remove the panic when the raft implementation is stable
if r . state == StateLeader {
panic ( "invalid transition [leader -> candidate]" )
}
r . step = stepCandidate
r . reset ( r . Term + 1 )
r . tick = r . tickElection
r . Vote = r . id
r . state = StateCandidate
r . logger . Infof ( "%x became candidate at term %d" , r . id , r . Term )
}
func ( r * raft ) becomePreCandidate ( ) {
// TODO(xiangli) remove the panic when the raft implementation is stable
if r . state == StateLeader {
panic ( "invalid transition [leader -> pre-candidate]" )
}
// Becoming a pre-candidate changes our step functions and state,
// but doesn't change anything else. In particular it does not increase
// r.Term or change r.Vote.
r . step = stepCandidate
2020-01-29 14:16:38 +00:00
r . prs . ResetVotes ( )
2018-07-06 16:09:34 +00:00
r . tick = r . tickElection
2019-01-23 19:35:03 +00:00
r . lead = None
2018-07-06 16:09:34 +00:00
r . state = StatePreCandidate
r . logger . Infof ( "%x became pre-candidate at term %d" , r . id , r . Term )
}
func ( r * raft ) becomeLeader ( ) {
// TODO(xiangli) remove the panic when the raft implementation is stable
if r . state == StateFollower {
panic ( "invalid transition [follower -> leader]" )
}
r . step = stepLeader
r . reset ( r . Term )
r . tick = r . tickHeartbeat
r . lead = r . id
r . state = StateLeader
2019-01-23 19:35:03 +00:00
// Followers enter replicate mode when they've been successfully probed
// (perhaps after having received a snapshot as a result). The leader is
// trivially in this state. Note that r.reset() has initialized this
// progress with the last index already.
2020-01-29 14:16:38 +00:00
r . prs . Progress [ r . id ] . BecomeReplicate ( )
2018-07-06 16:09:34 +00:00
// Conservatively set the pendingConfIndex to the last index in the
// log. There may or may not be a pending config change, but it's
// safe to delay any future proposals until we commit all our
// pending log entries, and scanning the entire tail of the log
// could be expensive.
r . pendingConfIndex = r . raftLog . lastIndex ( )
2019-01-23 19:35:03 +00:00
emptyEnt := pb . Entry { Data : nil }
if ! r . appendEntry ( emptyEnt ) {
// This won't happen because we just called reset() above.
r . logger . Panic ( "empty entry was dropped" )
}
// As a special case, don't count the initial empty entry towards the
// uncommitted log quota. This is because we want to preserve the
// behavior of allowing one entry larger than quota if the current
// usage is zero.
r . reduceUncommittedSize ( [ ] pb . Entry { emptyEnt } )
2018-07-06 16:09:34 +00:00
r . logger . Infof ( "%x became leader at term %d" , r . id , r . Term )
}
2020-01-29 14:16:38 +00:00
// campaign transitions the raft instance to candidate state. This must only be
// called after verifying that this is a legitimate transition.
2018-07-06 16:09:34 +00:00
func ( r * raft ) campaign ( t CampaignType ) {
2020-01-29 14:16:38 +00:00
if ! r . promotable ( ) {
// This path should not be hit (callers are supposed to check), but
// better safe than sorry.
r . logger . Warningf ( "%x is unpromotable; campaign() should have been called" , r . id )
}
2018-07-06 16:09:34 +00:00
var term uint64
var voteMsg pb . MessageType
if t == campaignPreElection {
r . becomePreCandidate ( )
voteMsg = pb . MsgPreVote
// PreVote RPCs are sent for the next term before we've incremented r.Term.
term = r . Term + 1
} else {
r . becomeCandidate ( )
voteMsg = pb . MsgVote
term = r . Term
}
2020-01-29 14:16:38 +00:00
if _ , _ , res := r . poll ( r . id , voteRespMsgType ( voteMsg ) , true ) ; res == quorum . VoteWon {
2018-07-06 16:09:34 +00:00
// We won the election after voting for ourselves (which must mean that
// this is a single-node cluster). Advance to the next state.
if t == campaignPreElection {
r . campaign ( campaignElection )
} else {
r . becomeLeader ( )
}
return
}
2020-01-29 14:16:38 +00:00
var ids [ ] uint64
{
idMap := r . prs . Voters . IDs ( )
ids = make ( [ ] uint64 , 0 , len ( idMap ) )
for id := range idMap {
ids = append ( ids , id )
}
sort . Slice ( ids , func ( i , j int ) bool { return ids [ i ] < ids [ j ] } )
}
for _ , id := range ids {
2018-07-06 16:09:34 +00:00
if id == r . id {
continue
}
r . logger . Infof ( "%x [logterm: %d, index: %d] sent %s request to %x at term %d" ,
r . id , r . raftLog . lastTerm ( ) , r . raftLog . lastIndex ( ) , voteMsg , id , r . Term )
var ctx [ ] byte
if t == campaignTransfer {
ctx = [ ] byte ( t )
}
r . send ( pb . Message { Term : term , To : id , Type : voteMsg , Index : r . raftLog . lastIndex ( ) , LogTerm : r . raftLog . lastTerm ( ) , Context : ctx } )
}
}
2020-01-29 14:16:38 +00:00
func ( r * raft ) poll ( id uint64 , t pb . MessageType , v bool ) ( granted int , rejected int , result quorum . VoteResult ) {
2018-07-06 16:09:34 +00:00
if v {
r . logger . Infof ( "%x received %s from %x at term %d" , r . id , t , id , r . Term )
} else {
r . logger . Infof ( "%x received %s rejection from %x at term %d" , r . id , t , id , r . Term )
}
2020-01-29 14:16:38 +00:00
r . prs . RecordVote ( id , v )
return r . prs . TallyVotes ( )
2018-07-06 16:09:34 +00:00
}
func ( r * raft ) Step ( m pb . Message ) error {
// Handle the message term, which may result in our stepping down to a follower.
switch {
case m . Term == 0 :
// local message
case m . Term > r . Term :
if m . Type == pb . MsgVote || m . Type == pb . MsgPreVote {
force := bytes . Equal ( m . Context , [ ] byte ( campaignTransfer ) )
inLease := r . checkQuorum && r . lead != None && r . electionElapsed < r . electionTimeout
if ! force && inLease {
// If a server receives a RequestVote request within the minimum election timeout
// of hearing from a current leader, it does not update its term or grant its vote
r . logger . Infof ( "%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)" ,
r . id , r . raftLog . lastTerm ( ) , r . raftLog . lastIndex ( ) , r . Vote , m . Type , m . From , m . LogTerm , m . Index , r . Term , r . electionTimeout - r . electionElapsed )
return nil
}
}
switch {
case m . Type == pb . MsgPreVote :
// Never change our term in response to a PreVote
case m . Type == pb . MsgPreVoteResp && ! m . Reject :
// We send pre-vote requests with a term in our future. If the
// pre-vote is granted, we will increment our term when we get a
// quorum. If it is not, the term comes from the node that
// rejected our vote so we should become a follower at the new
// term.
default :
r . logger . Infof ( "%x [term: %d] received a %s message with higher term from %x [term: %d]" ,
r . id , r . Term , m . Type , m . From , m . Term )
if m . Type == pb . MsgApp || m . Type == pb . MsgHeartbeat || m . Type == pb . MsgSnap {
r . becomeFollower ( m . Term , m . From )
} else {
r . becomeFollower ( m . Term , None )
}
}
case m . Term < r . Term :
if ( r . checkQuorum || r . preVote ) && ( m . Type == pb . MsgHeartbeat || m . Type == pb . MsgApp ) {
// We have received messages from a leader at a lower term. It is possible
// that these messages were simply delayed in the network, but this could
// also mean that this node has advanced its term number during a network
// partition, and it is now unable to either win an election or to rejoin
// the majority on the old term. If checkQuorum is false, this will be
// handled by incrementing term numbers in response to MsgVote with a
// higher term, but if checkQuorum is true we may not advance the term on
// MsgVote and must generate other messages to advance the term. The net
// result of these two features is to minimize the disruption caused by
// nodes that have been removed from the cluster's configuration: a
// removed node will send MsgVotes (or MsgPreVotes) which will be ignored,
// but it will not receive MsgApp or MsgHeartbeat, so it will not create
// disruptive term increases, by notifying leader of this node's activeness.
// The above comments also true for Pre-Vote
//
// When follower gets isolated, it soon starts an election ending
// up with a higher term than leader, although it won't receive enough
// votes to win the election. When it regains connectivity, this response
// with "pb.MsgAppResp" of higher term would force leader to step down.
// However, this disruption is inevitable to free this stuck node with
// fresh election. This can be prevented with Pre-Vote phase.
r . send ( pb . Message { To : m . From , Type : pb . MsgAppResp } )
} else if m . Type == pb . MsgPreVote {
// Before Pre-Vote enable, there may have candidate with higher term,
// but less log. After update to Pre-Vote, the cluster may deadlock if
// we drop messages with a lower term.
r . logger . Infof ( "%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d" ,
r . id , r . raftLog . lastTerm ( ) , r . raftLog . lastIndex ( ) , r . Vote , m . Type , m . From , m . LogTerm , m . Index , r . Term )
r . send ( pb . Message { To : m . From , Term : r . Term , Type : pb . MsgPreVoteResp , Reject : true } )
} else {
// ignore other cases
r . logger . Infof ( "%x [term: %d] ignored a %s message with lower term from %x [term: %d]" ,
r . id , r . Term , m . Type , m . From , m . Term )
}
return nil
}
switch m . Type {
case pb . MsgHup :
if r . state != StateLeader {
2020-01-29 14:16:38 +00:00
if ! r . promotable ( ) {
r . logger . Warningf ( "%x is unpromotable and can not campaign; ignoring MsgHup" , r . id )
return nil
}
2018-07-06 16:09:34 +00:00
ents , err := r . raftLog . slice ( r . raftLog . applied + 1 , r . raftLog . committed + 1 , noLimit )
if err != nil {
r . logger . Panicf ( "unexpected error getting unapplied entries (%v)" , err )
}
if n := numOfPendingConf ( ents ) ; n != 0 && r . raftLog . committed > r . raftLog . applied {
r . logger . Warningf ( "%x cannot campaign at term %d since there are still %d pending configuration changes to apply" , r . id , r . Term , n )
return nil
}
r . logger . Infof ( "%x is starting a new election at term %d" , r . id , r . Term )
if r . preVote {
r . campaign ( campaignPreElection )
} else {
r . campaign ( campaignElection )
}
} else {
r . logger . Debugf ( "%x ignoring MsgHup because already leader" , r . id )
}
case pb . MsgVote , pb . MsgPreVote :
// We can vote if this is a repeat of a vote we've already cast...
canVote := r . Vote == m . From ||
// ...we haven't voted and we don't think there's a leader yet in this term...
( r . Vote == None && r . lead == None ) ||
// ...or this is a PreVote for a future term...
( m . Type == pb . MsgPreVote && m . Term > r . Term )
// ...and we believe the candidate is up to date.
if canVote && r . raftLog . isUpToDate ( m . Index , m . LogTerm ) {
2020-01-29 14:16:38 +00:00
// Note: it turns out that that learners must be allowed to cast votes.
// This seems counter- intuitive but is necessary in the situation in which
// a learner has been promoted (i.e. is now a voter) but has not learned
// about this yet.
// For example, consider a group in which id=1 is a learner and id=2 and
// id=3 are voters. A configuration change promoting 1 can be committed on
// the quorum `{2,3}` without the config change being appended to the
// learner's log. If the leader (say 2) fails, there are de facto two
// voters remaining. Only 3 can win an election (due to its log containing
// all committed entries), but to do so it will need 1 to vote. But 1
// considers itself a learner and will continue to do so until 3 has
// stepped up as leader, replicates the conf change to 1, and 1 applies it.
// Ultimately, by receiving a request to vote, the learner realizes that
// the candidate believes it to be a voter, and that it should act
// accordingly. The candidate's config may be stale, too; but in that case
// it won't win the election, at least in the absence of the bug discussed
// in:
// https://github.com/etcd-io/etcd/issues/7625#issuecomment-488798263.
2018-07-06 16:09:34 +00:00
r . logger . Infof ( "%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d" ,
r . id , r . raftLog . lastTerm ( ) , r . raftLog . lastIndex ( ) , r . Vote , m . Type , m . From , m . LogTerm , m . Index , r . Term )
// When responding to Msg{Pre,}Vote messages we include the term
2019-04-12 15:51:37 +00:00
// from the message, not the local term. To see why, consider the
2018-07-06 16:09:34 +00:00
// case where a single node was previously partitioned away and
2019-04-12 15:51:37 +00:00
// it's local term is now out of date. If we include the local term
2018-07-06 16:09:34 +00:00
// (recall that for pre-votes we don't update the local term), the
// (pre-)campaigning node on the other end will proceed to ignore
// the message (it ignores all out of date messages).
// The term in the original message and current local term are the
// same in the case of regular votes, but different for pre-votes.
r . send ( pb . Message { To : m . From , Term : m . Term , Type : voteRespMsgType ( m . Type ) } )
if m . Type == pb . MsgVote {
// Only record real votes.
r . electionElapsed = 0
r . Vote = m . From
}
} else {
r . logger . Infof ( "%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d" ,
r . id , r . raftLog . lastTerm ( ) , r . raftLog . lastIndex ( ) , r . Vote , m . Type , m . From , m . LogTerm , m . Index , r . Term )
r . send ( pb . Message { To : m . From , Term : r . Term , Type : voteRespMsgType ( m . Type ) , Reject : true } )
}
default :
err := r . step ( r , m )
if err != nil {
return err
}
}
return nil
}
type stepFunc func ( r * raft , m pb . Message ) error
func stepLeader ( r * raft , m pb . Message ) error {
// These message types do not require any progress for m.From.
switch m . Type {
case pb . MsgBeat :
r . bcastHeartbeat ( )
return nil
case pb . MsgCheckQuorum :
2020-01-29 14:16:38 +00:00
// The leader should always see itself as active. As a precaution, handle
// the case in which the leader isn't in the configuration any more (for
// example if it just removed itself).
//
// TODO(tbg): I added a TODO in removeNode, it doesn't seem that the
// leader steps down when removing itself. I might be missing something.
if pr := r . prs . Progress [ r . id ] ; pr != nil {
pr . RecentActive = true
}
if ! r . prs . QuorumActive ( ) {
2018-07-06 16:09:34 +00:00
r . logger . Warningf ( "%x stepped down to follower since quorum is not active" , r . id )
r . becomeFollower ( r . Term , None )
}
2020-01-29 14:16:38 +00:00
// Mark everyone (but ourselves) as inactive in preparation for the next
// CheckQuorum.
r . prs . Visit ( func ( id uint64 , pr * tracker . Progress ) {
if id != r . id {
pr . RecentActive = false
}
} )
2018-07-06 16:09:34 +00:00
return nil
case pb . MsgProp :
if len ( m . Entries ) == 0 {
r . logger . Panicf ( "%x stepped empty MsgProp" , r . id )
}
2020-01-29 14:16:38 +00:00
if r . prs . Progress [ r . id ] == nil {
2018-07-06 16:09:34 +00:00
// If we are not currently a member of the range (i.e. this node
// was removed from the configuration while serving as leader),
// drop any new proposals.
return ErrProposalDropped
}
if r . leadTransferee != None {
r . logger . Debugf ( "%x [term %d] transfer leadership to %x is in progress; dropping proposal" , r . id , r . Term , r . leadTransferee )
return ErrProposalDropped
}
2020-01-29 14:16:38 +00:00
for i := range m . Entries {
e := & m . Entries [ i ]
var cc pb . ConfChangeI
2018-07-06 16:09:34 +00:00
if e . Type == pb . EntryConfChange {
2020-01-29 14:16:38 +00:00
var ccc pb . ConfChange
if err := ccc . Unmarshal ( e . Data ) ; err != nil {
panic ( err )
}
cc = ccc
} else if e . Type == pb . EntryConfChangeV2 {
var ccc pb . ConfChangeV2
if err := ccc . Unmarshal ( e . Data ) ; err != nil {
panic ( err )
}
cc = ccc
}
if cc != nil {
alreadyPending := r . pendingConfIndex > r . raftLog . applied
alreadyJoint := len ( r . prs . Config . Voters [ 1 ] ) > 0
wantsLeaveJoint := len ( cc . AsV2 ( ) . Changes ) == 0
var refused string
if alreadyPending {
refused = fmt . Sprintf ( "possible unapplied conf change at index %d (applied to %d)" , r . pendingConfIndex , r . raftLog . applied )
} else if alreadyJoint && ! wantsLeaveJoint {
refused = "must transition out of joint config first"
} else if ! alreadyJoint && wantsLeaveJoint {
refused = "not in joint state; refusing empty conf change"
}
if refused != "" {
r . logger . Infof ( "%x ignoring conf change %v at config %s: %s" , r . id , cc , r . prs . Config , refused )
2018-07-06 16:09:34 +00:00
m . Entries [ i ] = pb . Entry { Type : pb . EntryNormal }
} else {
r . pendingConfIndex = r . raftLog . lastIndex ( ) + uint64 ( i ) + 1
}
}
}
2019-01-23 19:35:03 +00:00
if ! r . appendEntry ( m . Entries ... ) {
return ErrProposalDropped
}
2018-07-06 16:09:34 +00:00
r . bcastAppend ( )
return nil
case pb . MsgReadIndex :
2020-01-29 14:16:38 +00:00
// If more than the local vote is needed, go through a full broadcast,
// otherwise optimize.
if ! r . prs . IsSingleton ( ) {
2018-07-06 16:09:34 +00:00
if r . raftLog . zeroTermOnErrCompacted ( r . raftLog . term ( r . raftLog . committed ) ) != r . Term {
// Reject read only request when this leader has not committed any log entry at its term.
return nil
}
// thinking: use an interally defined context instead of the user given context.
// We can express this in terms of the term and index instead of a user-supplied value.
// This would allow multiple reads to piggyback on the same message.
switch r . readOnly . option {
case ReadOnlySafe :
r . readOnly . addRequest ( r . raftLog . committed , m )
2020-01-29 14:16:38 +00:00
// The local node automatically acks the request.
r . readOnly . recvAck ( r . id , m . Entries [ 0 ] . Data )
2018-07-06 16:09:34 +00:00
r . bcastHeartbeatWithCtx ( m . Entries [ 0 ] . Data )
case ReadOnlyLeaseBased :
ri := r . raftLog . committed
if m . From == None || m . From == r . id { // from local member
2020-01-29 14:16:38 +00:00
r . readStates = append ( r . readStates , ReadState { Index : ri , RequestCtx : m . Entries [ 0 ] . Data } )
2018-07-06 16:09:34 +00:00
} else {
r . send ( pb . Message { To : m . From , Type : pb . MsgReadIndexResp , Index : ri , Entries : m . Entries } )
}
}
2020-01-29 14:16:38 +00:00
} else { // only one voting member (the leader) in the cluster
2019-04-12 15:51:37 +00:00
if m . From == None || m . From == r . id { // from leader itself
r . readStates = append ( r . readStates , ReadState { Index : r . raftLog . committed , RequestCtx : m . Entries [ 0 ] . Data } )
} else { // from learner member
r . send ( pb . Message { To : m . From , Type : pb . MsgReadIndexResp , Index : r . raftLog . committed , Entries : m . Entries } )
}
2018-07-06 16:09:34 +00:00
}
return nil
}
// All other message types require a progress for m.From (pr).
2020-01-29 14:16:38 +00:00
pr := r . prs . Progress [ m . From ]
2018-07-06 16:09:34 +00:00
if pr == nil {
r . logger . Debugf ( "%x no progress available for %x" , r . id , m . From )
return nil
}
switch m . Type {
case pb . MsgAppResp :
pr . RecentActive = true
if m . Reject {
2020-01-29 14:16:38 +00:00
r . logger . Debugf ( "%x received MsgAppResp(MsgApp was rejected, lastindex: %d) from %x for index %d" ,
2018-07-06 16:09:34 +00:00
r . id , m . RejectHint , m . From , m . Index )
2020-01-29 14:16:38 +00:00
if pr . MaybeDecrTo ( m . Index , m . RejectHint ) {
2018-07-06 16:09:34 +00:00
r . logger . Debugf ( "%x decreased progress of %x to [%s]" , r . id , m . From , pr )
2020-01-29 14:16:38 +00:00
if pr . State == tracker . StateReplicate {
pr . BecomeProbe ( )
2018-07-06 16:09:34 +00:00
}
r . sendAppend ( m . From )
}
} else {
oldPaused := pr . IsPaused ( )
2020-01-29 14:16:38 +00:00
if pr . MaybeUpdate ( m . Index ) {
2018-07-06 16:09:34 +00:00
switch {
2020-01-29 14:16:38 +00:00
case pr . State == tracker . StateProbe :
pr . BecomeReplicate ( )
case pr . State == tracker . StateSnapshot && pr . Match >= pr . PendingSnapshot :
// TODO(tbg): we should also enter this branch if a snapshot is
// received that is below pr.PendingSnapshot but which makes it
// possible to use the log again.
r . logger . Debugf ( "%x recovered from needing snapshot, resumed sending replication messages to %x [%s]" , r . id , m . From , pr )
2019-01-23 19:35:03 +00:00
// Transition back to replicating state via probing state
// (which takes the snapshot into account). If we didn't
// move to replicating state, that would only happen with
// the next round of appends (but there may not be a next
// round for a while, exposing an inconsistent RaftStatus).
2020-01-29 14:16:38 +00:00
pr . BecomeProbe ( )
pr . BecomeReplicate ( )
case pr . State == tracker . StateReplicate :
pr . Inflights . FreeLE ( m . Index )
2018-07-06 16:09:34 +00:00
}
if r . maybeCommit ( ) {
r . bcastAppend ( )
} else if oldPaused {
2018-10-03 16:55:26 +00:00
// If we were paused before, this node may be missing the
// latest commit index, so send it.
2018-07-06 16:09:34 +00:00
r . sendAppend ( m . From )
}
2018-10-03 16:55:26 +00:00
// We've updated flow control information above, which may
// allow us to send multiple (size-limited) in-flight messages
// at once (such as when transitioning from probe to
// replicate, or when freeTo() covers multiple messages). If
// we have more entries to send, send as many messages as we
// can (without sending empty messages for the commit index)
for r . maybeSendAppend ( m . From , false ) {
}
2018-07-06 16:09:34 +00:00
// Transfer leadership is in progress.
if m . From == r . leadTransferee && pr . Match == r . raftLog . lastIndex ( ) {
r . logger . Infof ( "%x sent MsgTimeoutNow to %x after received MsgAppResp" , r . id , m . From )
r . sendTimeoutNow ( m . From )
}
}
}
case pb . MsgHeartbeatResp :
pr . RecentActive = true
2020-01-29 14:16:38 +00:00
pr . ProbeSent = false
2018-07-06 16:09:34 +00:00
// free one slot for the full inflights window to allow progress.
2020-01-29 14:16:38 +00:00
if pr . State == tracker . StateReplicate && pr . Inflights . Full ( ) {
pr . Inflights . FreeFirstOne ( )
2018-07-06 16:09:34 +00:00
}
if pr . Match < r . raftLog . lastIndex ( ) {
r . sendAppend ( m . From )
}
if r . readOnly . option != ReadOnlySafe || len ( m . Context ) == 0 {
return nil
}
2020-01-29 14:16:38 +00:00
if r . prs . Voters . VoteResult ( r . readOnly . recvAck ( m . From , m . Context ) ) != quorum . VoteWon {
2018-07-06 16:09:34 +00:00
return nil
}
rss := r . readOnly . advance ( m )
for _ , rs := range rss {
req := rs . req
if req . From == None || req . From == r . id { // from local member
r . readStates = append ( r . readStates , ReadState { Index : rs . index , RequestCtx : req . Entries [ 0 ] . Data } )
} else {
r . send ( pb . Message { To : req . From , Type : pb . MsgReadIndexResp , Index : rs . index , Entries : req . Entries } )
}
}
case pb . MsgSnapStatus :
2020-01-29 14:16:38 +00:00
if pr . State != tracker . StateSnapshot {
2018-07-06 16:09:34 +00:00
return nil
}
2020-01-29 14:16:38 +00:00
// TODO(tbg): this code is very similar to the snapshot handling in
// MsgAppResp above. In fact, the code there is more correct than the
// code here and should likely be updated to match (or even better, the
// logic pulled into a newly created Progress state machine handler).
2018-07-06 16:09:34 +00:00
if ! m . Reject {
2020-01-29 14:16:38 +00:00
pr . BecomeProbe ( )
2018-07-06 16:09:34 +00:00
r . logger . Debugf ( "%x snapshot succeeded, resumed sending replication messages to %x [%s]" , r . id , m . From , pr )
} else {
2020-01-29 14:16:38 +00:00
// NB: the order here matters or we'll be probing erroneously from
// the snapshot index, but the snapshot never applied.
pr . PendingSnapshot = 0
pr . BecomeProbe ( )
2018-07-06 16:09:34 +00:00
r . logger . Debugf ( "%x snapshot failed, resumed sending replication messages to %x [%s]" , r . id , m . From , pr )
}
2020-01-29 14:16:38 +00:00
// If snapshot finish, wait for the MsgAppResp from the remote node before sending
// out the next MsgApp.
2018-07-06 16:09:34 +00:00
// If snapshot failure, wait for a heartbeat interval before next try
2020-01-29 14:16:38 +00:00
pr . ProbeSent = true
2018-07-06 16:09:34 +00:00
case pb . MsgUnreachable :
// During optimistic replication, if the remote becomes unreachable,
// there is huge probability that a MsgApp is lost.
2020-01-29 14:16:38 +00:00
if pr . State == tracker . StateReplicate {
pr . BecomeProbe ( )
2018-07-06 16:09:34 +00:00
}
r . logger . Debugf ( "%x failed to send message to %x because it is unreachable [%s]" , r . id , m . From , pr )
case pb . MsgTransferLeader :
if pr . IsLearner {
r . logger . Debugf ( "%x is learner. Ignored transferring leadership" , r . id )
return nil
}
leadTransferee := m . From
lastLeadTransferee := r . leadTransferee
if lastLeadTransferee != None {
if lastLeadTransferee == leadTransferee {
r . logger . Infof ( "%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x" ,
r . id , r . Term , leadTransferee , leadTransferee )
return nil
}
r . abortLeaderTransfer ( )
r . logger . Infof ( "%x [term %d] abort previous transferring leadership to %x" , r . id , r . Term , lastLeadTransferee )
}
if leadTransferee == r . id {
r . logger . Debugf ( "%x is already leader. Ignored transferring leadership to self" , r . id )
return nil
}
// Transfer leadership to third party.
r . logger . Infof ( "%x [term %d] starts to transfer leadership to %x" , r . id , r . Term , leadTransferee )
// Transfer leadership should be finished in one electionTimeout, so reset r.electionElapsed.
r . electionElapsed = 0
r . leadTransferee = leadTransferee
if pr . Match == r . raftLog . lastIndex ( ) {
r . sendTimeoutNow ( leadTransferee )
r . logger . Infof ( "%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log" , r . id , leadTransferee , leadTransferee )
} else {
r . sendAppend ( leadTransferee )
}
}
return nil
}
// stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
// whether they respond to MsgVoteResp or MsgPreVoteResp.
func stepCandidate ( r * raft , m pb . Message ) error {
// Only handle vote responses corresponding to our candidacy (while in
// StateCandidate, we may get stale MsgPreVoteResp messages in this term from
// our pre-candidate state).
var myVoteRespType pb . MessageType
if r . state == StatePreCandidate {
myVoteRespType = pb . MsgPreVoteResp
} else {
myVoteRespType = pb . MsgVoteResp
}
switch m . Type {
case pb . MsgProp :
r . logger . Infof ( "%x no leader at term %d; dropping proposal" , r . id , r . Term )
return ErrProposalDropped
case pb . MsgApp :
r . becomeFollower ( m . Term , m . From ) // always m.Term == r.Term
r . handleAppendEntries ( m )
case pb . MsgHeartbeat :
r . becomeFollower ( m . Term , m . From ) // always m.Term == r.Term
r . handleHeartbeat ( m )
case pb . MsgSnap :
r . becomeFollower ( m . Term , m . From ) // always m.Term == r.Term
r . handleSnapshot ( m )
case myVoteRespType :
2020-01-29 14:16:38 +00:00
gr , rj , res := r . poll ( m . From , m . Type , ! m . Reject )
r . logger . Infof ( "%x has received %d %s votes and %d vote rejections" , r . id , gr , m . Type , rj )
switch res {
case quorum . VoteWon :
2018-07-06 16:09:34 +00:00
if r . state == StatePreCandidate {
r . campaign ( campaignElection )
} else {
r . becomeLeader ( )
r . bcastAppend ( )
}
2020-01-29 14:16:38 +00:00
case quorum . VoteLost :
2018-07-06 16:09:34 +00:00
// pb.MsgPreVoteResp contains future term of pre-candidate
// m.Term > r.Term; reuse r.Term
r . becomeFollower ( r . Term , None )
}
case pb . MsgTimeoutNow :
r . logger . Debugf ( "%x [term %d state %v] ignored MsgTimeoutNow from %x" , r . id , r . Term , r . state , m . From )
}
return nil
}
func stepFollower ( r * raft , m pb . Message ) error {
switch m . Type {
case pb . MsgProp :
if r . lead == None {
r . logger . Infof ( "%x no leader at term %d; dropping proposal" , r . id , r . Term )
return ErrProposalDropped
} else if r . disableProposalForwarding {
r . logger . Infof ( "%x not forwarding to leader %x at term %d; dropping proposal" , r . id , r . lead , r . Term )
return ErrProposalDropped
}
m . To = r . lead
r . send ( m )
case pb . MsgApp :
r . electionElapsed = 0
r . lead = m . From
r . handleAppendEntries ( m )
case pb . MsgHeartbeat :
r . electionElapsed = 0
r . lead = m . From
r . handleHeartbeat ( m )
case pb . MsgSnap :
r . electionElapsed = 0
r . lead = m . From
r . handleSnapshot ( m )
case pb . MsgTransferLeader :
if r . lead == None {
r . logger . Infof ( "%x no leader at term %d; dropping leader transfer msg" , r . id , r . Term )
return nil
}
m . To = r . lead
r . send ( m )
case pb . MsgTimeoutNow :
if r . promotable ( ) {
r . logger . Infof ( "%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership." , r . id , r . Term , m . From )
// Leadership transfers never use pre-vote even if r.preVote is true; we
// know we are not recovering from a partition so there is no need for the
// extra round trip.
r . campaign ( campaignTransfer )
} else {
r . logger . Infof ( "%x received MsgTimeoutNow from %x but is not promotable" , r . id , m . From )
}
case pb . MsgReadIndex :
if r . lead == None {
r . logger . Infof ( "%x no leader at term %d; dropping index reading msg" , r . id , r . Term )
return nil
}
m . To = r . lead
r . send ( m )
case pb . MsgReadIndexResp :
if len ( m . Entries ) != 1 {
r . logger . Errorf ( "%x invalid format of MsgReadIndexResp from %x, entries count: %d" , r . id , m . From , len ( m . Entries ) )
return nil
}
r . readStates = append ( r . readStates , ReadState { Index : m . Index , RequestCtx : m . Entries [ 0 ] . Data } )
}
return nil
}
func ( r * raft ) handleAppendEntries ( m pb . Message ) {
if m . Index < r . raftLog . committed {
r . send ( pb . Message { To : m . From , Type : pb . MsgAppResp , Index : r . raftLog . committed } )
return
}
if mlastIndex , ok := r . raftLog . maybeAppend ( m . Index , m . LogTerm , m . Commit , m . Entries ... ) ; ok {
r . send ( pb . Message { To : m . From , Type : pb . MsgAppResp , Index : mlastIndex } )
} else {
2020-01-29 14:16:38 +00:00
r . logger . Debugf ( "%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x" ,
2018-07-06 16:09:34 +00:00
r . id , r . raftLog . zeroTermOnErrCompacted ( r . raftLog . term ( m . Index ) ) , m . Index , m . LogTerm , m . Index , m . From )
r . send ( pb . Message { To : m . From , Type : pb . MsgAppResp , Index : m . Index , Reject : true , RejectHint : r . raftLog . lastIndex ( ) } )
}
}
func ( r * raft ) handleHeartbeat ( m pb . Message ) {
r . raftLog . commitTo ( m . Commit )
r . send ( pb . Message { To : m . From , Type : pb . MsgHeartbeatResp , Context : m . Context } )
}
func ( r * raft ) handleSnapshot ( m pb . Message ) {
sindex , sterm := m . Snapshot . Metadata . Index , m . Snapshot . Metadata . Term
if r . restore ( m . Snapshot ) {
r . logger . Infof ( "%x [commit: %d] restored snapshot [index: %d, term: %d]" ,
r . id , r . raftLog . committed , sindex , sterm )
r . send ( pb . Message { To : m . From , Type : pb . MsgAppResp , Index : r . raftLog . lastIndex ( ) } )
} else {
r . logger . Infof ( "%x [commit: %d] ignored snapshot [index: %d, term: %d]" ,
r . id , r . raftLog . committed , sindex , sterm )
r . send ( pb . Message { To : m . From , Type : pb . MsgAppResp , Index : r . raftLog . committed } )
}
}
// restore recovers the state machine from a snapshot. It restores the log and the
2020-01-29 14:16:38 +00:00
// configuration of state machine. If this method returns false, the snapshot was
// ignored, either because it was obsolete or because of an error.
2018-07-06 16:09:34 +00:00
func ( r * raft ) restore ( s pb . Snapshot ) bool {
if s . Metadata . Index <= r . raftLog . committed {
return false
}
2020-01-29 14:16:38 +00:00
if r . state != StateFollower {
// This is defense-in-depth: if the leader somehow ended up applying a
// snapshot, it could move into a new term without moving into a
// follower state. This should never fire, but if it did, we'd have
// prevented damage by returning early, so log only a loud warning.
//
// At the time of writing, the instance is guaranteed to be in follower
// state when this method is called.
r . logger . Warningf ( "%x attempted to restore snapshot as leader; should never happen" , r . id )
r . becomeFollower ( r . Term + 1 , None )
2018-07-06 16:09:34 +00:00
return false
}
2020-01-29 14:16:38 +00:00
// More defense-in-depth: throw away snapshot if recipient is not in the
// config. This shouuldn't ever happen (at the time of writing) but lots of
// code here and there assumes that r.id is in the progress tracker.
found := false
cs := s . Metadata . ConfState
for _ , set := range [ ] [ ] uint64 {
cs . Voters ,
cs . Learners ,
} {
for _ , id := range set {
2018-07-06 16:09:34 +00:00
if id == r . id {
2020-01-29 14:16:38 +00:00
found = true
break
2018-07-06 16:09:34 +00:00
}
}
}
2020-01-29 14:16:38 +00:00
if ! found {
r . logger . Warningf (
"%x attempted to restore snapshot but it is not in the ConfState %v; should never happen" ,
r . id , cs ,
)
return false
}
2018-07-06 16:09:34 +00:00
2020-01-29 14:16:38 +00:00
// Now go ahead and actually restore.
if r . raftLog . matchTerm ( s . Metadata . Index , s . Metadata . Term ) {
r . logger . Infof ( "%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]" ,
r . id , r . raftLog . committed , r . raftLog . lastIndex ( ) , r . raftLog . lastTerm ( ) , s . Metadata . Index , s . Metadata . Term )
r . raftLog . commitTo ( s . Metadata . Index )
return false
}
2018-07-06 16:09:34 +00:00
r . raftLog . restore ( s )
2020-01-29 14:16:38 +00:00
// Reset the configuration and add the (potentially updated) peers in anew.
r . prs = tracker . MakeProgressTracker ( r . prs . MaxInflight )
cfg , prs , err := confchange . Restore ( confchange . Changer {
Tracker : r . prs ,
LastIndex : r . raftLog . lastIndex ( ) ,
} , cs )
if err != nil {
// This should never happen. Either there's a bug in our config change
// handling or the client corrupted the conf change.
panic ( fmt . Sprintf ( "unable to restore config %+v: %s" , cs , err ) )
2018-07-06 16:09:34 +00:00
}
2020-01-29 14:16:38 +00:00
assertConfStatesEquivalent ( r . logger , cs , r . switchToConfig ( cfg , prs ) )
pr := r . prs . Progress [ r . id ]
pr . MaybeUpdate ( pr . Next - 1 ) // TODO(tbg): this is untested and likely unneeded
r . logger . Infof ( "%x [commit: %d, lastindex: %d, lastterm: %d] restored snapshot [index: %d, term: %d]" ,
r . id , r . raftLog . committed , r . raftLog . lastIndex ( ) , r . raftLog . lastTerm ( ) , s . Metadata . Index , s . Metadata . Term )
return true
2018-07-06 16:09:34 +00:00
}
// promotable indicates whether state machine can be promoted to leader,
// which is true when its own id is in progress list.
func ( r * raft ) promotable ( ) bool {
2020-01-29 14:16:38 +00:00
pr := r . prs . Progress [ r . id ]
return pr != nil && ! pr . IsLearner
2018-07-06 16:09:34 +00:00
}
2020-01-29 14:16:38 +00:00
func ( r * raft ) applyConfChange ( cc pb . ConfChangeV2 ) pb . ConfState {
cfg , prs , err := func ( ) ( tracker . Config , tracker . ProgressMap , error ) {
changer := confchange . Changer {
Tracker : r . prs ,
LastIndex : r . raftLog . lastIndex ( ) ,
2018-07-06 16:09:34 +00:00
}
2020-01-29 14:16:38 +00:00
if cc . LeaveJoint ( ) {
return changer . LeaveJoint ( )
} else if autoLeave , ok := cc . EnterJoint ( ) ; ok {
return changer . EnterJoint ( autoLeave , cc . Changes ... )
2018-07-06 16:09:34 +00:00
}
2020-01-29 14:16:38 +00:00
return changer . Simple ( cc . Changes ... )
} ( )
2018-07-06 16:09:34 +00:00
2020-01-29 14:16:38 +00:00
if err != nil {
// TODO(tbg): return the error to the caller.
panic ( err )
2018-07-06 16:09:34 +00:00
}
2020-01-29 14:16:38 +00:00
return r . switchToConfig ( cfg , prs )
2018-07-06 16:09:34 +00:00
}
2020-01-29 14:16:38 +00:00
// switchToConfig reconfigures this node to use the provided configuration. It
// updates the in-memory state and, when necessary, carries out additional
// actions such as reacting to the removal of nodes or changed quorum
// requirements.
//
// The inputs usually result from restoring a ConfState or applying a ConfChange.
func ( r * raft ) switchToConfig ( cfg tracker . Config , prs tracker . ProgressMap ) pb . ConfState {
r . prs . Config = cfg
r . prs . Progress = prs
r . logger . Infof ( "%x switched to configuration %s" , r . id , r . prs . Config )
cs := r . prs . ConfState ( )
pr , ok := r . prs . Progress [ r . id ]
// Update whether the node itself is a learner, resetting to false when the
// node is removed.
r . isLearner = ok && pr . IsLearner
if ( ! ok || r . isLearner ) && r . state == StateLeader {
// This node is leader and was removed or demoted. We prevent demotions
// at the time writing but hypothetically we handle them the same way as
// removing the leader: stepping down into the next Term.
//
// TODO(tbg): step down (for sanity) and ask follower with largest Match
// to TimeoutNow (to avoid interruption). This might still drop some
// proposals but it's better than nothing.
//
// TODO(tbg): test this branch. It is untested at the time of writing.
return cs
}
// The remaining steps only make sense if this node is the leader and there
// are other nodes.
if r . state != StateLeader || len ( cs . Voters ) == 0 {
return cs
2018-07-06 16:09:34 +00:00
}
if r . maybeCommit ( ) {
2020-01-29 14:16:38 +00:00
// If the configuration change means that more entries are committed now,
// broadcast/append to everyone in the updated config.
2018-07-06 16:09:34 +00:00
r . bcastAppend ( )
2020-01-29 14:16:38 +00:00
} else {
// Otherwise, still probe the newly added replicas; there's no reason to
// let them wait out a heartbeat interval (or the next incoming
// proposal).
r . prs . Visit ( func ( id uint64 , pr * tracker . Progress ) {
r . maybeSendAppend ( id , false /* sendIfEmpty */ )
} )
}
// If the the leadTransferee was removed, abort the leadership transfer.
if _ , tOK := r . prs . Progress [ r . leadTransferee ] ; ! tOK && r . leadTransferee != 0 {
2018-07-06 16:09:34 +00:00
r . abortLeaderTransfer ( )
}
2020-01-29 14:16:38 +00:00
return cs
2018-07-06 16:09:34 +00:00
}
func ( r * raft ) loadState ( state pb . HardState ) {
if state . Commit < r . raftLog . committed || state . Commit > r . raftLog . lastIndex ( ) {
r . logger . Panicf ( "%x state.commit %d is out of range [%d, %d]" , r . id , state . Commit , r . raftLog . committed , r . raftLog . lastIndex ( ) )
}
r . raftLog . committed = state . Commit
r . Term = state . Term
r . Vote = state . Vote
}
// pastElectionTimeout returns true iff r.electionElapsed is greater
// than or equal to the randomized election timeout in
// [electiontimeout, 2 * electiontimeout - 1].
func ( r * raft ) pastElectionTimeout ( ) bool {
return r . electionElapsed >= r . randomizedElectionTimeout
}
func ( r * raft ) resetRandomizedElectionTimeout ( ) {
r . randomizedElectionTimeout = r . electionTimeout + globalRand . Intn ( r . electionTimeout )
}
func ( r * raft ) sendTimeoutNow ( to uint64 ) {
r . send ( pb . Message { To : to , Type : pb . MsgTimeoutNow } )
}
func ( r * raft ) abortLeaderTransfer ( ) {
r . leadTransferee = None
}
2018-10-15 21:36:55 +00:00
// increaseUncommittedSize computes the size of the proposed entries and
// determines whether they would push leader over its maxUncommittedSize limit.
// If the new entries would exceed the limit, the method returns false. If not,
// the increase in uncommitted entry size is recorded and the method returns
// true.
func ( r * raft ) increaseUncommittedSize ( ents [ ] pb . Entry ) bool {
var s uint64
for _ , e := range ents {
2019-01-23 19:35:03 +00:00
s += uint64 ( PayloadSize ( e ) )
2018-10-15 21:36:55 +00:00
}
if r . uncommittedSize > 0 && r . uncommittedSize + s > r . maxUncommittedSize {
// If the uncommitted tail of the Raft log is empty, allow any size
// proposal. Otherwise, limit the size of the uncommitted tail of the
// log and drop any proposal that would push the size over the limit.
return false
}
r . uncommittedSize += s
return true
}
// reduceUncommittedSize accounts for the newly committed entries by decreasing
// the uncommitted entry size limit.
func ( r * raft ) reduceUncommittedSize ( ents [ ] pb . Entry ) {
if r . uncommittedSize == 0 {
// Fast-path for followers, who do not track or enforce the limit.
return
}
var s uint64
for _ , e := range ents {
2019-01-23 19:35:03 +00:00
s += uint64 ( PayloadSize ( e ) )
2018-10-15 21:36:55 +00:00
}
if s > r . uncommittedSize {
// uncommittedSize may underestimate the size of the uncommitted Raft
// log tail but will never overestimate it. Saturate at 0 instead of
// allowing overflow.
r . uncommittedSize = 0
} else {
r . uncommittedSize -= s
}
}
2018-07-06 16:09:34 +00:00
func numOfPendingConf ( ents [ ] pb . Entry ) int {
n := 0
for i := range ents {
if ents [ i ] . Type == pb . EntryConfChange {
n ++
}
}
return n
}