Kick the leader loop on the proper thread after a snapshot restore, and

only if leadership is already established.
This commit is contained in:
James Phillips 2017-05-03 20:31:14 -07:00 committed by Frank Schroeder
parent 953347a6fe
commit f3c1f516b4
No known key found for this signature in database
GPG Key ID: 4D65C6EAEC87DECD
3 changed files with 20 additions and 6 deletions

View File

@ -124,6 +124,17 @@ WAIT:
goto RECONCILE
case member := <-reconcileCh:
s.reconcileMember(member)
case <-s.reassertLeaderCh:
if establishedLeader {
if err := s.revokeLeadership(); err != nil {
s.logger.Printf("[ERR] consul: failed to revoke leadership: %v", err)
goto WAIT
}
if err := s.establishLeadership(); err != nil {
s.logger.Printf("[ERR] consul: failed to re-establish leadership: %v", err)
goto WAIT
}
}
case index := <-s.tombstoneGC.ExpireCh():
go s.reapTombstones(index)
}

View File

@ -174,6 +174,10 @@ type Server struct {
// Consul servers.
statsFetcher *StatsFetcher
// reassertLeaderCh is used to signal the leader loop should re-run
// leadership actions after a snapshot restore.
reassertLeaderCh chan struct{}
// tombstoneGC is used to track the pending GC invocations
// for the KV tombstones
tombstoneGC *state.TombstoneGC
@ -266,6 +270,7 @@ func NewServer(config *Config) (*Server, error) {
router: servers.NewRouter(logger, shutdownCh, config.Datacenter),
rpcServer: rpc.NewServer(),
rpcTLS: incomingTLS,
reassertLeaderCh: make(chan struct{}),
tombstoneGC: gc,
shutdownCh: make(chan struct{}),
}

View File

@ -100,12 +100,10 @@ func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Re
if err := barrier.Error(); err != nil {
return nil, err
}
if err := s.revokeLeadership(); err != nil {
return nil, err
}
if err := s.establishLeadership(); err != nil {
return nil, err
}
// Tell the leader loop to reassert leader actions since we just
// replaced the state store contents.
s.reassertLeaderCh <- struct{}{}
// Give the caller back an empty reader since there's nothing to
// stream back.