2015-06-01 15:49:10 +00:00
package nomad
2015-06-04 11:38:41 +00:00
import (
2015-12-16 22:14:55 +00:00
"errors"
2015-06-04 11:38:41 +00:00
"fmt"
"time"
"github.com/armon/go-metrics"
2015-08-05 23:53:54 +00:00
"github.com/hashicorp/nomad/nomad/structs"
2015-06-04 11:38:41 +00:00
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
)
2016-05-25 17:28:25 +00:00
const (
// failedEvalUnblockInterval is the interval at which failed evaluations are
// unblocked to re-enter the scheduler. A failed evaluation occurs under
// high contention when the schedulers plan does not make progress.
failedEvalUnblockInterval = 1 * time . Minute
)
2015-06-01 15:49:10 +00:00
// monitorLeadership is used to monitor if we acquire or lose our role
// as the leader in the Raft cluster. There is some work the leader is
// expected to do, so we must react to changes
func ( s * Server ) monitorLeadership ( ) {
var stopCh chan struct { }
for {
select {
2015-09-07 17:46:41 +00:00
case isLeader := <- s . leaderCh :
2015-06-01 15:49:10 +00:00
if isLeader {
stopCh = make ( chan struct { } )
go s . leaderLoop ( stopCh )
s . logger . Printf ( "[INFO] nomad: cluster leadership acquired" )
} else if stopCh != nil {
close ( stopCh )
stopCh = nil
s . logger . Printf ( "[INFO] nomad: cluster leadership lost" )
}
case <- s . shutdownCh :
return
}
}
}
// leaderLoop runs as long as we are the leader to run various
// maintence activities
func ( s * Server ) leaderLoop ( stopCh chan struct { } ) {
2015-07-24 04:58:38 +00:00
// Ensure we revoke leadership on stepdown
defer s . revokeLeadership ( )
2015-06-05 21:54:45 +00:00
var reconcileCh chan serf . Member
2015-07-24 04:58:38 +00:00
establishedLeader := false
2015-06-05 21:54:45 +00:00
RECONCILE :
// Setup a reconciliation timer
reconcileCh = nil
interval := time . After ( s . config . ReconcileInterval )
// Apply a raft barrier to ensure our FSM is caught up
start := time . Now ( )
barrier := s . raft . Barrier ( 0 )
if err := barrier . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] nomad: failed to wait for barrier: %v" , err )
goto WAIT
}
metrics . MeasureSince ( [ ] string { "nomad" , "leader" , "barrier" } , start )
2015-07-24 04:58:38 +00:00
// Check if we need to handle initial leadership actions
if ! establishedLeader {
2015-08-15 22:15:00 +00:00
if err := s . establishLeadership ( stopCh ) ; err != nil {
2015-07-24 04:58:38 +00:00
s . logger . Printf ( "[ERR] nomad: failed to establish leadership: %v" ,
err )
goto WAIT
}
establishedLeader = true
}
2015-06-05 21:54:45 +00:00
// Reconcile any missing data
if err := s . reconcile ( ) ; err != nil {
s . logger . Printf ( "[ERR] nomad: failed to reconcile: %v" , err )
goto WAIT
}
// Initial reconcile worked, now we can process the channel
// updates
reconcileCh = s . reconcileCh
WAIT :
2015-06-01 15:49:10 +00:00
// Wait until leadership is lost
for {
select {
case <- stopCh :
return
case <- s . shutdownCh :
return
2015-06-05 21:54:45 +00:00
case <- interval :
goto RECONCILE
case member := <- reconcileCh :
2015-06-04 11:38:41 +00:00
s . reconcileMember ( member )
}
}
}
2015-07-24 04:58:38 +00:00
// establishLeadership is invoked once we become leader and are able
// to invoke an initial barrier. The barrier is used to ensure any
2016-05-15 16:41:34 +00:00
// previously inflight transactions have been committed and that our
2015-07-24 04:58:38 +00:00
// state is up-to-date.
2015-08-15 22:15:00 +00:00
func ( s * Server ) establishLeadership ( stopCh chan struct { } ) error {
2016-02-17 21:50:06 +00:00
// Disable workers to free half the cores for use in the plan queue and
// evaluation broker
if numWorkers := len ( s . workers ) ; numWorkers > 1 {
2016-02-20 21:38:46 +00:00
// Disabling 3/4 of the workers frees CPU for raft and the
// plan applier which uses 1/2 the cores.
for i := 0 ; i < ( 3 * numWorkers / 4 ) ; i ++ {
2016-02-17 21:50:06 +00:00
s . workers [ i ] . SetPause ( true )
}
2015-08-23 20:59:13 +00:00
}
2015-07-27 22:11:42 +00:00
// Enable the plan queue, since we are now the leader
s . planQueue . SetEnabled ( true )
2015-07-27 22:31:09 +00:00
// Start the plan evaluator
go s . planApply ( )
2015-07-27 22:11:42 +00:00
2015-07-24 04:58:38 +00:00
// Enable the eval broker, since we are now the leader
s . evalBroker . SetEnabled ( true )
2016-01-29 23:31:32 +00:00
// Enable the blocked eval tracker, since we are now the leader
s . blockedEvals . SetEnabled ( true )
2015-08-05 23:53:54 +00:00
// Restore the eval broker state
2016-01-29 23:31:32 +00:00
if err := s . restoreEvals ( ) ; err != nil {
2015-08-05 23:53:54 +00:00
return err
}
2015-08-15 22:15:00 +00:00
2015-12-04 23:10:08 +00:00
// Enable the periodic dispatcher, since we are now the leader.
2015-12-18 20:26:28 +00:00
s . periodicDispatcher . SetEnabled ( true )
2015-12-04 23:10:08 +00:00
s . periodicDispatcher . Start ( )
2015-12-18 20:26:28 +00:00
// Restore the periodic dispatcher state
if err := s . restorePeriodicDispatcher ( ) ; err != nil {
return err
}
2015-08-15 22:15:00 +00:00
// Scheduler periodic jobs
go s . schedulePeriodic ( stopCh )
2015-08-16 18:10:18 +00:00
// Reap any failed evaluations
go s . reapFailedEvaluations ( stopCh )
2015-08-23 00:17:13 +00:00
2016-01-31 00:16:13 +00:00
// Reap any duplicate blocked evaluations
go s . reapDupBlockedEvaluations ( stopCh )
2016-05-23 23:27:26 +00:00
// Periodically unblock failed allocations
go s . periodicUnblockFailedEvals ( stopCh )
2015-08-23 00:17:13 +00:00
// Setup the heartbeat timers. This is done both when starting up or when
// a leader fail over happens. Since the timers are maintained by the leader
// node, effectively this means all the timers are renewed at the time of failover.
// The TTL contract is that the session will not be expired before the TTL,
// so expiring it later is allowable.
//
// This MUST be done after the initial barrier to ensure the latest Nodes
// are available to be initialized. Otherwise initialization may use stale
// data.
if err := s . initializeHeartbeatTimers ( ) ; err != nil {
s . logger . Printf ( "[ERR] nomad: heartbeat timer setup failed: %v" , err )
return err
}
2016-08-05 21:40:35 +00:00
// COMPAT 0.4 - 0.4.1
// Reconcile the summaries of the registered jobs. We reconcile summaries
// only if the server is 0.4.1 since summaries are not present in 0.4 they
// might be incorrect after upgrading to 0.4.1 the summaries might not be
// correct
2016-08-06 00:48:37 +00:00
if err := s . reconcileJobSummaries ( ) ; err != nil {
return fmt . Errorf ( "unable to reconcile job summaries: %v" , err )
2016-08-05 21:40:35 +00:00
}
2015-08-05 23:53:54 +00:00
return nil
}
2015-07-27 22:11:42 +00:00
2016-01-29 23:31:32 +00:00
// restoreEvals is used to restore pending evaluations into the eval broker and
// blocked evaluations into the blocked eval tracker. The broker and blocked
// eval tracker is maintained only by the leader, so it must be restored anytime
// a leadership transition takes place.
func ( s * Server ) restoreEvals ( ) error {
2015-08-05 23:53:54 +00:00
// Get an iterator over every evaluation
iter , err := s . fsm . State ( ) . Evals ( )
if err != nil {
return fmt . Errorf ( "failed to get evaluations: %v" , err )
}
for {
raw := iter . Next ( )
if raw == nil {
break
}
eval := raw . ( * structs . Evaluation )
2016-01-29 23:31:32 +00:00
if eval . ShouldEnqueue ( ) {
2016-05-18 18:35:15 +00:00
s . evalBroker . Enqueue ( eval )
2016-01-29 23:31:32 +00:00
} else if eval . ShouldBlock ( ) {
s . blockedEvals . Block ( eval )
2015-08-05 23:53:54 +00:00
}
}
2015-07-24 04:58:38 +00:00
return nil
}
2015-12-04 23:10:08 +00:00
// restorePeriodicDispatcher is used to restore all periodic jobs into the
// periodic dispatcher. It also determines if a periodic job should have been
// created during the leadership transition and force runs them. The periodic
// dispatcher is maintained only by the leader, so it must be restored anytime a
// leadership transition takes place.
2015-12-18 20:26:28 +00:00
func ( s * Server ) restorePeriodicDispatcher ( ) error {
2015-12-04 23:10:08 +00:00
iter , err := s . fsm . State ( ) . JobsByPeriodic ( true )
if err != nil {
return fmt . Errorf ( "failed to get periodic jobs: %v" , err )
}
now := time . Now ( )
for i := iter . Next ( ) ; i != nil ; i = iter . Next ( ) {
job := i . ( * structs . Job )
s . periodicDispatcher . Add ( job )
2015-12-16 22:14:55 +00:00
// If the periodic job has never been launched before, launch will hold
// the time the periodic job was added. Otherwise it has the last launch
// time of the periodic job.
2015-12-19 01:51:30 +00:00
launch , err := s . fsm . State ( ) . PeriodicLaunchByID ( job . ID )
if err != nil || launch == nil {
return fmt . Errorf ( "failed to get periodic launch time: %v" , err )
2015-12-04 23:10:08 +00:00
}
2015-12-16 22:14:55 +00:00
// nextLaunch is the next launch that should occur.
nextLaunch := job . Periodic . Next ( launch . Launch )
// We skip force launching the job if there should be no next launch
// (the zero case) or if the next launch time is in the future. If it is
// in the future, it will be handled by the periodic dispatcher.
if nextLaunch . IsZero ( ) || ! nextLaunch . Before ( now ) {
2015-12-04 23:10:08 +00:00
continue
}
2016-01-13 18:19:53 +00:00
if _ , err := s . periodicDispatcher . ForceRun ( job . ID ) ; err != nil {
2015-12-16 22:14:55 +00:00
msg := fmt . Sprintf ( "force run of periodic job %q failed: %v" , job . ID , err )
s . logger . Printf ( "[ERR] nomad.periodic: %s" , msg )
return errors . New ( msg )
2015-12-04 23:10:08 +00:00
}
s . logger . Printf ( "[DEBUG] nomad.periodic: periodic job %q force" +
" run during leadership establishment" , job . ID )
}
2015-12-18 20:26:28 +00:00
return nil
}
2015-08-15 22:15:00 +00:00
// schedulePeriodic is used to do periodic job dispatch while we are leader
func ( s * Server ) schedulePeriodic ( stopCh chan struct { } ) {
evalGC := time . NewTicker ( s . config . EvalGCInterval )
defer evalGC . Stop ( )
2015-09-07 18:01:29 +00:00
nodeGC := time . NewTicker ( s . config . NodeGCInterval )
defer nodeGC . Stop ( )
2015-12-15 03:20:57 +00:00
jobGC := time . NewTicker ( s . config . JobGCInterval )
defer jobGC . Stop ( )
2015-08-15 22:15:00 +00:00
2016-06-22 16:11:25 +00:00
// getLatest grabs the latest index from the state store. It returns true if
// the index was retrieved successfully.
getLatest := func ( ) ( uint64 , bool ) {
2016-06-22 16:33:15 +00:00
snapshotIndex , err := s . fsm . State ( ) . LatestIndex ( )
2016-06-22 16:04:22 +00:00
if err != nil {
2016-06-22 16:33:15 +00:00
s . logger . Printf ( "[ERR] nomad: failed to determine state store's index: %v" , err )
2016-06-22 16:11:25 +00:00
return 0 , false
2016-06-22 16:04:22 +00:00
}
2016-06-22 16:11:25 +00:00
return snapshotIndex , true
}
for {
2015-08-15 22:15:00 +00:00
select {
case <- evalGC . C :
2016-06-22 16:11:25 +00:00
if index , ok := getLatest ( ) ; ok {
s . evalBroker . Enqueue ( s . coreJobEval ( structs . CoreJobEvalGC , index ) )
}
2015-09-07 18:01:29 +00:00
case <- nodeGC . C :
2016-06-22 16:11:25 +00:00
if index , ok := getLatest ( ) ; ok {
s . evalBroker . Enqueue ( s . coreJobEval ( structs . CoreJobNodeGC , index ) )
}
2015-12-15 03:20:57 +00:00
case <- jobGC . C :
2016-06-22 16:11:25 +00:00
if index , ok := getLatest ( ) ; ok {
s . evalBroker . Enqueue ( s . coreJobEval ( structs . CoreJobJobGC , index ) )
}
2015-08-15 22:15:00 +00:00
case <- stopCh :
return
}
}
}
2015-08-15 23:07:50 +00:00
// coreJobEval returns an evaluation for a core job
2016-06-22 16:04:22 +00:00
func ( s * Server ) coreJobEval ( job string , modifyIndex uint64 ) * structs . Evaluation {
2015-08-15 23:07:50 +00:00
return & structs . Evaluation {
2015-09-07 22:23:03 +00:00
ID : structs . GenerateUUID ( ) ,
2015-08-15 22:15:00 +00:00
Priority : structs . CoreJobPriority ,
Type : structs . JobTypeCore ,
TriggeredBy : structs . EvalTriggerScheduled ,
JobID : job ,
Status : structs . EvalStatusPending ,
2016-06-22 16:04:22 +00:00
ModifyIndex : modifyIndex ,
2015-08-15 22:15:00 +00:00
}
}
2015-08-16 18:10:18 +00:00
// reapFailedEvaluations is used to reap evaluations that
// have reached their delivery limit and should be failed
func ( s * Server ) reapFailedEvaluations ( stopCh chan struct { } ) {
for {
select {
case <- stopCh :
return
default :
// Scan for a failed evaluation
eval , token , err := s . evalBroker . Dequeue ( [ ] string { failedQueue } , time . Second )
if err != nil {
return
}
if eval == nil {
continue
}
// Update the status to failed
newEval := eval . Copy ( )
newEval . Status = structs . EvalStatusFailed
newEval . StatusDescription = fmt . Sprintf ( "evaluation reached delivery limit (%d)" , s . config . EvalDeliveryLimit )
s . logger . Printf ( "[WARN] nomad: eval %#v reached delivery limit, marking as failed" , newEval )
// Update via Raft
req := structs . EvalUpdateRequest {
Evals : [ ] * structs . Evaluation { newEval } ,
}
if _ , _ , err := s . raftApply ( structs . EvalUpdateRequestType , & req ) ; err != nil {
s . logger . Printf ( "[ERR] nomad: failed to update failed eval %#v: %v" , newEval , err )
continue
}
// Ack completion
s . evalBroker . Ack ( eval . ID , token )
}
}
}
2016-01-31 00:16:13 +00:00
// reapDupBlockedEvaluations is used to reap duplicate blocked evaluations and
// should be cancelled.
func ( s * Server ) reapDupBlockedEvaluations ( stopCh chan struct { } ) {
for {
select {
case <- stopCh :
return
default :
// Scan for duplicate blocked evals.
dups := s . blockedEvals . GetDuplicates ( time . Second )
if dups == nil {
continue
}
cancel := make ( [ ] * structs . Evaluation , len ( dups ) )
for i , dup := range dups {
// Update the status to cancelled
newEval := dup . Copy ( )
newEval . Status = structs . EvalStatusCancelled
newEval . StatusDescription = fmt . Sprintf ( "existing blocked evaluation exists for job %q" , newEval . JobID )
cancel [ i ] = newEval
}
// Update via Raft
req := structs . EvalUpdateRequest {
Evals : cancel ,
}
if _ , _ , err := s . raftApply ( structs . EvalUpdateRequestType , & req ) ; err != nil {
s . logger . Printf ( "[ERR] nomad: failed to update duplicate evals %#v: %v" , cancel , err )
continue
}
}
}
}
2016-05-23 23:27:26 +00:00
// periodicUnblockFailedEvals periodically unblocks failed, blocked evaluations.
func ( s * Server ) periodicUnblockFailedEvals ( stopCh chan struct { } ) {
2016-07-06 00:08:58 +00:00
ticker := time . NewTicker ( failedEvalUnblockInterval )
2016-05-25 17:28:25 +00:00
defer ticker . Stop ( )
2016-05-23 23:27:26 +00:00
for {
select {
case <- stopCh :
return
case <- ticker . C :
// Unblock the failed allocations
s . blockedEvals . UnblockFailed ( )
}
}
}
2015-07-24 04:58:38 +00:00
// revokeLeadership is invoked once we step down as leader.
// This is used to cleanup any state that may be specific to a leader.
func ( s * Server ) revokeLeadership ( ) error {
2015-07-27 22:11:42 +00:00
// Disable the plan queue, since we are no longer leader
s . planQueue . SetEnabled ( false )
2015-07-24 04:58:38 +00:00
// Disable the eval broker, since it is only useful as a leader
s . evalBroker . SetEnabled ( false )
2015-08-23 00:17:13 +00:00
2016-01-31 00:21:37 +00:00
// Disable the blocked eval tracker, since it is only useful as a leader
s . blockedEvals . SetEnabled ( false )
2015-12-18 20:26:28 +00:00
// Disable the periodic dispatcher, since it is only useful as a leader
s . periodicDispatcher . SetEnabled ( false )
2015-08-23 00:17:13 +00:00
// Clear the heartbeat timers on either shutdown or step down,
// since we are no longer responsible for TTL expirations.
if err := s . clearAllHeartbeatTimers ( ) ; err != nil {
s . logger . Printf ( "[ERR] nomad: clearing heartbeat timers failed: %v" , err )
return err
}
2015-08-23 20:59:13 +00:00
// Unpause our worker if we paused previously
if len ( s . workers ) > 1 {
2016-02-19 22:49:43 +00:00
for i := 0 ; i < len ( s . workers ) / 2 ; i ++ {
2016-02-17 21:50:06 +00:00
s . workers [ i ] . SetPause ( false )
}
2015-08-23 20:59:13 +00:00
}
2015-07-24 04:58:38 +00:00
return nil
}
2015-06-05 21:54:45 +00:00
// reconcile is used to reconcile the differences between Serf
// membership and what is reflected in our strongly consistent store.
func ( s * Server ) reconcile ( ) error {
defer metrics . MeasureSince ( [ ] string { "nomad" , "leader" , "reconcile" } , time . Now ( ) )
members := s . serf . Members ( )
for _ , member := range members {
if err := s . reconcileMember ( member ) ; err != nil {
return err
}
}
return nil
}
2015-06-04 11:38:41 +00:00
// reconcileMember is used to do an async reconcile of a single serf member
func ( s * Server ) reconcileMember ( member serf . Member ) error {
// Check if this is a member we should handle
valid , parts := isNomadServer ( member )
if ! valid || parts . Region != s . config . Region {
return nil
}
defer metrics . MeasureSince ( [ ] string { "nomad" , "leader" , "reconcileMember" } , time . Now ( ) )
// Do not reconcile ourself
if member . Name == fmt . Sprintf ( "%s.%s" , s . config . NodeName , s . config . Region ) {
return nil
}
var err error
switch member . Status {
case serf . StatusAlive :
err = s . addRaftPeer ( member , parts )
case serf . StatusLeft , StatusReap :
err = s . removeRaftPeer ( member , parts )
}
if err != nil {
s . logger . Printf ( "[ERR] nomad: failed to reconcile member: %v: %v" ,
member , err )
return err
}
return nil
}
2016-08-05 21:40:35 +00:00
// reconcileJobSummaries reconciles the summaries of all the jobs registered in
// the system
// COMPAT 0.4 -> 0.4.1
func ( s * Server ) reconcileJobSummaries ( ) error {
index , err := s . fsm . state . LatestIndex ( )
if err != nil {
return fmt . Errorf ( "unable to read latest index: %v" , err )
}
s . logger . Printf ( "[DEBUG] leader: reconciling job summaries at index: %v" , index )
args := & structs . GenericResponse { }
msg := structs . ReconcileJobSummariesRequestType | structs . IgnoreUnknownTypeFlag
if _ , _ , err = s . raftApply ( msg , args ) ; err != nil {
return fmt . Errorf ( "reconciliation of job summaries failed: %v" , err )
}
return nil
}
2015-06-04 11:38:41 +00:00
// addRaftPeer is used to add a new Raft peer when a Nomad server joins
func ( s * Server ) addRaftPeer ( m serf . Member , parts * serverParts ) error {
// Check for possibility of multiple bootstrap nodes
if parts . Bootstrap {
members := s . serf . Members ( )
for _ , member := range members {
valid , p := isNomadServer ( member )
if valid && member . Name != m . Name && p . Bootstrap {
s . logger . Printf ( "[ERR] nomad: '%v' and '%v' are both in bootstrap mode. Only one node should be in bootstrap mode, not adding Raft peer." , m . Name , member . Name )
return nil
}
2015-06-01 15:49:10 +00:00
}
}
2015-06-04 11:38:41 +00:00
// Attempt to add as a peer
future := s . raft . AddPeer ( parts . Addr . String ( ) )
if err := future . Error ( ) ; err != nil && err != raft . ErrKnownPeer {
s . logger . Printf ( "[ERR] nomad: failed to add raft peer: %v" , err )
return err
2015-06-05 21:54:45 +00:00
} else if err == nil {
s . logger . Printf ( "[INFO] nomad: added raft peer: %v" , parts )
2015-06-04 11:38:41 +00:00
}
return nil
}
// removeRaftPeer is used to remove a Raft peer when a Nomad server leaves
// or is reaped
func ( s * Server ) removeRaftPeer ( m serf . Member , parts * serverParts ) error {
// Attempt to remove as peer
future := s . raft . RemovePeer ( parts . Addr . String ( ) )
if err := future . Error ( ) ; err != nil && err != raft . ErrUnknownPeer {
s . logger . Printf ( "[ERR] nomad: failed to remove raft peer '%v': %v" ,
parts , err )
return err
} else if err == nil {
s . logger . Printf ( "[INFO] nomad: removed server '%s' as peer" , m . Name )
}
return nil
2015-06-01 15:49:10 +00:00
}