Teach Nomad servers how to fall back to Consul.

This commit is contained in:
Sean Chittenden 2016-06-13 22:58:39 -07:00
parent c592020e2d
commit f05514335b
No known key found for this signature in database
GPG Key ID: 4EBC9DC16C2E5E16
4 changed files with 121 additions and 4 deletions

View File

@ -235,7 +235,7 @@ func (a *Agent) serverConfig() (*nomad.Config, error) {
return nil, fmt.Errorf("server_service_name must be set when auto_advertise is enabled")
}
// conf.ConsulConfig = a.config.Consul
conf.ConsulConfig = a.config.Consul
return conf, nil
}
@ -379,7 +379,7 @@ func (a *Agent) setupServer() error {
}
// Create the server
server, err := nomad.NewServer(conf)
server, err := nomad.NewServer(conf, a.consulSyncer)
if err != nil {
return fmt.Errorf("server setup failed: %v", err)
}

View File

@ -10,6 +10,7 @@ import (
"github.com/hashicorp/memberlist"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
"github.com/hashicorp/nomad/scheduler"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
@ -176,6 +177,9 @@ type Config struct {
// a new leader is elected, since we no longer know the status
// of all the heartbeats.
FailoverHeartbeatTTL time.Duration
// ConsulConfig is this Agent's Consul configuration
ConsulConfig *config.ConsulConfig
}
// CheckVersion is used to check if the ProtocolVersion is valid

View File

@ -16,7 +16,11 @@ import (
"sync"
"time"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/raft"
@ -25,6 +29,12 @@ import (
)
const (
// datacenterQueryFactor sets the max number of DCs that a Nomad
// Server will query to find bootstrap_expect servers. If
// bootstrap_expect is 3, then the Nomad Server bootstrapFn handler
// will search through up to 9 Consul DCs to find its quorum.
datacenterQueryFactor = 3
raftState = "raft/"
serfSnapshot = "serf/snapshot"
snapshotsRetained = 2
@ -116,6 +126,9 @@ type Server struct {
heartbeatTimers map[string]*time.Timer
heartbeatTimersLock sync.Mutex
// consulSyncer advertises this Nomad Agent with Consul
consulSyncer *consul.Syncer
// Worker used for processing
workers []*Worker
@ -140,7 +153,7 @@ type endpoints struct {
// NewServer is used to construct a new Nomad server from the
// configuration, potentially returning an error
func NewServer(config *Config) (*Server, error) {
func NewServer(config *Config, consulSyncer *consul.Syncer) (*Server, error) {
// Check the protocol version
if err := config.CheckVersion(); err != nil {
return nil, err
@ -172,6 +185,7 @@ func NewServer(config *Config) (*Server, error) {
// Create the server
s := &Server{
config: config,
consulSyncer: consulSyncer,
connPool: NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, nil),
logger: logger,
rpcServer: rpc.NewServer(),
@ -218,6 +232,11 @@ func NewServer(config *Config) (*Server, error) {
return nil, fmt.Errorf("Failed to start workers: %v", err)
}
// Setup the Consul syncer
if err := s.setupConsulSyncer(); err != nil {
return nil, fmt.Errorf("failed to create server Consul syncer: %v")
}
// Monitor leadership changes
go s.monitorLeadership()
@ -356,6 +375,91 @@ func (s *Server) Leave() error {
return nil
}
// setupConsulSyncer creates Server-mode consul.Syncer which periodically
// executes callbacks on a fixed interval.
func (s *Server) setupConsulSyncer() error {
// The bootstrapFn callback handler is used to periodically poll
// Consul to look up the Nomad Servers in Consul. In the event the
// server has been brought up without a `retry-join` configuration
// and this Server is partitioned from the rest of the cluster,
// periodically poll Consul to reattach this Server to other servers
// in the same region and automatically reform a quorum (assuming the
// correct number of servers required for quorum are present).
bootstrapFn := func() error {
// If the the number of Members in Serf is more than the
// bootstrap quorum, do nothing.
if len(s.Members()) < s.config.BootstrapExpect {
return nil
}
s.logger.Printf("[TRACE] server.consul: lost contact with Nomad quorum, falling back to Consul for server list")
consulCatalog := s.consulSyncer.ConsulClient().Catalog()
dcs, err := consulCatalog.Datacenters()
if err != nil {
return fmt.Errorf("server.consul: unable to query Consul datacenters: %v", err)
}
if len(dcs) > 2 {
// Query the local DC first, then shuffle the
// remaining DCs. If additional calls to bootstrapFn
// are necessary, this Nomad Server will eventually
// walk all datacenter until it finds enough hosts to
// form a quorum.
nearestDC := dcs[0]
otherDCs := make([]string, 0, len(dcs))
otherDCs = dcs[1:lib.MinInt(len(dcs), s.config.BootstrapExpect*datacenterQueryFactor)]
shuffleStrings(otherDCs)
dcs = append([]string{nearestDC}, otherDCs...)
}
nomadServerServiceName := s.config.ConsulConfig.ServerServiceName
var mErr multierror.Error
const defaultMaxNumNomadServers = 8
nomadServerServices := make([]string, 0, defaultMaxNumNomadServers)
for _, dc := range dcs {
opts := &consulapi.QueryOptions{
AllowStale: true,
Datacenter: dc,
Near: "_agent",
WaitTime: consul.DefaultQueryWaitDuration,
}
consulServices, _, err := consulCatalog.Service(nomadServerServiceName, consul.ServiceTagSerf, opts)
if err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %+q from Consul datacenter %+q: %v", nomadServerServiceName, dc, err))
continue
}
for _, cs := range consulServices {
port := strconv.FormatInt(int64(cs.ServicePort), 10)
addr := cs.ServiceAddress
if addr == "" {
addr = cs.Address
}
serverAddr := net.JoinHostPort(addr, port)
nomadServerServices = append(nomadServerServices, serverAddr)
}
}
if len(nomadServerServices) == 0 {
if len(mErr.Errors) > 0 {
return mErr.ErrorOrNil()
}
return fmt.Errorf("no Nomad Servers advertising service %+q in Consul datacenters: %+q", nomadServerServiceName, dcs)
}
numServersContacted, err := s.Join(nomadServerServices)
if err != nil {
return fmt.Errorf("contacted %d Nomad Servers: %v", numServersContacted, err)
}
s.logger.Printf("[INFO] successfully contacted %d Nomad Servers", numServersContacted)
return nil
}
s.consulSyncer.AddPeriodicHandler("Nomad Server Fallback Server Handler", bootstrapFn)
return nil
}
// setupRPC is used to setup the RPC listener
func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
// Create endpoints

View File

@ -3,11 +3,14 @@ package nomad
import (
"fmt"
"io/ioutil"
"log"
"net"
"os"
"sync/atomic"
"testing"
"time"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/testutil"
)
@ -63,8 +66,14 @@ func testServer(t *testing.T, cb func(*Config)) *Server {
// Enable raft as leader if we have bootstrap on
config.RaftConfig.StartAsLeader = !config.DevDisableBootstrap
shutdownCh := make(chan struct{})
consulSyncer, err := consul.NewSyncer(config.ConsulConfig, shutdownCh, log.New(os.Stderr, "", log.LstdFlags))
if err != nil {
t.Fatalf("err: %v", err)
}
// Create server
server, err := NewServer(config)
server, err := NewServer(config, consulSyncer)
if err != nil {
t.Fatalf("err: %v", err)
}