Adds timeout and waits for feedback when asking the leader loop to reassert.

This adds on to the fix in #3004 for issue #2980.
This commit is contained in:
James Phillips 2017-05-04 11:52:22 -07:00
parent 1be6e3cf80
commit 9c3abd33c3
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
3 changed files with 42 additions and 15 deletions

View File

@ -76,6 +76,19 @@ func (s *Server) leaderLoop(stopCh chan struct{}) {
var reconcileCh chan serf.Member var reconcileCh chan serf.Member
establishedLeader := false establishedLeader := false
reassert := func() error {
if !establishedLeader {
return fmt.Errorf("leadership has not been established")
}
if err := s.revokeLeadership(); err != nil {
return err
}
if err := s.establishLeadership(); err != nil {
return err
}
return nil
}
RECONCILE: RECONCILE:
// Setup a reconciliation timer // Setup a reconciliation timer
reconcileCh = nil reconcileCh = nil
@ -125,18 +138,8 @@ WAIT:
s.reconcileMember(member) s.reconcileMember(member)
case index := <-s.tombstoneGC.ExpireCh(): case index := <-s.tombstoneGC.ExpireCh():
go s.reapTombstones(index) go s.reapTombstones(index)
case <-s.reassertLeaderCh: case errCh := <-s.reassertLeaderCh:
if !establishedLeader { errCh <- reassert()
continue
}
if err := s.revokeLeadership(); err != nil {
s.logger.Printf("[ERR] consul: failed to revoke leadership: %v", err)
continue
}
if err := s.establishLeadership(); err != nil {
s.logger.Printf("[ERR] consul: failed to re-establish leadership: %v", err)
continue
}
} }
} }
} }

View File

@ -176,7 +176,7 @@ type Server struct {
// reassertLeaderCh is used to signal the leader loop should re-run // reassertLeaderCh is used to signal the leader loop should re-run
// leadership actions after a snapshot restore. // leadership actions after a snapshot restore.
reassertLeaderCh chan struct{} reassertLeaderCh chan chan error
// tombstoneGC is used to track the pending GC invocations // tombstoneGC is used to track the pending GC invocations
// for the KV tombstones // for the KV tombstones
@ -270,7 +270,7 @@ func NewServer(config *Config) (*Server, error) {
router: servers.NewRouter(logger, shutdownCh, config.Datacenter), router: servers.NewRouter(logger, shutdownCh, config.Datacenter),
rpcServer: rpc.NewServer(), rpcServer: rpc.NewServer(),
rpcTLS: incomingTLS, rpcTLS: incomingTLS,
reassertLeaderCh: make(chan struct{}), reassertLeaderCh: make(chan chan error),
tombstoneGC: gc, tombstoneGC: gc,
shutdownCh: make(chan struct{}), shutdownCh: make(chan struct{}),
} }

View File

@ -101,10 +101,34 @@ func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Re
return nil, err return nil, err
} }
// This'll be used for feedback from the leader loop.
errCh := make(chan error, 1)
timeoutCh := time.After(time.Minute)
select { select {
// Tell the leader loop to reassert leader actions since we just // Tell the leader loop to reassert leader actions since we just
// replaced the state store contents. // replaced the state store contents.
case s.reassertLeaderCh <- struct{}{}: case s.reassertLeaderCh <- errCh:
// We might have lost leadership while waiting to kick the loop.
case <-timeoutCh:
return nil, fmt.Errorf("timed out waiting to re-run leader actions")
// Make sure we don't get stuck during shutdown
case <-s.shutdownCh:
}
select {
// Wait for the leader loop to finish up.
case err := <-errCh:
if err != nil {
return nil, err
}
// We might have lost leadership while the loop was doing its
// thing.
case <-timeoutCh:
return nil, fmt.Errorf("timed out waiting for re-run of leader actions")
// Make sure we don't get stuck during shutdown // Make sure we don't get stuck during shutdown
case <-s.shutdownCh: case <-s.shutdownCh: