Handle Nomad leadership flapping

Fixes a deadlock in leadership handling if leadership flapped.

Raft propagates leadership transition to Nomad through a NotifyCh channel.
Raft blocks when writing to this channel, so channel must be buffered or
aggressively consumed[1]. Otherwise, Raft blocks indefinitely in `raft.runLeader`
until the channel is consumed[1] and does not move on to executing follower
related logic (in `raft.runFollower`).

While Raft `runLeader` defer function blocks, raft cannot process any other
raft operations.  For example, `run{Leader|Follower}` methods consume
`raft.applyCh`, and while runLeader defer is blocked, all raft log applications
or config lookup will block indefinitely.

Sadly, `leaderLoop` and `establishLeader` makes few Raft calls!
`establishLeader` attempts to auto-create autopilot/scheduler config [3]; and
`leaderLoop` attempts to check raft configuration [4].  All of these calls occur
without a timeout.

Thus, if leadership flapped quickly while `leaderLoop/establishLeadership` is
invoked and hit any of these Raft calls, Raft handler _deadlock_ forever.

Depending on how many times it flapped and where exactly we get stuck, I suspect
it's possible to get in the following case:

* Agent metrics/stats http and RPC calls hang as they check raft.Configurations
* raft.State remains in Leader state, and server attempts to handle RPC calls
  (e.g. node/alloc updates) and these hang as well

As we create goroutines per RPC call, the number of goroutines grow over time
and may trigger a out of memory errors in addition to missed updates.

[1] d90d6d6bda/config.go (L190-L193)
[2] d90d6d6bda/raft.go (L425-L436)
[3] 2a89e47746/nomad/leader.go (L198-L202)
[4] 2a89e47746/nomad/leader.go (L877)
This commit is contained in:
Mahmood Ali 2020-01-22 10:55:44 -05:00
parent 129c884105
commit e436d2701a
4 changed files with 159 additions and 3 deletions

View File

@ -90,10 +90,29 @@ func (s *Server) monitorLeadership() {
}
}
wasLeader := false
for {
select {
case isLeader := <-s.leaderCh:
leaderStep(isLeader)
if wasLeader != isLeader {
wasLeader = isLeader
// normal case where we went through a transition
leaderStep(isLeader)
} else if wasLeader && isLeader {
// Server lost but then gained leadership immediately.
// During this time, this server may have received
// Raft transitions that haven't been applied to the FSM
// yet.
// Ensure that that FSM caught up and eval queues are refreshed
s.logger.Error("cluster leadership flapped, lost and gained leadership immediately. Leadership flaps indicate a cluster wide problems (e.g. networking).")
leaderStep(false)
leaderStep(true)
} else {
// Server gained but lost leadership immediately
// before it reacted; nothing to do, move on
s.logger.Error("cluster leadership flapped, gained and lost leadership immediately. Leadership flaps indicate a cluster wide problems (e.g. networking).")
}
case <-s.shutdownCh:
return
}

View File

@ -1234,10 +1234,10 @@ func (s *Server) setupRaft() error {
}
}
// Setup the leader channel
// Setup the leader channel; that keeps the latest leadership alone
leaderCh := make(chan bool, 1)
s.config.RaftConfig.NotifyCh = leaderCh
s.leaderCh = leaderCh
s.leaderCh = dropButLastChannel(leaderCh, s.shutdownCh)
// Setup the Raft store
s.raft, err = raft.NewRaft(s.config.RaftConfig, s.fsm, log, stable, snap, trans)

View File

@ -301,3 +301,68 @@ func getAlloc(state AllocGetter, allocID string) (*structs.Allocation, error) {
return alloc, nil
}
// dropButLastChannel returns a channel that drops all but last value from sourceCh.
//
// Useful for aggressively consuming sourceCh when intermediate values aren't relevant.
//
// This function propagates values to result quickly and drops intermediate messages
// in best effort basis. Golang scheduler may delay delivery or result in extra
// deliveries.
//
// Consider this function for example:
//
// ```
// src := make(chan bool)
// dst := dropButLastChannel(src, nil)
//
// go func() {
// src <- true
// src <- false
// }()
//
// // v can be `true` here but is very unlikely
// v := <-dst
// ```
//
func dropButLastChannel(sourceCh <-chan bool, shutdownCh <-chan struct{}) chan bool {
// buffer the most recent result
dst := make(chan bool)
go func() {
lv := false
DEQUE_SOURCE:
// wait for first message
select {
case lv = <-sourceCh:
goto ENQUEUE_DST
case <-shutdownCh:
return
}
ENQUEUE_DST:
// prioritize draining source first dequeue without blocking
for {
select {
case lv = <-sourceCh:
default:
break ENQUEUE_DST
}
}
// attempt to enqueue but keep monitoring source channel
select {
case lv = <-sourceCh:
goto ENQUEUE_DST
case dst <- lv:
// enqueued value; back to dequeing from source
goto DEQUE_SOURCE
case <-shutdownCh:
return
}
}()
return dst
}

View File

@ -4,6 +4,7 @@ import (
"net"
"reflect"
"testing"
"time"
version "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper/uuid"
@ -258,3 +259,74 @@ func TestMaxUint64(t *testing.T) {
t.Fatalf("bad")
}
}
func TestDropButLastChannelDropsValues(t *testing.T) {
sourceCh := make(chan bool)
shutdownCh := make(chan struct{})
dstCh := dropButLastChannel(sourceCh, shutdownCh)
// timeout duration for any channel propagation delay
timeoutDuration := 5 * time.Millisecond
// test that dstCh doesn't emit anything initially
select {
case <-dstCh:
require.Fail(t, "received a message unexpectedly")
case <-time.After(timeoutDuration):
// yay no message - it could have been a default: but
// checking for goroutine effect
}
sourceCh <- false
select {
case v := <-dstCh:
require.False(t, v, "unexpected value from dstCh Ch")
case <-time.After(timeoutDuration):
require.Fail(t, "timed out waiting for source->dstCh propagation")
}
// channel is drained now
select {
case v := <-dstCh:
require.Failf(t, "received a message unexpectedly", "value: %v", v)
case <-time.After(timeoutDuration):
// yay no message - it could have been a default: but
// checking for goroutine effect
}
// now enqueue many messages and ensure only last one is received
// enqueueing should be fast!
sourceCh <- false
sourceCh <- false
sourceCh <- false
sourceCh <- false
sourceCh <- true
// I suspect that dstCh may contain a stale (i.e. `false`) value if golang executes
// this select before the implementation goroutine dequeues last value.
//
// However, never got it to fail in test - so leaving it now to see if it ever fails;
// and if/when test fails, we can learn of how much of an issue it is and adjust
select {
case v := <-dstCh:
require.True(t, v, "unexpected value from dstCh Ch")
case <-time.After(timeoutDuration):
require.Fail(t, "timed out waiting for source->dstCh propagation")
}
sourceCh <- true
sourceCh <- true
sourceCh <- true
sourceCh <- true
sourceCh <- true
sourceCh <- false
select {
case v := <-dstCh:
require.False(t, v, "unexpected value from dstCh Ch")
case <-time.After(timeoutDuration):
require.Fail(t, "timed out waiting for source->dstCh propagation")
}
close(shutdownCh)
}