diff --git a/client/client.go b/client/client.go index 7f2abdf40..4595a8b7b 100644 --- a/client/client.go +++ b/client/client.go @@ -39,7 +39,7 @@ const ( // datacenterQueryLimit searches through up to this many adjacent // datacenters looking for the Nomad server service. - datacenterQueryLimit = 5 + datacenterQueryLimit = 9 // registerRetryIntv is minimum interval on which we retry // registration. We pick a value between this and 2x this. @@ -1253,8 +1253,8 @@ func (c *Client) setupConsulSyncer() error { // a new set of servers so it's okay. nearestDC := dcs[0] otherDCs := make([]string, 0, len(dcs)) - otherDCs = dcs[1:lib.MinInt(len(dcs), datacenterQueryLimit)] shuffleStrings(otherDCs) + otherDCs = dcs[1:lib.MinInt(len(dcs), datacenterQueryLimit)] dcs = append([]string{nearestDC}, otherDCs...) } @@ -1270,7 +1270,7 @@ func (c *Client) setupConsulSyncer() error { var mErr multierror.Error const defaultMaxNumNomadServers = 8 nomadServerServices := make([]string, 0, defaultMaxNumNomadServers) - c.logger.Printf("[DEBUG] client.consul: bootstrap contacting following Consul DCs: %q", dcs) + c.logger.Printf("[DEBUG] client.consul: bootstrap contacting following Consul DCs: %+q", dcs) for _, dc := range dcs { consulOpts := &consulapi.QueryOptions{ AllowStale: true, diff --git a/command/agent/agent.go b/command/agent/agent.go index c794ed2d5..cac832d53 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -11,6 +11,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/hashicorp/nomad/client" @@ -110,7 +111,7 @@ func (a *Agent) serverConfig() (*nomad.Config, error) { if a.config.Server.BootstrapExpect == 1 { conf.Bootstrap = true } else { - conf.BootstrapExpect = a.config.Server.BootstrapExpect + atomic.StoreInt32(&conf.BootstrapExpect, int32(a.config.Server.BootstrapExpect)) } } if a.config.DataDir != "" { @@ -235,7 +236,7 @@ func (a *Agent) serverConfig() (*nomad.Config, error) { return nil, fmt.Errorf("server_service_name must be set when auto_advertise is enabled") } - // conf.ConsulConfig = a.config.Consul + conf.ConsulConfig = a.config.Consul return conf, nil } @@ -377,7 +378,7 @@ func (a *Agent) setupServer() error { } // Create the server - server, err := nomad.NewServer(conf) + server, err := nomad.NewServer(conf, a.consulSyncer) if err != nil { return fmt.Errorf("server setup failed: %v", err) } diff --git a/nomad/config.go b/nomad/config.go index d998045d9..ff91ac3a2 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/memberlist" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/nomad/structs/config" "github.com/hashicorp/nomad/scheduler" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" @@ -51,8 +52,9 @@ type Config struct { // BootstrapExpect mode is used to automatically bring up a // collection of Nomad servers. This can be used to automatically - // bring up a collection of nodes. - BootstrapExpect int + // bring up a collection of nodes. All operations on BootstrapExpect + // must be handled via `atomic.*Int32()` calls. + BootstrapExpect int32 // DataDir is the directory to store our state in DataDir string @@ -176,6 +178,9 @@ type Config struct { // a new leader is elected, since we no longer know the status // of all the heartbeats. FailoverHeartbeatTTL time.Duration + + // ConsulConfig is this Agent's Consul configuration + ConsulConfig *config.ConsulConfig } // CheckVersion is used to check if the ProtocolVersion is valid diff --git a/nomad/serf.go b/nomad/serf.go index e36a5b23c..6ba60dfb6 100644 --- a/nomad/serf.go +++ b/nomad/serf.go @@ -1,6 +1,10 @@ package nomad -import "github.com/hashicorp/serf/serf" +import ( + "sync/atomic" + + "github.com/hashicorp/serf/serf" +) const ( // StatusReap is used to update the status of a node if we @@ -66,7 +70,7 @@ func (s *Server) nodeJoin(me serf.MemberEvent) { s.peerLock.Unlock() // If we still expecting to bootstrap, may need to handle this - if s.config.BootstrapExpect != 0 { + if atomic.LoadInt32(&s.config.BootstrapExpect) != 0 { s.maybeBootstrap() } } @@ -91,7 +95,7 @@ func (s *Server) maybeBootstrap() { // Bootstrap can only be done if there are no committed logs, // remove our expectations of bootstrapping if index != 0 { - s.config.BootstrapExpect = 0 + atomic.StoreInt32(&s.config.BootstrapExpect, 0) return } @@ -106,7 +110,7 @@ func (s *Server) maybeBootstrap() { if p.Region != s.config.Region { continue } - if p.Expect != 0 && p.Expect != s.config.BootstrapExpect { + if p.Expect != 0 && p.Expect != int(atomic.LoadInt32(&s.config.BootstrapExpect)) { s.logger.Printf("[ERR] nomad: peer %v has a conflicting expect value. All nodes should expect the same number.", member) return } @@ -118,7 +122,7 @@ func (s *Server) maybeBootstrap() { } // Skip if we haven't met the minimum expect count - if len(addrs) < s.config.BootstrapExpect { + if len(addrs) < int(atomic.LoadInt32(&s.config.BootstrapExpect)) { return } @@ -128,8 +132,8 @@ func (s *Server) maybeBootstrap() { s.logger.Printf("[ERR] nomad: failed to bootstrap peers: %v", err) } - // Bootstrapping comlete, don't enter this again - s.config.BootstrapExpect = 0 + // Bootstrapping complete, don't enter this again + atomic.StoreInt32(&s.config.BootstrapExpect, 0) } // nodeFailed is used to handle fail events on the serf cluster diff --git a/nomad/server.go b/nomad/server.go index 1932d18b1..d604ef603 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -14,9 +14,14 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" + consulapi "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/tlsutil" + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/raft" @@ -25,6 +30,22 @@ import ( ) const ( + // datacenterQueryLimit sets the max number of DCs that a Nomad + // Server will query to find bootstrap_expect servers. + datacenterQueryLimit = 25 + + // maxStaleLeadership is the maximum time we will permit this Nomad + // Server to go without seeing a valid Raft leader. + maxStaleLeadership = 15 * time.Second + + // peersPollInterval is used as the polling interval between attempts + // to query Consul for Nomad Servers. + peersPollInterval = 45 * time.Second + + // peersPollJitter is used to provide a slight amount of variance to + // the retry interval when querying Consul Servers + peersPollJitterFactor = 2 + raftState = "raft/" serfSnapshot = "serf/snapshot" snapshotsRetained = 2 @@ -116,6 +137,9 @@ type Server struct { heartbeatTimers map[string]*time.Timer heartbeatTimersLock sync.Mutex + // consulSyncer advertises this Nomad Agent with Consul + consulSyncer *consul.Syncer + // Worker used for processing workers []*Worker @@ -140,7 +164,7 @@ type endpoints struct { // NewServer is used to construct a new Nomad server from the // configuration, potentially returning an error -func NewServer(config *Config) (*Server, error) { +func NewServer(config *Config, consulSyncer *consul.Syncer) (*Server, error) { // Check the protocol version if err := config.CheckVersion(); err != nil { return nil, err @@ -172,6 +196,7 @@ func NewServer(config *Config) (*Server, error) { // Create the server s := &Server{ config: config, + consulSyncer: consulSyncer, connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, nil), logger: logger, rpcServer: rpc.NewServer(), @@ -218,6 +243,11 @@ func NewServer(config *Config) (*Server, error) { return nil, fmt.Errorf("Failed to start workers: %v", err) } + // Setup the Consul syncer + if err := s.setupConsulSyncer(); err != nil { + return nil, fmt.Errorf("failed to create server Consul syncer: %v") + } + // Monitor leadership changes go s.monitorLeadership() @@ -356,6 +386,182 @@ func (s *Server) Leave() error { return nil } +// setupBootstrapHandler() creates the closure necessary to support a Consul +// fallback handler. +func (s *Server) setupBootstrapHandler() error { + // peersTimeout is used to indicate to the Consul Syncer that the + // current Nomad Server has a stale peer set. peersTimeout will time + // out if the Consul Syncer bootstrapFn has not observed a Raft + // leader in maxStaleLeadership. If peersTimeout has been triggered, + // the Consul Syncer will begin querying Consul for other Nomad + // Servers. + // + // NOTE: time.Timer is used vs time.Time in order to handle clock + // drift because time.Timer is implemented as a monotonic clock. + var peersTimeout *time.Timer = time.NewTimer(0) + + // consulQueryCount is the number of times the bootstrapFn has been + // called, regardless of success. + var consulQueryCount uint64 + + // leadershipTimedOut is a helper method that returns true if the + // peersTimeout timer has expired. + leadershipTimedOut := func() bool { + select { + case <-peersTimeout.C: + return true + default: + return false + } + } + + // The bootstrapFn callback handler is used to periodically poll + // Consul to look up the Nomad Servers in Consul. In the event the + // server has been brought up without a `retry-join` configuration + // and this Server is partitioned from the rest of the cluster, + // periodically poll Consul to reattach this Server to other servers + // in the same region and automatically reform a quorum (assuming the + // correct number of servers required for quorum are present). + bootstrapFn := func() error { + // If there is a raft leader, do nothing + if s.raft.Leader() != "" { + peersTimeout.Reset(maxStaleLeadership) + return nil + } + + // (ab)use serf.go's behavior of setting BootstrapExpect to + // zero if we have bootstrapped. If we have bootstrapped + bootstrapExpect := atomic.LoadInt32(&s.config.BootstrapExpect) + if bootstrapExpect == 0 { + // This Nomad Server has been bootstrapped. Rely on + // the peersTimeout firing as a guard to prevent + // aggressive querying of Consul. + if !leadershipTimedOut() { + return nil + } + } else { + if consulQueryCount > 0 && !leadershipTimedOut() { + return nil + } + + // This Nomad Server has not been bootstrapped, reach + // out to Consul if our peer list is less than + // `bootstrap_expect`. + raftPeers, err := s.raftPeers.Peers() + if err != nil { + peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) + return nil + } + + // The necessary number of Nomad Servers required for + // quorum has been reached, we do not need to poll + // Consul. Let the normal timeout-based strategy + // take over. + if len(raftPeers) >= int(bootstrapExpect) { + peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) + return nil + } + } + consulQueryCount++ + + s.logger.Printf("[DEBUG] server.consul: lost contact with Nomad quorum, falling back to Consul for server list") + + consulCatalog := s.consulSyncer.ConsulClient().Catalog() + dcs, err := consulCatalog.Datacenters() + if err != nil { + peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) + return fmt.Errorf("server.consul: unable to query Consul datacenters: %v", err) + } + if len(dcs) > 2 { + // Query the local DC first, then shuffle the + // remaining DCs. If additional calls to bootstrapFn + // are necessary, this Nomad Server will eventually + // walk all datacenter until it finds enough hosts to + // form a quorum. + nearestDC := dcs[0] + otherDCs := make([]string, 0, len(dcs)) + shuffleStrings(otherDCs) + otherDCs = dcs[1:lib.MinInt(len(dcs), datacenterQueryLimit)] + + dcs = append([]string{nearestDC}, otherDCs...) + } + + nomadServerServiceName := s.config.ConsulConfig.ServerServiceName + var mErr multierror.Error + const defaultMaxNumNomadServers = 8 + nomadServerServices := make([]string, 0, defaultMaxNumNomadServers) + localNode := s.serf.Memberlist().LocalNode() + for _, dc := range dcs { + consulOpts := &consulapi.QueryOptions{ + AllowStale: true, + Datacenter: dc, + Near: "_agent", + WaitTime: consul.DefaultQueryWaitDuration, + } + consulServices, _, err := consulCatalog.Service(nomadServerServiceName, consul.ServiceTagSerf, consulOpts) + if err != nil { + err := fmt.Errorf("failed to query service %q in Consul datacenter %q: %v", nomadServerServiceName, dc, err) + s.logger.Printf("[WARN] server.consul: %v", err) + mErr.Errors = append(mErr.Errors, err) + continue + } + + for _, cs := range consulServices { + port := strconv.FormatInt(int64(cs.ServicePort), 10) + addr := cs.ServiceAddress + if addr == "" { + addr = cs.Address + } + if localNode.Addr.String() == addr && int(localNode.Port) == cs.ServicePort { + continue + } + serverAddr := net.JoinHostPort(addr, port) + nomadServerServices = append(nomadServerServices, serverAddr) + } + } + + if len(nomadServerServices) == 0 { + if len(mErr.Errors) > 0 { + peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) + return mErr.ErrorOrNil() + } + + // Log the error and return nil so future handlers + // can attempt to register the `nomad` service. + pollInterval := peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor) + s.logger.Printf("[TRACE] server.consul: no Nomad Servers advertising service %+q in Consul datacenters %+q, sleeping for %v", nomadServerServiceName, dcs, pollInterval) + peersTimeout.Reset(pollInterval) + return nil + } + + numServersContacted, err := s.Join(nomadServerServices) + if err != nil { + peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)) + return fmt.Errorf("contacted %d Nomad Servers: %v", numServersContacted, err) + } + + peersTimeout.Reset(maxStaleLeadership) + s.logger.Printf("[INFO] server.consul: successfully contacted %d Nomad Servers", numServersContacted) + + return nil + } + + s.consulSyncer.AddPeriodicHandler("Nomad Server Fallback Server Handler", bootstrapFn) + return nil +} + +// setupConsulSyncer creates Server-mode consul.Syncer which periodically +// executes callbacks on a fixed interval. +func (s *Server) setupConsulSyncer() error { + if s.config.ConsulConfig.ServerAutoJoin { + if err := s.setupBootstrapHandler(); err != nil { + return err + } + } + + return nil +} + // setupRPC is used to setup the RPC listener func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error { // Create endpoints @@ -531,8 +737,9 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) ( if s.config.Bootstrap || (s.config.DevMode && !s.config.DevDisableBootstrap) { conf.Tags["bootstrap"] = "1" } - if s.config.BootstrapExpect != 0 { - conf.Tags["expect"] = fmt.Sprintf("%d", s.config.BootstrapExpect) + bootstrapExpect := atomic.LoadInt32(&s.config.BootstrapExpect) + if bootstrapExpect != 0 { + conf.Tags["expect"] = fmt.Sprintf("%d", bootstrapExpect) } conf.MemberlistConfig.LogOutput = s.config.LogOutput conf.LogOutput = s.config.LogOutput diff --git a/nomad/server_test.go b/nomad/server_test.go index a44cb88da..d82e5065d 100644 --- a/nomad/server_test.go +++ b/nomad/server_test.go @@ -3,11 +3,13 @@ package nomad import ( "fmt" "io/ioutil" + "log" "net" "sync/atomic" "testing" "time" + "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/testutil" ) @@ -63,8 +65,14 @@ func testServer(t *testing.T, cb func(*Config)) *Server { // Enable raft as leader if we have bootstrap on config.RaftConfig.StartAsLeader = !config.DevDisableBootstrap + shutdownCh := make(chan struct{}) + consulSyncer, err := consul.NewSyncer(config.ConsulConfig, shutdownCh, log.New(config.LogOutput, "", log.LstdFlags)) + if err != nil { + t.Fatalf("err: %v", err) + } + // Create server - server, err := NewServer(config) + server, err := NewServer(config, consulSyncer) if err != nil { t.Fatalf("err: %v", err) }