2015-06-01 15:49:10 +00:00
package nomad
2015-06-04 11:38:41 +00:00
import (
2016-08-22 20:57:27 +00:00
"context"
2015-12-16 22:14:55 +00:00
"errors"
2015-06-04 11:38:41 +00:00
"fmt"
2017-04-12 21:47:59 +00:00
"math/rand"
2017-02-02 23:49:06 +00:00
"net"
2015-06-04 11:38:41 +00:00
"time"
2017-08-13 23:16:59 +00:00
"golang.org/x/time/rate"
2015-06-04 11:38:41 +00:00
"github.com/armon/go-metrics"
2017-02-08 04:31:23 +00:00
memdb "github.com/hashicorp/go-memdb"
2017-08-13 23:16:59 +00:00
"github.com/hashicorp/nomad/nomad/state"
2015-08-05 23:53:54 +00:00
"github.com/hashicorp/nomad/nomad/structs"
2015-06-04 11:38:41 +00:00
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
)
2016-05-25 17:28:25 +00:00
const (
// failedEvalUnblockInterval is the interval at which failed evaluations are
// unblocked to re-enter the scheduler. A failed evaluation occurs under
// high contention when the schedulers plan does not make progress.
failedEvalUnblockInterval = 1 * time . Minute
2017-08-13 23:16:59 +00:00
// replicationRateLimit is used to rate limit how often data is replicated
// between the authoritative region and the local region
replicationRateLimit rate . Limit = 10.0
2016-05-25 17:28:25 +00:00
)
2015-06-01 15:49:10 +00:00
// monitorLeadership is used to monitor if we acquire or lose our role
// as the leader in the Raft cluster. There is some work the leader is
// expected to do, so we must react to changes
func ( s * Server ) monitorLeadership ( ) {
var stopCh chan struct { }
for {
select {
2015-09-07 17:46:41 +00:00
case isLeader := <- s . leaderCh :
2015-06-01 15:49:10 +00:00
if isLeader {
stopCh = make ( chan struct { } )
go s . leaderLoop ( stopCh )
s . logger . Printf ( "[INFO] nomad: cluster leadership acquired" )
} else if stopCh != nil {
close ( stopCh )
stopCh = nil
s . logger . Printf ( "[INFO] nomad: cluster leadership lost" )
}
case <- s . shutdownCh :
return
}
}
}
// leaderLoop runs as long as we are the leader to run various
// maintence activities
func ( s * Server ) leaderLoop ( stopCh chan struct { } ) {
2015-07-24 04:58:38 +00:00
// Ensure we revoke leadership on stepdown
defer s . revokeLeadership ( )
2015-06-05 21:54:45 +00:00
var reconcileCh chan serf . Member
2015-07-24 04:58:38 +00:00
establishedLeader := false
2015-06-05 21:54:45 +00:00
RECONCILE :
// Setup a reconciliation timer
reconcileCh = nil
interval := time . After ( s . config . ReconcileInterval )
// Apply a raft barrier to ensure our FSM is caught up
start := time . Now ( )
barrier := s . raft . Barrier ( 0 )
if err := barrier . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] nomad: failed to wait for barrier: %v" , err )
goto WAIT
}
metrics . MeasureSince ( [ ] string { "nomad" , "leader" , "barrier" } , start )
2015-07-24 04:58:38 +00:00
// Check if we need to handle initial leadership actions
if ! establishedLeader {
2015-08-15 22:15:00 +00:00
if err := s . establishLeadership ( stopCh ) ; err != nil {
2017-08-03 19:37:58 +00:00
s . logger . Printf ( "[ERR] nomad: failed to establish leadership: %v" , err )
2015-07-24 04:58:38 +00:00
goto WAIT
}
establishedLeader = true
}
2015-06-05 21:54:45 +00:00
// Reconcile any missing data
if err := s . reconcile ( ) ; err != nil {
s . logger . Printf ( "[ERR] nomad: failed to reconcile: %v" , err )
goto WAIT
}
// Initial reconcile worked, now we can process the channel
// updates
reconcileCh = s . reconcileCh
WAIT :
2015-06-01 15:49:10 +00:00
// Wait until leadership is lost
for {
select {
case <- stopCh :
return
case <- s . shutdownCh :
return
2015-06-05 21:54:45 +00:00
case <- interval :
goto RECONCILE
case member := <- reconcileCh :
2015-06-04 11:38:41 +00:00
s . reconcileMember ( member )
}
}
}
2015-07-24 04:58:38 +00:00
// establishLeadership is invoked once we become leader and are able
// to invoke an initial barrier. The barrier is used to ensure any
2016-05-15 16:41:34 +00:00
// previously inflight transactions have been committed and that our
2015-07-24 04:58:38 +00:00
// state is up-to-date.
2015-08-15 22:15:00 +00:00
func ( s * Server ) establishLeadership ( stopCh chan struct { } ) error {
2016-02-17 21:50:06 +00:00
// Disable workers to free half the cores for use in the plan queue and
// evaluation broker
if numWorkers := len ( s . workers ) ; numWorkers > 1 {
2016-02-20 21:38:46 +00:00
// Disabling 3/4 of the workers frees CPU for raft and the
// plan applier which uses 1/2 the cores.
for i := 0 ; i < ( 3 * numWorkers / 4 ) ; i ++ {
2016-02-17 21:50:06 +00:00
s . workers [ i ] . SetPause ( true )
}
2015-08-23 20:59:13 +00:00
}
2015-07-27 22:11:42 +00:00
// Enable the plan queue, since we are now the leader
s . planQueue . SetEnabled ( true )
2015-07-27 22:31:09 +00:00
// Start the plan evaluator
go s . planApply ( )
2015-07-27 22:11:42 +00:00
2015-07-24 04:58:38 +00:00
// Enable the eval broker, since we are now the leader
s . evalBroker . SetEnabled ( true )
2016-01-29 23:31:32 +00:00
// Enable the blocked eval tracker, since we are now the leader
s . blockedEvals . SetEnabled ( true )
2017-06-28 22:35:52 +00:00
// Enable the deployment watcher, since we are now the leader
2017-08-31 00:45:32 +00:00
if err := s . deploymentWatcher . SetEnabled ( true , s . State ( ) ) ; err != nil {
2017-06-28 22:35:52 +00:00
return err
}
2015-08-05 23:53:54 +00:00
// Restore the eval broker state
2016-01-29 23:31:32 +00:00
if err := s . restoreEvals ( ) ; err != nil {
2015-08-05 23:53:54 +00:00
return err
}
2015-08-15 22:15:00 +00:00
2016-08-22 20:57:27 +00:00
// Activate the vault client
s . vault . SetActive ( true )
if err := s . restoreRevokingAccessors ( ) ; err != nil {
return err
}
2015-12-04 23:10:08 +00:00
// Enable the periodic dispatcher, since we are now the leader.
2015-12-18 20:26:28 +00:00
s . periodicDispatcher . SetEnabled ( true )
// Restore the periodic dispatcher state
if err := s . restorePeriodicDispatcher ( ) ; err != nil {
return err
}
2015-08-15 22:15:00 +00:00
// Scheduler periodic jobs
go s . schedulePeriodic ( stopCh )
2015-08-16 18:10:18 +00:00
// Reap any failed evaluations
go s . reapFailedEvaluations ( stopCh )
2015-08-23 00:17:13 +00:00
2016-01-31 00:16:13 +00:00
// Reap any duplicate blocked evaluations
go s . reapDupBlockedEvaluations ( stopCh )
2016-05-23 23:27:26 +00:00
// Periodically unblock failed allocations
go s . periodicUnblockFailedEvals ( stopCh )
2015-08-23 00:17:13 +00:00
// Setup the heartbeat timers. This is done both when starting up or when
// a leader fail over happens. Since the timers are maintained by the leader
// node, effectively this means all the timers are renewed at the time of failover.
// The TTL contract is that the session will not be expired before the TTL,
// so expiring it later is allowable.
//
// This MUST be done after the initial barrier to ensure the latest Nodes
// are available to be initialized. Otherwise initialization may use stale
// data.
if err := s . initializeHeartbeatTimers ( ) ; err != nil {
s . logger . Printf ( "[ERR] nomad: heartbeat timer setup failed: %v" , err )
return err
}
2016-08-05 21:40:35 +00:00
// COMPAT 0.4 - 0.4.1
// Reconcile the summaries of the registered jobs. We reconcile summaries
// only if the server is 0.4.1 since summaries are not present in 0.4 they
// might be incorrect after upgrading to 0.4.1 the summaries might not be
// correct
2016-08-06 00:48:37 +00:00
if err := s . reconcileJobSummaries ( ) ; err != nil {
return fmt . Errorf ( "unable to reconcile job summaries: %v" , err )
2016-08-05 21:40:35 +00:00
}
2017-08-13 23:16:59 +00:00
// Start replication of ACLs and Policies if they are enabled,
// and we are not the authoritative region.
if s . config . ACLEnabled && s . config . Region != s . config . AuthoritativeRegion {
go s . replicateACLPolicies ( stopCh )
go s . replicateACLTokens ( stopCh )
}
2015-08-05 23:53:54 +00:00
return nil
}
2015-07-27 22:11:42 +00:00
2016-01-29 23:31:32 +00:00
// restoreEvals is used to restore pending evaluations into the eval broker and
// blocked evaluations into the blocked eval tracker. The broker and blocked
// eval tracker is maintained only by the leader, so it must be restored anytime
// a leadership transition takes place.
func ( s * Server ) restoreEvals ( ) error {
2015-08-05 23:53:54 +00:00
// Get an iterator over every evaluation
2017-02-08 04:31:23 +00:00
ws := memdb . NewWatchSet ( )
iter , err := s . fsm . State ( ) . Evals ( ws )
2015-08-05 23:53:54 +00:00
if err != nil {
return fmt . Errorf ( "failed to get evaluations: %v" , err )
}
for {
raw := iter . Next ( )
if raw == nil {
break
}
eval := raw . ( * structs . Evaluation )
2016-01-29 23:31:32 +00:00
if eval . ShouldEnqueue ( ) {
2016-05-18 18:35:15 +00:00
s . evalBroker . Enqueue ( eval )
2016-01-29 23:31:32 +00:00
} else if eval . ShouldBlock ( ) {
s . blockedEvals . Block ( eval )
2015-08-05 23:53:54 +00:00
}
}
2015-07-24 04:58:38 +00:00
return nil
}
2016-08-22 20:57:27 +00:00
// restoreRevokingAccessors is used to restore Vault accessors that should be
// revoked.
func ( s * Server ) restoreRevokingAccessors ( ) error {
// An accessor should be revoked if its allocation or node is terminal
2017-02-08 04:31:23 +00:00
ws := memdb . NewWatchSet ( )
2016-08-22 20:57:27 +00:00
state := s . fsm . State ( )
2017-02-08 04:31:23 +00:00
iter , err := state . VaultAccessors ( ws )
2016-08-22 20:57:27 +00:00
if err != nil {
return fmt . Errorf ( "failed to get vault accessors: %v" , err )
}
var revoke [ ] * structs . VaultAccessor
for {
raw := iter . Next ( )
if raw == nil {
break
}
va := raw . ( * structs . VaultAccessor )
// Check the allocation
2017-02-08 04:31:23 +00:00
alloc , err := state . AllocByID ( ws , va . AllocID )
2016-08-22 20:57:27 +00:00
if err != nil {
2017-02-28 00:00:19 +00:00
return fmt . Errorf ( "failed to lookup allocation %q: %v" , va . AllocID , err )
2016-08-22 20:57:27 +00:00
}
if alloc == nil || alloc . Terminated ( ) {
// No longer running and should be revoked
revoke = append ( revoke , va )
continue
}
// Check the node
2017-02-08 04:31:23 +00:00
node , err := state . NodeByID ( ws , va . NodeID )
2016-08-22 20:57:27 +00:00
if err != nil {
return fmt . Errorf ( "failed to lookup node %q: %v" , va . NodeID , err )
}
if node == nil || node . TerminalStatus ( ) {
// Node is terminal so any accessor from it should be revoked
revoke = append ( revoke , va )
continue
}
}
if len ( revoke ) != 0 {
if err := s . vault . RevokeTokens ( context . Background ( ) , revoke , true ) ; err != nil {
return fmt . Errorf ( "failed to revoke tokens: %v" , err )
}
}
return nil
}
2015-12-04 23:10:08 +00:00
// restorePeriodicDispatcher is used to restore all periodic jobs into the
// periodic dispatcher. It also determines if a periodic job should have been
// created during the leadership transition and force runs them. The periodic
// dispatcher is maintained only by the leader, so it must be restored anytime a
// leadership transition takes place.
2015-12-18 20:26:28 +00:00
func ( s * Server ) restorePeriodicDispatcher ( ) error {
2017-02-08 04:31:23 +00:00
ws := memdb . NewWatchSet ( )
iter , err := s . fsm . State ( ) . JobsByPeriodic ( ws , true )
2015-12-04 23:10:08 +00:00
if err != nil {
return fmt . Errorf ( "failed to get periodic jobs: %v" , err )
}
now := time . Now ( )
for i := iter . Next ( ) ; i != nil ; i = iter . Next ( ) {
job := i . ( * structs . Job )
2017-08-03 19:37:58 +00:00
// We skip adding parameterized jobs because they themselves aren't
// tracked, only the dispatched children are.
if job . IsParameterized ( ) {
continue
}
2015-12-04 23:10:08 +00:00
s . periodicDispatcher . Add ( job )
2015-12-16 22:14:55 +00:00
// If the periodic job has never been launched before, launch will hold
// the time the periodic job was added. Otherwise it has the last launch
// time of the periodic job.
2017-02-08 04:31:23 +00:00
launch , err := s . fsm . State ( ) . PeriodicLaunchByID ( ws , job . ID )
2015-12-19 01:51:30 +00:00
if err != nil || launch == nil {
return fmt . Errorf ( "failed to get periodic launch time: %v" , err )
2015-12-04 23:10:08 +00:00
}
2015-12-16 22:14:55 +00:00
// nextLaunch is the next launch that should occur.
2017-07-07 23:18:12 +00:00
nextLaunch := job . Periodic . Next ( launch . Launch . In ( job . Periodic . GetLocation ( ) ) )
2015-12-16 22:14:55 +00:00
// We skip force launching the job if there should be no next launch
// (the zero case) or if the next launch time is in the future. If it is
// in the future, it will be handled by the periodic dispatcher.
if nextLaunch . IsZero ( ) || ! nextLaunch . Before ( now ) {
2015-12-04 23:10:08 +00:00
continue
}
2016-01-13 18:19:53 +00:00
if _ , err := s . periodicDispatcher . ForceRun ( job . ID ) ; err != nil {
2015-12-16 22:14:55 +00:00
msg := fmt . Sprintf ( "force run of periodic job %q failed: %v" , job . ID , err )
s . logger . Printf ( "[ERR] nomad.periodic: %s" , msg )
return errors . New ( msg )
2015-12-04 23:10:08 +00:00
}
s . logger . Printf ( "[DEBUG] nomad.periodic: periodic job %q force" +
" run during leadership establishment" , job . ID )
}
2015-12-18 20:26:28 +00:00
return nil
}
2015-08-15 22:15:00 +00:00
// schedulePeriodic is used to do periodic job dispatch while we are leader
func ( s * Server ) schedulePeriodic ( stopCh chan struct { } ) {
evalGC := time . NewTicker ( s . config . EvalGCInterval )
defer evalGC . Stop ( )
2015-09-07 18:01:29 +00:00
nodeGC := time . NewTicker ( s . config . NodeGCInterval )
defer nodeGC . Stop ( )
2015-12-15 03:20:57 +00:00
jobGC := time . NewTicker ( s . config . JobGCInterval )
defer jobGC . Stop ( )
2015-08-15 22:15:00 +00:00
2016-06-22 16:11:25 +00:00
// getLatest grabs the latest index from the state store. It returns true if
// the index was retrieved successfully.
getLatest := func ( ) ( uint64 , bool ) {
2016-06-22 16:33:15 +00:00
snapshotIndex , err := s . fsm . State ( ) . LatestIndex ( )
2016-06-22 16:04:22 +00:00
if err != nil {
2016-06-22 16:33:15 +00:00
s . logger . Printf ( "[ERR] nomad: failed to determine state store's index: %v" , err )
2016-06-22 16:11:25 +00:00
return 0 , false
2016-06-22 16:04:22 +00:00
}
2016-06-22 16:11:25 +00:00
return snapshotIndex , true
}
for {
2015-08-15 22:15:00 +00:00
select {
case <- evalGC . C :
2016-06-22 16:11:25 +00:00
if index , ok := getLatest ( ) ; ok {
s . evalBroker . Enqueue ( s . coreJobEval ( structs . CoreJobEvalGC , index ) )
}
2015-09-07 18:01:29 +00:00
case <- nodeGC . C :
2016-06-22 16:11:25 +00:00
if index , ok := getLatest ( ) ; ok {
s . evalBroker . Enqueue ( s . coreJobEval ( structs . CoreJobNodeGC , index ) )
}
2015-12-15 03:20:57 +00:00
case <- jobGC . C :
2016-06-22 16:11:25 +00:00
if index , ok := getLatest ( ) ; ok {
s . evalBroker . Enqueue ( s . coreJobEval ( structs . CoreJobJobGC , index ) )
}
2015-08-15 22:15:00 +00:00
case <- stopCh :
return
}
}
}
2015-08-15 23:07:50 +00:00
// coreJobEval returns an evaluation for a core job
2016-06-22 16:04:22 +00:00
func ( s * Server ) coreJobEval ( job string , modifyIndex uint64 ) * structs . Evaluation {
2015-08-15 23:07:50 +00:00
return & structs . Evaluation {
2015-09-07 22:23:03 +00:00
ID : structs . GenerateUUID ( ) ,
2015-08-15 22:15:00 +00:00
Priority : structs . CoreJobPriority ,
Type : structs . JobTypeCore ,
TriggeredBy : structs . EvalTriggerScheduled ,
JobID : job ,
Status : structs . EvalStatusPending ,
2016-06-22 16:04:22 +00:00
ModifyIndex : modifyIndex ,
2015-08-15 22:15:00 +00:00
}
}
2015-08-16 18:10:18 +00:00
// reapFailedEvaluations is used to reap evaluations that
// have reached their delivery limit and should be failed
func ( s * Server ) reapFailedEvaluations ( stopCh chan struct { } ) {
for {
select {
case <- stopCh :
return
default :
// Scan for a failed evaluation
eval , token , err := s . evalBroker . Dequeue ( [ ] string { failedQueue } , time . Second )
if err != nil {
return
}
if eval == nil {
continue
}
// Update the status to failed
2017-04-14 20:19:14 +00:00
updateEval := eval . Copy ( )
updateEval . Status = structs . EvalStatusFailed
updateEval . StatusDescription = fmt . Sprintf ( "evaluation reached delivery limit (%d)" , s . config . EvalDeliveryLimit )
s . logger . Printf ( "[WARN] nomad: eval %#v reached delivery limit, marking as failed" , updateEval )
2015-08-16 18:10:18 +00:00
2017-04-12 21:47:59 +00:00
// Create a follow-up evaluation that will be used to retry the
// scheduling for the job after the cluster is hopefully more stable
// due to the fairly large backoff.
2017-04-14 22:24:55 +00:00
followupEvalWait := s . config . EvalFailedFollowupBaselineDelay +
time . Duration ( rand . Int63n ( int64 ( s . config . EvalFailedFollowupDelayRange ) ) )
2017-04-12 21:47:59 +00:00
followupEval := eval . CreateFailedFollowUpEval ( followupEvalWait )
2015-08-16 18:10:18 +00:00
// Update via Raft
req := structs . EvalUpdateRequest {
2017-04-14 20:19:14 +00:00
Evals : [ ] * structs . Evaluation { updateEval , followupEval } ,
2015-08-16 18:10:18 +00:00
}
if _ , _ , err := s . raftApply ( structs . EvalUpdateRequestType , & req ) ; err != nil {
2017-04-14 20:19:14 +00:00
s . logger . Printf ( "[ERR] nomad: failed to update failed eval %#v and create a follow-up: %v" , updateEval , err )
2015-08-16 18:10:18 +00:00
continue
}
// Ack completion
s . evalBroker . Ack ( eval . ID , token )
}
}
}
2016-01-31 00:16:13 +00:00
// reapDupBlockedEvaluations is used to reap duplicate blocked evaluations and
// should be cancelled.
func ( s * Server ) reapDupBlockedEvaluations ( stopCh chan struct { } ) {
for {
select {
case <- stopCh :
return
default :
// Scan for duplicate blocked evals.
dups := s . blockedEvals . GetDuplicates ( time . Second )
if dups == nil {
continue
}
cancel := make ( [ ] * structs . Evaluation , len ( dups ) )
for i , dup := range dups {
// Update the status to cancelled
newEval := dup . Copy ( )
newEval . Status = structs . EvalStatusCancelled
newEval . StatusDescription = fmt . Sprintf ( "existing blocked evaluation exists for job %q" , newEval . JobID )
cancel [ i ] = newEval
}
// Update via Raft
req := structs . EvalUpdateRequest {
Evals : cancel ,
}
if _ , _ , err := s . raftApply ( structs . EvalUpdateRequestType , & req ) ; err != nil {
s . logger . Printf ( "[ERR] nomad: failed to update duplicate evals %#v: %v" , cancel , err )
continue
}
}
}
}
2016-05-23 23:27:26 +00:00
// periodicUnblockFailedEvals periodically unblocks failed, blocked evaluations.
func ( s * Server ) periodicUnblockFailedEvals ( stopCh chan struct { } ) {
2016-07-06 00:08:58 +00:00
ticker := time . NewTicker ( failedEvalUnblockInterval )
2016-05-25 17:28:25 +00:00
defer ticker . Stop ( )
2016-05-23 23:27:26 +00:00
for {
select {
case <- stopCh :
return
case <- ticker . C :
// Unblock the failed allocations
s . blockedEvals . UnblockFailed ( )
}
}
}
2015-07-24 04:58:38 +00:00
// revokeLeadership is invoked once we step down as leader.
// This is used to cleanup any state that may be specific to a leader.
func ( s * Server ) revokeLeadership ( ) error {
2015-07-27 22:11:42 +00:00
// Disable the plan queue, since we are no longer leader
s . planQueue . SetEnabled ( false )
2015-07-24 04:58:38 +00:00
// Disable the eval broker, since it is only useful as a leader
s . evalBroker . SetEnabled ( false )
2015-08-23 00:17:13 +00:00
2016-01-31 00:21:37 +00:00
// Disable the blocked eval tracker, since it is only useful as a leader
s . blockedEvals . SetEnabled ( false )
2015-12-18 20:26:28 +00:00
// Disable the periodic dispatcher, since it is only useful as a leader
s . periodicDispatcher . SetEnabled ( false )
2016-08-22 20:57:27 +00:00
// Disable the Vault client as it is only useful as a leader.
s . vault . SetActive ( false )
2017-06-28 22:35:52 +00:00
// Disable the deployment watcher as it is only useful as a leader.
2017-08-31 00:45:32 +00:00
if err := s . deploymentWatcher . SetEnabled ( false , nil ) ; err != nil {
2017-06-28 22:35:52 +00:00
return err
}
2015-08-23 00:17:13 +00:00
// Clear the heartbeat timers on either shutdown or step down,
// since we are no longer responsible for TTL expirations.
if err := s . clearAllHeartbeatTimers ( ) ; err != nil {
s . logger . Printf ( "[ERR] nomad: clearing heartbeat timers failed: %v" , err )
return err
}
2015-08-23 20:59:13 +00:00
// Unpause our worker if we paused previously
if len ( s . workers ) > 1 {
2016-02-19 22:49:43 +00:00
for i := 0 ; i < len ( s . workers ) / 2 ; i ++ {
2016-02-17 21:50:06 +00:00
s . workers [ i ] . SetPause ( false )
}
2015-08-23 20:59:13 +00:00
}
2015-07-24 04:58:38 +00:00
return nil
}
2015-06-05 21:54:45 +00:00
// reconcile is used to reconcile the differences between Serf
// membership and what is reflected in our strongly consistent store.
func ( s * Server ) reconcile ( ) error {
defer metrics . MeasureSince ( [ ] string { "nomad" , "leader" , "reconcile" } , time . Now ( ) )
members := s . serf . Members ( )
for _ , member := range members {
if err := s . reconcileMember ( member ) ; err != nil {
return err
}
}
return nil
}
2015-06-04 11:38:41 +00:00
// reconcileMember is used to do an async reconcile of a single serf member
func ( s * Server ) reconcileMember ( member serf . Member ) error {
// Check if this is a member we should handle
valid , parts := isNomadServer ( member )
if ! valid || parts . Region != s . config . Region {
return nil
}
defer metrics . MeasureSince ( [ ] string { "nomad" , "leader" , "reconcileMember" } , time . Now ( ) )
// Do not reconcile ourself
if member . Name == fmt . Sprintf ( "%s.%s" , s . config . NodeName , s . config . Region ) {
return nil
}
var err error
switch member . Status {
case serf . StatusAlive :
err = s . addRaftPeer ( member , parts )
case serf . StatusLeft , StatusReap :
err = s . removeRaftPeer ( member , parts )
}
if err != nil {
s . logger . Printf ( "[ERR] nomad: failed to reconcile member: %v: %v" ,
member , err )
return err
}
return nil
}
2016-08-05 21:40:35 +00:00
// reconcileJobSummaries reconciles the summaries of all the jobs registered in
// the system
// COMPAT 0.4 -> 0.4.1
func ( s * Server ) reconcileJobSummaries ( ) error {
index , err := s . fsm . state . LatestIndex ( )
if err != nil {
return fmt . Errorf ( "unable to read latest index: %v" , err )
}
s . logger . Printf ( "[DEBUG] leader: reconciling job summaries at index: %v" , index )
args := & structs . GenericResponse { }
msg := structs . ReconcileJobSummariesRequestType | structs . IgnoreUnknownTypeFlag
if _ , _ , err = s . raftApply ( msg , args ) ; err != nil {
return fmt . Errorf ( "reconciliation of job summaries failed: %v" , err )
}
return nil
}
2015-06-04 11:38:41 +00:00
// addRaftPeer is used to add a new Raft peer when a Nomad server joins
func ( s * Server ) addRaftPeer ( m serf . Member , parts * serverParts ) error {
2017-02-02 23:49:06 +00:00
// Do not join ourselfs
if m . Name == s . config . NodeName {
s . logger . Printf ( "[DEBUG] nomad: adding self (%q) as raft peer skipped" , m . Name )
return nil
}
2015-06-04 11:38:41 +00:00
// Check for possibility of multiple bootstrap nodes
if parts . Bootstrap {
members := s . serf . Members ( )
for _ , member := range members {
valid , p := isNomadServer ( member )
if valid && member . Name != m . Name && p . Bootstrap {
s . logger . Printf ( "[ERR] nomad: '%v' and '%v' are both in bootstrap mode. Only one node should be in bootstrap mode, not adding Raft peer." , m . Name , member . Name )
return nil
}
2015-06-01 15:49:10 +00:00
}
}
2015-06-04 11:38:41 +00:00
2017-02-02 23:49:06 +00:00
// TODO (alexdadgar) - This will need to be changed once we support node IDs.
addr := ( & net . TCPAddr { IP : m . Addr , Port : parts . Port } ) . String ( )
// See if it's already in the configuration. It's harmless to re-add it
// but we want to avoid doing that if possible to prevent useless Raft
// log entries.
configFuture := s . raft . GetConfiguration ( )
if err := configFuture . Error ( ) ; err != nil {
2017-02-03 00:07:15 +00:00
s . logger . Printf ( "[ERR] nomad: failed to get raft configuration: %v" , err )
2017-02-02 23:49:06 +00:00
return err
}
for _ , server := range configFuture . Configuration ( ) . Servers {
if server . Address == raft . ServerAddress ( addr ) {
return nil
}
}
2015-06-04 11:38:41 +00:00
// Attempt to add as a peer
2017-02-02 23:49:06 +00:00
addFuture := s . raft . AddPeer ( raft . ServerAddress ( addr ) )
if err := addFuture . Error ( ) ; err != nil {
2015-06-04 11:38:41 +00:00
s . logger . Printf ( "[ERR] nomad: failed to add raft peer: %v" , err )
return err
2015-06-05 21:54:45 +00:00
} else if err == nil {
s . logger . Printf ( "[INFO] nomad: added raft peer: %v" , parts )
2015-06-04 11:38:41 +00:00
}
return nil
}
// removeRaftPeer is used to remove a Raft peer when a Nomad server leaves
// or is reaped
func ( s * Server ) removeRaftPeer ( m serf . Member , parts * serverParts ) error {
2017-02-02 23:49:06 +00:00
// TODO (alexdadgar) - This will need to be changed once we support node IDs.
addr := ( & net . TCPAddr { IP : m . Addr , Port : parts . Port } ) . String ( )
// See if it's already in the configuration. It's harmless to re-remove it
// but we want to avoid doing that if possible to prevent useless Raft
// log entries.
configFuture := s . raft . GetConfiguration ( )
if err := configFuture . Error ( ) ; err != nil {
2017-02-03 00:07:15 +00:00
s . logger . Printf ( "[ERR] nomad: failed to get raft configuration: %v" , err )
2017-02-02 23:49:06 +00:00
return err
}
for _ , server := range configFuture . Configuration ( ) . Servers {
if server . Address == raft . ServerAddress ( addr ) {
goto REMOVE
}
}
return nil
REMOVE :
// Attempt to remove as a peer.
future := s . raft . RemovePeer ( raft . ServerAddress ( addr ) )
if err := future . Error ( ) ; err != nil {
2015-06-04 11:38:41 +00:00
s . logger . Printf ( "[ERR] nomad: failed to remove raft peer '%v': %v" ,
parts , err )
return err
}
return nil
2015-06-01 15:49:10 +00:00
}
2017-08-13 23:16:59 +00:00
// replicateACLPolicies is used to replicate ACL policies from
// the authoritative region to this region.
func ( s * Server ) replicateACLPolicies ( stopCh chan struct { } ) {
2017-08-19 22:30:01 +00:00
req := structs . ACLPolicyListRequest {
QueryOptions : structs . QueryOptions {
2017-08-24 16:53:30 +00:00
Region : s . config . AuthoritativeRegion ,
AllowStale : true ,
2017-08-19 22:30:01 +00:00
} ,
}
2017-08-13 23:16:59 +00:00
limiter := rate . NewLimiter ( replicationRateLimit , int ( replicationRateLimit ) )
2017-08-19 22:30:01 +00:00
s . logger . Printf ( "[DEBUG] nomad: starting ACL policy replication from authoritative region %q" , req . Region )
2017-08-13 23:16:59 +00:00
START :
for {
select {
case <- stopCh :
return
default :
// Rate limit how often we attempt replication
limiter . Wait ( context . Background ( ) )
// Fetch the list of policies
var resp structs . ACLPolicyListResponse
2017-08-21 03:51:30 +00:00
req . SecretID = s . ReplicationToken ( )
2017-08-13 23:16:59 +00:00
err := s . forwardRegion ( s . config . AuthoritativeRegion ,
"ACL.ListPolicies" , & req , & resp )
if err != nil {
s . logger . Printf ( "[ERR] nomad: failed to fetch policies from authoritative region: %v" , err )
goto ERR_WAIT
}
// Perform a two-way diff
delete , update := diffACLPolicies ( s . State ( ) , req . MinQueryIndex , resp . Policies )
// Delete policies that should not exist
if len ( delete ) > 0 {
args := & structs . ACLPolicyDeleteRequest {
Names : delete ,
}
_ , _ , err := s . raftApply ( structs . ACLPolicyDeleteRequestType , args )
if err != nil {
s . logger . Printf ( "[ERR] nomad: failed to delete policies: %v" , err )
goto ERR_WAIT
}
}
// Fetch any outdated policies
var fetched [ ] * structs . ACLPolicy
2017-08-20 22:30:18 +00:00
if len ( update ) > 0 {
req := structs . ACLPolicySetRequest {
Names : update ,
QueryOptions : structs . QueryOptions {
2017-08-24 16:57:14 +00:00
Region : s . config . AuthoritativeRegion ,
SecretID : s . ReplicationToken ( ) ,
AllowStale : true ,
MinQueryIndex : resp . Index - 1 ,
2017-08-20 22:30:18 +00:00
} ,
2017-08-13 23:16:59 +00:00
}
2017-08-20 22:30:18 +00:00
var reply structs . ACLPolicySetResponse
if err := s . forwardRegion ( s . config . AuthoritativeRegion ,
"ACL.GetPolicies" , & req , & reply ) ; err != nil {
s . logger . Printf ( "[ERR] nomad: failed to fetch policies from authoritative region: %v" , err )
2017-08-13 23:16:59 +00:00
goto ERR_WAIT
}
2017-08-20 22:30:18 +00:00
for _ , policy := range reply . Policies {
fetched = append ( fetched , policy )
2017-08-13 23:16:59 +00:00
}
}
// Update local policies
if len ( fetched ) > 0 {
args := & structs . ACLPolicyUpsertRequest {
Policies : fetched ,
}
_ , _ , err := s . raftApply ( structs . ACLPolicyUpsertRequestType , args )
if err != nil {
s . logger . Printf ( "[ERR] nomad: failed to update policies: %v" , err )
goto ERR_WAIT
}
}
// Update the minimum query index, blocks until there
// is a change.
req . MinQueryIndex = resp . Index
}
}
ERR_WAIT :
select {
case <- time . After ( s . config . ReplicationBackoff ) :
goto START
case <- stopCh :
return
}
}
// diffACLPolicies is used to perform a two-way diff between the local
// policies and the remote policies to determine which policies need to
// be deleted or updated.
func diffACLPolicies ( state * state . StateStore , minIndex uint64 , remoteList [ ] * structs . ACLPolicyListStub ) ( delete [ ] string , update [ ] string ) {
// Construct a set of the local and remote policies
local := make ( map [ string ] struct { } )
remote := make ( map [ string ] struct { } )
// Add all the local policies
iter , err := state . ACLPolicies ( nil )
if err != nil {
panic ( "failed to iterate local policies" )
}
for {
raw := iter . Next ( )
if raw == nil {
break
}
policy := raw . ( * structs . ACLPolicy )
local [ policy . Name ] = struct { } { }
}
// Iterate over the remote policies
for _ , rp := range remoteList {
remote [ rp . Name ] = struct { } { }
// Check if the policy is missing locally
if _ , ok := local [ rp . Name ] ; ! ok {
update = append ( update , rp . Name )
// Check if policy is newer remotely
// TODO: Eventually would be nice to use a policy
// hash or something to avoid fetching policies that
// are unchanged.
} else if rp . ModifyIndex > minIndex {
update = append ( update , rp . Name )
}
}
// Check if policy should be deleted
for lp := range local {
if _ , ok := remote [ lp ] ; ! ok {
delete = append ( delete , lp )
}
}
return
}
// replicateACLTokens is used to replicate global ACL tokens from
// the authoritative region to this region.
func ( s * Server ) replicateACLTokens ( stopCh chan struct { } ) {
2017-08-13 23:45:13 +00:00
req := structs . ACLTokenListRequest {
GlobalOnly : true ,
2017-08-19 22:30:01 +00:00
QueryOptions : structs . QueryOptions {
2017-08-24 16:53:30 +00:00
Region : s . config . AuthoritativeRegion ,
AllowStale : true ,
2017-08-19 22:30:01 +00:00
} ,
2017-08-13 23:45:13 +00:00
}
limiter := rate . NewLimiter ( replicationRateLimit , int ( replicationRateLimit ) )
2017-08-19 22:30:01 +00:00
s . logger . Printf ( "[DEBUG] nomad: starting ACL token replication from authoritative region %q" , req . Region )
2017-08-13 23:45:13 +00:00
START :
2017-08-13 23:16:59 +00:00
for {
select {
case <- stopCh :
return
2017-08-13 23:45:13 +00:00
default :
// Rate limit how often we attempt replication
limiter . Wait ( context . Background ( ) )
// Fetch the list of tokens
var resp structs . ACLTokenListResponse
2017-08-21 03:51:30 +00:00
req . SecretID = s . ReplicationToken ( )
2017-08-13 23:45:13 +00:00
err := s . forwardRegion ( s . config . AuthoritativeRegion ,
"ACL.ListTokens" , & req , & resp )
if err != nil {
s . logger . Printf ( "[ERR] nomad: failed to fetch tokens from authoritative region: %v" , err )
goto ERR_WAIT
}
// Perform a two-way diff
delete , update := diffACLTokens ( s . State ( ) , req . MinQueryIndex , resp . Tokens )
// Delete tokens that should not exist
if len ( delete ) > 0 {
args := & structs . ACLTokenDeleteRequest {
AccessorIDs : delete ,
}
_ , _ , err := s . raftApply ( structs . ACLTokenDeleteRequestType , args )
if err != nil {
s . logger . Printf ( "[ERR] nomad: failed to delete tokens: %v" , err )
goto ERR_WAIT
}
}
2017-08-19 22:30:01 +00:00
// Fetch any outdated policies.
2017-08-13 23:45:13 +00:00
var fetched [ ] * structs . ACLToken
2017-08-20 22:30:18 +00:00
if len ( update ) > 0 {
req := structs . ACLTokenSetRequest {
AccessorIDS : update ,
QueryOptions : structs . QueryOptions {
2017-08-24 16:57:14 +00:00
Region : s . config . AuthoritativeRegion ,
SecretID : s . ReplicationToken ( ) ,
AllowStale : true ,
MinQueryIndex : resp . Index - 1 ,
2017-08-20 22:30:18 +00:00
} ,
2017-08-13 23:45:13 +00:00
}
2017-08-20 22:30:18 +00:00
var reply structs . ACLTokenSetResponse
if err := s . forwardRegion ( s . config . AuthoritativeRegion ,
"ACL.GetTokens" , & req , & reply ) ; err != nil {
s . logger . Printf ( "[ERR] nomad: failed to fetch tokens from authoritative region: %v" , err )
2017-08-13 23:45:13 +00:00
goto ERR_WAIT
}
2017-08-20 22:30:18 +00:00
for _ , token := range reply . Tokens {
fetched = append ( fetched , token )
2017-08-13 23:45:13 +00:00
}
}
2017-08-19 22:30:01 +00:00
// Update local tokens
2017-08-13 23:45:13 +00:00
if len ( fetched ) > 0 {
args := & structs . ACLTokenUpsertRequest {
Tokens : fetched ,
}
_ , _ , err := s . raftApply ( structs . ACLTokenUpsertRequestType , args )
if err != nil {
s . logger . Printf ( "[ERR] nomad: failed to update tokens: %v" , err )
goto ERR_WAIT
}
}
// Update the minimum query index, blocks until there
// is a change.
req . MinQueryIndex = resp . Index
}
}
ERR_WAIT :
select {
case <- time . After ( s . config . ReplicationBackoff ) :
goto START
case <- stopCh :
return
}
}
// diffACLTokens is used to perform a two-way diff between the local
// tokens and the remote tokens to determine which tokens need to
// be deleted or updated.
func diffACLTokens ( state * state . StateStore , minIndex uint64 , remoteList [ ] * structs . ACLTokenListStub ) ( delete [ ] string , update [ ] string ) {
// Construct a set of the local and remote policies
local := make ( map [ string ] struct { } )
remote := make ( map [ string ] struct { } )
// Add all the local global tokens
iter , err := state . ACLTokensByGlobal ( nil , true )
if err != nil {
panic ( "failed to iterate local tokens" )
}
for {
raw := iter . Next ( )
if raw == nil {
break
}
token := raw . ( * structs . ACLToken )
local [ token . AccessorID ] = struct { } { }
}
// Iterate over the remote tokens
for _ , rp := range remoteList {
remote [ rp . AccessorID ] = struct { } { }
// Check if the token is missing locally
if _ , ok := local [ rp . AccessorID ] ; ! ok {
update = append ( update , rp . AccessorID )
// Check if token is newer remotely
// TODO: Eventually would be nice to use an object
// hash or something to avoid fetching tokens that
// are unchanged.
} else if rp . ModifyIndex > minIndex {
update = append ( update , rp . AccessorID )
}
}
// Check if local token should be deleted
for lp := range local {
if _ , ok := remote [ lp ] ; ! ok {
delete = append ( delete , lp )
2017-08-13 23:16:59 +00:00
}
}
2017-08-13 23:45:13 +00:00
return
2017-08-13 23:16:59 +00:00
}