2015-06-01 15:49:10 +00:00
package nomad
2015-06-04 11:38:41 +00:00
import (
"fmt"
"time"
"github.com/armon/go-metrics"
2015-08-05 23:53:54 +00:00
"github.com/hashicorp/nomad/nomad/structs"
2015-06-04 11:38:41 +00:00
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
)
2015-06-01 15:49:10 +00:00
// monitorLeadership is used to monitor if we acquire or lose our role
// as the leader in the Raft cluster. There is some work the leader is
// expected to do, so we must react to changes
func ( s * Server ) monitorLeadership ( ) {
leaderCh := s . raft . LeaderCh ( )
var stopCh chan struct { }
for {
select {
case isLeader := <- leaderCh :
if isLeader {
stopCh = make ( chan struct { } )
go s . leaderLoop ( stopCh )
s . logger . Printf ( "[INFO] nomad: cluster leadership acquired" )
} else if stopCh != nil {
close ( stopCh )
stopCh = nil
s . logger . Printf ( "[INFO] nomad: cluster leadership lost" )
}
case <- s . shutdownCh :
return
}
}
}
// leaderLoop runs as long as we are the leader to run various
// maintence activities
func ( s * Server ) leaderLoop ( stopCh chan struct { } ) {
2015-07-24 04:58:38 +00:00
// Ensure we revoke leadership on stepdown
defer s . revokeLeadership ( )
2015-06-05 21:54:45 +00:00
var reconcileCh chan serf . Member
2015-07-24 04:58:38 +00:00
establishedLeader := false
2015-06-05 21:54:45 +00:00
RECONCILE :
// Setup a reconciliation timer
reconcileCh = nil
interval := time . After ( s . config . ReconcileInterval )
// Apply a raft barrier to ensure our FSM is caught up
start := time . Now ( )
barrier := s . raft . Barrier ( 0 )
if err := barrier . Error ( ) ; err != nil {
s . logger . Printf ( "[ERR] nomad: failed to wait for barrier: %v" , err )
goto WAIT
}
metrics . MeasureSince ( [ ] string { "nomad" , "leader" , "barrier" } , start )
2015-07-24 04:58:38 +00:00
// Check if we need to handle initial leadership actions
if ! establishedLeader {
if err := s . establishLeadership ( ) ; err != nil {
s . logger . Printf ( "[ERR] nomad: failed to establish leadership: %v" ,
err )
goto WAIT
}
establishedLeader = true
}
2015-06-05 21:54:45 +00:00
// Reconcile any missing data
if err := s . reconcile ( ) ; err != nil {
s . logger . Printf ( "[ERR] nomad: failed to reconcile: %v" , err )
goto WAIT
}
// Initial reconcile worked, now we can process the channel
// updates
reconcileCh = s . reconcileCh
WAIT :
2015-06-01 15:49:10 +00:00
// Wait until leadership is lost
for {
select {
case <- stopCh :
return
case <- s . shutdownCh :
return
2015-06-05 21:54:45 +00:00
case <- interval :
goto RECONCILE
case member := <- reconcileCh :
2015-06-04 11:38:41 +00:00
s . reconcileMember ( member )
}
}
}
2015-07-24 04:58:38 +00:00
// establishLeadership is invoked once we become leader and are able
// to invoke an initial barrier. The barrier is used to ensure any
// previously inflight transactions have been commited and that our
// state is up-to-date.
func ( s * Server ) establishLeadership ( ) error {
2015-07-27 22:11:42 +00:00
// Enable the plan queue, since we are now the leader
s . planQueue . SetEnabled ( true )
2015-07-27 22:31:09 +00:00
// Start the plan evaluator
go s . planApply ( )
2015-07-27 22:11:42 +00:00
2015-07-24 04:58:38 +00:00
// Enable the eval broker, since we are now the leader
s . evalBroker . SetEnabled ( true )
2015-08-05 23:53:54 +00:00
// Restore the eval broker state
if err := s . restoreEvalBroker ( ) ; err != nil {
return err
}
return nil
}
2015-07-27 22:11:42 +00:00
2015-08-05 23:53:54 +00:00
// restoreEvalBroker is used to restore all pending evaluations
// into the eval broker. The broker is maintained only by the leader,
// so it must be restored anytime a leadership transition takes place.
func ( s * Server ) restoreEvalBroker ( ) error {
// Get an iterator over every evaluation
iter , err := s . fsm . State ( ) . Evals ( )
if err != nil {
return fmt . Errorf ( "failed to get evaluations: %v" , err )
}
for {
raw := iter . Next ( )
if raw == nil {
break
}
eval := raw . ( * structs . Evaluation )
2015-08-06 18:28:55 +00:00
if ! eval . ShouldEnqueue ( ) {
continue
}
if err := s . evalBroker . Enqueue ( eval ) ; err != nil {
return fmt . Errorf ( "failed to enqueue evaluation %s: %v" , eval . ID , err )
2015-08-05 23:53:54 +00:00
}
}
2015-07-24 04:58:38 +00:00
return nil
}
// revokeLeadership is invoked once we step down as leader.
// This is used to cleanup any state that may be specific to a leader.
func ( s * Server ) revokeLeadership ( ) error {
2015-07-27 22:11:42 +00:00
// Disable the plan queue, since we are no longer leader
s . planQueue . SetEnabled ( false )
2015-07-24 04:58:38 +00:00
// Disable the eval broker, since it is only useful as a leader
s . evalBroker . SetEnabled ( false )
return nil
}
2015-06-05 21:54:45 +00:00
// reconcile is used to reconcile the differences between Serf
// membership and what is reflected in our strongly consistent store.
func ( s * Server ) reconcile ( ) error {
defer metrics . MeasureSince ( [ ] string { "nomad" , "leader" , "reconcile" } , time . Now ( ) )
members := s . serf . Members ( )
for _ , member := range members {
if err := s . reconcileMember ( member ) ; err != nil {
return err
}
}
return nil
}
2015-06-04 11:38:41 +00:00
// reconcileMember is used to do an async reconcile of a single serf member
func ( s * Server ) reconcileMember ( member serf . Member ) error {
// Check if this is a member we should handle
valid , parts := isNomadServer ( member )
if ! valid || parts . Region != s . config . Region {
return nil
}
defer metrics . MeasureSince ( [ ] string { "nomad" , "leader" , "reconcileMember" } , time . Now ( ) )
// Do not reconcile ourself
if member . Name == fmt . Sprintf ( "%s.%s" , s . config . NodeName , s . config . Region ) {
return nil
}
var err error
switch member . Status {
case serf . StatusAlive :
err = s . addRaftPeer ( member , parts )
case serf . StatusLeft , StatusReap :
err = s . removeRaftPeer ( member , parts )
}
if err != nil {
s . logger . Printf ( "[ERR] nomad: failed to reconcile member: %v: %v" ,
member , err )
return err
}
return nil
}
// addRaftPeer is used to add a new Raft peer when a Nomad server joins
func ( s * Server ) addRaftPeer ( m serf . Member , parts * serverParts ) error {
// Check for possibility of multiple bootstrap nodes
if parts . Bootstrap {
members := s . serf . Members ( )
for _ , member := range members {
valid , p := isNomadServer ( member )
if valid && member . Name != m . Name && p . Bootstrap {
s . logger . Printf ( "[ERR] nomad: '%v' and '%v' are both in bootstrap mode. Only one node should be in bootstrap mode, not adding Raft peer." , m . Name , member . Name )
return nil
}
2015-06-01 15:49:10 +00:00
}
}
2015-06-04 11:38:41 +00:00
// Attempt to add as a peer
future := s . raft . AddPeer ( parts . Addr . String ( ) )
if err := future . Error ( ) ; err != nil && err != raft . ErrKnownPeer {
s . logger . Printf ( "[ERR] nomad: failed to add raft peer: %v" , err )
return err
2015-06-05 21:54:45 +00:00
} else if err == nil {
s . logger . Printf ( "[INFO] nomad: added raft peer: %v" , parts )
2015-06-04 11:38:41 +00:00
}
return nil
}
// removeRaftPeer is used to remove a Raft peer when a Nomad server leaves
// or is reaped
func ( s * Server ) removeRaftPeer ( m serf . Member , parts * serverParts ) error {
// Attempt to remove as peer
future := s . raft . RemovePeer ( parts . Addr . String ( ) )
if err := future . Error ( ) ; err != nil && err != raft . ErrUnknownPeer {
s . logger . Printf ( "[ERR] nomad: failed to remove raft peer '%v': %v" ,
parts , err )
return err
} else if err == nil {
s . logger . Printf ( "[INFO] nomad: removed server '%s' as peer" , m . Name )
}
return nil
2015-06-01 15:49:10 +00:00
}