2015-06-01 15:49:10 +00:00
package nomad
2015-06-04 11:38:41 +00:00
import (
"fmt"
"time"
"github.com/armon/go-metrics"
2015-08-05 23:53:54 +00:00
"github.com/hashicorp/nomad/nomad/structs"
2015-06-04 11:38:41 +00:00
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
)
2015-06-01 15:49:10 +00:00
// monitorLeadership is used to monitor if we acquire or lose our role
// as the leader in the Raft cluster. There is some work the leader is
// expected to do, so we must react to changes
func ( s * Server ) monitorLeadership ( ) {
var stopCh chan struct { }
for {
select {
2015-09-07 17:46:41 +00:00
case isLeader := <- s . leaderCh :
2015-06-01 15:49:10 +00:00
if isLeader {
stopCh = make ( chan struct { } )
go s . leaderLoop ( stopCh )
s . logger . Printf ( "[INFO] nomad: cluster leadership acquired" )
} else if stopCh != nil {
close ( stopCh )
stopCh = nil
s . logger . Printf ( "[INFO] nomad: cluster leadership lost" )
}
case <- s . shutdownCh :
return
}
}
}
// leaderLoop runs as long as we are the leader to run various
// maintence activities
func ( s * Server ) leaderLoop ( stopCh chan struct { } ) {
2015-07-24 04:58:38 +00:00
// Ensure we revoke leadership on stepdown
defer s . revokeLeadership ( )
2015-06-05 21:54:45 +00:00
var reconcileCh chan serf . Member
2015-07-24 04:58:38 +00:00
establishedLeader := false
2015-06-05 21:54:45 +00:00
RECONCILE :
// Setup a reconciliation timer
reconcileCh = nil
interval := time . After ( s . config . ReconcileInterval )
// Apply a raft barrier to ensure our FSM is caught up
start := time . Now ( )
barrier := s . raft . Barrier ( 0 )
if err := barrier . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] nomad: failed to wait for barrier: %v" , err )
goto WAIT
}
metrics . MeasureSince ( [ ] string { "nomad" , "leader" , "barrier" } , start )
2015-07-24 04:58:38 +00:00
// Check if we need to handle initial leadership actions
if ! establishedLeader {
2015-08-15 22:15:00 +00:00
if err := s . establishLeadership ( stopCh ) ; err != nil {
2015-07-24 04:58:38 +00:00
s . logger . Printf ( "[ERR] nomad: failed to establish leadership: %v" ,
err )
goto WAIT
}
establishedLeader = true
}
2015-06-05 21:54:45 +00:00
// Reconcile any missing data
if err := s . reconcile ( ) ; err != nil {
s . logger . Printf ( "[ERR] nomad: failed to reconcile: %v" , err )
goto WAIT
}
// Initial reconcile worked, now we can process the channel
// updates
reconcileCh = s . reconcileCh
WAIT :
2015-06-01 15:49:10 +00:00
// Wait until leadership is lost
for {
select {
case <- stopCh :
return
case <- s . shutdownCh :
return
2015-06-05 21:54:45 +00:00
case <- interval :
goto RECONCILE
case member := <- reconcileCh :
2015-06-04 11:38:41 +00:00
s . reconcileMember ( member )
}
}
}
2015-07-24 04:58:38 +00:00
// establishLeadership is invoked once we become leader and are able
// to invoke an initial barrier. The barrier is used to ensure any
// previously inflight transactions have been commited and that our
// state is up-to-date.
2015-08-15 22:15:00 +00:00
func ( s * Server ) establishLeadership ( stopCh chan struct { } ) error {
2015-08-23 20:59:13 +00:00
// If we have multiple workers, disable one to free processing
// for the plan queue and evaluation broker
if len ( s . workers ) > 1 {
s . workers [ 0 ] . SetPause ( true )
}
2015-07-27 22:11:42 +00:00
// Enable the plan queue, since we are now the leader
s . planQueue . SetEnabled ( true )
2015-07-27 22:31:09 +00:00
// Start the plan evaluator
go s . planApply ( )
2015-07-27 22:11:42 +00:00
2015-07-24 04:58:38 +00:00
// Enable the eval broker, since we are now the leader
s . evalBroker . SetEnabled ( true )
2015-08-05 23:53:54 +00:00
// Restore the eval broker state
if err := s . restoreEvalBroker ( ) ; err != nil {
return err
}
2015-08-15 22:15:00 +00:00
// Scheduler periodic jobs
go s . schedulePeriodic ( stopCh )
2015-08-16 18:10:18 +00:00
// Reap any failed evaluations
go s . reapFailedEvaluations ( stopCh )
2015-08-23 00:17:13 +00:00
// Setup the heartbeat timers. This is done both when starting up or when
// a leader fail over happens. Since the timers are maintained by the leader
// node, effectively this means all the timers are renewed at the time of failover.
// The TTL contract is that the session will not be expired before the TTL,
// so expiring it later is allowable.
//
// This MUST be done after the initial barrier to ensure the latest Nodes
// are available to be initialized. Otherwise initialization may use stale
// data.
if err := s . initializeHeartbeatTimers ( ) ; err != nil {
s . logger . Printf ( "[ERR] nomad: heartbeat timer setup failed: %v" , err )
return err
}
2015-08-05 23:53:54 +00:00
return nil
}
2015-07-27 22:11:42 +00:00
2015-08-05 23:53:54 +00:00
// restoreEvalBroker is used to restore all pending evaluations
// into the eval broker. The broker is maintained only by the leader,
// so it must be restored anytime a leadership transition takes place.
func ( s * Server ) restoreEvalBroker ( ) error {
// Get an iterator over every evaluation
iter , err := s . fsm . State ( ) . Evals ( )
if err != nil {
return fmt . Errorf ( "failed to get evaluations: %v" , err )
}
for {
raw := iter . Next ( )
if raw == nil {
break
}
eval := raw . ( * structs . Evaluation )
2015-08-06 18:28:55 +00:00
if ! eval . ShouldEnqueue ( ) {
continue
}
if err := s . evalBroker . Enqueue ( eval ) ; err != nil {
return fmt . Errorf ( "failed to enqueue evaluation %s: %v" , eval . ID , err )
2015-08-05 23:53:54 +00:00
}
}
2015-07-24 04:58:38 +00:00
return nil
}
2015-08-15 22:15:00 +00:00
// schedulePeriodic is used to do periodic job dispatch while we are leader
func ( s * Server ) schedulePeriodic ( stopCh chan struct { } ) {
evalGC := time . NewTicker ( s . config . EvalGCInterval )
defer evalGC . Stop ( )
2015-09-07 18:01:29 +00:00
nodeGC := time . NewTicker ( s . config . NodeGCInterval )
defer nodeGC . Stop ( )
2015-08-15 22:15:00 +00:00
for {
select {
case <- evalGC . C :
2015-08-15 23:07:50 +00:00
s . evalBroker . Enqueue ( s . coreJobEval ( structs . CoreJobEvalGC ) )
2015-09-07 18:01:29 +00:00
case <- nodeGC . C :
s . evalBroker . Enqueue ( s . coreJobEval ( structs . CoreJobNodeGC ) )
2015-08-15 22:15:00 +00:00
case <- stopCh :
return
}
}
}
2015-08-15 23:07:50 +00:00
// coreJobEval returns an evaluation for a core job
func ( s * Server ) coreJobEval ( job string ) * structs . Evaluation {
return & structs . Evaluation {
2015-09-07 22:23:03 +00:00
ID : structs . GenerateUUID ( ) ,
2015-08-15 22:15:00 +00:00
Priority : structs . CoreJobPriority ,
Type : structs . JobTypeCore ,
TriggeredBy : structs . EvalTriggerScheduled ,
JobID : job ,
Status : structs . EvalStatusPending ,
ModifyIndex : s . raft . AppliedIndex ( ) ,
}
}
2015-08-16 18:10:18 +00:00
// reapFailedEvaluations is used to reap evaluations that
// have reached their delivery limit and should be failed
func ( s * Server ) reapFailedEvaluations ( stopCh chan struct { } ) {
for {
select {
case <- stopCh :
return
default :
// Scan for a failed evaluation
eval , token , err := s . evalBroker . Dequeue ( [ ] string { failedQueue } , time . Second )
if err != nil {
return
}
if eval == nil {
continue
}
// Update the status to failed
newEval := eval . Copy ( )
newEval . Status = structs . EvalStatusFailed
newEval . StatusDescription = fmt . Sprintf ( "evaluation reached delivery limit (%d)" , s . config . EvalDeliveryLimit )
s . logger . Printf ( "[WARN] nomad: eval %#v reached delivery limit, marking as failed" , newEval )
// Update via Raft
req := structs . EvalUpdateRequest {
Evals : [ ] * structs . Evaluation { newEval } ,
}
if _ , _ , err := s . raftApply ( structs . EvalUpdateRequestType , & req ) ; err != nil {
s . logger . Printf ( "[ERR] nomad: failed to update failed eval %#v: %v" , newEval , err )
continue
}
// Ack completion
s . evalBroker . Ack ( eval . ID , token )
}
}
}
2015-07-24 04:58:38 +00:00
// revokeLeadership is invoked once we step down as leader.
// This is used to cleanup any state that may be specific to a leader.
func ( s * Server ) revokeLeadership ( ) error {
2015-07-27 22:11:42 +00:00
// Disable the plan queue, since we are no longer leader
s . planQueue . SetEnabled ( false )
2015-07-24 04:58:38 +00:00
// Disable the eval broker, since it is only useful as a leader
s . evalBroker . SetEnabled ( false )
2015-08-23 00:17:13 +00:00
// Clear the heartbeat timers on either shutdown or step down,
// since we are no longer responsible for TTL expirations.
if err := s . clearAllHeartbeatTimers ( ) ; err != nil {
s . logger . Printf ( "[ERR] nomad: clearing heartbeat timers failed: %v" , err )
return err
}
2015-08-23 20:59:13 +00:00
// Unpause our worker if we paused previously
if len ( s . workers ) > 1 {
s . workers [ 0 ] . SetPause ( false )
}
2015-07-24 04:58:38 +00:00
return nil
}
2015-06-05 21:54:45 +00:00
// reconcile is used to reconcile the differences between Serf
// membership and what is reflected in our strongly consistent store.
func ( s * Server ) reconcile ( ) error {
defer metrics . MeasureSince ( [ ] string { "nomad" , "leader" , "reconcile" } , time . Now ( ) )
members := s . serf . Members ( )
for _ , member := range members {
if err := s . reconcileMember ( member ) ; err != nil {
return err
}
}
return nil
}
2015-06-04 11:38:41 +00:00
// reconcileMember is used to do an async reconcile of a single serf member
func ( s * Server ) reconcileMember ( member serf . Member ) error {
// Check if this is a member we should handle
valid , parts := isNomadServer ( member )
if ! valid || parts . Region != s . config . Region {
return nil
}
defer metrics . MeasureSince ( [ ] string { "nomad" , "leader" , "reconcileMember" } , time . Now ( ) )
// Do not reconcile ourself
if member . Name == fmt . Sprintf ( "%s.%s" , s . config . NodeName , s . config . Region ) {
return nil
}
var err error
switch member . Status {
case serf . StatusAlive :
err = s . addRaftPeer ( member , parts )
case serf . StatusLeft , StatusReap :
err = s . removeRaftPeer ( member , parts )
}
if err != nil {
s . logger . Printf ( "[ERR] nomad: failed to reconcile member: %v: %v" ,
member , err )
return err
}
return nil
}
// addRaftPeer is used to add a new Raft peer when a Nomad server joins
func ( s * Server ) addRaftPeer ( m serf . Member , parts * serverParts ) error {
// Check for possibility of multiple bootstrap nodes
if parts . Bootstrap {
members := s . serf . Members ( )
for _ , member := range members {
valid , p := isNomadServer ( member )
if valid && member . Name != m . Name && p . Bootstrap {
s . logger . Printf ( "[ERR] nomad: '%v' and '%v' are both in bootstrap mode. Only one node should be in bootstrap mode, not adding Raft peer." , m . Name , member . Name )
return nil
}
2015-06-01 15:49:10 +00:00
}
}
2015-06-04 11:38:41 +00:00
// Attempt to add as a peer
future := s . raft . AddPeer ( parts . Addr . String ( ) )
if err := future . Error ( ) ; err != nil && err != raft . ErrKnownPeer {
s . logger . Printf ( "[ERR] nomad: failed to add raft peer: %v" , err )
return err
2015-06-05 21:54:45 +00:00
} else if err == nil {
s . logger . Printf ( "[INFO] nomad: added raft peer: %v" , parts )
2015-06-04 11:38:41 +00:00
}
return nil
}
// removeRaftPeer is used to remove a Raft peer when a Nomad server leaves
// or is reaped
func ( s * Server ) removeRaftPeer ( m serf . Member , parts * serverParts ) error {
// Attempt to remove as peer
future := s . raft . RemovePeer ( parts . Addr . String ( ) )
if err := future . Error ( ) ; err != nil && err != raft . ErrUnknownPeer {
s . logger . Printf ( "[ERR] nomad: failed to remove raft peer '%v': %v" ,
parts , err )
return err
} else if err == nil {
s . logger . Printf ( "[INFO] nomad: removed server '%s' as peer" , m . Name )
}
return nil
2015-06-01 15:49:10 +00:00
}