Rework `server_auto_join` to use a timer instead of the peer count.
It is perfectly viable for an admin to downsize a Nomad Server cluster down to 1, 2, or `num % 2 == 0` (however ill-advised such activities may be). And instead of using `bootstrap_expect`, use a timeout-based strategy. If the `bootstrapFn` hasn't observed a leader in 15s it will fall back to Consul and will poll every ~60s until it sees a leader.
This commit is contained in:
parent
b0fecbefc1
commit
65319252b9
119
nomad/server.go
119
nomad/server.go
|
@ -30,12 +30,21 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
// datacenterQueryFactor sets the max number of DCs that a Nomad
|
||||
// Server will query to find bootstrap_expect servers. If
|
||||
// bootstrap_expect is 5, then the Nomad Server bootstrapFn handler
|
||||
// will search through up to 15 Consul DCs to find possible Serf
|
||||
// peers.
|
||||
datacenterQueryFactor = 5
|
||||
// datacenterQueryLimit sets the max number of DCs that a Nomad
|
||||
// Server will query to find bootstrap_expect servers.
|
||||
datacenterQueryLimit = 25
|
||||
|
||||
// maxStaleLeadership is the maximum time we will permit this Nomad
|
||||
// Server to go without seeing a valid Raft leader.
|
||||
maxStaleLeadership = 15 * time.Second
|
||||
|
||||
// peersPollInterval is used as the polling interval between attempts
|
||||
// to query Consul for Nomad Servers.
|
||||
peersPollInterval = 45 * time.Second
|
||||
|
||||
// peersPollJitter is used to provide a slight amount of variance to
|
||||
// the retry interval when querying Consul Servers
|
||||
peersPollJitterFactor = 2
|
||||
|
||||
raftState = "raft/"
|
||||
serfSnapshot = "serf/snapshot"
|
||||
|
@ -377,9 +386,31 @@ func (s *Server) Leave() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// setupConsulSyncer creates Server-mode consul.Syncer which periodically
|
||||
// executes callbacks on a fixed interval.
|
||||
func (s *Server) setupConsulSyncer() error {
|
||||
// setupBootstrapHandler() creates the closure necessary to support a Consul
|
||||
// fallback handler.
|
||||
func (s *Server) setupBootstrapHandler() error {
|
||||
// peersTimeout is used to indicate to the Consul Syncer that the
|
||||
// current Nomad Server has a stale peer set. peersTimeout will time
|
||||
// out if the Consul Syncer bootstrapFn has not observed a Raft
|
||||
// leader in maxStaleLeadership. If peersTimeout has been triggered,
|
||||
// the Consul Syncer will begin querying Consul for other Nomad
|
||||
// Servers.
|
||||
//
|
||||
// NOTE: time.Timer is used vs time.Time in order to handle clock
|
||||
// drift because time.Timer is implemented as a monotonic clock.
|
||||
var peersTimeout *time.Timer = time.NewTimer(0)
|
||||
|
||||
// leadershipTimedOut is a helper method that returns true if the
|
||||
// peersTimeout timer has expired.
|
||||
leadershipTimedOut := func() bool {
|
||||
select {
|
||||
case <-peersTimeout.C:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// The bootstrapFn callback handler is used to periodically poll
|
||||
// Consul to look up the Nomad Servers in Consul. In the event the
|
||||
// server has been brought up without a `retry-join` configuration
|
||||
|
@ -390,22 +421,46 @@ func (s *Server) setupConsulSyncer() error {
|
|||
bootstrapFn := func() error {
|
||||
// If there is a raft leader, do nothing
|
||||
if s.raft.Leader() != "" {
|
||||
peersTimeout.Reset(maxStaleLeadership)
|
||||
return nil
|
||||
}
|
||||
|
||||
// If the the number of Raft peers is more than the min
|
||||
// quorum, do nothing.
|
||||
raftPeers, err := s.raftPeers.Peers()
|
||||
minQuorum := (s.config.BootstrapExpect / 2) + 1
|
||||
if err == nil && len(raftPeers) >= minQuorum {
|
||||
return nil
|
||||
// (ab)use serf.go's behavior of setting BootstrapExpect to
|
||||
// zero if we have bootstrapped. If we have bootstrapped
|
||||
bootstrapExpect := atomic.LoadInt32(&s.config.BootstrapExpect)
|
||||
if bootstrapExpect == 0 {
|
||||
// This Nomad Server has been bootstrapped. Rely on
|
||||
// timeouts to determine health.
|
||||
|
||||
if !leadershipTimedOut() {
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
// This Nomad Server has not been bootstrapped, reach
|
||||
// out to Consul if our peer list is less than
|
||||
// `bootstrap_expect`.
|
||||
raftPeers, err := s.raftPeers.Peers()
|
||||
if err != nil {
|
||||
peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
|
||||
return nil
|
||||
}
|
||||
|
||||
// The necessary number of Nomad Servers required for
|
||||
// quorum has been reached, we do not need to poll
|
||||
// Consul. Let the normal timeout-based strategy
|
||||
// take over.
|
||||
if len(raftPeers) >= int(bootstrapExpect) {
|
||||
peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
s.logger.Printf("[TRACE] server.consul: lost contact with Nomad quorum, falling back to Consul for server list")
|
||||
s.logger.Printf("[DEBUG] server.consul: lost contact with Nomad quorum, falling back to Consul for server list")
|
||||
|
||||
consulCatalog := s.consulSyncer.ConsulClient().Catalog()
|
||||
dcs, err := consulCatalog.Datacenters()
|
||||
if err != nil {
|
||||
peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
|
||||
return fmt.Errorf("server.consul: unable to query Consul datacenters: %v", err)
|
||||
}
|
||||
if len(dcs) > 2 {
|
||||
|
@ -417,7 +472,7 @@ func (s *Server) setupConsulSyncer() error {
|
|||
nearestDC := dcs[0]
|
||||
otherDCs := make([]string, 0, len(dcs))
|
||||
shuffleStrings(otherDCs)
|
||||
otherDCs = dcs[1:lib.MinInt(len(dcs), s.config.BootstrapExpect*datacenterQueryFactor)]
|
||||
otherDCs = dcs[1:lib.MinInt(len(dcs), datacenterQueryLimit)]
|
||||
|
||||
dcs = append([]string{nearestDC}, otherDCs...)
|
||||
}
|
||||
|
@ -427,16 +482,16 @@ func (s *Server) setupConsulSyncer() error {
|
|||
const defaultMaxNumNomadServers = 8
|
||||
nomadServerServices := make([]string, 0, defaultMaxNumNomadServers)
|
||||
for _, dc := range dcs {
|
||||
opts := &consulapi.QueryOptions{
|
||||
consulOpts := &consulapi.QueryOptions{
|
||||
AllowStale: true,
|
||||
Datacenter: dc,
|
||||
Near: "_agent",
|
||||
WaitTime: consul.DefaultQueryWaitDuration,
|
||||
}
|
||||
consulServices, _, err := consulCatalog.Service(nomadServerServiceName, consul.ServiceTagSerf, opts)
|
||||
consulServices, _, err := consulCatalog.Service(nomadServerServiceName, consul.ServiceTagSerf, consulOpts)
|
||||
if err != nil {
|
||||
s.logger.Printf("[TRACE] server.consul: failed to query dc %+q's service %+q: %v", dc, nomadServerServiceName, err)
|
||||
mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %+q from Consul datacenter %+q: %v", nomadServerServiceName, dc, err))
|
||||
s.logger.Printf("[WARN] server.consul: failed to query service %+q in Consul datacenter %+q: %v", nomadServerServiceName, dc, err)
|
||||
mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %q from Consul datacenter %q: %v", nomadServerServiceName, dc, err))
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -453,28 +508,44 @@ func (s *Server) setupConsulSyncer() error {
|
|||
|
||||
if len(nomadServerServices) == 0 {
|
||||
if len(mErr.Errors) > 0 {
|
||||
peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
// Log the error and return nil so future handlers
|
||||
// can attempt to register the `nomad` service.
|
||||
s.logger.Printf("[TRACE] server.consul: no Nomad Servers advertising service %+q in Consul datacenters: %+q", nomadServerServiceName, dcs)
|
||||
s.logger.Printf("[TRACE] server.consul: no Nomad Servers advertising service %+q in Consul datacenters %+q", nomadServerServiceName, dcs)
|
||||
peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
|
||||
return nil
|
||||
}
|
||||
|
||||
numServersContacted, err := s.Join(nomadServerServices)
|
||||
if err != nil {
|
||||
peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
|
||||
return fmt.Errorf("contacted %d Nomad Servers: %v", numServersContacted, err)
|
||||
}
|
||||
|
||||
peersTimeout.Reset(maxStaleLeadership)
|
||||
s.logger.Printf("[INFO] server.consul: successfully contacted %d Nomad Servers", numServersContacted)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
s.consulSyncer.AddPeriodicHandler("Nomad Server Fallback Server Handler", bootstrapFn)
|
||||
return nil
|
||||
}
|
||||
|
||||
// setupConsulSyncer creates Server-mode consul.Syncer which periodically
|
||||
// executes callbacks on a fixed interval.
|
||||
func (s *Server) setupConsulSyncer() error {
|
||||
var mErr multierror.Error
|
||||
if s.config.ConsulConfig.ServerAutoJoin {
|
||||
s.consulSyncer.AddPeriodicHandler("Nomad Server Fallback Server Handler", bootstrapFn)
|
||||
if err := s.setupBootstrapHandler(); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
// setupRPC is used to setup the RPC listener
|
||||
|
|
Loading…
Reference in New Issue