2015-06-01 15:49:10 +00:00
package nomad
import (
2015-06-03 10:26:50 +00:00
"crypto/tls"
"errors"
2015-06-01 15:49:10 +00:00
"fmt"
"log"
2015-06-03 10:26:50 +00:00
"net"
"net/rpc"
2015-06-01 15:49:10 +00:00
"path/filepath"
2015-06-03 10:26:50 +00:00
"reflect"
2015-11-24 21:15:01 +00:00
"sort"
2015-06-03 10:26:50 +00:00
"strconv"
2016-04-05 17:02:44 +00:00
"strings"
2015-06-01 15:49:10 +00:00
"sync"
2016-06-16 19:00:15 +00:00
"sync/atomic"
2015-06-01 15:49:10 +00:00
"time"
2016-06-14 05:58:39 +00:00
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
2015-06-03 10:26:50 +00:00
"github.com/hashicorp/consul/tlsutil"
2016-06-14 05:58:39 +00:00
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/command/agent/consul"
2015-08-29 21:22:24 +00:00
"github.com/hashicorp/nomad/nomad/state"
2016-05-28 01:14:34 +00:00
"github.com/hashicorp/nomad/nomad/structs"
2015-06-01 15:49:10 +00:00
"github.com/hashicorp/raft"
"github.com/hashicorp/raft-boltdb"
2015-06-03 10:58:00 +00:00
"github.com/hashicorp/serf/serf"
2016-08-10 03:15:13 +00:00
vaultapi "github.com/hashicorp/vault/api"
2015-06-01 15:49:10 +00:00
)
const (
2016-06-16 19:14:03 +00:00
// datacenterQueryLimit sets the max number of DCs that a Nomad
// Server will query to find bootstrap_expect servers.
datacenterQueryLimit = 25
// maxStaleLeadership is the maximum time we will permit this Nomad
// Server to go without seeing a valid Raft leader.
maxStaleLeadership = 15 * time . Second
// peersPollInterval is used as the polling interval between attempts
// to query Consul for Nomad Servers.
peersPollInterval = 45 * time . Second
// peersPollJitter is used to provide a slight amount of variance to
// the retry interval when querying Consul Servers
peersPollJitterFactor = 2
2016-06-14 05:58:39 +00:00
2015-06-01 15:49:10 +00:00
raftState = "raft/"
2015-06-03 10:58:00 +00:00
serfSnapshot = "serf/snapshot"
2015-06-01 15:49:10 +00:00
snapshotsRetained = 2
2015-06-07 18:50:29 +00:00
// serverRPCCache controls how long we keep an idle connection open to a server
serverRPCCache = 2 * time . Minute
// serverMaxStreams controsl how many idle streams we keep open to a server
serverMaxStreams = 64
2015-06-01 15:49:10 +00:00
// raftLogCacheSize is the maximum number of logs to cache in-memory.
2016-05-15 16:41:34 +00:00
// This is used to reduce disk I/O for the recently committed entries.
2015-06-01 15:49:10 +00:00
raftLogCacheSize = 512
2015-06-03 11:25:50 +00:00
// raftRemoveGracePeriod is how long we wait to allow a RemovePeer
// to replicate to gracefully leave the cluster.
raftRemoveGracePeriod = 5 * time . Second
2015-06-01 15:49:10 +00:00
)
// Server is Nomad server which manages the job queues,
// schedulers, and notification bus for agents.
type Server struct {
config * Config
logger * log . Logger
2015-06-07 18:50:29 +00:00
// Connection pool to other Nomad servers
connPool * ConnPool
2015-06-03 10:26:50 +00:00
// Endpoints holds our RPC endpoints
endpoints endpoints
2015-06-03 11:25:50 +00:00
// The raft instance is used among Nomad nodes within the
// region to protect operations that require strong consistency
2015-09-07 17:46:41 +00:00
leaderCh <- chan bool
2015-06-01 15:49:10 +00:00
raft * raft . Raft
raftLayer * RaftLayer
raftPeers raft . PeerStore
raftStore * raftboltdb . BoltStore
2015-06-01 19:11:40 +00:00
raftInmem * raft . InmemStore
2015-06-01 15:49:10 +00:00
raftTransport * raft . NetworkTransport
// fsm is the state machine used with Raft
fsm * nomadFSM
2015-06-03 10:26:50 +00:00
// rpcListener is used to listen for incoming connections
2015-06-03 10:58:00 +00:00
rpcListener net . Listener
rpcServer * rpc . Server
rpcAdvertise net . Addr
2015-06-03 10:26:50 +00:00
// rpcTLS is the TLS config for incoming TLS requests
rpcTLS * tls . Config
2015-06-04 10:33:12 +00:00
// peers is used to track the known Nomad servers. This is
// used for region forwarding and clustering.
2015-06-07 18:32:01 +00:00
peers map [ string ] [ ] * serverParts
localPeers map [ string ] * serverParts
peerLock sync . RWMutex
2015-06-04 10:33:12 +00:00
2015-06-03 10:58:00 +00:00
// serf is the Serf cluster containing only Nomad
// servers. This is used for multi-region federation
// and automatic clustering within regions.
serf * serf . Serf
2015-06-04 10:42:56 +00:00
// reconcileCh is used to pass events from the serf handler
// into the leader manager. Mostly used to handle when servers
// join/leave from the region.
reconcileCh chan serf . Member
2015-06-03 10:58:00 +00:00
// eventCh is used to receive events from the serf cluster
eventCh chan serf . Event
2015-07-24 04:44:17 +00:00
// evalBroker is used to manage the in-progress evaluations
// that are waiting to be brokered to a sub-scheduler
evalBroker * EvalBroker
2016-01-29 23:31:32 +00:00
// BlockedEvals is used to manage evaluations that are blocked on node
// capacity changes.
blockedEvals * BlockedEvals
2015-07-27 22:11:42 +00:00
// planQueue is used to manage the submitted allocation
// plans that are waiting to be assessed by the leader
planQueue * PlanQueue
2015-12-18 20:26:28 +00:00
// periodicDispatcher is used to track and create evaluations for periodic jobs.
periodicDispatcher * PeriodicDispatch
2015-12-01 22:54:57 +00:00
2015-08-23 00:17:13 +00:00
// heartbeatTimers track the expiration time of each heartbeat that has
// a TTL. On expiration, the node status is updated to be 'down'.
heartbeatTimers map [ string ] * time . Timer
heartbeatTimersLock sync . Mutex
2016-06-14 05:58:39 +00:00
// consulSyncer advertises this Nomad Agent with Consul
consulSyncer * consul . Syncer
2016-08-10 03:15:13 +00:00
// vault is the client for communicating with Vault.
vault * vaultapi . Client
2015-08-23 17:53:53 +00:00
// Worker used for processing
workers [ ] * Worker
2015-06-03 11:25:50 +00:00
left bool
2015-06-01 15:49:10 +00:00
shutdown bool
shutdownCh chan struct { }
shutdownLock sync . Mutex
}
2015-06-03 10:26:50 +00:00
// Holds the RPC endpoints
type endpoints struct {
2016-01-13 18:19:53 +00:00
Status * Status
Node * Node
Job * Job
Eval * Eval
Plan * Plan
Alloc * Alloc
Region * Region
Periodic * Periodic
2016-02-20 23:50:41 +00:00
System * System
2015-06-03 10:26:50 +00:00
}
2015-06-01 15:49:10 +00:00
// NewServer is used to construct a new Nomad server from the
// configuration, potentially returning an error
2016-06-17 06:29:23 +00:00
func NewServer ( config * Config , consulSyncer * consul . Syncer , logger * log . Logger ) ( * Server , error ) {
2015-06-01 15:49:10 +00:00
// Check the protocol version
if err := config . CheckVersion ( ) ; err != nil {
return nil , err
}
2015-07-24 04:44:17 +00:00
// Create an eval broker
2015-08-16 17:55:55 +00:00
evalBroker , err := NewEvalBroker ( config . EvalNackTimeout , config . EvalDeliveryLimit )
2015-07-24 04:44:17 +00:00
if err != nil {
return nil , err
}
2016-01-29 23:31:32 +00:00
// Create a new blocked eval tracker.
blockedEvals := NewBlockedEvals ( evalBroker )
2015-07-27 22:11:42 +00:00
// Create a plan queue
planQueue , err := NewPlanQueue ( )
if err != nil {
return nil , err
}
2015-06-01 15:49:10 +00:00
// Create the server
s := & Server {
2016-01-29 23:31:32 +00:00
config : config ,
2016-06-14 05:58:39 +00:00
consulSyncer : consulSyncer ,
2016-01-29 23:31:32 +00:00
connPool : NewPool ( config . LogOutput , serverRPCCache , serverMaxStreams , nil ) ,
logger : logger ,
rpcServer : rpc . NewServer ( ) ,
peers : make ( map [ string ] [ ] * serverParts ) ,
localPeers : make ( map [ string ] * serverParts ) ,
reconcileCh : make ( chan serf . Member , 32 ) ,
eventCh : make ( chan serf . Event , 256 ) ,
evalBroker : evalBroker ,
blockedEvals : blockedEvals ,
planQueue : planQueue ,
shutdownCh : make ( chan struct { } ) ,
2015-06-01 15:49:10 +00:00
}
2016-08-10 03:15:13 +00:00
// Get the Vault API configuration
c , err := config . VaultConfig . ApiConfig ( true )
if err != nil {
s . logger . Printf ( "[ERR] nomad: failed to create Vault API config: %v" , err )
return nil , fmt . Errorf ( "Failed to create Vault API config: %v" , err )
}
// Create the Vault API client
v , err := vaultapi . NewClient ( c )
if err != nil {
s . logger . Printf ( "[ERR] nomad: failed to create Vault API client: %v" , err )
return nil , fmt . Errorf ( "Failed to create Vault API client: %v" , err )
}
// Set the wrapping function such that token creation is wrapped
v . SetWrappingLookupFunc ( config . VaultConfig . GetWrappingFn ( ) )
// Set the token and store the client
v . SetToken ( config . VaultConfig . PeriodicToken )
s . vault = v
2015-12-18 20:26:28 +00:00
// Create the periodic dispatcher for launching periodic jobs.
2015-12-19 01:26:05 +00:00
s . periodicDispatcher = NewPeriodicDispatch ( s . logger , s )
2015-12-18 20:26:28 +00:00
2015-06-03 10:26:50 +00:00
// Initialize the RPC layer
// TODO: TLS...
if err := s . setupRPC ( nil ) ; err != nil {
s . Shutdown ( )
2016-06-17 13:44:10 +00:00
s . logger . Printf ( "[ERR] nomad: failed to start RPC layer: %s" , err )
2015-06-03 10:26:50 +00:00
return nil , fmt . Errorf ( "Failed to start RPC layer: %v" , err )
}
2015-06-01 15:49:10 +00:00
// Initialize the Raft server
if err := s . setupRaft ( ) ; err != nil {
s . Shutdown ( )
2016-06-17 13:44:10 +00:00
s . logger . Printf ( "[ERR] nomad: failed to start Raft: %s" , err )
2015-06-01 15:49:10 +00:00
return nil , fmt . Errorf ( "Failed to start Raft: %v" , err )
}
2015-06-03 10:58:00 +00:00
// Initialize the wan Serf
s . serf , err = s . setupSerf ( config . SerfConfig , s . eventCh , serfSnapshot )
if err != nil {
s . Shutdown ( )
2016-06-17 13:44:10 +00:00
s . logger . Printf ( "[ERR] nomad: failed to start serf WAN: %s" , err )
2015-06-03 10:58:00 +00:00
return nil , fmt . Errorf ( "Failed to start serf: %v" , err )
}
2016-05-15 16:41:34 +00:00
// Initialize the scheduling workers
2015-07-28 22:12:08 +00:00
if err := s . setupWorkers ( ) ; err != nil {
s . Shutdown ( )
2016-06-17 13:44:10 +00:00
s . logger . Printf ( "[ERR] nomad: failed to start workers: %s" , err )
2015-07-28 22:12:08 +00:00
return nil , fmt . Errorf ( "Failed to start workers: %v" , err )
}
2016-06-14 05:58:39 +00:00
// Setup the Consul syncer
if err := s . setupConsulSyncer ( ) ; err != nil {
return nil , fmt . Errorf ( "failed to create server Consul syncer: %v" )
}
2015-09-07 17:46:41 +00:00
// Monitor leadership changes
go s . monitorLeadership ( )
// Start ingesting events for Serf
go s . serfEventHandler ( )
2015-06-03 10:26:50 +00:00
// Start the RPC listeners
go s . listen ( )
2015-07-27 22:11:42 +00:00
// Emit metrics for the eval broker
2015-08-05 23:45:50 +00:00
go evalBroker . EmitStats ( time . Second , s . shutdownCh )
2015-07-24 05:17:37 +00:00
2015-07-27 22:11:42 +00:00
// Emit metrics for the plan queue
2015-08-05 23:45:50 +00:00
go planQueue . EmitStats ( time . Second , s . shutdownCh )
2015-07-27 22:11:42 +00:00
2016-02-01 02:46:45 +00:00
// Emit metrics for the blocked eval tracker.
go blockedEvals . EmitStats ( time . Second , s . shutdownCh )
2015-08-23 00:17:13 +00:00
// Emit metrics
go s . heartbeatStats ( )
2015-06-01 15:49:10 +00:00
// Done
return s , nil
}
// Shutdown is used to shutdown the server
func ( s * Server ) Shutdown ( ) error {
s . logger . Printf ( "[INFO] nomad: shutting down server" )
s . shutdownLock . Lock ( )
defer s . shutdownLock . Unlock ( )
if s . shutdown {
return nil
}
s . shutdown = true
close ( s . shutdownCh )
2015-06-05 21:54:45 +00:00
if s . serf != nil {
s . serf . Shutdown ( )
}
2015-06-01 15:49:10 +00:00
if s . raft != nil {
s . raftTransport . Close ( )
s . raftLayer . Close ( )
future := s . raft . Shutdown ( )
if err := future . Error ( ) ; err != nil {
s . logger . Printf ( "[WARN] nomad: Error shutting down raft: %s" , err )
}
2015-06-03 09:26:49 +00:00
if s . raftStore != nil {
s . raftStore . Close ( )
}
2015-06-01 15:49:10 +00:00
}
2015-06-03 10:26:50 +00:00
// Shutdown the RPC listener
if s . rpcListener != nil {
s . rpcListener . Close ( )
}
2015-06-07 18:50:29 +00:00
// Close the connection pool
s . connPool . Shutdown ( )
2015-06-01 15:49:10 +00:00
// Close the fsm
if s . fsm != nil {
s . fsm . Close ( )
}
return nil
}
2015-07-28 22:12:08 +00:00
// IsShutdown checks if the server is shutdown
func ( s * Server ) IsShutdown ( ) bool {
select {
case <- s . shutdownCh :
return true
default :
return false
}
}
2015-06-03 11:25:50 +00:00
// Leave is used to prepare for a graceful shutdown of the server
func ( s * Server ) Leave ( ) error {
s . logger . Printf ( "[INFO] nomad: server starting leave" )
s . left = true
// Check the number of known peers
numPeers , err := s . numOtherPeers ( )
if err != nil {
s . logger . Printf ( "[ERR] nomad: failed to check raft peers: %v" , err )
return err
}
// If we are the current leader, and we have any other peers (cluster has multiple
// servers), we should do a RemovePeer to safely reduce the quorum size. If we are
// not the leader, then we should issue our leave intention and wait to be removed
// for some sane period of time.
isLeader := s . IsLeader ( )
if isLeader && numPeers > 0 {
future := s . raft . RemovePeer ( s . raftTransport . LocalAddr ( ) )
if err := future . Error ( ) ; err != nil && err != raft . ErrUnknownPeer {
s . logger . Printf ( "[ERR] nomad: failed to remove ourself as raft peer: %v" , err )
}
}
// Leave the gossip pool
if s . serf != nil {
if err := s . serf . Leave ( ) ; err != nil {
s . logger . Printf ( "[ERR] nomad: failed to leave Serf cluster: %v" , err )
}
}
// If we were not leader, wait to be safely removed from the cluster.
// We must wait to allow the raft replication to take place, otherwise
// an immediate shutdown could cause a loss of quorum.
if ! isLeader {
limit := time . Now ( ) . Add ( raftRemoveGracePeriod )
for numPeers > 0 && time . Now ( ) . Before ( limit ) {
// Update the number of peers
numPeers , err = s . numOtherPeers ( )
if err != nil {
s . logger . Printf ( "[ERR] nomad: failed to check raft peers: %v" , err )
break
}
// Avoid the sleep if we are done
if numPeers == 0 {
break
}
// Sleep a while and check again
time . Sleep ( 50 * time . Millisecond )
}
if numPeers != 0 {
s . logger . Printf ( "[WARN] nomad: failed to leave raft peer set gracefully, timeout" )
}
}
return nil
}
2016-06-16 19:14:03 +00:00
// setupBootstrapHandler() creates the closure necessary to support a Consul
// fallback handler.
func ( s * Server ) setupBootstrapHandler ( ) error {
// peersTimeout is used to indicate to the Consul Syncer that the
// current Nomad Server has a stale peer set. peersTimeout will time
// out if the Consul Syncer bootstrapFn has not observed a Raft
// leader in maxStaleLeadership. If peersTimeout has been triggered,
// the Consul Syncer will begin querying Consul for other Nomad
// Servers.
//
// NOTE: time.Timer is used vs time.Time in order to handle clock
// drift because time.Timer is implemented as a monotonic clock.
var peersTimeout * time . Timer = time . NewTimer ( 0 )
2016-06-16 21:27:10 +00:00
// consulQueryCount is the number of times the bootstrapFn has been
// called, regardless of success.
var consulQueryCount uint64
2016-06-16 19:14:03 +00:00
// leadershipTimedOut is a helper method that returns true if the
// peersTimeout timer has expired.
leadershipTimedOut := func ( ) bool {
select {
case <- peersTimeout . C :
return true
default :
return false
}
}
2016-06-14 05:58:39 +00:00
// The bootstrapFn callback handler is used to periodically poll
// Consul to look up the Nomad Servers in Consul. In the event the
// server has been brought up without a `retry-join` configuration
// and this Server is partitioned from the rest of the cluster,
// periodically poll Consul to reattach this Server to other servers
// in the same region and automatically reform a quorum (assuming the
// correct number of servers required for quorum are present).
bootstrapFn := func ( ) error {
2016-06-14 07:07:04 +00:00
// If there is a raft leader, do nothing
if s . raft . Leader ( ) != "" {
2016-06-16 19:14:03 +00:00
peersTimeout . Reset ( maxStaleLeadership )
2016-06-14 07:07:04 +00:00
return nil
}
2016-06-16 19:14:03 +00:00
// (ab)use serf.go's behavior of setting BootstrapExpect to
// zero if we have bootstrapped. If we have bootstrapped
bootstrapExpect := atomic . LoadInt32 ( & s . config . BootstrapExpect )
if bootstrapExpect == 0 {
// This Nomad Server has been bootstrapped. Rely on
2016-06-16 21:27:10 +00:00
// the peersTimeout firing as a guard to prevent
// aggressive querying of Consul.
2016-06-16 19:14:03 +00:00
if ! leadershipTimedOut ( ) {
return nil
}
} else {
2016-06-16 21:27:10 +00:00
if consulQueryCount > 0 && ! leadershipTimedOut ( ) {
return nil
}
2016-06-16 19:14:03 +00:00
// This Nomad Server has not been bootstrapped, reach
// out to Consul if our peer list is less than
// `bootstrap_expect`.
raftPeers , err := s . raftPeers . Peers ( )
if err != nil {
peersTimeout . Reset ( peersPollInterval + lib . RandomStagger ( peersPollInterval / peersPollJitterFactor ) )
return nil
}
// The necessary number of Nomad Servers required for
// quorum has been reached, we do not need to poll
// Consul. Let the normal timeout-based strategy
// take over.
if len ( raftPeers ) >= int ( bootstrapExpect ) {
peersTimeout . Reset ( peersPollInterval + lib . RandomStagger ( peersPollInterval / peersPollJitterFactor ) )
return nil
}
2016-06-14 05:58:39 +00:00
}
2016-06-16 21:27:10 +00:00
consulQueryCount ++
2016-06-14 05:58:39 +00:00
2016-06-16 19:14:03 +00:00
s . logger . Printf ( "[DEBUG] server.consul: lost contact with Nomad quorum, falling back to Consul for server list" )
2016-06-14 05:58:39 +00:00
consulCatalog := s . consulSyncer . ConsulClient ( ) . Catalog ( )
dcs , err := consulCatalog . Datacenters ( )
if err != nil {
2016-06-16 19:14:03 +00:00
peersTimeout . Reset ( peersPollInterval + lib . RandomStagger ( peersPollInterval / peersPollJitterFactor ) )
2016-06-14 05:58:39 +00:00
return fmt . Errorf ( "server.consul: unable to query Consul datacenters: %v" , err )
}
if len ( dcs ) > 2 {
// Query the local DC first, then shuffle the
// remaining DCs. If additional calls to bootstrapFn
// are necessary, this Nomad Server will eventually
// walk all datacenter until it finds enough hosts to
// form a quorum.
2016-06-21 21:26:01 +00:00
shuffleStrings ( dcs [ 1 : ] )
dcs = dcs [ 0 : lib . MinInt ( len ( dcs ) , datacenterQueryLimit ) ]
2016-06-14 05:58:39 +00:00
}
nomadServerServiceName := s . config . ConsulConfig . ServerServiceName
var mErr multierror . Error
const defaultMaxNumNomadServers = 8
nomadServerServices := make ( [ ] string , 0 , defaultMaxNumNomadServers )
2016-06-16 21:27:10 +00:00
localNode := s . serf . Memberlist ( ) . LocalNode ( )
2016-06-14 05:58:39 +00:00
for _ , dc := range dcs {
2016-06-16 19:14:03 +00:00
consulOpts := & consulapi . QueryOptions {
2016-06-14 05:58:39 +00:00
AllowStale : true ,
Datacenter : dc ,
Near : "_agent" ,
WaitTime : consul . DefaultQueryWaitDuration ,
}
2016-06-16 19:14:03 +00:00
consulServices , _ , err := consulCatalog . Service ( nomadServerServiceName , consul . ServiceTagSerf , consulOpts )
2016-06-14 05:58:39 +00:00
if err != nil {
2016-06-16 21:40:09 +00:00
err := fmt . Errorf ( "failed to query service %q in Consul datacenter %q: %v" , nomadServerServiceName , dc , err )
s . logger . Printf ( "[WARN] server.consul: %v" , err )
mErr . Errors = append ( mErr . Errors , err )
2016-06-14 05:58:39 +00:00
continue
}
for _ , cs := range consulServices {
port := strconv . FormatInt ( int64 ( cs . ServicePort ) , 10 )
addr := cs . ServiceAddress
if addr == "" {
addr = cs . Address
}
2016-06-16 21:27:10 +00:00
if localNode . Addr . String ( ) == addr && int ( localNode . Port ) == cs . ServicePort {
continue
}
2016-06-14 05:58:39 +00:00
serverAddr := net . JoinHostPort ( addr , port )
nomadServerServices = append ( nomadServerServices , serverAddr )
}
}
2016-06-14 07:07:04 +00:00
2016-06-14 05:58:39 +00:00
if len ( nomadServerServices ) == 0 {
if len ( mErr . Errors ) > 0 {
2016-06-16 19:14:03 +00:00
peersTimeout . Reset ( peersPollInterval + lib . RandomStagger ( peersPollInterval / peersPollJitterFactor ) )
2016-06-14 05:58:39 +00:00
return mErr . ErrorOrNil ( )
}
2016-06-14 07:07:04 +00:00
// Log the error and return nil so future handlers
// can attempt to register the `nomad` service.
2016-06-16 21:27:10 +00:00
pollInterval := peersPollInterval + lib . RandomStagger ( peersPollInterval / peersPollJitterFactor )
s . logger . Printf ( "[TRACE] server.consul: no Nomad Servers advertising service %+q in Consul datacenters %+q, sleeping for %v" , nomadServerServiceName , dcs , pollInterval )
peersTimeout . Reset ( pollInterval )
2016-06-14 07:07:04 +00:00
return nil
2016-06-14 05:58:39 +00:00
}
2016-06-14 07:07:04 +00:00
2016-06-14 05:58:39 +00:00
numServersContacted , err := s . Join ( nomadServerServices )
if err != nil {
2016-06-16 19:14:03 +00:00
peersTimeout . Reset ( peersPollInterval + lib . RandomStagger ( peersPollInterval / peersPollJitterFactor ) )
2016-06-14 05:58:39 +00:00
return fmt . Errorf ( "contacted %d Nomad Servers: %v" , numServersContacted , err )
}
2016-06-16 19:14:03 +00:00
peersTimeout . Reset ( maxStaleLeadership )
2016-06-14 23:30:01 +00:00
s . logger . Printf ( "[INFO] server.consul: successfully contacted %d Nomad Servers" , numServersContacted )
2016-06-14 05:58:39 +00:00
return nil
}
2016-06-16 19:14:03 +00:00
s . consulSyncer . AddPeriodicHandler ( "Nomad Server Fallback Server Handler" , bootstrapFn )
return nil
}
// setupConsulSyncer creates Server-mode consul.Syncer which periodically
// executes callbacks on a fixed interval.
func ( s * Server ) setupConsulSyncer ( ) error {
2016-06-14 22:27:15 +00:00
if s . config . ConsulConfig . ServerAutoJoin {
2016-06-16 19:14:03 +00:00
if err := s . setupBootstrapHandler ( ) ; err != nil {
2016-06-16 21:40:09 +00:00
return err
2016-06-16 19:14:03 +00:00
}
2016-06-14 22:27:15 +00:00
}
2016-06-14 05:58:39 +00:00
2016-06-16 21:40:09 +00:00
return nil
2016-06-14 05:58:39 +00:00
}
2015-06-03 10:26:50 +00:00
// setupRPC is used to setup the RPC listener
func ( s * Server ) setupRPC ( tlsWrap tlsutil . DCWrapper ) error {
// Create endpoints
s . endpoints . Status = & Status { s }
2016-02-22 02:51:34 +00:00
s . endpoints . Node = & Node { srv : s }
2015-07-23 21:41:18 +00:00
s . endpoints . Job = & Job { s }
2015-07-23 23:00:19 +00:00
s . endpoints . Eval = & Eval { s }
2015-07-27 22:31:49 +00:00
s . endpoints . Plan = & Plan { s }
2015-09-06 22:34:28 +00:00
s . endpoints . Alloc = & Alloc { s }
2015-11-24 05:47:11 +00:00
s . endpoints . Region = & Region { s }
2016-01-13 18:19:53 +00:00
s . endpoints . Periodic = & Periodic { s }
2016-02-20 23:50:41 +00:00
s . endpoints . System = & System { s }
2015-06-03 10:26:50 +00:00
// Register the handlers
s . rpcServer . Register ( s . endpoints . Status )
2015-09-07 03:31:32 +00:00
s . rpcServer . Register ( s . endpoints . Node )
2015-07-23 21:41:18 +00:00
s . rpcServer . Register ( s . endpoints . Job )
2015-07-23 23:00:19 +00:00
s . rpcServer . Register ( s . endpoints . Eval )
2015-07-27 22:31:49 +00:00
s . rpcServer . Register ( s . endpoints . Plan )
2015-09-06 22:34:28 +00:00
s . rpcServer . Register ( s . endpoints . Alloc )
2015-11-24 05:47:11 +00:00
s . rpcServer . Register ( s . endpoints . Region )
2016-01-13 18:19:53 +00:00
s . rpcServer . Register ( s . endpoints . Periodic )
2016-02-20 23:50:41 +00:00
s . rpcServer . Register ( s . endpoints . System )
2015-06-03 10:26:50 +00:00
list , err := net . ListenTCP ( "tcp" , s . config . RPCAddr )
if err != nil {
return err
}
s . rpcListener = list
if s . config . RPCAdvertise != nil {
2015-06-03 10:58:00 +00:00
s . rpcAdvertise = s . config . RPCAdvertise
2015-06-03 10:26:50 +00:00
} else {
2015-06-03 10:58:00 +00:00
s . rpcAdvertise = s . rpcListener . Addr ( )
2015-06-03 10:26:50 +00:00
}
// Verify that we have a usable advertise address
2015-06-03 10:58:00 +00:00
addr , ok := s . rpcAdvertise . ( * net . TCPAddr )
2015-06-03 10:26:50 +00:00
if ! ok {
list . Close ( )
return fmt . Errorf ( "RPC advertise address is not a TCP Address: %v" , addr )
}
if addr . IP . IsUnspecified ( ) {
list . Close ( )
return fmt . Errorf ( "RPC advertise address is not advertisable: %v" , addr )
}
// Provide a DC specific wrapper. Raft replication is only
// ever done in the same datacenter, so we can provide it as a constant.
// wrapper := tlsutil.SpecificDC(s.config.Datacenter, tlsWrap)
// TODO: TLS...
2015-06-03 10:58:00 +00:00
s . raftLayer = NewRaftLayer ( s . rpcAdvertise , nil )
2015-06-03 10:26:50 +00:00
return nil
}
2015-06-01 15:49:10 +00:00
// setupRaft is used to setup and initialize Raft
func ( s * Server ) setupRaft ( ) error {
// If we are in bootstrap mode, enable a single node cluster
2015-06-05 21:54:45 +00:00
if s . config . Bootstrap || ( s . config . DevMode && ! s . config . DevDisableBootstrap ) {
2015-06-01 15:49:10 +00:00
s . config . RaftConfig . EnableSingleNode = true
}
// Create the FSM
var err error
2016-01-29 23:31:32 +00:00
s . fsm , err = NewFSM ( s . evalBroker , s . periodicDispatcher , s . blockedEvals , s . config . LogOutput )
2015-06-01 15:49:10 +00:00
if err != nil {
return err
}
2015-06-01 19:11:40 +00:00
// Create a transport layer
2015-08-26 00:36:52 +00:00
trans := raft . NewNetworkTransport ( s . raftLayer , 3 , s . config . RaftTimeout ,
2015-06-01 19:11:40 +00:00
s . config . LogOutput )
s . raftTransport = trans
2015-06-01 15:49:10 +00:00
// Create the backend raft store for logs and stable storage
2015-06-01 19:11:40 +00:00
var log raft . LogStore
var stable raft . StableStore
var snap raft . SnapshotStore
var peers raft . PeerStore
if s . config . DevMode {
store := raft . NewInmemStore ( )
s . raftInmem = store
stable = store
log = store
snap = raft . NewDiscardSnapshotStore ( )
peers = & raft . StaticPeers { }
2015-06-04 11:02:39 +00:00
s . raftPeers = peers
2015-06-01 19:11:40 +00:00
} else {
// Create the base raft path
path := filepath . Join ( s . config . DataDir , raftState )
if err := ensurePath ( path , true ) ; err != nil {
return err
}
2015-06-01 15:49:10 +00:00
2015-06-01 19:11:40 +00:00
// Create the BoltDB backend
store , err := raftboltdb . NewBoltStore ( filepath . Join ( path , "raft.db" ) )
if err != nil {
return err
}
s . raftStore = store
stable = store
2015-06-01 15:49:10 +00:00
2015-06-01 19:11:40 +00:00
// Wrap the store in a LogCache to improve performance
cacheStore , err := raft . NewLogCache ( raftLogCacheSize , store )
if err != nil {
store . Close ( )
return err
}
log = cacheStore
2015-06-01 15:49:10 +00:00
2015-06-01 19:11:40 +00:00
// Create the snapshot store
snapshots , err := raft . NewFileSnapshotStore ( path , snapshotsRetained , s . config . LogOutput )
if err != nil {
if s . raftStore != nil {
s . raftStore . Close ( )
}
return err
}
snap = snapshots
2015-06-01 15:49:10 +00:00
2015-06-01 19:11:40 +00:00
// Setup the peer store
s . raftPeers = raft . NewJSONPeers ( path , trans )
peers = s . raftPeers
}
2015-06-01 15:49:10 +00:00
// Ensure local host is always included if we are in bootstrap mode
2015-06-01 19:11:40 +00:00
if s . config . RaftConfig . EnableSingleNode {
p , err := peers . Peers ( )
2015-06-01 15:49:10 +00:00
if err != nil {
2015-06-01 19:11:40 +00:00
if s . raftStore != nil {
s . raftStore . Close ( )
}
2015-06-01 15:49:10 +00:00
return err
}
2015-06-01 19:11:40 +00:00
if ! raft . PeerContained ( p , trans . LocalAddr ( ) ) {
peers . SetPeers ( raft . AddUniquePeer ( p , trans . LocalAddr ( ) ) )
2015-06-01 15:49:10 +00:00
}
}
// Make sure we set the LogOutput
s . config . RaftConfig . LogOutput = s . config . LogOutput
2015-09-07 17:46:41 +00:00
// Setup the leader channel
leaderCh := make ( chan bool , 1 )
s . config . RaftConfig . NotifyCh = leaderCh
s . leaderCh = leaderCh
2015-06-01 15:49:10 +00:00
// Setup the Raft store
2015-06-01 19:11:40 +00:00
s . raft , err = raft . NewRaft ( s . config . RaftConfig , s . fsm , log , stable ,
snap , peers , trans )
2015-06-01 15:49:10 +00:00
if err != nil {
2015-06-01 19:11:40 +00:00
if s . raftStore != nil {
s . raftStore . Close ( )
}
2015-06-01 15:49:10 +00:00
trans . Close ( )
return err
}
return nil
}
2015-06-03 10:26:50 +00:00
2015-06-03 10:58:00 +00:00
// setupSerf is used to setup and initialize a Serf
func ( s * Server ) setupSerf ( conf * serf . Config , ch chan serf . Event , path string ) ( * serf . Serf , error ) {
conf . Init ( )
conf . NodeName = fmt . Sprintf ( "%s.%s" , s . config . NodeName , s . config . Region )
conf . Tags [ "role" ] = "nomad"
conf . Tags [ "region" ] = s . config . Region
conf . Tags [ "dc" ] = s . config . Datacenter
2016-05-28 01:14:34 +00:00
conf . Tags [ "vsn" ] = fmt . Sprintf ( "%d" , structs . ApiMajorVersion )
conf . Tags [ "mvn" ] = fmt . Sprintf ( "%d" , structs . ApiMinorVersion )
2015-06-03 10:58:00 +00:00
conf . Tags [ "build" ] = s . config . Build
2015-06-03 11:35:48 +00:00
conf . Tags [ "port" ] = fmt . Sprintf ( "%d" , s . rpcAdvertise . ( * net . TCPAddr ) . Port )
2015-09-07 17:46:41 +00:00
if s . config . Bootstrap || ( s . config . DevMode && ! s . config . DevDisableBootstrap ) {
2015-06-03 10:58:00 +00:00
conf . Tags [ "bootstrap" ] = "1"
}
2016-06-16 19:00:15 +00:00
bootstrapExpect := atomic . LoadInt32 ( & s . config . BootstrapExpect )
if bootstrapExpect != 0 {
conf . Tags [ "expect" ] = fmt . Sprintf ( "%d" , bootstrapExpect )
2015-06-03 10:58:00 +00:00
}
conf . MemberlistConfig . LogOutput = s . config . LogOutput
conf . LogOutput = s . config . LogOutput
conf . EventCh = ch
2015-06-04 10:33:12 +00:00
if ! s . config . DevMode {
conf . SnapshotPath = filepath . Join ( s . config . DataDir , path )
if err := ensurePath ( conf . SnapshotPath , false ) ; err != nil {
return nil , err
}
}
2015-06-03 10:58:00 +00:00
conf . ProtocolVersion = protocolVersionMap [ s . config . ProtocolVersion ]
conf . RejoinAfterLeave = true
conf . Merge = & serfMergeDelegate { }
// Until Nomad supports this fully, we disable automatic resolution.
// When enabled, the Serf gossip may just turn off if we are the minority
// node which is rather unexpected.
conf . EnableNameConflictResolution = false
return serf . Create ( conf )
}
2015-07-28 22:12:08 +00:00
// setupWorkers is used to start the scheduling workers
func ( s * Server ) setupWorkers ( ) error {
// Check if all the schedulers are disabled
if len ( s . config . EnabledSchedulers ) == 0 || s . config . NumSchedulers == 0 {
s . logger . Printf ( "[WARN] nomad: no enabled schedulers" )
return nil
}
// Start the workers
for i := 0 ; i < s . config . NumSchedulers ; i ++ {
2015-08-23 17:53:53 +00:00
if w , err := NewWorker ( s ) ; err != nil {
2015-07-28 22:12:08 +00:00
return err
2015-08-23 17:53:53 +00:00
} else {
s . workers = append ( s . workers , w )
2015-07-28 22:12:08 +00:00
}
}
s . logger . Printf ( "[INFO] nomad: starting %d scheduling worker(s) for %v" ,
s . config . NumSchedulers , s . config . EnabledSchedulers )
return nil
}
2015-06-03 11:25:50 +00:00
// numOtherPeers is used to check on the number of known peers
2016-03-17 23:04:09 +00:00
// excluding the local node
2015-06-03 11:25:50 +00:00
func ( s * Server ) numOtherPeers ( ) ( int , error ) {
peers , err := s . raftPeers . Peers ( )
if err != nil {
return 0 , err
}
otherPeers := raft . ExcludePeer ( peers , s . raftTransport . LocalAddr ( ) )
return len ( otherPeers ) , nil
}
2015-06-03 10:26:50 +00:00
// IsLeader checks if this server is the cluster leader
func ( s * Server ) IsLeader ( ) bool {
return s . raft . State ( ) == raft . Leader
}
2015-06-03 11:25:50 +00:00
// Join is used to have Nomad join the gossip ring
// The target address should be another node listening on the
// Serf address
func ( s * Server ) Join ( addrs [ ] string ) ( int , error ) {
return s . serf . Join ( addrs , true )
}
// LocalMember is used to return the local node
func ( c * Server ) LocalMember ( ) serf . Member {
return c . serf . LocalMember ( )
}
// Members is used to return the members of the serf cluster
func ( s * Server ) Members ( ) [ ] serf . Member {
return s . serf . Members ( )
}
// RemoveFailedNode is used to remove a failed node from the cluster
func ( s * Server ) RemoveFailedNode ( node string ) error {
return s . serf . RemoveFailedNode ( node )
}
// KeyManager returns the Serf keyring manager
func ( s * Server ) KeyManager ( ) * serf . KeyManager {
return s . serf . KeyManager ( )
}
// Encrypted determines if gossip is encrypted
func ( s * Server ) Encrypted ( ) bool {
return s . serf . EncryptionEnabled ( )
}
2015-08-29 21:22:24 +00:00
// State returns the underlying state store. This should *not*
// be used to modify state directly.
func ( s * Server ) State ( ) * state . StateStore {
return s . fsm . State ( )
}
2015-11-24 05:47:11 +00:00
// Regions returns the known regions in the cluster.
func ( s * Server ) Regions ( ) [ ] string {
2015-11-24 05:49:03 +00:00
s . peerLock . RLock ( )
defer s . peerLock . RUnlock ( )
2015-11-24 05:47:11 +00:00
regions := make ( [ ] string , 0 , len ( s . peers ) )
for region , _ := range s . peers {
regions = append ( regions , region )
}
2015-11-24 21:15:01 +00:00
sort . Strings ( regions )
2015-11-24 05:47:11 +00:00
return regions
}
2015-06-03 10:26:50 +00:00
// inmemCodec is used to do an RPC call without going over a network
type inmemCodec struct {
method string
args interface { }
reply interface { }
err error
}
func ( i * inmemCodec ) ReadRequestHeader ( req * rpc . Request ) error {
req . ServiceMethod = i . method
return nil
}
func ( i * inmemCodec ) ReadRequestBody ( args interface { } ) error {
sourceValue := reflect . Indirect ( reflect . Indirect ( reflect . ValueOf ( i . args ) ) )
dst := reflect . Indirect ( reflect . Indirect ( reflect . ValueOf ( args ) ) )
dst . Set ( sourceValue )
return nil
}
func ( i * inmemCodec ) WriteResponse ( resp * rpc . Response , reply interface { } ) error {
if resp . Error != "" {
i . err = errors . New ( resp . Error )
return nil
}
sourceValue := reflect . Indirect ( reflect . Indirect ( reflect . ValueOf ( reply ) ) )
dst := reflect . Indirect ( reflect . Indirect ( reflect . ValueOf ( i . reply ) ) )
dst . Set ( sourceValue )
return nil
}
func ( i * inmemCodec ) Close ( ) error {
return nil
}
// RPC is used to make a local RPC call
func ( s * Server ) RPC ( method string , args interface { } , reply interface { } ) error {
codec := & inmemCodec {
method : method ,
args : args ,
reply : reply ,
}
if err := s . rpcServer . ServeRequest ( codec ) ; err != nil {
return err
}
return codec . err
}
// Stats is used to return statistics for debugging and insight
// for various sub-systems
func ( s * Server ) Stats ( ) map [ string ] map [ string ] string {
toString := func ( v uint64 ) string {
return strconv . FormatUint ( v , 10 )
}
stats := map [ string ] map [ string ] string {
"nomad" : map [ string ] string {
"server" : "true" ,
"leader" : fmt . Sprintf ( "%v" , s . IsLeader ( ) ) ,
2016-04-03 23:38:39 +00:00
"leader_addr" : s . raft . Leader ( ) ,
2015-06-03 10:26:50 +00:00
"bootstrap" : fmt . Sprintf ( "%v" , s . config . Bootstrap ) ,
2015-08-17 00:40:35 +00:00
"known_regions" : toString ( uint64 ( len ( s . peers ) ) ) ,
2015-06-03 10:26:50 +00:00
} ,
"raft" : s . raft . Stats ( ) ,
2015-06-03 11:08:04 +00:00
"serf" : s . serf . Stats ( ) ,
2015-08-20 22:29:30 +00:00
"runtime" : RuntimeStats ( ) ,
2015-06-03 10:26:50 +00:00
}
2016-06-10 05:20:18 +00:00
if peers , err := s . raftPeers . Peers ( ) ; err == nil {
2016-04-05 17:02:44 +00:00
stats [ "raft" ] [ "raft_peers" ] = strings . Join ( peers , "," )
} else {
s . logger . Printf ( "[DEBUG] server: error getting raft peers: %v" , err )
}
2015-06-03 10:26:50 +00:00
return stats
}