2015-06-01 15:49:10 +00:00
package nomad
import (
2017-11-20 15:38:46 +00:00
"context"
2015-06-03 10:26:50 +00:00
"crypto/tls"
2015-06-01 15:49:10 +00:00
"fmt"
2017-02-02 23:31:36 +00:00
"io/ioutil"
2015-06-01 15:49:10 +00:00
"log"
2015-06-03 10:26:50 +00:00
"net"
"net/rpc"
2017-02-02 23:31:36 +00:00
"os"
2015-06-01 15:49:10 +00:00
"path/filepath"
2015-11-24 21:15:01 +00:00
"sort"
2015-06-03 10:26:50 +00:00
"strconv"
2015-06-01 15:49:10 +00:00
"sync"
2016-06-16 19:00:15 +00:00
"sync/atomic"
2015-06-01 15:49:10 +00:00
"time"
2017-12-18 21:16:23 +00:00
"github.com/hashicorp/consul/agent/consul/autopilot"
2016-06-14 05:58:39 +00:00
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
2017-11-15 01:53:23 +00:00
multierror "github.com/hashicorp/go-multierror"
2017-08-19 23:49:53 +00:00
lru "github.com/hashicorp/golang-lru"
2016-06-14 05:58:39 +00:00
"github.com/hashicorp/nomad/command/agent/consul"
2018-01-11 18:17:23 +00:00
"github.com/hashicorp/nomad/helper/codec"
2018-01-12 21:58:44 +00:00
"github.com/hashicorp/nomad/helper/pool"
"github.com/hashicorp/nomad/helper/stats"
2016-10-25 23:05:37 +00:00
"github.com/hashicorp/nomad/helper/tlsutil"
2017-06-28 22:35:52 +00:00
"github.com/hashicorp/nomad/nomad/deploymentwatcher"
2015-08-29 21:22:24 +00:00
"github.com/hashicorp/nomad/nomad/state"
2016-05-28 01:14:34 +00:00
"github.com/hashicorp/nomad/nomad/structs"
2017-11-20 15:38:46 +00:00
"github.com/hashicorp/nomad/nomad/structs/config"
2015-06-01 15:49:10 +00:00
"github.com/hashicorp/raft"
2017-11-15 01:53:23 +00:00
raftboltdb "github.com/hashicorp/raft-boltdb"
2015-06-03 10:58:00 +00:00
"github.com/hashicorp/serf/serf"
2015-06-01 15:49:10 +00:00
)
const (
2016-06-16 19:14:03 +00:00
// datacenterQueryLimit sets the max number of DCs that a Nomad
// Server will query to find bootstrap_expect servers.
datacenterQueryLimit = 25
// maxStaleLeadership is the maximum time we will permit this Nomad
// Server to go without seeing a valid Raft leader.
maxStaleLeadership = 15 * time . Second
// peersPollInterval is used as the polling interval between attempts
// to query Consul for Nomad Servers.
peersPollInterval = 45 * time . Second
// peersPollJitter is used to provide a slight amount of variance to
// the retry interval when querying Consul Servers
peersPollJitterFactor = 2
2016-06-14 05:58:39 +00:00
2015-06-01 15:49:10 +00:00
raftState = "raft/"
2015-06-03 10:58:00 +00:00
serfSnapshot = "serf/snapshot"
2015-06-01 15:49:10 +00:00
snapshotsRetained = 2
2015-06-07 18:50:29 +00:00
// serverRPCCache controls how long we keep an idle connection open to a server
serverRPCCache = 2 * time . Minute
2018-03-11 17:50:39 +00:00
// serverMaxStreams controls how many idle streams we keep open to a server
2015-06-07 18:50:29 +00:00
serverMaxStreams = 64
2015-06-01 15:49:10 +00:00
// raftLogCacheSize is the maximum number of logs to cache in-memory.
2016-05-15 16:41:34 +00:00
// This is used to reduce disk I/O for the recently committed entries.
2015-06-01 15:49:10 +00:00
raftLogCacheSize = 512
2015-06-03 11:25:50 +00:00
// raftRemoveGracePeriod is how long we wait to allow a RemovePeer
// to replicate to gracefully leave the cluster.
raftRemoveGracePeriod = 5 * time . Second
2017-02-01 00:43:57 +00:00
// defaultConsulDiscoveryInterval is how often to poll Consul for new
// servers if there is no leader.
2017-04-14 00:03:07 +00:00
defaultConsulDiscoveryInterval time . Duration = 3 * time . Second
2017-02-01 00:43:57 +00:00
// defaultConsulDiscoveryIntervalRetry is how often to poll Consul for
// new servers if there is no leader and the last Consul query failed.
2017-04-14 00:03:07 +00:00
defaultConsulDiscoveryIntervalRetry time . Duration = 9 * time . Second
2017-08-19 23:49:53 +00:00
// aclCacheSize is the number of ACL objects to keep cached. ACLs have a parsing and
// construction cost, so we keep the hot objects cached to reduce the ACL token resolution time.
aclCacheSize = 512
2015-06-01 15:49:10 +00:00
)
// Server is Nomad server which manages the job queues,
// schedulers, and notification bus for agents.
type Server struct {
config * Config
2017-12-07 17:07:00 +00:00
2015-06-01 15:49:10 +00:00
logger * log . Logger
2015-06-07 18:50:29 +00:00
// Connection pool to other Nomad servers
2018-01-12 21:58:44 +00:00
connPool * pool . ConnPool
2015-06-07 18:50:29 +00:00
2015-06-03 11:25:50 +00:00
// The raft instance is used among Nomad nodes within the
// region to protect operations that require strong consistency
2015-09-07 17:46:41 +00:00
leaderCh <- chan bool
2015-06-01 15:49:10 +00:00
raft * raft . Raft
raftLayer * RaftLayer
raftStore * raftboltdb . BoltStore
2015-06-01 19:11:40 +00:00
raftInmem * raft . InmemStore
2015-06-01 15:49:10 +00:00
raftTransport * raft . NetworkTransport
2017-12-18 21:16:23 +00:00
// autopilot is the Autopilot instance for this server.
autopilot * autopilot . Autopilot
2015-06-01 15:49:10 +00:00
// fsm is the state machine used with Raft
fsm * nomadFSM
2015-06-03 10:26:50 +00:00
// rpcListener is used to listen for incoming connections
2017-12-07 17:07:00 +00:00
rpcListener net . Listener
listenerCh chan struct { }
2017-11-28 17:33:46 +00:00
2018-02-15 23:03:12 +00:00
// tlsWrap is used to wrap outbound connections using TLS. It should be
// accessed using the lock.
tlsWrap tlsutil . RegionWrapper
tlsWrapLock sync . RWMutex
2018-01-03 22:59:52 +00:00
// rpcServer is the static RPC server that is used by the local agent.
rpcServer * rpc . Server
// rpcAdvertise is the advertised address for the RPC listener.
2015-06-03 10:58:00 +00:00
rpcAdvertise net . Addr
2015-06-03 10:26:50 +00:00
// rpcTLS is the TLS config for incoming TLS requests
2017-12-07 17:07:00 +00:00
rpcTLS * tls . Config
rpcCancel context . CancelFunc
2015-06-03 10:26:50 +00:00
2018-01-03 22:59:52 +00:00
// staticEndpoints is the set of static endpoints that can be reused across
// all RPC connections
staticEndpoints endpoints
2018-02-05 21:32:39 +00:00
// streamingRpcs is the registry holding our streaming RPC handlers.
2018-03-11 18:41:13 +00:00
streamingRpcs * structs . StreamingRpcRegistry
2018-01-19 00:51:49 +00:00
2018-01-05 21:50:04 +00:00
// nodeConns is the set of multiplexed node connections we have keyed by
// NodeID
2018-01-12 23:57:07 +00:00
nodeConns map [ string ] * nodeConnState
2018-01-05 21:50:04 +00:00
nodeConnsLock sync . RWMutex
2015-06-04 10:33:12 +00:00
// peers is used to track the known Nomad servers. This is
// used for region forwarding and clustering.
2015-06-07 18:32:01 +00:00
peers map [ string ] [ ] * serverParts
2017-02-02 23:49:06 +00:00
localPeers map [ raft . ServerAddress ] * serverParts
2015-06-07 18:32:01 +00:00
peerLock sync . RWMutex
2015-06-04 10:33:12 +00:00
2015-06-03 10:58:00 +00:00
// serf is the Serf cluster containing only Nomad
// servers. This is used for multi-region federation
// and automatic clustering within regions.
serf * serf . Serf
2015-06-04 10:42:56 +00:00
// reconcileCh is used to pass events from the serf handler
// into the leader manager. Mostly used to handle when servers
// join/leave from the region.
reconcileCh chan serf . Member
2015-06-03 10:58:00 +00:00
// eventCh is used to receive events from the serf cluster
eventCh chan serf . Event
2017-06-28 22:35:52 +00:00
// BlockedEvals is used to manage evaluations that are blocked on node
// capacity changes.
blockedEvals * BlockedEvals
// deploymentWatcher is used to watch deployments and their allocations and
2017-08-07 21:13:05 +00:00
// make the required calls to continue to transition the deployment.
2017-06-28 22:35:52 +00:00
deploymentWatcher * deploymentwatcher . Watcher
2015-07-24 04:44:17 +00:00
// evalBroker is used to manage the in-progress evaluations
// that are waiting to be brokered to a sub-scheduler
evalBroker * EvalBroker
2017-06-28 22:35:52 +00:00
// periodicDispatcher is used to track and create evaluations for periodic jobs.
periodicDispatcher * PeriodicDispatch
2016-01-29 23:31:32 +00:00
2015-07-27 22:11:42 +00:00
// planQueue is used to manage the submitted allocation
// plans that are waiting to be assessed by the leader
planQueue * PlanQueue
2015-08-23 00:17:13 +00:00
// heartbeatTimers track the expiration time of each heartbeat that has
// a TTL. On expiration, the node status is updated to be 'down'.
heartbeatTimers map [ string ] * time . Timer
heartbeatTimersLock sync . Mutex
2017-02-01 00:43:57 +00:00
// consulCatalog is used for discovering other Nomad Servers via Consul
consulCatalog consul . CatalogAPI
2016-06-14 05:58:39 +00:00
2016-08-10 03:15:13 +00:00
// vault is the client for communicating with Vault.
2016-08-10 20:20:13 +00:00
vault VaultClient
2016-08-10 03:15:13 +00:00
2015-08-23 17:53:53 +00:00
// Worker used for processing
workers [ ] * Worker
2017-08-19 23:49:53 +00:00
// aclCache is used to maintain the parsed ACL objects
aclCache * lru . TwoQueueCache
2017-10-23 19:50:37 +00:00
// leaderAcl is the management ACL token that is valid when resolved by the
// current leader.
leaderAcl string
leaderAclLock sync . Mutex
2017-12-18 21:16:23 +00:00
// statsFetcher is used by autopilot to check the status of the other
// Nomad router.
statsFetcher * StatsFetcher
2017-09-19 14:47:10 +00:00
// EnterpriseState is used to fill in state for Pro/Ent builds
EnterpriseState
2015-06-03 11:25:50 +00:00
left bool
2015-06-01 15:49:10 +00:00
shutdown bool
shutdownCh chan struct { }
shutdownLock sync . Mutex
}
2015-06-03 10:26:50 +00:00
// Holds the RPC endpoints
type endpoints struct {
2017-08-10 19:24:11 +00:00
Status * Status
Node * Node
Job * Job
Eval * Eval
Plan * Plan
Alloc * Alloc
Deployment * Deployment
Region * Region
Search * Search
Periodic * Periodic
System * System
Operator * Operator
2017-08-08 00:10:04 +00:00
ACL * ACL
2017-09-07 23:56:15 +00:00
Enterprise * EnterpriseEndpoints
2018-01-11 23:58:59 +00:00
// Client endpoints
2018-02-06 01:20:42 +00:00
ClientStats * ClientStats
FileSystem * FileSystem
ClientAllocations * ClientAllocations
2015-06-03 10:26:50 +00:00
}
2015-06-01 15:49:10 +00:00
// NewServer is used to construct a new Nomad server from the
// configuration, potentially returning an error
2017-02-01 00:43:57 +00:00
func NewServer ( config * Config , consulCatalog consul . CatalogAPI , logger * log . Logger ) ( * Server , error ) {
2015-06-01 15:49:10 +00:00
// Check the protocol version
if err := config . CheckVersion ( ) ; err != nil {
return nil , err
}
2015-07-24 04:44:17 +00:00
// Create an eval broker
2017-04-14 22:24:55 +00:00
evalBroker , err := NewEvalBroker (
config . EvalNackTimeout ,
config . EvalNackInitialReenqueueDelay ,
config . EvalNackSubsequentReenqueueDelay ,
config . EvalDeliveryLimit )
2015-07-24 04:44:17 +00:00
if err != nil {
return nil , err
}
2016-01-29 23:31:32 +00:00
// Create a new blocked eval tracker.
blockedEvals := NewBlockedEvals ( evalBroker )
2015-07-27 22:11:42 +00:00
// Create a plan queue
planQueue , err := NewPlanQueue ( )
if err != nil {
return nil , err
}
2016-10-24 05:22:00 +00:00
// Configure TLS
2017-11-28 22:25:16 +00:00
tlsConf := config . tlsConfig ( )
incomingTLS , tlsWrap , err := getTLSConf ( config . TLSConfig . EnableRPC , tlsConf )
2017-11-29 17:54:05 +00:00
if err != nil {
return nil , err
2016-10-24 05:22:00 +00:00
}
2017-08-19 23:49:53 +00:00
// Create the ACL object cache
aclCache , err := lru . New2Q ( aclCacheSize )
if err != nil {
return nil , err
}
2015-06-01 15:49:10 +00:00
// Create the server
s := & Server {
2017-07-03 18:26:45 +00:00
config : config ,
consulCatalog : consulCatalog ,
2018-01-12 21:58:44 +00:00
connPool : pool . NewPool ( config . LogOutput , serverRPCCache , serverMaxStreams , tlsWrap ) ,
2017-07-03 18:26:45 +00:00
logger : logger ,
2018-02-15 23:03:12 +00:00
tlsWrap : tlsWrap ,
2017-07-03 18:26:45 +00:00
rpcServer : rpc . NewServer ( ) ,
2018-03-11 18:41:13 +00:00
streamingRpcs : structs . NewStreamingRpcRegistry ( ) ,
2018-01-12 23:57:07 +00:00
nodeConns : make ( map [ string ] * nodeConnState ) ,
2017-07-03 18:26:45 +00:00
peers : make ( map [ string ] [ ] * serverParts ) ,
localPeers : make ( map [ raft . ServerAddress ] * serverParts ) ,
reconcileCh : make ( chan serf . Member , 32 ) ,
eventCh : make ( chan serf . Event , 256 ) ,
evalBroker : evalBroker ,
blockedEvals : blockedEvals ,
planQueue : planQueue ,
rpcTLS : incomingTLS ,
2017-08-19 23:49:53 +00:00
aclCache : aclCache ,
2017-07-03 18:26:45 +00:00
shutdownCh : make ( chan struct { } ) ,
2015-06-01 15:49:10 +00:00
}
2015-12-18 20:26:28 +00:00
// Create the periodic dispatcher for launching periodic jobs.
2015-12-19 01:26:05 +00:00
s . periodicDispatcher = NewPeriodicDispatch ( s . logger , s )
2015-12-18 20:26:28 +00:00
2017-12-18 21:16:23 +00:00
// Initialize the stats fetcher that autopilot will use.
s . statsFetcher = NewStatsFetcher ( logger , s . connPool , s . config . Region )
2016-08-10 20:20:13 +00:00
// Setup Vault
if err := s . setupVaultClient ( ) ; err != nil {
s . Shutdown ( )
s . logger . Printf ( "[ERR] nomad: failed to setup Vault client: %v" , err )
return nil , fmt . Errorf ( "Failed to setup Vault client: %v" , err )
}
2015-06-03 10:26:50 +00:00
// Initialize the RPC layer
2016-10-24 05:22:00 +00:00
if err := s . setupRPC ( tlsWrap ) ; err != nil {
2015-06-03 10:26:50 +00:00
s . Shutdown ( )
2016-06-17 13:44:10 +00:00
s . logger . Printf ( "[ERR] nomad: failed to start RPC layer: %s" , err )
2015-06-03 10:26:50 +00:00
return nil , fmt . Errorf ( "Failed to start RPC layer: %v" , err )
}
2015-06-01 15:49:10 +00:00
// Initialize the Raft server
if err := s . setupRaft ( ) ; err != nil {
s . Shutdown ( )
2016-06-17 13:44:10 +00:00
s . logger . Printf ( "[ERR] nomad: failed to start Raft: %s" , err )
2015-06-01 15:49:10 +00:00
return nil , fmt . Errorf ( "Failed to start Raft: %v" , err )
}
2015-06-03 10:58:00 +00:00
// Initialize the wan Serf
s . serf , err = s . setupSerf ( config . SerfConfig , s . eventCh , serfSnapshot )
if err != nil {
s . Shutdown ( )
2016-06-17 13:44:10 +00:00
s . logger . Printf ( "[ERR] nomad: failed to start serf WAN: %s" , err )
2015-06-03 10:58:00 +00:00
return nil , fmt . Errorf ( "Failed to start serf: %v" , err )
}
2016-05-15 16:41:34 +00:00
// Initialize the scheduling workers
2015-07-28 22:12:08 +00:00
if err := s . setupWorkers ( ) ; err != nil {
s . Shutdown ( )
2016-06-17 13:44:10 +00:00
s . logger . Printf ( "[ERR] nomad: failed to start workers: %s" , err )
2015-07-28 22:12:08 +00:00
return nil , fmt . Errorf ( "Failed to start workers: %v" , err )
}
2016-06-14 05:58:39 +00:00
// Setup the Consul syncer
if err := s . setupConsulSyncer ( ) ; err != nil {
2017-02-28 00:00:19 +00:00
return nil , fmt . Errorf ( "failed to create server Consul syncer: %v" , err )
2016-06-14 05:58:39 +00:00
}
2017-06-28 22:35:52 +00:00
// Setup the deployment watcher.
if err := s . setupDeploymentWatcher ( ) ; err != nil {
return nil , fmt . Errorf ( "failed to create deployment watcher: %v" , err )
}
2017-09-19 14:47:10 +00:00
// Setup the enterprise state
if err := s . setupEnterprise ( config ) ; err != nil {
return nil , err
}
2015-09-07 17:46:41 +00:00
// Monitor leadership changes
go s . monitorLeadership ( )
// Start ingesting events for Serf
go s . serfEventHandler ( )
2017-12-07 17:07:00 +00:00
// start the RPC listener for the server
2017-11-30 19:52:13 +00:00
s . startRPCListener ( )
2015-06-03 10:26:50 +00:00
2015-07-27 22:11:42 +00:00
// Emit metrics for the eval broker
2015-08-05 23:45:50 +00:00
go evalBroker . EmitStats ( time . Second , s . shutdownCh )
2015-07-24 05:17:37 +00:00
2015-07-27 22:11:42 +00:00
// Emit metrics for the plan queue
2015-08-05 23:45:50 +00:00
go planQueue . EmitStats ( time . Second , s . shutdownCh )
2015-07-27 22:11:42 +00:00
2016-02-01 02:46:45 +00:00
// Emit metrics for the blocked eval tracker.
go blockedEvals . EmitStats ( time . Second , s . shutdownCh )
2017-02-15 00:02:18 +00:00
// Emit metrics for the Vault client.
go s . vault . EmitStats ( time . Second , s . shutdownCh )
2015-08-23 00:17:13 +00:00
// Emit metrics
go s . heartbeatStats ( )
2017-09-19 14:47:10 +00:00
// Start enterprise background workers
s . startEnterpriseBackground ( )
2015-06-01 15:49:10 +00:00
// Done
return s , nil
}
2017-12-07 17:07:00 +00:00
// startRPCListener starts the server's the RPC listener
2017-11-30 19:52:13 +00:00
func ( s * Server ) startRPCListener ( ) {
ctx , cancel := context . WithCancel ( context . Background ( ) )
s . rpcCancel = cancel
go func ( ) {
defer close ( s . listenerCh )
s . listen ( ctx )
} ( )
}
2017-12-07 17:07:00 +00:00
// createRPCListener creates the server's RPC listener
2018-01-16 16:55:11 +00:00
func ( s * Server ) createRPCListener ( ) ( * net . TCPListener , error ) {
2017-11-30 19:52:13 +00:00
s . listenerCh = make ( chan struct { } )
2018-01-16 16:55:11 +00:00
listener , err := net . ListenTCP ( "tcp" , s . config . RPCAddr )
if err != nil {
s . logger . Printf ( "[ERR] nomad: error when initializing TLS listener %s" , err )
return listener , err
2017-11-30 19:52:13 +00:00
}
2018-01-16 16:55:11 +00:00
s . rpcListener = listener
return listener , nil
2017-11-30 19:52:13 +00:00
}
2017-12-07 17:07:00 +00:00
// getTLSConf gets the server's TLS configuration based on the config supplied
// by the operator
2017-11-28 22:25:16 +00:00
func getTLSConf ( enableRPC bool , tlsConf * tlsutil . Config ) ( * tls . Config , tlsutil . RegionWrapper , error ) {
2017-11-20 15:38:46 +00:00
var tlsWrap tlsutil . RegionWrapper
var incomingTLS * tls . Config
2017-11-28 22:25:16 +00:00
if enableRPC {
2017-11-20 15:38:46 +00:00
tw , err := tlsConf . OutgoingTLSWrapper ( )
if err != nil {
2017-11-28 22:25:16 +00:00
return nil , nil , err
2017-11-20 15:38:46 +00:00
}
tlsWrap = tw
itls , err := tlsConf . IncomingTLSConfig ( )
if err != nil {
2017-11-28 22:25:16 +00:00
return nil , nil , err
2017-11-20 15:38:46 +00:00
}
incomingTLS = itls
}
2017-11-28 22:25:16 +00:00
return incomingTLS , tlsWrap , nil
}
2017-12-07 17:07:00 +00:00
// reloadTLSConnections updates a server's TLS configuration and reloads RPC
// connections.
2017-12-05 00:29:43 +00:00
func ( s * Server ) reloadTLSConnections ( newTLSConfig * config . TLSConfig ) error {
2017-11-28 22:25:16 +00:00
s . logger . Printf ( "[INFO] nomad: reloading server connections due to configuration changes" )
2018-01-16 13:02:39 +00:00
tlsConf := tlsutil . NewTLSConfiguration ( newTLSConfig )
2017-11-29 22:22:21 +00:00
incomingTLS , tlsWrap , err := getTLSConf ( newTLSConfig . EnableRPC , tlsConf )
2017-11-29 17:54:05 +00:00
if err != nil {
2018-01-16 16:55:11 +00:00
s . logger . Printf ( "[ERR] nomad: unable to reset TLS context %s" , err )
2017-11-29 17:54:05 +00:00
return err
}
2017-11-20 15:38:46 +00:00
2018-02-15 23:03:12 +00:00
// Store the new tls wrapper.
s . tlsWrapLock . Lock ( )
s . tlsWrap = tlsWrap
s . tlsWrapLock . Unlock ( )
2017-11-20 15:38:46 +00:00
if s . rpcCancel == nil {
2018-01-16 16:55:11 +00:00
err = fmt . Errorf ( "No existing RPC server to reset." )
s . logger . Printf ( "[ERR] nomad: %s" , err )
return err
2017-11-20 15:38:46 +00:00
}
2017-11-29 17:54:05 +00:00
2017-11-28 17:33:46 +00:00
s . rpcCancel ( )
2017-11-20 15:38:46 +00:00
2018-01-16 16:55:11 +00:00
// Keeping configuration in sync is important for other places that require
// access to config information, such as rpc.go, where we decide on what kind
// of network connections to accept depending on the server configuration
2017-11-29 22:22:21 +00:00
s . config . TLSConfig = newTLSConfig
2017-11-20 15:38:46 +00:00
s . rpcTLS = incomingTLS
s . connPool . ReloadTLS ( tlsWrap )
// reinitialize our rpc listener
s . rpcListener . Close ( )
2017-11-30 19:52:13 +00:00
<- s . listenerCh
2018-01-16 19:16:35 +00:00
s . startRPCListener ( )
2017-11-28 17:33:46 +00:00
2018-01-16 16:55:11 +00:00
listener , err := s . createRPCListener ( )
if err != nil {
listener . Close ( )
return err
}
2017-11-20 15:38:46 +00:00
2018-01-16 19:16:35 +00:00
// Close and reload existing Raft connections
2017-11-27 16:42:52 +00:00
wrapper := tlsutil . RegionSpecificWrapper ( s . config . Region , tlsWrap )
2018-01-19 10:12:14 +00:00
s . raftLayer . ReloadTLS ( wrapper )
s . raftTransport . CloseStreams ( )
2017-11-27 16:42:52 +00:00
2017-11-30 15:50:43 +00:00
s . logger . Printf ( "[DEBUG] nomad: finished reloading server connections" )
2017-11-20 15:38:46 +00:00
return nil
}
2015-06-01 15:49:10 +00:00
// Shutdown is used to shutdown the server
func ( s * Server ) Shutdown ( ) error {
s . logger . Printf ( "[INFO] nomad: shutting down server" )
s . shutdownLock . Lock ( )
defer s . shutdownLock . Unlock ( )
if s . shutdown {
return nil
}
s . shutdown = true
close ( s . shutdownCh )
2015-06-05 21:54:45 +00:00
if s . serf != nil {
s . serf . Shutdown ( )
}
2015-06-01 15:49:10 +00:00
if s . raft != nil {
s . raftTransport . Close ( )
s . raftLayer . Close ( )
future := s . raft . Shutdown ( )
if err := future . Error ( ) ; err != nil {
s . logger . Printf ( "[WARN] nomad: Error shutting down raft: %s" , err )
}
2015-06-03 09:26:49 +00:00
if s . raftStore != nil {
s . raftStore . Close ( )
}
2015-06-01 15:49:10 +00:00
}
2015-06-03 10:26:50 +00:00
// Shutdown the RPC listener
if s . rpcListener != nil {
s . rpcListener . Close ( )
}
2015-06-07 18:50:29 +00:00
// Close the connection pool
s . connPool . Shutdown ( )
2015-06-01 15:49:10 +00:00
// Close the fsm
if s . fsm != nil {
s . fsm . Close ( )
}
2016-08-10 20:20:13 +00:00
// Stop Vault token renewal
2016-08-11 20:04:56 +00:00
if s . vault != nil {
s . vault . Stop ( )
}
2016-08-10 20:20:13 +00:00
2015-06-01 15:49:10 +00:00
return nil
}
2015-07-28 22:12:08 +00:00
// IsShutdown checks if the server is shutdown
func ( s * Server ) IsShutdown ( ) bool {
select {
case <- s . shutdownCh :
return true
default :
return false
}
}
2015-06-03 11:25:50 +00:00
// Leave is used to prepare for a graceful shutdown of the server
func ( s * Server ) Leave ( ) error {
s . logger . Printf ( "[INFO] nomad: server starting leave" )
s . left = true
// Check the number of known peers
2017-02-02 21:52:31 +00:00
numPeers , err := s . numPeers ( )
2015-06-03 11:25:50 +00:00
if err != nil {
s . logger . Printf ( "[ERR] nomad: failed to check raft peers: %v" , err )
return err
}
2017-02-03 00:07:15 +00:00
addr := s . raftTransport . LocalAddr ( )
2015-06-03 11:25:50 +00:00
// If we are the current leader, and we have any other peers (cluster has multiple
// servers), we should do a RemovePeer to safely reduce the quorum size. If we are
// not the leader, then we should issue our leave intention and wait to be removed
// for some sane period of time.
isLeader := s . IsLeader ( )
2017-02-02 21:52:31 +00:00
if isLeader && numPeers > 1 {
2018-01-16 21:35:32 +00:00
minRaftProtocol , err := s . autopilot . MinRaftProtocol ( )
if err != nil {
return err
}
if minRaftProtocol >= 2 && s . config . RaftConfig . ProtocolVersion >= 3 {
future := s . raft . RemoveServer ( raft . ServerID ( s . config . NodeID ) , 0 , 0 )
if err := future . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] nomad: failed to remove ourself as raft peer: %v" , err )
}
} else {
future := s . raft . RemovePeer ( addr )
if err := future . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] nomad: failed to remove ourself as raft peer: %v" , err )
}
2015-06-03 11:25:50 +00:00
}
}
// Leave the gossip pool
if s . serf != nil {
if err := s . serf . Leave ( ) ; err != nil {
s . logger . Printf ( "[ERR] nomad: failed to leave Serf cluster: %v" , err )
}
}
// If we were not leader, wait to be safely removed from the cluster.
// We must wait to allow the raft replication to take place, otherwise
// an immediate shutdown could cause a loss of quorum.
if ! isLeader {
2017-02-03 00:07:15 +00:00
left := false
2015-06-03 11:25:50 +00:00
limit := time . Now ( ) . Add ( raftRemoveGracePeriod )
2017-02-03 00:07:15 +00:00
for ! left && time . Now ( ) . Before ( limit ) {
// Sleep a while before we check.
time . Sleep ( 50 * time . Millisecond )
2015-06-03 11:25:50 +00:00
2017-02-03 00:07:15 +00:00
// Get the latest configuration.
future := s . raft . GetConfiguration ( )
if err := future . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] nomad: failed to get raft configuration: %v" , err )
2015-06-03 11:25:50 +00:00
break
}
2017-02-03 00:07:15 +00:00
// See if we are no longer included.
left = true
for _ , server := range future . Configuration ( ) . Servers {
if server . Address == addr {
left = false
break
}
}
2015-06-03 11:25:50 +00:00
}
2017-02-03 00:07:15 +00:00
// TODO (alexdadgar) With the old Raft library we used to force the
// peers set to empty when a graceful leave occurred. This would
// keep voting spam down if the server was restarted, but it was
// dangerous because the peers was inconsistent with the logs and
// snapshots, so it wasn't really safe in all cases for the server
// to become leader. This is now safe, but the log spam is noisy.
// The next new version of the library will have a "you are not a
// peer stop it" behavior that should address this. We will have
// to evaluate during the RC period if this interim situation is
// not too confusing for operators.
// TODO (alexdadgar) When we take a later new version of the Raft
// library it won't try to complete replication, so this peer
// may not realize that it has been removed. Need to revisit this
// and the warning here.
if ! left {
s . logger . Printf ( "[WARN] nomad: failed to leave raft configuration gracefully, timeout" )
2015-06-03 11:25:50 +00:00
}
}
return nil
}
2017-11-20 15:38:46 +00:00
// Reload handles a config reload specific to server-only configuration. Not
// all config fields can handle a reload.
func ( s * Server ) Reload ( newConfig * Config ) error {
if newConfig == nil {
2017-02-01 22:20:14 +00:00
return fmt . Errorf ( "Reload given a nil config" )
}
var mErr multierror . Error
// Handle the Vault reload. Vault should never be nil but just guard.
if s . vault != nil {
2017-11-20 15:38:46 +00:00
if err := s . vault . SetConfig ( newConfig . VaultConfig ) ; err != nil {
2017-02-01 22:20:14 +00:00
multierror . Append ( & mErr , err )
}
}
2017-12-05 00:29:43 +00:00
if ! newConfig . TLSConfig . Equals ( s . config . TLSConfig ) {
if err := s . reloadTLSConnections ( newConfig . TLSConfig ) ; err != nil {
2018-01-16 16:55:11 +00:00
s . logger . Printf ( "[ERR] nomad: error reloading server TLS configuration: %s" , err )
2017-02-01 22:20:14 +00:00
multierror . Append ( & mErr , err )
}
}
return mErr . ErrorOrNil ( )
}
2016-06-16 19:14:03 +00:00
// setupBootstrapHandler() creates the closure necessary to support a Consul
// fallback handler.
func ( s * Server ) setupBootstrapHandler ( ) error {
// peersTimeout is used to indicate to the Consul Syncer that the
// current Nomad Server has a stale peer set. peersTimeout will time
// out if the Consul Syncer bootstrapFn has not observed a Raft
// leader in maxStaleLeadership. If peersTimeout has been triggered,
// the Consul Syncer will begin querying Consul for other Nomad
// Servers.
//
// NOTE: time.Timer is used vs time.Time in order to handle clock
// drift because time.Timer is implemented as a monotonic clock.
var peersTimeout * time . Timer = time . NewTimer ( 0 )
2016-06-16 21:27:10 +00:00
// consulQueryCount is the number of times the bootstrapFn has been
// called, regardless of success.
var consulQueryCount uint64
2016-06-16 19:14:03 +00:00
// leadershipTimedOut is a helper method that returns true if the
// peersTimeout timer has expired.
leadershipTimedOut := func ( ) bool {
select {
case <- peersTimeout . C :
return true
default :
return false
}
}
2016-06-14 05:58:39 +00:00
// The bootstrapFn callback handler is used to periodically poll
// Consul to look up the Nomad Servers in Consul. In the event the
// server has been brought up without a `retry-join` configuration
// and this Server is partitioned from the rest of the cluster,
// periodically poll Consul to reattach this Server to other servers
// in the same region and automatically reform a quorum (assuming the
// correct number of servers required for quorum are present).
bootstrapFn := func ( ) error {
2016-06-14 07:07:04 +00:00
// If there is a raft leader, do nothing
if s . raft . Leader ( ) != "" {
2016-06-16 19:14:03 +00:00
peersTimeout . Reset ( maxStaleLeadership )
2016-06-14 07:07:04 +00:00
return nil
}
2016-06-16 19:14:03 +00:00
// (ab)use serf.go's behavior of setting BootstrapExpect to
// zero if we have bootstrapped. If we have bootstrapped
bootstrapExpect := atomic . LoadInt32 ( & s . config . BootstrapExpect )
if bootstrapExpect == 0 {
// This Nomad Server has been bootstrapped. Rely on
2016-06-16 21:27:10 +00:00
// the peersTimeout firing as a guard to prevent
// aggressive querying of Consul.
2016-06-16 19:14:03 +00:00
if ! leadershipTimedOut ( ) {
return nil
}
} else {
2016-06-16 21:27:10 +00:00
if consulQueryCount > 0 && ! leadershipTimedOut ( ) {
return nil
}
2016-06-16 19:14:03 +00:00
// This Nomad Server has not been bootstrapped, reach
// out to Consul if our peer list is less than
// `bootstrap_expect`.
2017-02-02 21:52:31 +00:00
raftPeers , err := s . numPeers ( )
2016-06-16 19:14:03 +00:00
if err != nil {
peersTimeout . Reset ( peersPollInterval + lib . RandomStagger ( peersPollInterval / peersPollJitterFactor ) )
return nil
}
// The necessary number of Nomad Servers required for
// quorum has been reached, we do not need to poll
// Consul. Let the normal timeout-based strategy
// take over.
2017-02-02 23:31:36 +00:00
if raftPeers >= int ( bootstrapExpect ) {
2016-06-16 19:14:03 +00:00
peersTimeout . Reset ( peersPollInterval + lib . RandomStagger ( peersPollInterval / peersPollJitterFactor ) )
return nil
}
2016-06-14 05:58:39 +00:00
}
2016-06-16 21:27:10 +00:00
consulQueryCount ++
2016-06-14 05:58:39 +00:00
2017-02-03 00:07:15 +00:00
s . logger . Printf ( "[DEBUG] server.nomad: lost contact with Nomad quorum, falling back to Consul for server list" )
2016-06-14 05:58:39 +00:00
2017-02-01 00:43:57 +00:00
dcs , err := s . consulCatalog . Datacenters ( )
2016-06-14 05:58:39 +00:00
if err != nil {
2016-06-16 19:14:03 +00:00
peersTimeout . Reset ( peersPollInterval + lib . RandomStagger ( peersPollInterval / peersPollJitterFactor ) )
2017-02-03 00:07:15 +00:00
return fmt . Errorf ( "server.nomad: unable to query Consul datacenters: %v" , err )
2016-06-14 05:58:39 +00:00
}
if len ( dcs ) > 2 {
// Query the local DC first, then shuffle the
// remaining DCs. If additional calls to bootstrapFn
// are necessary, this Nomad Server will eventually
// walk all datacenter until it finds enough hosts to
// form a quorum.
2016-06-21 21:26:01 +00:00
shuffleStrings ( dcs [ 1 : ] )
dcs = dcs [ 0 : lib . MinInt ( len ( dcs ) , datacenterQueryLimit ) ]
2016-06-14 05:58:39 +00:00
}
nomadServerServiceName := s . config . ConsulConfig . ServerServiceName
var mErr multierror . Error
const defaultMaxNumNomadServers = 8
nomadServerServices := make ( [ ] string , 0 , defaultMaxNumNomadServers )
2016-06-16 21:27:10 +00:00
localNode := s . serf . Memberlist ( ) . LocalNode ( )
2016-06-14 05:58:39 +00:00
for _ , dc := range dcs {
2016-06-16 19:14:03 +00:00
consulOpts := & consulapi . QueryOptions {
2016-06-14 05:58:39 +00:00
AllowStale : true ,
Datacenter : dc ,
Near : "_agent" ,
WaitTime : consul . DefaultQueryWaitDuration ,
}
2017-02-01 00:43:57 +00:00
consulServices , _ , err := s . consulCatalog . Service ( nomadServerServiceName , consul . ServiceTagSerf , consulOpts )
2016-06-14 05:58:39 +00:00
if err != nil {
2016-06-16 21:40:09 +00:00
err := fmt . Errorf ( "failed to query service %q in Consul datacenter %q: %v" , nomadServerServiceName , dc , err )
2017-02-03 00:07:15 +00:00
s . logger . Printf ( "[WARN] server.nomad: %v" , err )
2016-06-16 21:40:09 +00:00
mErr . Errors = append ( mErr . Errors , err )
2016-06-14 05:58:39 +00:00
continue
}
for _ , cs := range consulServices {
port := strconv . FormatInt ( int64 ( cs . ServicePort ) , 10 )
addr := cs . ServiceAddress
if addr == "" {
addr = cs . Address
}
2016-06-16 21:27:10 +00:00
if localNode . Addr . String ( ) == addr && int ( localNode . Port ) == cs . ServicePort {
continue
}
2016-06-14 05:58:39 +00:00
serverAddr := net . JoinHostPort ( addr , port )
nomadServerServices = append ( nomadServerServices , serverAddr )
}
}
2016-06-14 07:07:04 +00:00
2016-06-14 05:58:39 +00:00
if len ( nomadServerServices ) == 0 {
if len ( mErr . Errors ) > 0 {
2016-06-16 19:14:03 +00:00
peersTimeout . Reset ( peersPollInterval + lib . RandomStagger ( peersPollInterval / peersPollJitterFactor ) )
2016-06-14 05:58:39 +00:00
return mErr . ErrorOrNil ( )
}
2016-06-14 07:07:04 +00:00
// Log the error and return nil so future handlers
// can attempt to register the `nomad` service.
2016-06-16 21:27:10 +00:00
pollInterval := peersPollInterval + lib . RandomStagger ( peersPollInterval / peersPollJitterFactor )
2017-02-03 00:07:15 +00:00
s . logger . Printf ( "[TRACE] server.nomad: no Nomad Servers advertising service %+q in Consul datacenters %+q, sleeping for %v" , nomadServerServiceName , dcs , pollInterval )
2016-06-16 21:27:10 +00:00
peersTimeout . Reset ( pollInterval )
2016-06-14 07:07:04 +00:00
return nil
2016-06-14 05:58:39 +00:00
}
2016-06-14 07:07:04 +00:00
2016-06-14 05:58:39 +00:00
numServersContacted , err := s . Join ( nomadServerServices )
if err != nil {
2016-06-16 19:14:03 +00:00
peersTimeout . Reset ( peersPollInterval + lib . RandomStagger ( peersPollInterval / peersPollJitterFactor ) )
2016-06-14 05:58:39 +00:00
return fmt . Errorf ( "contacted %d Nomad Servers: %v" , numServersContacted , err )
}
2016-06-16 19:14:03 +00:00
peersTimeout . Reset ( maxStaleLeadership )
2017-02-03 00:07:15 +00:00
s . logger . Printf ( "[INFO] server.nomad: successfully contacted %d Nomad Servers" , numServersContacted )
2016-06-14 05:58:39 +00:00
return nil
}
2016-06-16 19:14:03 +00:00
2017-02-01 00:43:57 +00:00
// Hacky replacement for old ConsulSyncer Periodic Handler.
go func ( ) {
lastOk := true
sync := time . NewTimer ( 0 )
for {
select {
case <- sync . C :
d := defaultConsulDiscoveryInterval
if err := bootstrapFn ( ) ; err != nil {
// Only log if it worked last time
if lastOk {
lastOk = false
s . logger . Printf ( "[ERR] consul: error looking up Nomad servers: %v" , err )
}
d = defaultConsulDiscoveryIntervalRetry
}
sync . Reset ( d )
case <- s . shutdownCh :
return
}
}
} ( )
2016-06-16 19:14:03 +00:00
return nil
}
// setupConsulSyncer creates Server-mode consul.Syncer which periodically
// executes callbacks on a fixed interval.
func ( s * Server ) setupConsulSyncer ( ) error {
2017-01-18 23:55:14 +00:00
if s . config . ConsulConfig . ServerAutoJoin != nil && * s . config . ConsulConfig . ServerAutoJoin {
2016-06-16 19:14:03 +00:00
if err := s . setupBootstrapHandler ( ) ; err != nil {
2016-06-16 21:40:09 +00:00
return err
2016-06-16 19:14:03 +00:00
}
2016-06-14 22:27:15 +00:00
}
2016-06-14 05:58:39 +00:00
2016-06-16 21:40:09 +00:00
return nil
2016-08-10 20:20:13 +00:00
}
2017-06-28 22:35:52 +00:00
// setupDeploymentWatcher creates a deployment watcher that consumes the RPC
2018-03-11 19:06:05 +00:00
// endpoints for state information and makes transitions via Raft through a
2017-06-28 22:35:52 +00:00
// shim that provides the appropriate methods.
func ( s * Server ) setupDeploymentWatcher ( ) error {
2017-08-31 00:45:32 +00:00
// Create the raft shim type to restrict the set of raft methods that can be
// made
2017-06-28 22:35:52 +00:00
raftShim := & deploymentWatcherRaftShim {
apply : s . raftApply ,
}
2017-07-03 18:26:45 +00:00
// Create the deployment watcher
s . deploymentWatcher = deploymentwatcher . NewDeploymentsWatcher (
2017-08-31 00:45:32 +00:00
s . logger , raftShim ,
2017-07-03 18:26:45 +00:00
deploymentwatcher . LimitStateQueriesPerSecond ,
2017-07-03 19:05:01 +00:00
deploymentwatcher . CrossDeploymentEvalBatchDuration )
2017-07-03 18:26:45 +00:00
2017-06-28 22:35:52 +00:00
return nil
}
2016-08-10 20:20:13 +00:00
// setupVaultClient is used to set up the Vault API client.
func ( s * Server ) setupVaultClient ( ) error {
2016-08-22 20:57:27 +00:00
v , err := NewVaultClient ( s . config . VaultConfig , s . logger , s . purgeVaultAccessors )
2016-08-10 20:20:13 +00:00
if err != nil {
return err
}
s . vault = v
return nil
2016-06-14 05:58:39 +00:00
}
2015-06-03 10:26:50 +00:00
// setupRPC is used to setup the RPC listener
2016-11-01 18:55:29 +00:00
func ( s * Server ) setupRPC ( tlsWrap tlsutil . RegionWrapper ) error {
2018-01-03 22:59:52 +00:00
// Populate the static RPC server
2018-01-04 00:00:55 +00:00
s . setupRpcServer ( s . rpcServer , nil )
2015-06-03 10:26:50 +00:00
2018-01-16 16:55:11 +00:00
listener , err := s . createRPCListener ( )
2015-06-03 10:26:50 +00:00
if err != nil {
2018-01-16 16:55:11 +00:00
listener . Close ( )
2015-06-03 10:26:50 +00:00
return err
}
if s . config . RPCAdvertise != nil {
2015-06-03 10:58:00 +00:00
s . rpcAdvertise = s . config . RPCAdvertise
2015-06-03 10:26:50 +00:00
} else {
2015-06-03 10:58:00 +00:00
s . rpcAdvertise = s . rpcListener . Addr ( )
2015-06-03 10:26:50 +00:00
}
// Verify that we have a usable advertise address
2015-06-03 10:58:00 +00:00
addr , ok := s . rpcAdvertise . ( * net . TCPAddr )
2015-06-03 10:26:50 +00:00
if ! ok {
2018-01-16 16:55:11 +00:00
listener . Close ( )
2015-06-03 10:26:50 +00:00
return fmt . Errorf ( "RPC advertise address is not a TCP Address: %v" , addr )
}
if addr . IP . IsUnspecified ( ) {
2018-01-16 16:55:11 +00:00
listener . Close ( )
2015-06-03 10:26:50 +00:00
return fmt . Errorf ( "RPC advertise address is not advertisable: %v" , addr )
}
2016-11-01 18:55:29 +00:00
wrapper := tlsutil . RegionSpecificWrapper ( s . config . Region , tlsWrap )
s . raftLayer = NewRaftLayer ( s . rpcAdvertise , wrapper )
2015-06-03 10:26:50 +00:00
return nil
}
2018-01-03 22:59:52 +00:00
// setupRpcServer is used to populate an RPC server with endpoints
2018-01-04 00:00:55 +00:00
func ( s * Server ) setupRpcServer ( server * rpc . Server , ctx * RPCContext ) {
2018-01-03 22:59:52 +00:00
// Add the static endpoints to the RPC server.
if s . staticEndpoints . Status == nil {
// Initialize the list just once
s . staticEndpoints . ACL = & ACL { s }
s . staticEndpoints . Alloc = & Alloc { s }
s . staticEndpoints . Eval = & Eval { s }
s . staticEndpoints . Job = & Job { s }
2018-01-05 21:50:04 +00:00
s . staticEndpoints . Node = & Node { srv : s } // Add but don't register
2018-01-03 22:59:52 +00:00
s . staticEndpoints . Deployment = & Deployment { srv : s }
s . staticEndpoints . Operator = & Operator { s }
s . staticEndpoints . Periodic = & Periodic { s }
s . staticEndpoints . Plan = & Plan { s }
s . staticEndpoints . Region = & Region { s }
s . staticEndpoints . Status = & Status { s }
s . staticEndpoints . System = & System { s }
s . staticEndpoints . Search = & Search { s }
s . staticEndpoints . Enterprise = NewEnterpriseEndpoints ( s )
2018-01-11 23:58:59 +00:00
// Client endpoints
s . staticEndpoints . ClientStats = & ClientStats { s }
2018-02-06 01:20:42 +00:00
s . staticEndpoints . ClientAllocations = & ClientAllocations { s }
2018-01-22 01:09:20 +00:00
// Streaming endpoints
s . staticEndpoints . FileSystem = & FileSystem { s }
2018-02-01 19:28:52 +00:00
s . staticEndpoints . FileSystem . register ( )
2018-01-03 22:59:52 +00:00
}
// Register the static handlers
server . Register ( s . staticEndpoints . ACL )
server . Register ( s . staticEndpoints . Alloc )
server . Register ( s . staticEndpoints . Eval )
server . Register ( s . staticEndpoints . Job )
server . Register ( s . staticEndpoints . Deployment )
server . Register ( s . staticEndpoints . Operator )
server . Register ( s . staticEndpoints . Periodic )
server . Register ( s . staticEndpoints . Plan )
server . Register ( s . staticEndpoints . Region )
server . Register ( s . staticEndpoints . Status )
server . Register ( s . staticEndpoints . System )
server . Register ( s . staticEndpoints . Search )
s . staticEndpoints . Enterprise . Register ( server )
2018-01-11 23:58:59 +00:00
server . Register ( s . staticEndpoints . ClientStats )
2018-02-06 01:20:42 +00:00
server . Register ( s . staticEndpoints . ClientAllocations )
2018-02-01 19:28:52 +00:00
server . Register ( s . staticEndpoints . FileSystem )
2018-01-03 22:59:52 +00:00
// Create new dynamic endpoints and add them to the RPC server.
2018-01-05 21:50:04 +00:00
node := & Node { srv : s , ctx : ctx }
// Register the dynamic endpoints
server . Register ( node )
2018-01-03 22:59:52 +00:00
}
2015-06-01 15:49:10 +00:00
// setupRaft is used to setup and initialize Raft
func ( s * Server ) setupRaft ( ) error {
2017-02-02 23:31:36 +00:00
// If we have an unclean exit then attempt to close the Raft store.
defer func ( ) {
if s . raft == nil && s . raftStore != nil {
if err := s . raftStore . Close ( ) ; err != nil {
2017-02-03 00:07:15 +00:00
s . logger . Printf ( "[ERR] nomad: failed to close Raft store: %v" , err )
2017-02-02 23:31:36 +00:00
}
}
} ( )
2015-06-01 15:49:10 +00:00
// Create the FSM
2017-10-13 21:36:02 +00:00
fsmConfig := & FSMConfig {
EvalBroker : s . evalBroker ,
Periodic : s . periodicDispatcher ,
Blocked : s . blockedEvals ,
LogOutput : s . config . LogOutput ,
Region : s . Region ( ) ,
}
2015-06-01 15:49:10 +00:00
var err error
2017-10-13 21:36:02 +00:00
s . fsm , err = NewFSM ( fsmConfig )
2015-06-01 15:49:10 +00:00
if err != nil {
return err
}
2015-06-01 19:11:40 +00:00
// Create a transport layer
2015-08-26 00:36:52 +00:00
trans := raft . NewNetworkTransport ( s . raftLayer , 3 , s . config . RaftTimeout ,
2015-06-01 19:11:40 +00:00
s . config . LogOutput )
s . raftTransport = trans
2015-06-01 15:49:10 +00:00
2017-02-02 23:31:36 +00:00
// Make sure we set the LogOutput.
s . config . RaftConfig . LogOutput = s . config . LogOutput
// Our version of Raft protocol requires the LocalID to match the network
// address of the transport.
s . config . RaftConfig . LocalID = raft . ServerID ( trans . LocalAddr ( ) )
2017-11-22 00:29:11 +00:00
if s . config . RaftConfig . ProtocolVersion >= 3 {
s . config . RaftConfig . LocalID = raft . ServerID ( s . config . NodeID )
}
2017-02-02 23:31:36 +00:00
// Build an all in-memory setup for dev mode, otherwise prepare a full
// disk-based setup.
2015-06-01 19:11:40 +00:00
var log raft . LogStore
var stable raft . StableStore
var snap raft . SnapshotStore
if s . config . DevMode {
store := raft . NewInmemStore ( )
s . raftInmem = store
stable = store
log = store
snap = raft . NewDiscardSnapshotStore ( )
} else {
// Create the base raft path
path := filepath . Join ( s . config . DataDir , raftState )
if err := ensurePath ( path , true ) ; err != nil {
return err
}
2015-06-01 15:49:10 +00:00
2015-06-01 19:11:40 +00:00
// Create the BoltDB backend
store , err := raftboltdb . NewBoltStore ( filepath . Join ( path , "raft.db" ) )
if err != nil {
return err
}
s . raftStore = store
stable = store
2015-06-01 15:49:10 +00:00
2015-06-01 19:11:40 +00:00
// Wrap the store in a LogCache to improve performance
cacheStore , err := raft . NewLogCache ( raftLogCacheSize , store )
if err != nil {
store . Close ( )
return err
}
log = cacheStore
2015-06-01 15:49:10 +00:00
2015-06-01 19:11:40 +00:00
// Create the snapshot store
snapshots , err := raft . NewFileSnapshotStore ( path , snapshotsRetained , s . config . LogOutput )
if err != nil {
if s . raftStore != nil {
s . raftStore . Close ( )
}
return err
}
snap = snapshots
2015-06-01 15:49:10 +00:00
2017-02-02 23:31:36 +00:00
// For an existing cluster being upgraded to the new version of
// Raft, we almost never want to run recovery based on the old
// peers.json file. We create a peers.info file with a helpful
// note about where peers.json went, and use that as a sentinel
// to avoid ingesting the old one that first time (if we have to
// create the peers.info file because it's not there, we also
// blow away any existing peers.json file).
peersFile := filepath . Join ( path , "peers.json" )
peersInfoFile := filepath . Join ( path , "peers.info" )
if _ , err := os . Stat ( peersInfoFile ) ; os . IsNotExist ( err ) {
if err := ioutil . WriteFile ( peersInfoFile , [ ] byte ( peersInfoContent ) , 0755 ) ; err != nil {
return fmt . Errorf ( "failed to write peers.info file: %v" , err )
}
// Blow away the peers.json file if present, since the
// peers.info sentinel wasn't there.
if _ , err := os . Stat ( peersFile ) ; err == nil {
if err := os . Remove ( peersFile ) ; err != nil {
return fmt . Errorf ( "failed to delete peers.json, please delete manually (see peers.info for details): %v" , err )
}
2017-02-03 00:07:15 +00:00
s . logger . Printf ( "[INFO] nomad: deleted peers.json file (see peers.info for details)" )
2017-02-02 23:31:36 +00:00
}
} else if _ , err := os . Stat ( peersFile ) ; err == nil {
2017-02-03 00:07:15 +00:00
s . logger . Printf ( "[INFO] nomad: found peers.json file, recovering Raft configuration..." )
2017-02-02 23:31:36 +00:00
configuration , err := raft . ReadPeersJSON ( peersFile )
if err != nil {
return fmt . Errorf ( "recovery failed to parse peers.json: %v" , err )
}
2017-10-13 21:36:02 +00:00
tmpFsm , err := NewFSM ( fsmConfig )
2017-02-02 23:31:36 +00:00
if err != nil {
return fmt . Errorf ( "recovery failed to make temp FSM: %v" , err )
}
if err := raft . RecoverCluster ( s . config . RaftConfig , tmpFsm ,
log , stable , snap , trans , configuration ) ; err != nil {
return fmt . Errorf ( "recovery failed: %v" , err )
}
if err := os . Remove ( peersFile ) ; err != nil {
return fmt . Errorf ( "recovery failed to delete peers.json, please delete manually (see peers.info for details): %v" , err )
}
2017-02-03 00:07:15 +00:00
s . logger . Printf ( "[INFO] nomad: deleted peers.json file after successful recovery" )
2017-02-02 23:31:36 +00:00
}
2015-06-01 19:11:40 +00:00
}
2015-06-01 15:49:10 +00:00
2017-02-02 23:31:36 +00:00
// If we are in bootstrap or dev mode and the state is clean then we can
// bootstrap now.
if s . config . Bootstrap || s . config . DevMode {
hasState , err := raft . HasExistingState ( log , stable , snap )
2015-06-01 15:49:10 +00:00
if err != nil {
return err
}
2017-02-02 23:31:36 +00:00
if ! hasState {
configuration := raft . Configuration {
Servers : [ ] raft . Server {
2017-09-26 22:26:33 +00:00
{
2017-11-22 00:29:11 +00:00
ID : s . config . RaftConfig . LocalID ,
2017-02-02 23:31:36 +00:00
Address : trans . LocalAddr ( ) ,
} ,
} ,
}
if err := raft . BootstrapCluster ( s . config . RaftConfig ,
log , stable , snap , trans , configuration ) ; err != nil {
return err
}
2015-06-01 15:49:10 +00:00
}
}
2015-09-07 17:46:41 +00:00
// Setup the leader channel
leaderCh := make ( chan bool , 1 )
s . config . RaftConfig . NotifyCh = leaderCh
s . leaderCh = leaderCh
2015-06-01 15:49:10 +00:00
// Setup the Raft store
2017-02-02 23:31:36 +00:00
s . raft , err = raft . NewRaft ( s . config . RaftConfig , s . fsm , log , stable , snap , trans )
2015-06-01 15:49:10 +00:00
if err != nil {
return err
}
return nil
}
2015-06-03 10:26:50 +00:00
2015-06-03 10:58:00 +00:00
// setupSerf is used to setup and initialize a Serf
func ( s * Server ) setupSerf ( conf * serf . Config , ch chan serf . Event , path string ) ( * serf . Serf , error ) {
conf . Init ( )
conf . NodeName = fmt . Sprintf ( "%s.%s" , s . config . NodeName , s . config . Region )
conf . Tags [ "role" ] = "nomad"
conf . Tags [ "region" ] = s . config . Region
conf . Tags [ "dc" ] = s . config . Datacenter
2016-05-28 01:14:34 +00:00
conf . Tags [ "vsn" ] = fmt . Sprintf ( "%d" , structs . ApiMajorVersion )
conf . Tags [ "mvn" ] = fmt . Sprintf ( "%d" , structs . ApiMinorVersion )
2015-06-03 10:58:00 +00:00
conf . Tags [ "build" ] = s . config . Build
2017-11-22 00:29:11 +00:00
conf . Tags [ "raft_vsn" ] = fmt . Sprintf ( "%d" , s . config . RaftConfig . ProtocolVersion )
conf . Tags [ "id" ] = s . config . NodeID
2017-12-18 21:16:23 +00:00
conf . Tags [ "rpc_addr" ] = s . rpcAdvertise . ( * net . TCPAddr ) . IP . String ( )
2015-06-03 11:35:48 +00:00
conf . Tags [ "port" ] = fmt . Sprintf ( "%d" , s . rpcAdvertise . ( * net . TCPAddr ) . Port )
2015-09-07 17:46:41 +00:00
if s . config . Bootstrap || ( s . config . DevMode && ! s . config . DevDisableBootstrap ) {
2015-06-03 10:58:00 +00:00
conf . Tags [ "bootstrap" ] = "1"
}
2016-06-16 19:00:15 +00:00
bootstrapExpect := atomic . LoadInt32 ( & s . config . BootstrapExpect )
if bootstrapExpect != 0 {
conf . Tags [ "expect" ] = fmt . Sprintf ( "%d" , bootstrapExpect )
2015-06-03 10:58:00 +00:00
}
2017-12-18 21:16:23 +00:00
if s . config . NonVoter {
conf . Tags [ "nonvoter" ] = "1"
}
2018-01-30 03:53:34 +00:00
if s . config . RedundancyZone != "" {
conf . Tags [ AutopilotRZTag ] = s . config . RedundancyZone
}
if s . config . UpgradeVersion != "" {
conf . Tags [ AutopilotVersionTag ] = s . config . UpgradeVersion
}
2015-06-03 10:58:00 +00:00
conf . MemberlistConfig . LogOutput = s . config . LogOutput
conf . LogOutput = s . config . LogOutput
conf . EventCh = ch
2015-06-04 10:33:12 +00:00
if ! s . config . DevMode {
conf . SnapshotPath = filepath . Join ( s . config . DataDir , path )
if err := ensurePath ( conf . SnapshotPath , false ) ; err != nil {
return nil , err
}
}
2015-06-03 10:58:00 +00:00
conf . ProtocolVersion = protocolVersionMap [ s . config . ProtocolVersion ]
conf . RejoinAfterLeave = true
conf . Merge = & serfMergeDelegate { }
// Until Nomad supports this fully, we disable automatic resolution.
// When enabled, the Serf gossip may just turn off if we are the minority
// node which is rather unexpected.
conf . EnableNameConflictResolution = false
return serf . Create ( conf )
}
2015-07-28 22:12:08 +00:00
// setupWorkers is used to start the scheduling workers
func ( s * Server ) setupWorkers ( ) error {
// Check if all the schedulers are disabled
if len ( s . config . EnabledSchedulers ) == 0 || s . config . NumSchedulers == 0 {
s . logger . Printf ( "[WARN] nomad: no enabled schedulers" )
return nil
}
// Start the workers
for i := 0 ; i < s . config . NumSchedulers ; i ++ {
2015-08-23 17:53:53 +00:00
if w , err := NewWorker ( s ) ; err != nil {
2015-07-28 22:12:08 +00:00
return err
2015-08-23 17:53:53 +00:00
} else {
s . workers = append ( s . workers , w )
2015-07-28 22:12:08 +00:00
}
}
s . logger . Printf ( "[INFO] nomad: starting %d scheduling worker(s) for %v" ,
s . config . NumSchedulers , s . config . EnabledSchedulers )
return nil
}
2017-02-02 21:52:31 +00:00
// numPeers is used to check on the number of known peers, including the local
// node.
func ( s * Server ) numPeers ( ) ( int , error ) {
future := s . raft . GetConfiguration ( )
if err := future . Error ( ) ; err != nil {
2015-06-03 11:25:50 +00:00
return 0 , err
}
2017-02-02 21:52:31 +00:00
configuration := future . Configuration ( )
return len ( configuration . Servers ) , nil
2015-06-03 11:25:50 +00:00
}
2015-06-03 10:26:50 +00:00
// IsLeader checks if this server is the cluster leader
func ( s * Server ) IsLeader ( ) bool {
return s . raft . State ( ) == raft . Leader
}
2015-06-03 11:25:50 +00:00
// Join is used to have Nomad join the gossip ring
// The target address should be another node listening on the
// Serf address
func ( s * Server ) Join ( addrs [ ] string ) ( int , error ) {
return s . serf . Join ( addrs , true )
}
// LocalMember is used to return the local node
func ( c * Server ) LocalMember ( ) serf . Member {
return c . serf . LocalMember ( )
}
// Members is used to return the members of the serf cluster
func ( s * Server ) Members ( ) [ ] serf . Member {
return s . serf . Members ( )
}
// RemoveFailedNode is used to remove a failed node from the cluster
func ( s * Server ) RemoveFailedNode ( node string ) error {
return s . serf . RemoveFailedNode ( node )
}
// KeyManager returns the Serf keyring manager
func ( s * Server ) KeyManager ( ) * serf . KeyManager {
return s . serf . KeyManager ( )
}
// Encrypted determines if gossip is encrypted
func ( s * Server ) Encrypted ( ) bool {
return s . serf . EncryptionEnabled ( )
}
2015-08-29 21:22:24 +00:00
// State returns the underlying state store. This should *not*
// be used to modify state directly.
func ( s * Server ) State ( ) * state . StateStore {
return s . fsm . State ( )
}
2017-10-23 19:50:37 +00:00
// setLeaderAcl stores the given ACL token as the current leader's ACL token.
func ( s * Server ) setLeaderAcl ( token string ) {
s . leaderAclLock . Lock ( )
s . leaderAcl = token
s . leaderAclLock . Unlock ( )
}
// getLeaderAcl retrieves the leader's ACL token
func ( s * Server ) getLeaderAcl ( ) string {
s . leaderAclLock . Lock ( )
defer s . leaderAclLock . Unlock ( )
return s . leaderAcl
}
2015-11-24 05:47:11 +00:00
// Regions returns the known regions in the cluster.
func ( s * Server ) Regions ( ) [ ] string {
2015-11-24 05:49:03 +00:00
s . peerLock . RLock ( )
defer s . peerLock . RUnlock ( )
2015-11-24 05:47:11 +00:00
regions := make ( [ ] string , 0 , len ( s . peers ) )
2017-09-26 22:26:33 +00:00
for region := range s . peers {
2015-11-24 05:47:11 +00:00
regions = append ( regions , region )
}
2015-11-24 21:15:01 +00:00
sort . Strings ( regions )
2015-11-24 05:47:11 +00:00
return regions
}
2015-06-03 10:26:50 +00:00
// RPC is used to make a local RPC call
func ( s * Server ) RPC ( method string , args interface { } , reply interface { } ) error {
2018-01-11 18:17:23 +00:00
codec := & codec . InmemCodec {
Method : method ,
Args : args ,
Reply : reply ,
2015-06-03 10:26:50 +00:00
}
if err := s . rpcServer . ServeRequest ( codec ) ; err != nil {
return err
}
2018-01-11 18:17:23 +00:00
return codec . Err
2015-06-03 10:26:50 +00:00
}
2018-01-22 01:09:20 +00:00
// StreamingRpcHandler is used to make a streaming RPC call.
func ( s * Server ) StreamingRpcHandler ( method string ) ( structs . StreamingRpcHandler , error ) {
return s . streamingRpcs . GetHandler ( method )
}
2015-06-03 10:26:50 +00:00
// Stats is used to return statistics for debugging and insight
// for various sub-systems
func ( s * Server ) Stats ( ) map [ string ] map [ string ] string {
toString := func ( v uint64 ) string {
return strconv . FormatUint ( v , 10 )
}
stats := map [ string ] map [ string ] string {
2017-09-26 22:26:33 +00:00
"nomad" : {
2015-06-03 10:26:50 +00:00
"server" : "true" ,
"leader" : fmt . Sprintf ( "%v" , s . IsLeader ( ) ) ,
2017-02-03 00:07:15 +00:00
"leader_addr" : string ( s . raft . Leader ( ) ) ,
2015-06-03 10:26:50 +00:00
"bootstrap" : fmt . Sprintf ( "%v" , s . config . Bootstrap ) ,
2015-08-17 00:40:35 +00:00
"known_regions" : toString ( uint64 ( len ( s . peers ) ) ) ,
2015-06-03 10:26:50 +00:00
} ,
"raft" : s . raft . Stats ( ) ,
2015-06-03 11:08:04 +00:00
"serf" : s . serf . Stats ( ) ,
2018-01-12 21:58:44 +00:00
"runtime" : stats . RuntimeStats ( ) ,
2015-06-03 10:26:50 +00:00
}
2017-02-02 21:52:31 +00:00
2015-06-03 10:26:50 +00:00
return stats
}
2016-10-17 17:48:04 +00:00
2017-08-07 21:13:05 +00:00
// Region returns the region of the server
2016-10-17 17:48:04 +00:00
func ( s * Server ) Region ( ) string {
return s . config . Region
}
// Datacenter returns the data center of the server
func ( s * Server ) Datacenter ( ) string {
return s . config . Datacenter
}
// GetConfig returns the config of the server for testing purposes only
func ( s * Server ) GetConfig ( ) * Config {
return s . config
}
2017-02-02 23:31:36 +00:00
2017-08-21 03:51:30 +00:00
// ReplicationToken returns the token used for replication. We use a method to support
// dynamic reloading of this value later.
func ( s * Server ) ReplicationToken ( ) string {
return s . config . ReplicationToken
}
2017-02-02 23:31:36 +00:00
// peersInfoContent is used to help operators understand what happened to the
// peers.json file. This is written to a file called peers.info in the same
// location.
const peersInfoContent = `
2017-02-08 22:50:19 +00:00
As of Nomad 0.5 .5 , the peers . json file is only used for recovery
2017-02-02 23:31:36 +00:00
after an outage . It should be formatted as a JSON array containing the address
2017-08-31 13:51:04 +00:00
and port ( RPC ) of each Nomad server in the cluster , like this :
2017-02-02 23:31:36 +00:00
2017-02-08 22:50:19 +00:00
[ "10.1.0.1:4647" , "10.1.0.2:4647" , "10.1.0.3:4647" ]
2017-02-02 23:31:36 +00:00
Under normal operation , the peers . json file will not be present .
2017-02-08 22:50:19 +00:00
When Nomad starts for the first time , it will create this peers . info file and
2017-02-02 23:31:36 +00:00
delete any existing peers . json file so that recovery doesn ' t occur on the first
startup .
Once this peers . info file is present , any peers . json file will be ingested at
startup , and will set the Raft peer configuration manually to recover from an
outage . It ' s crucial that all servers in the cluster are shut down before
creating the peers . json file , and that all servers receive the same
configuration . Once the peers . json file is successfully ingested and applied , it
will be deleted .
2017-02-10 23:37:13 +00:00
Please see https : //www.nomadproject.io/guides/outage.html for more information.
2017-02-02 23:31:36 +00:00
`