2015-06-01 15:49:10 +00:00
package nomad
import (
2015-06-03 10:26:50 +00:00
"crypto/tls"
"errors"
2015-06-01 15:49:10 +00:00
"fmt"
2017-02-02 23:31:36 +00:00
"io/ioutil"
2015-06-01 15:49:10 +00:00
"log"
2015-06-03 10:26:50 +00:00
"net"
"net/rpc"
2017-02-02 23:31:36 +00:00
"os"
2015-06-01 15:49:10 +00:00
"path/filepath"
2015-06-03 10:26:50 +00:00
"reflect"
2015-11-24 21:15:01 +00:00
"sort"
2015-06-03 10:26:50 +00:00
"strconv"
2015-06-01 15:49:10 +00:00
"sync"
2016-06-16 19:00:15 +00:00
"sync/atomic"
2015-06-01 15:49:10 +00:00
"time"
2016-06-14 05:58:39 +00:00
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
2017-11-15 01:53:23 +00:00
multierror "github.com/hashicorp/go-multierror"
2017-08-19 23:49:53 +00:00
lru "github.com/hashicorp/golang-lru"
2016-06-14 05:58:39 +00:00
"github.com/hashicorp/nomad/command/agent/consul"
2016-10-25 23:05:37 +00:00
"github.com/hashicorp/nomad/helper/tlsutil"
2017-06-28 22:35:52 +00:00
"github.com/hashicorp/nomad/nomad/deploymentwatcher"
2015-08-29 21:22:24 +00:00
"github.com/hashicorp/nomad/nomad/state"
2016-05-28 01:14:34 +00:00
"github.com/hashicorp/nomad/nomad/structs"
2015-06-01 15:49:10 +00:00
"github.com/hashicorp/raft"
2017-11-15 01:53:23 +00:00
raftboltdb "github.com/hashicorp/raft-boltdb"
2015-06-03 10:58:00 +00:00
"github.com/hashicorp/serf/serf"
2015-06-01 15:49:10 +00:00
)
const (
2016-06-16 19:14:03 +00:00
// datacenterQueryLimit sets the max number of DCs that a Nomad
// Server will query to find bootstrap_expect servers.
datacenterQueryLimit = 25
// maxStaleLeadership is the maximum time we will permit this Nomad
// Server to go without seeing a valid Raft leader.
maxStaleLeadership = 15 * time . Second
// peersPollInterval is used as the polling interval between attempts
// to query Consul for Nomad Servers.
peersPollInterval = 45 * time . Second
// peersPollJitter is used to provide a slight amount of variance to
// the retry interval when querying Consul Servers
peersPollJitterFactor = 2
2016-06-14 05:58:39 +00:00
2015-06-01 15:49:10 +00:00
raftState = "raft/"
2015-06-03 10:58:00 +00:00
serfSnapshot = "serf/snapshot"
2015-06-01 15:49:10 +00:00
snapshotsRetained = 2
2015-06-07 18:50:29 +00:00
// serverRPCCache controls how long we keep an idle connection open to a server
serverRPCCache = 2 * time . Minute
// serverMaxStreams controsl how many idle streams we keep open to a server
serverMaxStreams = 64
2015-06-01 15:49:10 +00:00
// raftLogCacheSize is the maximum number of logs to cache in-memory.
2016-05-15 16:41:34 +00:00
// This is used to reduce disk I/O for the recently committed entries.
2015-06-01 15:49:10 +00:00
raftLogCacheSize = 512
2015-06-03 11:25:50 +00:00
// raftRemoveGracePeriod is how long we wait to allow a RemovePeer
// to replicate to gracefully leave the cluster.
raftRemoveGracePeriod = 5 * time . Second
2017-02-01 00:43:57 +00:00
// defaultConsulDiscoveryInterval is how often to poll Consul for new
// servers if there is no leader.
2017-04-14 00:03:07 +00:00
defaultConsulDiscoveryInterval time . Duration = 3 * time . Second
2017-02-01 00:43:57 +00:00
// defaultConsulDiscoveryIntervalRetry is how often to poll Consul for
// new servers if there is no leader and the last Consul query failed.
2017-04-14 00:03:07 +00:00
defaultConsulDiscoveryIntervalRetry time . Duration = 9 * time . Second
2017-08-19 23:49:53 +00:00
// aclCacheSize is the number of ACL objects to keep cached. ACLs have a parsing and
// construction cost, so we keep the hot objects cached to reduce the ACL token resolution time.
aclCacheSize = 512
2015-06-01 15:49:10 +00:00
)
// Server is Nomad server which manages the job queues,
// schedulers, and notification bus for agents.
type Server struct {
config * Config
logger * log . Logger
2015-06-07 18:50:29 +00:00
// Connection pool to other Nomad servers
connPool * ConnPool
2015-06-03 10:26:50 +00:00
// Endpoints holds our RPC endpoints
endpoints endpoints
2015-06-03 11:25:50 +00:00
// The raft instance is used among Nomad nodes within the
// region to protect operations that require strong consistency
2015-09-07 17:46:41 +00:00
leaderCh <- chan bool
2015-06-01 15:49:10 +00:00
raft * raft . Raft
raftLayer * RaftLayer
raftStore * raftboltdb . BoltStore
2015-06-01 19:11:40 +00:00
raftInmem * raft . InmemStore
2015-06-01 15:49:10 +00:00
raftTransport * raft . NetworkTransport
// fsm is the state machine used with Raft
fsm * nomadFSM
2015-06-03 10:26:50 +00:00
// rpcListener is used to listen for incoming connections
2015-06-03 10:58:00 +00:00
rpcListener net . Listener
rpcServer * rpc . Server
rpcAdvertise net . Addr
2015-06-03 10:26:50 +00:00
// rpcTLS is the TLS config for incoming TLS requests
rpcTLS * tls . Config
2015-06-04 10:33:12 +00:00
// peers is used to track the known Nomad servers. This is
// used for region forwarding and clustering.
2015-06-07 18:32:01 +00:00
peers map [ string ] [ ] * serverParts
2017-02-02 23:49:06 +00:00
localPeers map [ raft . ServerAddress ] * serverParts
2015-06-07 18:32:01 +00:00
peerLock sync . RWMutex
2015-06-04 10:33:12 +00:00
2015-06-03 10:58:00 +00:00
// serf is the Serf cluster containing only Nomad
// servers. This is used for multi-region federation
// and automatic clustering within regions.
serf * serf . Serf
2015-06-04 10:42:56 +00:00
// reconcileCh is used to pass events from the serf handler
// into the leader manager. Mostly used to handle when servers
// join/leave from the region.
reconcileCh chan serf . Member
2015-06-03 10:58:00 +00:00
// eventCh is used to receive events from the serf cluster
eventCh chan serf . Event
2017-06-28 22:35:52 +00:00
// BlockedEvals is used to manage evaluations that are blocked on node
// capacity changes.
blockedEvals * BlockedEvals
// deploymentWatcher is used to watch deployments and their allocations and
2017-08-07 21:13:05 +00:00
// make the required calls to continue to transition the deployment.
2017-06-28 22:35:52 +00:00
deploymentWatcher * deploymentwatcher . Watcher
2015-07-24 04:44:17 +00:00
// evalBroker is used to manage the in-progress evaluations
// that are waiting to be brokered to a sub-scheduler
evalBroker * EvalBroker
2017-06-28 22:35:52 +00:00
// periodicDispatcher is used to track and create evaluations for periodic jobs.
periodicDispatcher * PeriodicDispatch
2016-01-29 23:31:32 +00:00
2015-07-27 22:11:42 +00:00
// planQueue is used to manage the submitted allocation
// plans that are waiting to be assessed by the leader
planQueue * PlanQueue
2015-08-23 00:17:13 +00:00
// heartbeatTimers track the expiration time of each heartbeat that has
// a TTL. On expiration, the node status is updated to be 'down'.
heartbeatTimers map [ string ] * time . Timer
heartbeatTimersLock sync . Mutex
2017-02-01 00:43:57 +00:00
// consulCatalog is used for discovering other Nomad Servers via Consul
consulCatalog consul . CatalogAPI
2016-06-14 05:58:39 +00:00
2016-08-10 03:15:13 +00:00
// vault is the client for communicating with Vault.
2016-08-10 20:20:13 +00:00
vault VaultClient
2016-08-10 03:15:13 +00:00
2015-08-23 17:53:53 +00:00
// Worker used for processing
workers [ ] * Worker
2017-08-19 23:49:53 +00:00
// aclCache is used to maintain the parsed ACL objects
aclCache * lru . TwoQueueCache
2017-10-23 19:50:37 +00:00
// leaderAcl is the management ACL token that is valid when resolved by the
// current leader.
leaderAcl string
leaderAclLock sync . Mutex
2017-09-19 14:47:10 +00:00
// EnterpriseState is used to fill in state for Pro/Ent builds
EnterpriseState
2015-06-03 11:25:50 +00:00
left bool
2015-06-01 15:49:10 +00:00
shutdown bool
shutdownCh chan struct { }
shutdownLock sync . Mutex
}
2015-06-03 10:26:50 +00:00
// Holds the RPC endpoints
type endpoints struct {
2017-08-10 19:24:11 +00:00
Status * Status
Node * Node
Job * Job
Eval * Eval
Plan * Plan
Alloc * Alloc
Deployment * Deployment
Region * Region
Search * Search
Periodic * Periodic
System * System
Operator * Operator
2017-08-08 00:10:04 +00:00
ACL * ACL
2017-09-07 23:56:15 +00:00
Enterprise * EnterpriseEndpoints
2015-06-03 10:26:50 +00:00
}
2015-06-01 15:49:10 +00:00
// NewServer is used to construct a new Nomad server from the
// configuration, potentially returning an error
2017-02-01 00:43:57 +00:00
func NewServer ( config * Config , consulCatalog consul . CatalogAPI , logger * log . Logger ) ( * Server , error ) {
2015-06-01 15:49:10 +00:00
// Check the protocol version
if err := config . CheckVersion ( ) ; err != nil {
return nil , err
}
2015-07-24 04:44:17 +00:00
// Create an eval broker
2017-04-14 22:24:55 +00:00
evalBroker , err := NewEvalBroker (
config . EvalNackTimeout ,
config . EvalNackInitialReenqueueDelay ,
config . EvalNackSubsequentReenqueueDelay ,
config . EvalDeliveryLimit )
2015-07-24 04:44:17 +00:00
if err != nil {
return nil , err
}
2016-01-29 23:31:32 +00:00
// Create a new blocked eval tracker.
blockedEvals := NewBlockedEvals ( evalBroker )
2015-07-27 22:11:42 +00:00
// Create a plan queue
planQueue , err := NewPlanQueue ( )
if err != nil {
return nil , err
}
2016-10-24 05:22:00 +00:00
// Configure TLS
2016-11-01 18:55:29 +00:00
var tlsWrap tlsutil . RegionWrapper
2016-10-24 05:22:00 +00:00
var incomingTLS * tls . Config
2016-10-25 22:57:38 +00:00
if config . TLSConfig . EnableRPC {
2016-10-24 05:22:00 +00:00
tlsConf := config . tlsConfig ( )
tw , err := tlsConf . OutgoingTLSWrapper ( )
if err != nil {
return nil , err
}
tlsWrap = tw
itls , err := tlsConf . IncomingTLSConfig ( )
if err != nil {
return nil , err
}
incomingTLS = itls
}
2017-08-19 23:49:53 +00:00
// Create the ACL object cache
aclCache , err := lru . New2Q ( aclCacheSize )
if err != nil {
return nil , err
}
2015-06-01 15:49:10 +00:00
// Create the server
s := & Server {
2017-07-03 18:26:45 +00:00
config : config ,
consulCatalog : consulCatalog ,
connPool : NewPool ( config . LogOutput , serverRPCCache , serverMaxStreams , tlsWrap ) ,
logger : logger ,
rpcServer : rpc . NewServer ( ) ,
peers : make ( map [ string ] [ ] * serverParts ) ,
localPeers : make ( map [ raft . ServerAddress ] * serverParts ) ,
reconcileCh : make ( chan serf . Member , 32 ) ,
eventCh : make ( chan serf . Event , 256 ) ,
evalBroker : evalBroker ,
blockedEvals : blockedEvals ,
planQueue : planQueue ,
rpcTLS : incomingTLS ,
2017-08-19 23:49:53 +00:00
aclCache : aclCache ,
2017-07-03 18:26:45 +00:00
shutdownCh : make ( chan struct { } ) ,
2015-06-01 15:49:10 +00:00
}
2015-12-18 20:26:28 +00:00
// Create the periodic dispatcher for launching periodic jobs.
2015-12-19 01:26:05 +00:00
s . periodicDispatcher = NewPeriodicDispatch ( s . logger , s )
2015-12-18 20:26:28 +00:00
2016-08-10 20:20:13 +00:00
// Setup Vault
if err := s . setupVaultClient ( ) ; err != nil {
s . Shutdown ( )
s . logger . Printf ( "[ERR] nomad: failed to setup Vault client: %v" , err )
return nil , fmt . Errorf ( "Failed to setup Vault client: %v" , err )
}
2015-06-03 10:26:50 +00:00
// Initialize the RPC layer
2016-10-24 05:22:00 +00:00
if err := s . setupRPC ( tlsWrap ) ; err != nil {
2015-06-03 10:26:50 +00:00
s . Shutdown ( )
2016-06-17 13:44:10 +00:00
s . logger . Printf ( "[ERR] nomad: failed to start RPC layer: %s" , err )
2015-06-03 10:26:50 +00:00
return nil , fmt . Errorf ( "Failed to start RPC layer: %v" , err )
}
2015-06-01 15:49:10 +00:00
// Initialize the Raft server
if err := s . setupRaft ( ) ; err != nil {
s . Shutdown ( )
2016-06-17 13:44:10 +00:00
s . logger . Printf ( "[ERR] nomad: failed to start Raft: %s" , err )
2015-06-01 15:49:10 +00:00
return nil , fmt . Errorf ( "Failed to start Raft: %v" , err )
}
2015-06-03 10:58:00 +00:00
// Initialize the wan Serf
s . serf , err = s . setupSerf ( config . SerfConfig , s . eventCh , serfSnapshot )
if err != nil {
s . Shutdown ( )
2016-06-17 13:44:10 +00:00
s . logger . Printf ( "[ERR] nomad: failed to start serf WAN: %s" , err )
2015-06-03 10:58:00 +00:00
return nil , fmt . Errorf ( "Failed to start serf: %v" , err )
}
2016-05-15 16:41:34 +00:00
// Initialize the scheduling workers
2015-07-28 22:12:08 +00:00
if err := s . setupWorkers ( ) ; err != nil {
s . Shutdown ( )
2016-06-17 13:44:10 +00:00
s . logger . Printf ( "[ERR] nomad: failed to start workers: %s" , err )
2015-07-28 22:12:08 +00:00
return nil , fmt . Errorf ( "Failed to start workers: %v" , err )
}
2016-06-14 05:58:39 +00:00
// Setup the Consul syncer
if err := s . setupConsulSyncer ( ) ; err != nil {
2017-02-28 00:00:19 +00:00
return nil , fmt . Errorf ( "failed to create server Consul syncer: %v" , err )
2016-06-14 05:58:39 +00:00
}
2017-06-28 22:35:52 +00:00
// Setup the deployment watcher.
if err := s . setupDeploymentWatcher ( ) ; err != nil {
return nil , fmt . Errorf ( "failed to create deployment watcher: %v" , err )
}
2017-09-19 14:47:10 +00:00
// Setup the enterprise state
if err := s . setupEnterprise ( config ) ; err != nil {
return nil , err
}
2015-09-07 17:46:41 +00:00
// Monitor leadership changes
go s . monitorLeadership ( )
// Start ingesting events for Serf
go s . serfEventHandler ( )
2015-06-03 10:26:50 +00:00
// Start the RPC listeners
go s . listen ( )
2015-07-27 22:11:42 +00:00
// Emit metrics for the eval broker
2015-08-05 23:45:50 +00:00
go evalBroker . EmitStats ( time . Second , s . shutdownCh )
2015-07-24 05:17:37 +00:00
2015-07-27 22:11:42 +00:00
// Emit metrics for the plan queue
2015-08-05 23:45:50 +00:00
go planQueue . EmitStats ( time . Second , s . shutdownCh )
2015-07-27 22:11:42 +00:00
2016-02-01 02:46:45 +00:00
// Emit metrics for the blocked eval tracker.
go blockedEvals . EmitStats ( time . Second , s . shutdownCh )
2017-02-15 00:02:18 +00:00
// Emit metrics for the Vault client.
go s . vault . EmitStats ( time . Second , s . shutdownCh )
2015-08-23 00:17:13 +00:00
// Emit metrics
go s . heartbeatStats ( )
2017-09-19 14:47:10 +00:00
// Start enterprise background workers
s . startEnterpriseBackground ( )
2015-06-01 15:49:10 +00:00
// Done
return s , nil
}
// Shutdown is used to shutdown the server
func ( s * Server ) Shutdown ( ) error {
s . logger . Printf ( "[INFO] nomad: shutting down server" )
s . shutdownLock . Lock ( )
defer s . shutdownLock . Unlock ( )
if s . shutdown {
return nil
}
s . shutdown = true
close ( s . shutdownCh )
2015-06-05 21:54:45 +00:00
if s . serf != nil {
s . serf . Shutdown ( )
}
2015-06-01 15:49:10 +00:00
if s . raft != nil {
s . raftTransport . Close ( )
s . raftLayer . Close ( )
future := s . raft . Shutdown ( )
if err := future . Error ( ) ; err != nil {
s . logger . Printf ( "[WARN] nomad: Error shutting down raft: %s" , err )
}
2015-06-03 09:26:49 +00:00
if s . raftStore != nil {
s . raftStore . Close ( )
}
2015-06-01 15:49:10 +00:00
}
2015-06-03 10:26:50 +00:00
// Shutdown the RPC listener
if s . rpcListener != nil {
s . rpcListener . Close ( )
}
2015-06-07 18:50:29 +00:00
// Close the connection pool
s . connPool . Shutdown ( )
2015-06-01 15:49:10 +00:00
// Close the fsm
if s . fsm != nil {
s . fsm . Close ( )
}
2016-08-10 20:20:13 +00:00
// Stop Vault token renewal
2016-08-11 20:04:56 +00:00
if s . vault != nil {
s . vault . Stop ( )
}
2016-08-10 20:20:13 +00:00
2015-06-01 15:49:10 +00:00
return nil
}
2015-07-28 22:12:08 +00:00
// IsShutdown checks if the server is shutdown
func ( s * Server ) IsShutdown ( ) bool {
select {
case <- s . shutdownCh :
return true
default :
return false
}
}
2015-06-03 11:25:50 +00:00
// Leave is used to prepare for a graceful shutdown of the server
func ( s * Server ) Leave ( ) error {
s . logger . Printf ( "[INFO] nomad: server starting leave" )
s . left = true
// Check the number of known peers
2017-02-02 21:52:31 +00:00
numPeers , err := s . numPeers ( )
2015-06-03 11:25:50 +00:00
if err != nil {
s . logger . Printf ( "[ERR] nomad: failed to check raft peers: %v" , err )
return err
}
2017-02-03 00:07:15 +00:00
// TODO (alexdadgar) - This will need to be updated once we support node
// IDs.
addr := s . raftTransport . LocalAddr ( )
2015-06-03 11:25:50 +00:00
// If we are the current leader, and we have any other peers (cluster has multiple
// servers), we should do a RemovePeer to safely reduce the quorum size. If we are
// not the leader, then we should issue our leave intention and wait to be removed
// for some sane period of time.
isLeader := s . IsLeader ( )
2017-02-02 21:52:31 +00:00
if isLeader && numPeers > 1 {
2017-02-03 00:07:15 +00:00
future := s . raft . RemovePeer ( addr )
if err := future . Error ( ) ; err != nil {
2015-06-03 11:25:50 +00:00
s . logger . Printf ( "[ERR] nomad: failed to remove ourself as raft peer: %v" , err )
}
}
// Leave the gossip pool
if s . serf != nil {
if err := s . serf . Leave ( ) ; err != nil {
s . logger . Printf ( "[ERR] nomad: failed to leave Serf cluster: %v" , err )
}
}
// If we were not leader, wait to be safely removed from the cluster.
// We must wait to allow the raft replication to take place, otherwise
// an immediate shutdown could cause a loss of quorum.
if ! isLeader {
2017-02-03 00:07:15 +00:00
left := false
2015-06-03 11:25:50 +00:00
limit := time . Now ( ) . Add ( raftRemoveGracePeriod )
2017-02-03 00:07:15 +00:00
for ! left && time . Now ( ) . Before ( limit ) {
// Sleep a while before we check.
time . Sleep ( 50 * time . Millisecond )
2015-06-03 11:25:50 +00:00
2017-02-03 00:07:15 +00:00
// Get the latest configuration.
future := s . raft . GetConfiguration ( )
if err := future . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] nomad: failed to get raft configuration: %v" , err )
2015-06-03 11:25:50 +00:00
break
}
2017-02-03 00:07:15 +00:00
// See if we are no longer included.
left = true
for _ , server := range future . Configuration ( ) . Servers {
if server . Address == addr {
left = false
break
}
}
2015-06-03 11:25:50 +00:00
}
2017-02-03 00:07:15 +00:00
// TODO (alexdadgar) With the old Raft library we used to force the
// peers set to empty when a graceful leave occurred. This would
// keep voting spam down if the server was restarted, but it was
// dangerous because the peers was inconsistent with the logs and
// snapshots, so it wasn't really safe in all cases for the server
// to become leader. This is now safe, but the log spam is noisy.
// The next new version of the library will have a "you are not a
// peer stop it" behavior that should address this. We will have
// to evaluate during the RC period if this interim situation is
// not too confusing for operators.
// TODO (alexdadgar) When we take a later new version of the Raft
// library it won't try to complete replication, so this peer
// may not realize that it has been removed. Need to revisit this
// and the warning here.
if ! left {
s . logger . Printf ( "[WARN] nomad: failed to leave raft configuration gracefully, timeout" )
2015-06-03 11:25:50 +00:00
}
}
return nil
}
2017-02-01 22:20:14 +00:00
// Reload handles a config reload. Not all config fields can handle a reload.
func ( s * Server ) Reload ( config * Config ) error {
if config == nil {
return fmt . Errorf ( "Reload given a nil config" )
}
var mErr multierror . Error
// Handle the Vault reload. Vault should never be nil but just guard.
if s . vault != nil {
if err := s . vault . SetConfig ( config . VaultConfig ) ; err != nil {
multierror . Append ( & mErr , err )
}
}
return mErr . ErrorOrNil ( )
}
2016-06-16 19:14:03 +00:00
// setupBootstrapHandler() creates the closure necessary to support a Consul
// fallback handler.
func ( s * Server ) setupBootstrapHandler ( ) error {
// peersTimeout is used to indicate to the Consul Syncer that the
// current Nomad Server has a stale peer set. peersTimeout will time
// out if the Consul Syncer bootstrapFn has not observed a Raft
// leader in maxStaleLeadership. If peersTimeout has been triggered,
// the Consul Syncer will begin querying Consul for other Nomad
// Servers.
//
// NOTE: time.Timer is used vs time.Time in order to handle clock
// drift because time.Timer is implemented as a monotonic clock.
var peersTimeout * time . Timer = time . NewTimer ( 0 )
2016-06-16 21:27:10 +00:00
// consulQueryCount is the number of times the bootstrapFn has been
// called, regardless of success.
var consulQueryCount uint64
2016-06-16 19:14:03 +00:00
// leadershipTimedOut is a helper method that returns true if the
// peersTimeout timer has expired.
leadershipTimedOut := func ( ) bool {
select {
case <- peersTimeout . C :
return true
default :
return false
}
}
2016-06-14 05:58:39 +00:00
// The bootstrapFn callback handler is used to periodically poll
// Consul to look up the Nomad Servers in Consul. In the event the
// server has been brought up without a `retry-join` configuration
// and this Server is partitioned from the rest of the cluster,
// periodically poll Consul to reattach this Server to other servers
// in the same region and automatically reform a quorum (assuming the
// correct number of servers required for quorum are present).
bootstrapFn := func ( ) error {
2016-06-14 07:07:04 +00:00
// If there is a raft leader, do nothing
if s . raft . Leader ( ) != "" {
2016-06-16 19:14:03 +00:00
peersTimeout . Reset ( maxStaleLeadership )
2016-06-14 07:07:04 +00:00
return nil
}
2016-06-16 19:14:03 +00:00
// (ab)use serf.go's behavior of setting BootstrapExpect to
// zero if we have bootstrapped. If we have bootstrapped
bootstrapExpect := atomic . LoadInt32 ( & s . config . BootstrapExpect )
if bootstrapExpect == 0 {
// This Nomad Server has been bootstrapped. Rely on
2016-06-16 21:27:10 +00:00
// the peersTimeout firing as a guard to prevent
// aggressive querying of Consul.
2016-06-16 19:14:03 +00:00
if ! leadershipTimedOut ( ) {
return nil
}
} else {
2016-06-16 21:27:10 +00:00
if consulQueryCount > 0 && ! leadershipTimedOut ( ) {
return nil
}
2016-06-16 19:14:03 +00:00
// This Nomad Server has not been bootstrapped, reach
// out to Consul if our peer list is less than
// `bootstrap_expect`.
2017-02-02 21:52:31 +00:00
raftPeers , err := s . numPeers ( )
2016-06-16 19:14:03 +00:00
if err != nil {
peersTimeout . Reset ( peersPollInterval + lib . RandomStagger ( peersPollInterval / peersPollJitterFactor ) )
return nil
}
// The necessary number of Nomad Servers required for
// quorum has been reached, we do not need to poll
// Consul. Let the normal timeout-based strategy
// take over.
2017-02-02 23:31:36 +00:00
if raftPeers >= int ( bootstrapExpect ) {
2016-06-16 19:14:03 +00:00
peersTimeout . Reset ( peersPollInterval + lib . RandomStagger ( peersPollInterval / peersPollJitterFactor ) )
return nil
}
2016-06-14 05:58:39 +00:00
}
2016-06-16 21:27:10 +00:00
consulQueryCount ++
2016-06-14 05:58:39 +00:00
2017-02-03 00:07:15 +00:00
s . logger . Printf ( "[DEBUG] server.nomad: lost contact with Nomad quorum, falling back to Consul for server list" )
2016-06-14 05:58:39 +00:00
2017-02-01 00:43:57 +00:00
dcs , err := s . consulCatalog . Datacenters ( )
2016-06-14 05:58:39 +00:00
if err != nil {
2016-06-16 19:14:03 +00:00
peersTimeout . Reset ( peersPollInterval + lib . RandomStagger ( peersPollInterval / peersPollJitterFactor ) )
2017-02-03 00:07:15 +00:00
return fmt . Errorf ( "server.nomad: unable to query Consul datacenters: %v" , err )
2016-06-14 05:58:39 +00:00
}
if len ( dcs ) > 2 {
// Query the local DC first, then shuffle the
// remaining DCs. If additional calls to bootstrapFn
// are necessary, this Nomad Server will eventually
// walk all datacenter until it finds enough hosts to
// form a quorum.
2016-06-21 21:26:01 +00:00
shuffleStrings ( dcs [ 1 : ] )
dcs = dcs [ 0 : lib . MinInt ( len ( dcs ) , datacenterQueryLimit ) ]
2016-06-14 05:58:39 +00:00
}
nomadServerServiceName := s . config . ConsulConfig . ServerServiceName
var mErr multierror . Error
const defaultMaxNumNomadServers = 8
nomadServerServices := make ( [ ] string , 0 , defaultMaxNumNomadServers )
2016-06-16 21:27:10 +00:00
localNode := s . serf . Memberlist ( ) . LocalNode ( )
2016-06-14 05:58:39 +00:00
for _ , dc := range dcs {
2016-06-16 19:14:03 +00:00
consulOpts := & consulapi . QueryOptions {
2016-06-14 05:58:39 +00:00
AllowStale : true ,
Datacenter : dc ,
Near : "_agent" ,
WaitTime : consul . DefaultQueryWaitDuration ,
}
2017-02-01 00:43:57 +00:00
consulServices , _ , err := s . consulCatalog . Service ( nomadServerServiceName , consul . ServiceTagSerf , consulOpts )
2016-06-14 05:58:39 +00:00
if err != nil {
2016-06-16 21:40:09 +00:00
err := fmt . Errorf ( "failed to query service %q in Consul datacenter %q: %v" , nomadServerServiceName , dc , err )
2017-02-03 00:07:15 +00:00
s . logger . Printf ( "[WARN] server.nomad: %v" , err )
2016-06-16 21:40:09 +00:00
mErr . Errors = append ( mErr . Errors , err )
2016-06-14 05:58:39 +00:00
continue
}
for _ , cs := range consulServices {
port := strconv . FormatInt ( int64 ( cs . ServicePort ) , 10 )
addr := cs . ServiceAddress
if addr == "" {
addr = cs . Address
}
2016-06-16 21:27:10 +00:00
if localNode . Addr . String ( ) == addr && int ( localNode . Port ) == cs . ServicePort {
continue
}
2016-06-14 05:58:39 +00:00
serverAddr := net . JoinHostPort ( addr , port )
nomadServerServices = append ( nomadServerServices , serverAddr )
}
}
2016-06-14 07:07:04 +00:00
2016-06-14 05:58:39 +00:00
if len ( nomadServerServices ) == 0 {
if len ( mErr . Errors ) > 0 {
2016-06-16 19:14:03 +00:00
peersTimeout . Reset ( peersPollInterval + lib . RandomStagger ( peersPollInterval / peersPollJitterFactor ) )
2016-06-14 05:58:39 +00:00
return mErr . ErrorOrNil ( )
}
2016-06-14 07:07:04 +00:00
// Log the error and return nil so future handlers
// can attempt to register the `nomad` service.
2016-06-16 21:27:10 +00:00
pollInterval := peersPollInterval + lib . RandomStagger ( peersPollInterval / peersPollJitterFactor )
2017-02-03 00:07:15 +00:00
s . logger . Printf ( "[TRACE] server.nomad: no Nomad Servers advertising service %+q in Consul datacenters %+q, sleeping for %v" , nomadServerServiceName , dcs , pollInterval )
2016-06-16 21:27:10 +00:00
peersTimeout . Reset ( pollInterval )
2016-06-14 07:07:04 +00:00
return nil
2016-06-14 05:58:39 +00:00
}
2016-06-14 07:07:04 +00:00
2016-06-14 05:58:39 +00:00
numServersContacted , err := s . Join ( nomadServerServices )
if err != nil {
2016-06-16 19:14:03 +00:00
peersTimeout . Reset ( peersPollInterval + lib . RandomStagger ( peersPollInterval / peersPollJitterFactor ) )
2016-06-14 05:58:39 +00:00
return fmt . Errorf ( "contacted %d Nomad Servers: %v" , numServersContacted , err )
}
2016-06-16 19:14:03 +00:00
peersTimeout . Reset ( maxStaleLeadership )
2017-02-03 00:07:15 +00:00
s . logger . Printf ( "[INFO] server.nomad: successfully contacted %d Nomad Servers" , numServersContacted )
2016-06-14 05:58:39 +00:00
return nil
}
2016-06-16 19:14:03 +00:00
2017-02-01 00:43:57 +00:00
// Hacky replacement for old ConsulSyncer Periodic Handler.
go func ( ) {
lastOk := true
sync := time . NewTimer ( 0 )
for {
select {
case <- sync . C :
d := defaultConsulDiscoveryInterval
if err := bootstrapFn ( ) ; err != nil {
// Only log if it worked last time
if lastOk {
lastOk = false
s . logger . Printf ( "[ERR] consul: error looking up Nomad servers: %v" , err )
}
d = defaultConsulDiscoveryIntervalRetry
}
sync . Reset ( d )
case <- s . shutdownCh :
return
}
}
} ( )
2016-06-16 19:14:03 +00:00
return nil
}
// setupConsulSyncer creates Server-mode consul.Syncer which periodically
// executes callbacks on a fixed interval.
func ( s * Server ) setupConsulSyncer ( ) error {
2017-01-18 23:55:14 +00:00
if s . config . ConsulConfig . ServerAutoJoin != nil && * s . config . ConsulConfig . ServerAutoJoin {
2016-06-16 19:14:03 +00:00
if err := s . setupBootstrapHandler ( ) ; err != nil {
2016-06-16 21:40:09 +00:00
return err
2016-06-16 19:14:03 +00:00
}
2016-06-14 22:27:15 +00:00
}
2016-06-14 05:58:39 +00:00
2016-06-16 21:40:09 +00:00
return nil
2016-08-10 20:20:13 +00:00
}
2017-06-28 22:35:52 +00:00
// setupDeploymentWatcher creates a deployment watcher that consumes the RPC
// endpoints for state information and makes transistions via Raft through a
// shim that provides the appropriate methods.
func ( s * Server ) setupDeploymentWatcher ( ) error {
2017-08-31 00:45:32 +00:00
// Create the raft shim type to restrict the set of raft methods that can be
// made
2017-06-28 22:35:52 +00:00
raftShim := & deploymentWatcherRaftShim {
apply : s . raftApply ,
}
2017-07-03 18:26:45 +00:00
// Create the deployment watcher
s . deploymentWatcher = deploymentwatcher . NewDeploymentsWatcher (
2017-08-31 00:45:32 +00:00
s . logger , raftShim ,
2017-07-03 18:26:45 +00:00
deploymentwatcher . LimitStateQueriesPerSecond ,
2017-07-03 19:05:01 +00:00
deploymentwatcher . CrossDeploymentEvalBatchDuration )
2017-07-03 18:26:45 +00:00
2017-06-28 22:35:52 +00:00
return nil
}
2016-08-10 20:20:13 +00:00
// setupVaultClient is used to set up the Vault API client.
func ( s * Server ) setupVaultClient ( ) error {
2016-08-22 20:57:27 +00:00
v , err := NewVaultClient ( s . config . VaultConfig , s . logger , s . purgeVaultAccessors )
2016-08-10 20:20:13 +00:00
if err != nil {
return err
}
s . vault = v
return nil
2016-06-14 05:58:39 +00:00
}
2015-06-03 10:26:50 +00:00
// setupRPC is used to setup the RPC listener
2016-11-01 18:55:29 +00:00
func ( s * Server ) setupRPC ( tlsWrap tlsutil . RegionWrapper ) error {
2015-06-03 10:26:50 +00:00
// Create endpoints
2017-08-08 00:10:04 +00:00
s . endpoints . ACL = & ACL { s }
2017-02-10 01:58:20 +00:00
s . endpoints . Alloc = & Alloc { s }
2015-07-23 23:00:19 +00:00
s . endpoints . Eval = & Eval { s }
2017-02-10 01:58:20 +00:00
s . endpoints . Job = & Job { s }
s . endpoints . Node = & Node { srv : s }
2017-06-26 21:23:52 +00:00
s . endpoints . Deployment = & Deployment { srv : s }
2017-02-10 01:58:20 +00:00
s . endpoints . Operator = & Operator { s }
s . endpoints . Periodic = & Periodic { s }
2015-07-27 22:31:49 +00:00
s . endpoints . Plan = & Plan { s }
2015-11-24 05:47:11 +00:00
s . endpoints . Region = & Region { s }
2017-02-10 01:58:20 +00:00
s . endpoints . Status = & Status { s }
2016-02-20 23:50:41 +00:00
s . endpoints . System = & System { s }
2017-08-10 19:24:11 +00:00
s . endpoints . Search = & Search { s }
2017-09-07 23:56:15 +00:00
s . endpoints . Enterprise = NewEnterpriseEndpoints ( s )
2015-06-03 10:26:50 +00:00
// Register the handlers
2017-08-08 00:10:04 +00:00
s . rpcServer . Register ( s . endpoints . ACL )
2017-02-10 01:58:20 +00:00
s . rpcServer . Register ( s . endpoints . Alloc )
2015-07-23 23:00:19 +00:00
s . rpcServer . Register ( s . endpoints . Eval )
2017-02-10 01:58:20 +00:00
s . rpcServer . Register ( s . endpoints . Job )
s . rpcServer . Register ( s . endpoints . Node )
2017-06-26 21:23:52 +00:00
s . rpcServer . Register ( s . endpoints . Deployment )
2017-02-10 01:58:20 +00:00
s . rpcServer . Register ( s . endpoints . Operator )
s . rpcServer . Register ( s . endpoints . Periodic )
2015-07-27 22:31:49 +00:00
s . rpcServer . Register ( s . endpoints . Plan )
2015-11-24 05:47:11 +00:00
s . rpcServer . Register ( s . endpoints . Region )
2017-02-10 01:58:20 +00:00
s . rpcServer . Register ( s . endpoints . Status )
2016-02-20 23:50:41 +00:00
s . rpcServer . Register ( s . endpoints . System )
2017-08-10 19:24:11 +00:00
s . rpcServer . Register ( s . endpoints . Search )
2017-09-07 23:56:15 +00:00
s . endpoints . Enterprise . Register ( s )
2015-06-03 10:26:50 +00:00
list , err := net . ListenTCP ( "tcp" , s . config . RPCAddr )
if err != nil {
return err
}
s . rpcListener = list
if s . config . RPCAdvertise != nil {
2015-06-03 10:58:00 +00:00
s . rpcAdvertise = s . config . RPCAdvertise
2015-06-03 10:26:50 +00:00
} else {
2015-06-03 10:58:00 +00:00
s . rpcAdvertise = s . rpcListener . Addr ( )
2015-06-03 10:26:50 +00:00
}
// Verify that we have a usable advertise address
2015-06-03 10:58:00 +00:00
addr , ok := s . rpcAdvertise . ( * net . TCPAddr )
2015-06-03 10:26:50 +00:00
if ! ok {
list . Close ( )
return fmt . Errorf ( "RPC advertise address is not a TCP Address: %v" , addr )
}
if addr . IP . IsUnspecified ( ) {
list . Close ( )
return fmt . Errorf ( "RPC advertise address is not advertisable: %v" , addr )
}
2016-11-01 18:55:29 +00:00
wrapper := tlsutil . RegionSpecificWrapper ( s . config . Region , tlsWrap )
s . raftLayer = NewRaftLayer ( s . rpcAdvertise , wrapper )
2015-06-03 10:26:50 +00:00
return nil
}
2015-06-01 15:49:10 +00:00
// setupRaft is used to setup and initialize Raft
func ( s * Server ) setupRaft ( ) error {
2017-02-02 23:31:36 +00:00
// If we have an unclean exit then attempt to close the Raft store.
defer func ( ) {
if s . raft == nil && s . raftStore != nil {
if err := s . raftStore . Close ( ) ; err != nil {
2017-02-03 00:07:15 +00:00
s . logger . Printf ( "[ERR] nomad: failed to close Raft store: %v" , err )
2017-02-02 23:31:36 +00:00
}
}
} ( )
2015-06-01 15:49:10 +00:00
// Create the FSM
2017-10-13 21:36:02 +00:00
fsmConfig := & FSMConfig {
EvalBroker : s . evalBroker ,
Periodic : s . periodicDispatcher ,
Blocked : s . blockedEvals ,
LogOutput : s . config . LogOutput ,
Region : s . Region ( ) ,
}
2015-06-01 15:49:10 +00:00
var err error
2017-10-13 21:36:02 +00:00
s . fsm , err = NewFSM ( fsmConfig )
2015-06-01 15:49:10 +00:00
if err != nil {
return err
}
2015-06-01 19:11:40 +00:00
// Create a transport layer
2015-08-26 00:36:52 +00:00
trans := raft . NewNetworkTransport ( s . raftLayer , 3 , s . config . RaftTimeout ,
2015-06-01 19:11:40 +00:00
s . config . LogOutput )
s . raftTransport = trans
2015-06-01 15:49:10 +00:00
2017-02-02 23:31:36 +00:00
// Make sure we set the LogOutput.
s . config . RaftConfig . LogOutput = s . config . LogOutput
// Our version of Raft protocol requires the LocalID to match the network
// address of the transport.
s . config . RaftConfig . LocalID = raft . ServerID ( trans . LocalAddr ( ) )
// Build an all in-memory setup for dev mode, otherwise prepare a full
// disk-based setup.
2015-06-01 19:11:40 +00:00
var log raft . LogStore
var stable raft . StableStore
var snap raft . SnapshotStore
if s . config . DevMode {
store := raft . NewInmemStore ( )
s . raftInmem = store
stable = store
log = store
snap = raft . NewDiscardSnapshotStore ( )
} else {
// Create the base raft path
path := filepath . Join ( s . config . DataDir , raftState )
if err := ensurePath ( path , true ) ; err != nil {
return err
}
2015-06-01 15:49:10 +00:00
2015-06-01 19:11:40 +00:00
// Create the BoltDB backend
store , err := raftboltdb . NewBoltStore ( filepath . Join ( path , "raft.db" ) )
if err != nil {
return err
}
s . raftStore = store
stable = store
2015-06-01 15:49:10 +00:00
2015-06-01 19:11:40 +00:00
// Wrap the store in a LogCache to improve performance
cacheStore , err := raft . NewLogCache ( raftLogCacheSize , store )
if err != nil {
store . Close ( )
return err
}
log = cacheStore
2015-06-01 15:49:10 +00:00
2015-06-01 19:11:40 +00:00
// Create the snapshot store
snapshots , err := raft . NewFileSnapshotStore ( path , snapshotsRetained , s . config . LogOutput )
if err != nil {
if s . raftStore != nil {
s . raftStore . Close ( )
}
return err
}
snap = snapshots
2015-06-01 15:49:10 +00:00
2017-02-02 23:31:36 +00:00
// For an existing cluster being upgraded to the new version of
// Raft, we almost never want to run recovery based on the old
// peers.json file. We create a peers.info file with a helpful
// note about where peers.json went, and use that as a sentinel
// to avoid ingesting the old one that first time (if we have to
// create the peers.info file because it's not there, we also
// blow away any existing peers.json file).
peersFile := filepath . Join ( path , "peers.json" )
peersInfoFile := filepath . Join ( path , "peers.info" )
if _ , err := os . Stat ( peersInfoFile ) ; os . IsNotExist ( err ) {
if err := ioutil . WriteFile ( peersInfoFile , [ ] byte ( peersInfoContent ) , 0755 ) ; err != nil {
return fmt . Errorf ( "failed to write peers.info file: %v" , err )
}
// Blow away the peers.json file if present, since the
// peers.info sentinel wasn't there.
if _ , err := os . Stat ( peersFile ) ; err == nil {
if err := os . Remove ( peersFile ) ; err != nil {
return fmt . Errorf ( "failed to delete peers.json, please delete manually (see peers.info for details): %v" , err )
}
2017-02-03 00:07:15 +00:00
s . logger . Printf ( "[INFO] nomad: deleted peers.json file (see peers.info for details)" )
2017-02-02 23:31:36 +00:00
}
} else if _ , err := os . Stat ( peersFile ) ; err == nil {
2017-02-03 00:07:15 +00:00
s . logger . Printf ( "[INFO] nomad: found peers.json file, recovering Raft configuration..." )
2017-02-02 23:31:36 +00:00
configuration , err := raft . ReadPeersJSON ( peersFile )
if err != nil {
return fmt . Errorf ( "recovery failed to parse peers.json: %v" , err )
}
2017-10-13 21:36:02 +00:00
tmpFsm , err := NewFSM ( fsmConfig )
2017-02-02 23:31:36 +00:00
if err != nil {
return fmt . Errorf ( "recovery failed to make temp FSM: %v" , err )
}
if err := raft . RecoverCluster ( s . config . RaftConfig , tmpFsm ,
log , stable , snap , trans , configuration ) ; err != nil {
return fmt . Errorf ( "recovery failed: %v" , err )
}
if err := os . Remove ( peersFile ) ; err != nil {
return fmt . Errorf ( "recovery failed to delete peers.json, please delete manually (see peers.info for details): %v" , err )
}
2017-02-03 00:07:15 +00:00
s . logger . Printf ( "[INFO] nomad: deleted peers.json file after successful recovery" )
2017-02-02 23:31:36 +00:00
}
2015-06-01 19:11:40 +00:00
}
2015-06-01 15:49:10 +00:00
2017-02-02 23:31:36 +00:00
// If we are in bootstrap or dev mode and the state is clean then we can
// bootstrap now.
if s . config . Bootstrap || s . config . DevMode {
hasState , err := raft . HasExistingState ( log , stable , snap )
2015-06-01 15:49:10 +00:00
if err != nil {
return err
}
2017-02-02 23:31:36 +00:00
if ! hasState {
// TODO (alexdadgar) - This will need to be updated when
// we add support for node IDs.
configuration := raft . Configuration {
Servers : [ ] raft . Server {
2017-09-26 22:26:33 +00:00
{
2017-02-02 23:31:36 +00:00
ID : raft . ServerID ( trans . LocalAddr ( ) ) ,
Address : trans . LocalAddr ( ) ,
} ,
} ,
}
if err := raft . BootstrapCluster ( s . config . RaftConfig ,
log , stable , snap , trans , configuration ) ; err != nil {
return err
}
2015-06-01 15:49:10 +00:00
}
}
2015-09-07 17:46:41 +00:00
// Setup the leader channel
leaderCh := make ( chan bool , 1 )
s . config . RaftConfig . NotifyCh = leaderCh
s . leaderCh = leaderCh
2015-06-01 15:49:10 +00:00
// Setup the Raft store
2017-02-02 23:31:36 +00:00
s . raft , err = raft . NewRaft ( s . config . RaftConfig , s . fsm , log , stable , snap , trans )
2015-06-01 15:49:10 +00:00
if err != nil {
return err
}
return nil
}
2015-06-03 10:26:50 +00:00
2015-06-03 10:58:00 +00:00
// setupSerf is used to setup and initialize a Serf
func ( s * Server ) setupSerf ( conf * serf . Config , ch chan serf . Event , path string ) ( * serf . Serf , error ) {
conf . Init ( )
conf . NodeName = fmt . Sprintf ( "%s.%s" , s . config . NodeName , s . config . Region )
conf . Tags [ "role" ] = "nomad"
conf . Tags [ "region" ] = s . config . Region
conf . Tags [ "dc" ] = s . config . Datacenter
2016-05-28 01:14:34 +00:00
conf . Tags [ "vsn" ] = fmt . Sprintf ( "%d" , structs . ApiMajorVersion )
conf . Tags [ "mvn" ] = fmt . Sprintf ( "%d" , structs . ApiMinorVersion )
2015-06-03 10:58:00 +00:00
conf . Tags [ "build" ] = s . config . Build
2015-06-03 11:35:48 +00:00
conf . Tags [ "port" ] = fmt . Sprintf ( "%d" , s . rpcAdvertise . ( * net . TCPAddr ) . Port )
2015-09-07 17:46:41 +00:00
if s . config . Bootstrap || ( s . config . DevMode && ! s . config . DevDisableBootstrap ) {
2015-06-03 10:58:00 +00:00
conf . Tags [ "bootstrap" ] = "1"
}
2016-06-16 19:00:15 +00:00
bootstrapExpect := atomic . LoadInt32 ( & s . config . BootstrapExpect )
if bootstrapExpect != 0 {
conf . Tags [ "expect" ] = fmt . Sprintf ( "%d" , bootstrapExpect )
2015-06-03 10:58:00 +00:00
}
conf . MemberlistConfig . LogOutput = s . config . LogOutput
conf . LogOutput = s . config . LogOutput
conf . EventCh = ch
2015-06-04 10:33:12 +00:00
if ! s . config . DevMode {
conf . SnapshotPath = filepath . Join ( s . config . DataDir , path )
if err := ensurePath ( conf . SnapshotPath , false ) ; err != nil {
return nil , err
}
}
2015-06-03 10:58:00 +00:00
conf . ProtocolVersion = protocolVersionMap [ s . config . ProtocolVersion ]
conf . RejoinAfterLeave = true
conf . Merge = & serfMergeDelegate { }
// Until Nomad supports this fully, we disable automatic resolution.
// When enabled, the Serf gossip may just turn off if we are the minority
// node which is rather unexpected.
conf . EnableNameConflictResolution = false
return serf . Create ( conf )
}
2015-07-28 22:12:08 +00:00
// setupWorkers is used to start the scheduling workers
func ( s * Server ) setupWorkers ( ) error {
// Check if all the schedulers are disabled
if len ( s . config . EnabledSchedulers ) == 0 || s . config . NumSchedulers == 0 {
s . logger . Printf ( "[WARN] nomad: no enabled schedulers" )
return nil
}
// Start the workers
for i := 0 ; i < s . config . NumSchedulers ; i ++ {
2015-08-23 17:53:53 +00:00
if w , err := NewWorker ( s ) ; err != nil {
2015-07-28 22:12:08 +00:00
return err
2015-08-23 17:53:53 +00:00
} else {
s . workers = append ( s . workers , w )
2015-07-28 22:12:08 +00:00
}
}
s . logger . Printf ( "[INFO] nomad: starting %d scheduling worker(s) for %v" ,
s . config . NumSchedulers , s . config . EnabledSchedulers )
return nil
}
2017-02-02 21:52:31 +00:00
// numPeers is used to check on the number of known peers, including the local
// node.
func ( s * Server ) numPeers ( ) ( int , error ) {
future := s . raft . GetConfiguration ( )
if err := future . Error ( ) ; err != nil {
2015-06-03 11:25:50 +00:00
return 0 , err
}
2017-02-02 21:52:31 +00:00
configuration := future . Configuration ( )
return len ( configuration . Servers ) , nil
2015-06-03 11:25:50 +00:00
}
2015-06-03 10:26:50 +00:00
// IsLeader checks if this server is the cluster leader
func ( s * Server ) IsLeader ( ) bool {
return s . raft . State ( ) == raft . Leader
}
2015-06-03 11:25:50 +00:00
// Join is used to have Nomad join the gossip ring
// The target address should be another node listening on the
// Serf address
func ( s * Server ) Join ( addrs [ ] string ) ( int , error ) {
return s . serf . Join ( addrs , true )
}
// LocalMember is used to return the local node
func ( c * Server ) LocalMember ( ) serf . Member {
return c . serf . LocalMember ( )
}
// Members is used to return the members of the serf cluster
func ( s * Server ) Members ( ) [ ] serf . Member {
return s . serf . Members ( )
}
// RemoveFailedNode is used to remove a failed node from the cluster
func ( s * Server ) RemoveFailedNode ( node string ) error {
return s . serf . RemoveFailedNode ( node )
}
// KeyManager returns the Serf keyring manager
func ( s * Server ) KeyManager ( ) * serf . KeyManager {
return s . serf . KeyManager ( )
}
// Encrypted determines if gossip is encrypted
func ( s * Server ) Encrypted ( ) bool {
return s . serf . EncryptionEnabled ( )
}
2015-08-29 21:22:24 +00:00
// State returns the underlying state store. This should *not*
// be used to modify state directly.
func ( s * Server ) State ( ) * state . StateStore {
return s . fsm . State ( )
}
2017-10-23 19:50:37 +00:00
// setLeaderAcl stores the given ACL token as the current leader's ACL token.
func ( s * Server ) setLeaderAcl ( token string ) {
s . leaderAclLock . Lock ( )
s . leaderAcl = token
s . leaderAclLock . Unlock ( )
}
// getLeaderAcl retrieves the leader's ACL token
func ( s * Server ) getLeaderAcl ( ) string {
s . leaderAclLock . Lock ( )
defer s . leaderAclLock . Unlock ( )
return s . leaderAcl
}
2015-11-24 05:47:11 +00:00
// Regions returns the known regions in the cluster.
func ( s * Server ) Regions ( ) [ ] string {
2015-11-24 05:49:03 +00:00
s . peerLock . RLock ( )
defer s . peerLock . RUnlock ( )
2015-11-24 05:47:11 +00:00
regions := make ( [ ] string , 0 , len ( s . peers ) )
2017-09-26 22:26:33 +00:00
for region := range s . peers {
2015-11-24 05:47:11 +00:00
regions = append ( regions , region )
}
2015-11-24 21:15:01 +00:00
sort . Strings ( regions )
2015-11-24 05:47:11 +00:00
return regions
}
2015-06-03 10:26:50 +00:00
// inmemCodec is used to do an RPC call without going over a network
type inmemCodec struct {
method string
args interface { }
reply interface { }
err error
}
func ( i * inmemCodec ) ReadRequestHeader ( req * rpc . Request ) error {
req . ServiceMethod = i . method
return nil
}
func ( i * inmemCodec ) ReadRequestBody ( args interface { } ) error {
sourceValue := reflect . Indirect ( reflect . Indirect ( reflect . ValueOf ( i . args ) ) )
dst := reflect . Indirect ( reflect . Indirect ( reflect . ValueOf ( args ) ) )
dst . Set ( sourceValue )
return nil
}
func ( i * inmemCodec ) WriteResponse ( resp * rpc . Response , reply interface { } ) error {
if resp . Error != "" {
i . err = errors . New ( resp . Error )
return nil
}
sourceValue := reflect . Indirect ( reflect . Indirect ( reflect . ValueOf ( reply ) ) )
dst := reflect . Indirect ( reflect . Indirect ( reflect . ValueOf ( i . reply ) ) )
dst . Set ( sourceValue )
return nil
}
func ( i * inmemCodec ) Close ( ) error {
return nil
}
// RPC is used to make a local RPC call
func ( s * Server ) RPC ( method string , args interface { } , reply interface { } ) error {
codec := & inmemCodec {
method : method ,
args : args ,
reply : reply ,
}
if err := s . rpcServer . ServeRequest ( codec ) ; err != nil {
return err
}
return codec . err
}
// Stats is used to return statistics for debugging and insight
// for various sub-systems
func ( s * Server ) Stats ( ) map [ string ] map [ string ] string {
toString := func ( v uint64 ) string {
return strconv . FormatUint ( v , 10 )
}
stats := map [ string ] map [ string ] string {
2017-09-26 22:26:33 +00:00
"nomad" : {
2015-06-03 10:26:50 +00:00
"server" : "true" ,
"leader" : fmt . Sprintf ( "%v" , s . IsLeader ( ) ) ,
2017-02-03 00:07:15 +00:00
"leader_addr" : string ( s . raft . Leader ( ) ) ,
2015-06-03 10:26:50 +00:00
"bootstrap" : fmt . Sprintf ( "%v" , s . config . Bootstrap ) ,
2015-08-17 00:40:35 +00:00
"known_regions" : toString ( uint64 ( len ( s . peers ) ) ) ,
2015-06-03 10:26:50 +00:00
} ,
"raft" : s . raft . Stats ( ) ,
2015-06-03 11:08:04 +00:00
"serf" : s . serf . Stats ( ) ,
2015-08-20 22:29:30 +00:00
"runtime" : RuntimeStats ( ) ,
2015-06-03 10:26:50 +00:00
}
2017-02-02 21:52:31 +00:00
2015-06-03 10:26:50 +00:00
return stats
}
2016-10-17 17:48:04 +00:00
2017-08-07 21:13:05 +00:00
// Region returns the region of the server
2016-10-17 17:48:04 +00:00
func ( s * Server ) Region ( ) string {
return s . config . Region
}
// Datacenter returns the data center of the server
func ( s * Server ) Datacenter ( ) string {
return s . config . Datacenter
}
// GetConfig returns the config of the server for testing purposes only
func ( s * Server ) GetConfig ( ) * Config {
return s . config
}
2017-02-02 23:31:36 +00:00
2017-08-21 03:51:30 +00:00
// ReplicationToken returns the token used for replication. We use a method to support
// dynamic reloading of this value later.
func ( s * Server ) ReplicationToken ( ) string {
return s . config . ReplicationToken
}
2017-02-02 23:31:36 +00:00
// peersInfoContent is used to help operators understand what happened to the
// peers.json file. This is written to a file called peers.info in the same
// location.
const peersInfoContent = `
2017-02-08 22:50:19 +00:00
As of Nomad 0.5 .5 , the peers . json file is only used for recovery
2017-02-02 23:31:36 +00:00
after an outage . It should be formatted as a JSON array containing the address
2017-08-31 13:51:04 +00:00
and port ( RPC ) of each Nomad server in the cluster , like this :
2017-02-02 23:31:36 +00:00
2017-02-08 22:50:19 +00:00
[ "10.1.0.1:4647" , "10.1.0.2:4647" , "10.1.0.3:4647" ]
2017-02-02 23:31:36 +00:00
Under normal operation , the peers . json file will not be present .
2017-02-08 22:50:19 +00:00
When Nomad starts for the first time , it will create this peers . info file and
2017-02-02 23:31:36 +00:00
delete any existing peers . json file so that recovery doesn ' t occur on the first
startup .
Once this peers . info file is present , any peers . json file will be ingested at
startup , and will set the Raft peer configuration manually to recover from an
outage . It ' s crucial that all servers in the cluster are shut down before
creating the peers . json file , and that all servers receive the same
configuration . Once the peers . json file is successfully ingested and applied , it
will be deleted .
2017-02-10 23:37:13 +00:00
Please see https : //www.nomadproject.io/guides/outage.html for more information.
2017-02-02 23:31:36 +00:00
`