Merge pull request #1276 from hashicorp/f-consul-server-autojoin

Teach Nomad servers how to fall back to Consul.
This commit is contained in:
Sean Chittenden 2016-06-16 14:40:45 -07:00 committed by GitHub
commit af55b74114
6 changed files with 244 additions and 19 deletions

View File

@ -39,7 +39,7 @@ const (
// datacenterQueryLimit searches through up to this many adjacent
// datacenters looking for the Nomad server service.
datacenterQueryLimit = 5
datacenterQueryLimit = 9
// registerRetryIntv is minimum interval on which we retry
// registration. We pick a value between this and 2x this.
@ -1253,8 +1253,8 @@ func (c *Client) setupConsulSyncer() error {
// a new set of servers so it's okay.
nearestDC := dcs[0]
otherDCs := make([]string, 0, len(dcs))
otherDCs = dcs[1:lib.MinInt(len(dcs), datacenterQueryLimit)]
shuffleStrings(otherDCs)
otherDCs = dcs[1:lib.MinInt(len(dcs), datacenterQueryLimit)]
dcs = append([]string{nearestDC}, otherDCs...)
}
@ -1270,7 +1270,7 @@ func (c *Client) setupConsulSyncer() error {
var mErr multierror.Error
const defaultMaxNumNomadServers = 8
nomadServerServices := make([]string, 0, defaultMaxNumNomadServers)
c.logger.Printf("[DEBUG] client.consul: bootstrap contacting following Consul DCs: %q", dcs)
c.logger.Printf("[DEBUG] client.consul: bootstrap contacting following Consul DCs: %+q", dcs)
for _, dc := range dcs {
consulOpts := &consulapi.QueryOptions{
AllowStale: true,

View File

@ -11,6 +11,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/nomad/client"
@ -110,7 +111,7 @@ func (a *Agent) serverConfig() (*nomad.Config, error) {
if a.config.Server.BootstrapExpect == 1 {
conf.Bootstrap = true
} else {
conf.BootstrapExpect = a.config.Server.BootstrapExpect
atomic.StoreInt32(&conf.BootstrapExpect, int32(a.config.Server.BootstrapExpect))
}
}
if a.config.DataDir != "" {
@ -235,7 +236,7 @@ func (a *Agent) serverConfig() (*nomad.Config, error) {
return nil, fmt.Errorf("server_service_name must be set when auto_advertise is enabled")
}
// conf.ConsulConfig = a.config.Consul
conf.ConsulConfig = a.config.Consul
return conf, nil
}
@ -377,7 +378,7 @@ func (a *Agent) setupServer() error {
}
// Create the server
server, err := nomad.NewServer(conf)
server, err := nomad.NewServer(conf, a.consulSyncer)
if err != nil {
return fmt.Errorf("server setup failed: %v", err)
}

View File

@ -10,6 +10,7 @@ import (
"github.com/hashicorp/memberlist"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
"github.com/hashicorp/nomad/scheduler"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
@ -51,8 +52,9 @@ type Config struct {
// BootstrapExpect mode is used to automatically bring up a
// collection of Nomad servers. This can be used to automatically
// bring up a collection of nodes.
BootstrapExpect int
// bring up a collection of nodes. All operations on BootstrapExpect
// must be handled via `atomic.*Int32()` calls.
BootstrapExpect int32
// DataDir is the directory to store our state in
DataDir string
@ -176,6 +178,9 @@ type Config struct {
// a new leader is elected, since we no longer know the status
// of all the heartbeats.
FailoverHeartbeatTTL time.Duration
// ConsulConfig is this Agent's Consul configuration
ConsulConfig *config.ConsulConfig
}
// CheckVersion is used to check if the ProtocolVersion is valid

View File

@ -1,6 +1,10 @@
package nomad
import "github.com/hashicorp/serf/serf"
import (
"sync/atomic"
"github.com/hashicorp/serf/serf"
)
const (
// StatusReap is used to update the status of a node if we
@ -66,7 +70,7 @@ func (s *Server) nodeJoin(me serf.MemberEvent) {
s.peerLock.Unlock()
// If we still expecting to bootstrap, may need to handle this
if s.config.BootstrapExpect != 0 {
if atomic.LoadInt32(&s.config.BootstrapExpect) != 0 {
s.maybeBootstrap()
}
}
@ -91,7 +95,7 @@ func (s *Server) maybeBootstrap() {
// Bootstrap can only be done if there are no committed logs,
// remove our expectations of bootstrapping
if index != 0 {
s.config.BootstrapExpect = 0
atomic.StoreInt32(&s.config.BootstrapExpect, 0)
return
}
@ -106,7 +110,7 @@ func (s *Server) maybeBootstrap() {
if p.Region != s.config.Region {
continue
}
if p.Expect != 0 && p.Expect != s.config.BootstrapExpect {
if p.Expect != 0 && p.Expect != int(atomic.LoadInt32(&s.config.BootstrapExpect)) {
s.logger.Printf("[ERR] nomad: peer %v has a conflicting expect value. All nodes should expect the same number.", member)
return
}
@ -118,7 +122,7 @@ func (s *Server) maybeBootstrap() {
}
// Skip if we haven't met the minimum expect count
if len(addrs) < s.config.BootstrapExpect {
if len(addrs) < int(atomic.LoadInt32(&s.config.BootstrapExpect)) {
return
}
@ -128,8 +132,8 @@ func (s *Server) maybeBootstrap() {
s.logger.Printf("[ERR] nomad: failed to bootstrap peers: %v", err)
}
// Bootstrapping comlete, don't enter this again
s.config.BootstrapExpect = 0
// Bootstrapping complete, don't enter this again
atomic.StoreInt32(&s.config.BootstrapExpect, 0)
}
// nodeFailed is used to handle fail events on the serf cluster

View File

@ -14,9 +14,14 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/raft"
@ -25,6 +30,22 @@ import (
)
const (
// datacenterQueryLimit sets the max number of DCs that a Nomad
// Server will query to find bootstrap_expect servers.
datacenterQueryLimit = 25
// maxStaleLeadership is the maximum time we will permit this Nomad
// Server to go without seeing a valid Raft leader.
maxStaleLeadership = 15 * time.Second
// peersPollInterval is used as the polling interval between attempts
// to query Consul for Nomad Servers.
peersPollInterval = 45 * time.Second
// peersPollJitter is used to provide a slight amount of variance to
// the retry interval when querying Consul Servers
peersPollJitterFactor = 2
raftState = "raft/"
serfSnapshot = "serf/snapshot"
snapshotsRetained = 2
@ -116,6 +137,9 @@ type Server struct {
heartbeatTimers map[string]*time.Timer
heartbeatTimersLock sync.Mutex
// consulSyncer advertises this Nomad Agent with Consul
consulSyncer *consul.Syncer
// Worker used for processing
workers []*Worker
@ -140,7 +164,7 @@ type endpoints struct {
// NewServer is used to construct a new Nomad server from the
// configuration, potentially returning an error
func NewServer(config *Config) (*Server, error) {
func NewServer(config *Config, consulSyncer *consul.Syncer) (*Server, error) {
// Check the protocol version
if err := config.CheckVersion(); err != nil {
return nil, err
@ -172,6 +196,7 @@ func NewServer(config *Config) (*Server, error) {
// Create the server
s := &Server{
config: config,
consulSyncer: consulSyncer,
connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, nil),
logger: logger,
rpcServer: rpc.NewServer(),
@ -218,6 +243,11 @@ func NewServer(config *Config) (*Server, error) {
return nil, fmt.Errorf("Failed to start workers: %v", err)
}
// Setup the Consul syncer
if err := s.setupConsulSyncer(); err != nil {
return nil, fmt.Errorf("failed to create server Consul syncer: %v")
}
// Monitor leadership changes
go s.monitorLeadership()
@ -356,6 +386,182 @@ func (s *Server) Leave() error {
return nil
}
// setupBootstrapHandler() creates the closure necessary to support a Consul
// fallback handler.
func (s *Server) setupBootstrapHandler() error {
// peersTimeout is used to indicate to the Consul Syncer that the
// current Nomad Server has a stale peer set. peersTimeout will time
// out if the Consul Syncer bootstrapFn has not observed a Raft
// leader in maxStaleLeadership. If peersTimeout has been triggered,
// the Consul Syncer will begin querying Consul for other Nomad
// Servers.
//
// NOTE: time.Timer is used vs time.Time in order to handle clock
// drift because time.Timer is implemented as a monotonic clock.
var peersTimeout *time.Timer = time.NewTimer(0)
// consulQueryCount is the number of times the bootstrapFn has been
// called, regardless of success.
var consulQueryCount uint64
// leadershipTimedOut is a helper method that returns true if the
// peersTimeout timer has expired.
leadershipTimedOut := func() bool {
select {
case <-peersTimeout.C:
return true
default:
return false
}
}
// The bootstrapFn callback handler is used to periodically poll
// Consul to look up the Nomad Servers in Consul. In the event the
// server has been brought up without a `retry-join` configuration
// and this Server is partitioned from the rest of the cluster,
// periodically poll Consul to reattach this Server to other servers
// in the same region and automatically reform a quorum (assuming the
// correct number of servers required for quorum are present).
bootstrapFn := func() error {
// If there is a raft leader, do nothing
if s.raft.Leader() != "" {
peersTimeout.Reset(maxStaleLeadership)
return nil
}
// (ab)use serf.go's behavior of setting BootstrapExpect to
// zero if we have bootstrapped. If we have bootstrapped
bootstrapExpect := atomic.LoadInt32(&s.config.BootstrapExpect)
if bootstrapExpect == 0 {
// This Nomad Server has been bootstrapped. Rely on
// the peersTimeout firing as a guard to prevent
// aggressive querying of Consul.
if !leadershipTimedOut() {
return nil
}
} else {
if consulQueryCount > 0 && !leadershipTimedOut() {
return nil
}
// This Nomad Server has not been bootstrapped, reach
// out to Consul if our peer list is less than
// `bootstrap_expect`.
raftPeers, err := s.raftPeers.Peers()
if err != nil {
peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
return nil
}
// The necessary number of Nomad Servers required for
// quorum has been reached, we do not need to poll
// Consul. Let the normal timeout-based strategy
// take over.
if len(raftPeers) >= int(bootstrapExpect) {
peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
return nil
}
}
consulQueryCount++
s.logger.Printf("[DEBUG] server.consul: lost contact with Nomad quorum, falling back to Consul for server list")
consulCatalog := s.consulSyncer.ConsulClient().Catalog()
dcs, err := consulCatalog.Datacenters()
if err != nil {
peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
return fmt.Errorf("server.consul: unable to query Consul datacenters: %v", err)
}
if len(dcs) > 2 {
// Query the local DC first, then shuffle the
// remaining DCs. If additional calls to bootstrapFn
// are necessary, this Nomad Server will eventually
// walk all datacenter until it finds enough hosts to
// form a quorum.
nearestDC := dcs[0]
otherDCs := make([]string, 0, len(dcs))
shuffleStrings(otherDCs)
otherDCs = dcs[1:lib.MinInt(len(dcs), datacenterQueryLimit)]
dcs = append([]string{nearestDC}, otherDCs...)
}
nomadServerServiceName := s.config.ConsulConfig.ServerServiceName
var mErr multierror.Error
const defaultMaxNumNomadServers = 8
nomadServerServices := make([]string, 0, defaultMaxNumNomadServers)
localNode := s.serf.Memberlist().LocalNode()
for _, dc := range dcs {
consulOpts := &consulapi.QueryOptions{
AllowStale: true,
Datacenter: dc,
Near: "_agent",
WaitTime: consul.DefaultQueryWaitDuration,
}
consulServices, _, err := consulCatalog.Service(nomadServerServiceName, consul.ServiceTagSerf, consulOpts)
if err != nil {
err := fmt.Errorf("failed to query service %q in Consul datacenter %q: %v", nomadServerServiceName, dc, err)
s.logger.Printf("[WARN] server.consul: %v", err)
mErr.Errors = append(mErr.Errors, err)
continue
}
for _, cs := range consulServices {
port := strconv.FormatInt(int64(cs.ServicePort), 10)
addr := cs.ServiceAddress
if addr == "" {
addr = cs.Address
}
if localNode.Addr.String() == addr && int(localNode.Port) == cs.ServicePort {
continue
}
serverAddr := net.JoinHostPort(addr, port)
nomadServerServices = append(nomadServerServices, serverAddr)
}
}
if len(nomadServerServices) == 0 {
if len(mErr.Errors) > 0 {
peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
return mErr.ErrorOrNil()
}
// Log the error and return nil so future handlers
// can attempt to register the `nomad` service.
pollInterval := peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor)
s.logger.Printf("[TRACE] server.consul: no Nomad Servers advertising service %+q in Consul datacenters %+q, sleeping for %v", nomadServerServiceName, dcs, pollInterval)
peersTimeout.Reset(pollInterval)
return nil
}
numServersContacted, err := s.Join(nomadServerServices)
if err != nil {
peersTimeout.Reset(peersPollInterval + lib.RandomStagger(peersPollInterval/peersPollJitterFactor))
return fmt.Errorf("contacted %d Nomad Servers: %v", numServersContacted, err)
}
peersTimeout.Reset(maxStaleLeadership)
s.logger.Printf("[INFO] server.consul: successfully contacted %d Nomad Servers", numServersContacted)
return nil
}
s.consulSyncer.AddPeriodicHandler("Nomad Server Fallback Server Handler", bootstrapFn)
return nil
}
// setupConsulSyncer creates Server-mode consul.Syncer which periodically
// executes callbacks on a fixed interval.
func (s *Server) setupConsulSyncer() error {
if s.config.ConsulConfig.ServerAutoJoin {
if err := s.setupBootstrapHandler(); err != nil {
return err
}
}
return nil
}
// setupRPC is used to setup the RPC listener
func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
// Create endpoints
@ -531,8 +737,9 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
if s.config.Bootstrap || (s.config.DevMode && !s.config.DevDisableBootstrap) {
conf.Tags["bootstrap"] = "1"
}
if s.config.BootstrapExpect != 0 {
conf.Tags["expect"] = fmt.Sprintf("%d", s.config.BootstrapExpect)
bootstrapExpect := atomic.LoadInt32(&s.config.BootstrapExpect)
if bootstrapExpect != 0 {
conf.Tags["expect"] = fmt.Sprintf("%d", bootstrapExpect)
}
conf.MemberlistConfig.LogOutput = s.config.LogOutput
conf.LogOutput = s.config.LogOutput

View File

@ -3,11 +3,13 @@ package nomad
import (
"fmt"
"io/ioutil"
"log"
"net"
"sync/atomic"
"testing"
"time"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/testutil"
)
@ -63,8 +65,14 @@ func testServer(t *testing.T, cb func(*Config)) *Server {
// Enable raft as leader if we have bootstrap on
config.RaftConfig.StartAsLeader = !config.DevDisableBootstrap
shutdownCh := make(chan struct{})
consulSyncer, err := consul.NewSyncer(config.ConsulConfig, shutdownCh, log.New(config.LogOutput, "", log.LstdFlags))
if err != nil {
t.Fatalf("err: %v", err)
}
// Create server
server, err := NewServer(config)
server, err := NewServer(config, consulSyncer)
if err != nil {
t.Fatalf("err: %v", err)
}