Fixes handling of stop channel and failed barrier attempts. (#3546)
* Fixes handling of stop channel and failed barrier attempts. There were two issues here. First, we needed to not exit when there was a timeout trying to write the barrier, because Raft might not step down, so we'd be left as the leader but having run all the step down actions. Second, we didn't close over the stopCh correctly, so it was possible to nil that out and have the leaderLoop never exit. We close over it properly AND sequence the nil-ing of it AFTER the leaderLoop exits for good measure, so the code is more robust. Fixes #3545 * Cleans up based on code review feedback. * Tweaks comments. * Renames variables and removes comments.
This commit is contained in:
parent
406bad36bd
commit
a1db119d02
|
@ -33,25 +33,39 @@ func (s *Server) monitorLeadership() {
|
|||
// cleanup and to ensure we never run multiple leader loops.
|
||||
raftNotifyCh := s.raftNotifyCh
|
||||
|
||||
var wg sync.WaitGroup
|
||||
var stopCh chan struct{}
|
||||
var weAreLeaderCh chan struct{}
|
||||
var leaderLoop sync.WaitGroup
|
||||
for {
|
||||
select {
|
||||
case isLeader := <-raftNotifyCh:
|
||||
if isLeader {
|
||||
stopCh = make(chan struct{})
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
s.leaderLoop(stopCh)
|
||||
wg.Done()
|
||||
}()
|
||||
switch {
|
||||
case isLeader:
|
||||
if weAreLeaderCh != nil {
|
||||
s.logger.Printf("[ERR] consul: attempted to start the leader loop while running")
|
||||
continue
|
||||
}
|
||||
|
||||
weAreLeaderCh = make(chan struct{})
|
||||
leaderLoop.Add(1)
|
||||
go func(ch chan struct{}) {
|
||||
defer leaderLoop.Done()
|
||||
s.leaderLoop(ch)
|
||||
}(weAreLeaderCh)
|
||||
s.logger.Printf("[INFO] consul: cluster leadership acquired")
|
||||
} else if stopCh != nil {
|
||||
close(stopCh)
|
||||
stopCh = nil
|
||||
wg.Wait()
|
||||
|
||||
default:
|
||||
if weAreLeaderCh == nil {
|
||||
s.logger.Printf("[ERR] consul: attempted to stop the leader loop while not running")
|
||||
continue
|
||||
}
|
||||
|
||||
s.logger.Printf("[DEBUG] consul: shutting down leader loop")
|
||||
close(weAreLeaderCh)
|
||||
leaderLoop.Wait()
|
||||
weAreLeaderCh = nil
|
||||
s.logger.Printf("[INFO] consul: cluster leadership lost")
|
||||
}
|
||||
|
||||
case <-s.shutdownCh:
|
||||
return
|
||||
}
|
||||
|
@ -97,7 +111,7 @@ RECONCILE:
|
|||
barrier := s.raft.Barrier(barrierWriteTimeout)
|
||||
if err := barrier.Error(); err != nil {
|
||||
s.logger.Printf("[ERR] consul: failed to wait for barrier: %v", err)
|
||||
return
|
||||
goto WAIT
|
||||
}
|
||||
metrics.MeasureSince([]string{"consul", "leader", "barrier"}, start)
|
||||
metrics.MeasureSince([]string{"leader", "barrier"}, start)
|
||||
|
@ -127,6 +141,15 @@ RECONCILE:
|
|||
reconcileCh = s.reconcileCh
|
||||
|
||||
WAIT:
|
||||
// Poll the stop channel to give it priority so we don't waste time
|
||||
// trying to perform the other operations if we have been asked to shut
|
||||
// down.
|
||||
select {
|
||||
case <-stopCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Periodically reconcile as long as we are the leader,
|
||||
// or when Serf events arrive
|
||||
for {
|
||||
|
|
Loading…
Reference in New Issue