open-nomad/nomad/leader.go

652 lines
19 KiB
Go
Raw Normal View History

2015-06-01 15:49:10 +00:00
package nomad
import (
"context"
"errors"
"fmt"
"math/rand"
2017-02-02 23:49:06 +00:00
"net"
"time"
"github.com/armon/go-metrics"
2017-02-08 04:31:23 +00:00
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
)
2016-05-25 17:28:25 +00:00
const (
// failedEvalUnblockInterval is the interval at which failed evaluations are
// unblocked to re-enter the scheduler. A failed evaluation occurs under
// high contention when the schedulers plan does not make progress.
failedEvalUnblockInterval = 1 * time.Minute
)
2015-06-01 15:49:10 +00:00
// monitorLeadership is used to monitor if we acquire or lose our role
// as the leader in the Raft cluster. There is some work the leader is
// expected to do, so we must react to changes
func (s *Server) monitorLeadership() {
var stopCh chan struct{}
for {
select {
case isLeader := <-s.leaderCh:
2015-06-01 15:49:10 +00:00
if isLeader {
stopCh = make(chan struct{})
go s.leaderLoop(stopCh)
s.logger.Printf("[INFO] nomad: cluster leadership acquired")
} else if stopCh != nil {
close(stopCh)
stopCh = nil
s.logger.Printf("[INFO] nomad: cluster leadership lost")
}
case <-s.shutdownCh:
return
}
}
}
// leaderLoop runs as long as we are the leader to run various
// maintence activities
func (s *Server) leaderLoop(stopCh chan struct{}) {
// Ensure we revoke leadership on stepdown
defer s.revokeLeadership()
2015-06-05 21:54:45 +00:00
var reconcileCh chan serf.Member
establishedLeader := false
2015-06-05 21:54:45 +00:00
RECONCILE:
// Setup a reconciliation timer
reconcileCh = nil
interval := time.After(s.config.ReconcileInterval)
// Apply a raft barrier to ensure our FSM is caught up
start := time.Now()
barrier := s.raft.Barrier(0)
if err := barrier.Error(); err != nil {
s.logger.Printf("[ERR] nomad: failed to wait for barrier: %v", err)
goto WAIT
}
metrics.MeasureSince([]string{"nomad", "leader", "barrier"}, start)
// Check if we need to handle initial leadership actions
if !establishedLeader {
2015-08-15 22:15:00 +00:00
if err := s.establishLeadership(stopCh); err != nil {
s.logger.Printf("[ERR] nomad: failed to establish leadership: %v",
err)
goto WAIT
}
establishedLeader = true
}
2015-06-05 21:54:45 +00:00
// Reconcile any missing data
if err := s.reconcile(); err != nil {
s.logger.Printf("[ERR] nomad: failed to reconcile: %v", err)
goto WAIT
}
// Initial reconcile worked, now we can process the channel
// updates
reconcileCh = s.reconcileCh
WAIT:
2015-06-01 15:49:10 +00:00
// Wait until leadership is lost
for {
select {
case <-stopCh:
return
case <-s.shutdownCh:
return
2015-06-05 21:54:45 +00:00
case <-interval:
goto RECONCILE
case member := <-reconcileCh:
s.reconcileMember(member)
}
}
}
// establishLeadership is invoked once we become leader and are able
// to invoke an initial barrier. The barrier is used to ensure any
2016-05-15 16:41:34 +00:00
// previously inflight transactions have been committed and that our
// state is up-to-date.
2015-08-15 22:15:00 +00:00
func (s *Server) establishLeadership(stopCh chan struct{}) error {
// Disable workers to free half the cores for use in the plan queue and
// evaluation broker
if numWorkers := len(s.workers); numWorkers > 1 {
// Disabling 3/4 of the workers frees CPU for raft and the
// plan applier which uses 1/2 the cores.
for i := 0; i < (3 * numWorkers / 4); i++ {
s.workers[i].SetPause(true)
}
2015-08-23 20:59:13 +00:00
}
2015-07-27 22:11:42 +00:00
// Enable the plan queue, since we are now the leader
s.planQueue.SetEnabled(true)
// Start the plan evaluator
go s.planApply()
2015-07-27 22:11:42 +00:00
// Enable the eval broker, since we are now the leader
s.evalBroker.SetEnabled(true)
2016-01-29 23:31:32 +00:00
// Enable the blocked eval tracker, since we are now the leader
s.blockedEvals.SetEnabled(true)
2017-06-28 22:35:52 +00:00
// Enable the deployment watcher, since we are now the leader
if err := s.deploymentWatcher.SetEnabled(true); err != nil {
return err
}
// Restore the eval broker state
2016-01-29 23:31:32 +00:00
if err := s.restoreEvals(); err != nil {
return err
}
2015-08-15 22:15:00 +00:00
// Activate the vault client
s.vault.SetActive(true)
if err := s.restoreRevokingAccessors(); err != nil {
return err
}
// Enable the periodic dispatcher, since we are now the leader.
2015-12-18 20:26:28 +00:00
s.periodicDispatcher.SetEnabled(true)
s.periodicDispatcher.Start()
2015-12-18 20:26:28 +00:00
// Restore the periodic dispatcher state
if err := s.restorePeriodicDispatcher(); err != nil {
return err
}
2015-08-15 22:15:00 +00:00
// Scheduler periodic jobs
go s.schedulePeriodic(stopCh)
2015-08-16 18:10:18 +00:00
// Reap any failed evaluations
go s.reapFailedEvaluations(stopCh)
// Reap any duplicate blocked evaluations
go s.reapDupBlockedEvaluations(stopCh)
// Periodically unblock failed allocations
go s.periodicUnblockFailedEvals(stopCh)
// Setup the heartbeat timers. This is done both when starting up or when
// a leader fail over happens. Since the timers are maintained by the leader
// node, effectively this means all the timers are renewed at the time of failover.
// The TTL contract is that the session will not be expired before the TTL,
// so expiring it later is allowable.
//
// This MUST be done after the initial barrier to ensure the latest Nodes
// are available to be initialized. Otherwise initialization may use stale
// data.
if err := s.initializeHeartbeatTimers(); err != nil {
s.logger.Printf("[ERR] nomad: heartbeat timer setup failed: %v", err)
return err
}
// COMPAT 0.4 - 0.4.1
// Reconcile the summaries of the registered jobs. We reconcile summaries
// only if the server is 0.4.1 since summaries are not present in 0.4 they
// might be incorrect after upgrading to 0.4.1 the summaries might not be
// correct
if err := s.reconcileJobSummaries(); err != nil {
return fmt.Errorf("unable to reconcile job summaries: %v", err)
}
return nil
}
2015-07-27 22:11:42 +00:00
2016-01-29 23:31:32 +00:00
// restoreEvals is used to restore pending evaluations into the eval broker and
// blocked evaluations into the blocked eval tracker. The broker and blocked
// eval tracker is maintained only by the leader, so it must be restored anytime
// a leadership transition takes place.
func (s *Server) restoreEvals() error {
// Get an iterator over every evaluation
2017-02-08 04:31:23 +00:00
ws := memdb.NewWatchSet()
iter, err := s.fsm.State().Evals(ws)
if err != nil {
return fmt.Errorf("failed to get evaluations: %v", err)
}
for {
raw := iter.Next()
if raw == nil {
break
}
eval := raw.(*structs.Evaluation)
2016-01-29 23:31:32 +00:00
if eval.ShouldEnqueue() {
s.evalBroker.Enqueue(eval)
2016-01-29 23:31:32 +00:00
} else if eval.ShouldBlock() {
s.blockedEvals.Block(eval)
}
}
return nil
}
// restoreRevokingAccessors is used to restore Vault accessors that should be
// revoked.
func (s *Server) restoreRevokingAccessors() error {
// An accessor should be revoked if its allocation or node is terminal
2017-02-08 04:31:23 +00:00
ws := memdb.NewWatchSet()
state := s.fsm.State()
2017-02-08 04:31:23 +00:00
iter, err := state.VaultAccessors(ws)
if err != nil {
return fmt.Errorf("failed to get vault accessors: %v", err)
}
var revoke []*structs.VaultAccessor
for {
raw := iter.Next()
if raw == nil {
break
}
va := raw.(*structs.VaultAccessor)
// Check the allocation
2017-02-08 04:31:23 +00:00
alloc, err := state.AllocByID(ws, va.AllocID)
if err != nil {
return fmt.Errorf("failed to lookup allocation %q: %v", va.AllocID, err)
}
if alloc == nil || alloc.Terminated() {
// No longer running and should be revoked
revoke = append(revoke, va)
continue
}
// Check the node
2017-02-08 04:31:23 +00:00
node, err := state.NodeByID(ws, va.NodeID)
if err != nil {
return fmt.Errorf("failed to lookup node %q: %v", va.NodeID, err)
}
if node == nil || node.TerminalStatus() {
// Node is terminal so any accessor from it should be revoked
revoke = append(revoke, va)
continue
}
}
if len(revoke) != 0 {
if err := s.vault.RevokeTokens(context.Background(), revoke, true); err != nil {
return fmt.Errorf("failed to revoke tokens: %v", err)
}
}
return nil
}
// restorePeriodicDispatcher is used to restore all periodic jobs into the
// periodic dispatcher. It also determines if a periodic job should have been
// created during the leadership transition and force runs them. The periodic
// dispatcher is maintained only by the leader, so it must be restored anytime a
// leadership transition takes place.
2015-12-18 20:26:28 +00:00
func (s *Server) restorePeriodicDispatcher() error {
2017-02-08 04:31:23 +00:00
ws := memdb.NewWatchSet()
iter, err := s.fsm.State().JobsByPeriodic(ws, true)
if err != nil {
return fmt.Errorf("failed to get periodic jobs: %v", err)
}
now := time.Now()
for i := iter.Next(); i != nil; i = iter.Next() {
job := i.(*structs.Job)
s.periodicDispatcher.Add(job)
// If the periodic job has never been launched before, launch will hold
// the time the periodic job was added. Otherwise it has the last launch
// time of the periodic job.
2017-02-08 04:31:23 +00:00
launch, err := s.fsm.State().PeriodicLaunchByID(ws, job.ID)
2015-12-19 01:51:30 +00:00
if err != nil || launch == nil {
return fmt.Errorf("failed to get periodic launch time: %v", err)
}
// nextLaunch is the next launch that should occur.
nextLaunch := job.Periodic.Next(launch.Launch.In(job.Periodic.GetLocation()))
// We skip force launching the job if there should be no next launch
// (the zero case) or if the next launch time is in the future. If it is
// in the future, it will be handled by the periodic dispatcher.
if nextLaunch.IsZero() || !nextLaunch.Before(now) {
continue
}
2016-01-13 18:19:53 +00:00
if _, err := s.periodicDispatcher.ForceRun(job.ID); err != nil {
msg := fmt.Sprintf("force run of periodic job %q failed: %v", job.ID, err)
s.logger.Printf("[ERR] nomad.periodic: %s", msg)
return errors.New(msg)
}
s.logger.Printf("[DEBUG] nomad.periodic: periodic job %q force"+
" run during leadership establishment", job.ID)
}
2015-12-18 20:26:28 +00:00
return nil
}
2015-08-15 22:15:00 +00:00
// schedulePeriodic is used to do periodic job dispatch while we are leader
func (s *Server) schedulePeriodic(stopCh chan struct{}) {
evalGC := time.NewTicker(s.config.EvalGCInterval)
defer evalGC.Stop()
2015-09-07 18:01:29 +00:00
nodeGC := time.NewTicker(s.config.NodeGCInterval)
defer nodeGC.Stop()
2015-12-15 03:20:57 +00:00
jobGC := time.NewTicker(s.config.JobGCInterval)
defer jobGC.Stop()
2015-08-15 22:15:00 +00:00
// getLatest grabs the latest index from the state store. It returns true if
// the index was retrieved successfully.
getLatest := func() (uint64, bool) {
2016-06-22 16:33:15 +00:00
snapshotIndex, err := s.fsm.State().LatestIndex()
if err != nil {
2016-06-22 16:33:15 +00:00
s.logger.Printf("[ERR] nomad: failed to determine state store's index: %v", err)
return 0, false
}
return snapshotIndex, true
}
for {
2015-08-15 22:15:00 +00:00
select {
case <-evalGC.C:
if index, ok := getLatest(); ok {
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobEvalGC, index))
}
2015-09-07 18:01:29 +00:00
case <-nodeGC.C:
if index, ok := getLatest(); ok {
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobNodeGC, index))
}
2015-12-15 03:20:57 +00:00
case <-jobGC.C:
if index, ok := getLatest(); ok {
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobJobGC, index))
}
2015-08-15 22:15:00 +00:00
case <-stopCh:
return
}
}
}
// coreJobEval returns an evaluation for a core job
func (s *Server) coreJobEval(job string, modifyIndex uint64) *structs.Evaluation {
return &structs.Evaluation{
ID: structs.GenerateUUID(),
2015-08-15 22:15:00 +00:00
Priority: structs.CoreJobPriority,
Type: structs.JobTypeCore,
TriggeredBy: structs.EvalTriggerScheduled,
JobID: job,
Status: structs.EvalStatusPending,
ModifyIndex: modifyIndex,
2015-08-15 22:15:00 +00:00
}
}
2015-08-16 18:10:18 +00:00
// reapFailedEvaluations is used to reap evaluations that
// have reached their delivery limit and should be failed
func (s *Server) reapFailedEvaluations(stopCh chan struct{}) {
for {
select {
case <-stopCh:
return
default:
// Scan for a failed evaluation
eval, token, err := s.evalBroker.Dequeue([]string{failedQueue}, time.Second)
if err != nil {
return
}
if eval == nil {
continue
}
// Update the status to failed
2017-04-14 20:19:14 +00:00
updateEval := eval.Copy()
updateEval.Status = structs.EvalStatusFailed
updateEval.StatusDescription = fmt.Sprintf("evaluation reached delivery limit (%d)", s.config.EvalDeliveryLimit)
s.logger.Printf("[WARN] nomad: eval %#v reached delivery limit, marking as failed", updateEval)
2015-08-16 18:10:18 +00:00
// Create a follow-up evaluation that will be used to retry the
// scheduling for the job after the cluster is hopefully more stable
// due to the fairly large backoff.
2017-04-14 22:24:55 +00:00
followupEvalWait := s.config.EvalFailedFollowupBaselineDelay +
time.Duration(rand.Int63n(int64(s.config.EvalFailedFollowupDelayRange)))
followupEval := eval.CreateFailedFollowUpEval(followupEvalWait)
2015-08-16 18:10:18 +00:00
// Update via Raft
req := structs.EvalUpdateRequest{
2017-04-14 20:19:14 +00:00
Evals: []*structs.Evaluation{updateEval, followupEval},
2015-08-16 18:10:18 +00:00
}
if _, _, err := s.raftApply(structs.EvalUpdateRequestType, &req); err != nil {
2017-04-14 20:19:14 +00:00
s.logger.Printf("[ERR] nomad: failed to update failed eval %#v and create a follow-up: %v", updateEval, err)
2015-08-16 18:10:18 +00:00
continue
}
// Ack completion
s.evalBroker.Ack(eval.ID, token)
}
}
}
// reapDupBlockedEvaluations is used to reap duplicate blocked evaluations and
// should be cancelled.
func (s *Server) reapDupBlockedEvaluations(stopCh chan struct{}) {
for {
select {
case <-stopCh:
return
default:
// Scan for duplicate blocked evals.
dups := s.blockedEvals.GetDuplicates(time.Second)
if dups == nil {
continue
}
cancel := make([]*structs.Evaluation, len(dups))
for i, dup := range dups {
// Update the status to cancelled
newEval := dup.Copy()
newEval.Status = structs.EvalStatusCancelled
newEval.StatusDescription = fmt.Sprintf("existing blocked evaluation exists for job %q", newEval.JobID)
cancel[i] = newEval
}
// Update via Raft
req := structs.EvalUpdateRequest{
Evals: cancel,
}
if _, _, err := s.raftApply(structs.EvalUpdateRequestType, &req); err != nil {
s.logger.Printf("[ERR] nomad: failed to update duplicate evals %#v: %v", cancel, err)
continue
}
}
}
}
// periodicUnblockFailedEvals periodically unblocks failed, blocked evaluations.
func (s *Server) periodicUnblockFailedEvals(stopCh chan struct{}) {
2016-07-06 00:08:58 +00:00
ticker := time.NewTicker(failedEvalUnblockInterval)
2016-05-25 17:28:25 +00:00
defer ticker.Stop()
for {
select {
case <-stopCh:
return
case <-ticker.C:
// Unblock the failed allocations
s.blockedEvals.UnblockFailed()
}
}
}
// revokeLeadership is invoked once we step down as leader.
// This is used to cleanup any state that may be specific to a leader.
func (s *Server) revokeLeadership() error {
2015-07-27 22:11:42 +00:00
// Disable the plan queue, since we are no longer leader
s.planQueue.SetEnabled(false)
// Disable the eval broker, since it is only useful as a leader
s.evalBroker.SetEnabled(false)
// Disable the blocked eval tracker, since it is only useful as a leader
s.blockedEvals.SetEnabled(false)
2015-12-18 20:26:28 +00:00
// Disable the periodic dispatcher, since it is only useful as a leader
s.periodicDispatcher.SetEnabled(false)
// Disable the Vault client as it is only useful as a leader.
s.vault.SetActive(false)
2017-06-28 22:35:52 +00:00
// Disable the deployment watcher as it is only useful as a leader.
if err := s.deploymentWatcher.SetEnabled(false); err != nil {
return err
}
// Clear the heartbeat timers on either shutdown or step down,
// since we are no longer responsible for TTL expirations.
if err := s.clearAllHeartbeatTimers(); err != nil {
s.logger.Printf("[ERR] nomad: clearing heartbeat timers failed: %v", err)
return err
}
2015-08-23 20:59:13 +00:00
// Unpause our worker if we paused previously
if len(s.workers) > 1 {
2016-02-19 22:49:43 +00:00
for i := 0; i < len(s.workers)/2; i++ {
s.workers[i].SetPause(false)
}
2015-08-23 20:59:13 +00:00
}
return nil
}
2015-06-05 21:54:45 +00:00
// reconcile is used to reconcile the differences between Serf
// membership and what is reflected in our strongly consistent store.
func (s *Server) reconcile() error {
defer metrics.MeasureSince([]string{"nomad", "leader", "reconcile"}, time.Now())
members := s.serf.Members()
for _, member := range members {
if err := s.reconcileMember(member); err != nil {
return err
}
}
return nil
}
// reconcileMember is used to do an async reconcile of a single serf member
func (s *Server) reconcileMember(member serf.Member) error {
// Check if this is a member we should handle
valid, parts := isNomadServer(member)
if !valid || parts.Region != s.config.Region {
return nil
}
defer metrics.MeasureSince([]string{"nomad", "leader", "reconcileMember"}, time.Now())
// Do not reconcile ourself
if member.Name == fmt.Sprintf("%s.%s", s.config.NodeName, s.config.Region) {
return nil
}
var err error
switch member.Status {
case serf.StatusAlive:
err = s.addRaftPeer(member, parts)
case serf.StatusLeft, StatusReap:
err = s.removeRaftPeer(member, parts)
}
if err != nil {
s.logger.Printf("[ERR] nomad: failed to reconcile member: %v: %v",
member, err)
return err
}
return nil
}
// reconcileJobSummaries reconciles the summaries of all the jobs registered in
// the system
// COMPAT 0.4 -> 0.4.1
func (s *Server) reconcileJobSummaries() error {
index, err := s.fsm.state.LatestIndex()
if err != nil {
return fmt.Errorf("unable to read latest index: %v", err)
}
s.logger.Printf("[DEBUG] leader: reconciling job summaries at index: %v", index)
args := &structs.GenericResponse{}
msg := structs.ReconcileJobSummariesRequestType | structs.IgnoreUnknownTypeFlag
if _, _, err = s.raftApply(msg, args); err != nil {
return fmt.Errorf("reconciliation of job summaries failed: %v", err)
}
return nil
}
// addRaftPeer is used to add a new Raft peer when a Nomad server joins
func (s *Server) addRaftPeer(m serf.Member, parts *serverParts) error {
2017-02-02 23:49:06 +00:00
// Do not join ourselfs
if m.Name == s.config.NodeName {
s.logger.Printf("[DEBUG] nomad: adding self (%q) as raft peer skipped", m.Name)
return nil
}
// Check for possibility of multiple bootstrap nodes
if parts.Bootstrap {
members := s.serf.Members()
for _, member := range members {
valid, p := isNomadServer(member)
if valid && member.Name != m.Name && p.Bootstrap {
s.logger.Printf("[ERR] nomad: '%v' and '%v' are both in bootstrap mode. Only one node should be in bootstrap mode, not adding Raft peer.", m.Name, member.Name)
return nil
}
2015-06-01 15:49:10 +00:00
}
}
2017-02-02 23:49:06 +00:00
// TODO (alexdadgar) - This will need to be changed once we support node IDs.
addr := (&net.TCPAddr{IP: m.Addr, Port: parts.Port}).String()
// See if it's already in the configuration. It's harmless to re-add it
// but we want to avoid doing that if possible to prevent useless Raft
// log entries.
configFuture := s.raft.GetConfiguration()
if err := configFuture.Error(); err != nil {
2017-02-03 00:07:15 +00:00
s.logger.Printf("[ERR] nomad: failed to get raft configuration: %v", err)
2017-02-02 23:49:06 +00:00
return err
}
for _, server := range configFuture.Configuration().Servers {
if server.Address == raft.ServerAddress(addr) {
return nil
}
}
// Attempt to add as a peer
2017-02-02 23:49:06 +00:00
addFuture := s.raft.AddPeer(raft.ServerAddress(addr))
if err := addFuture.Error(); err != nil {
s.logger.Printf("[ERR] nomad: failed to add raft peer: %v", err)
return err
2015-06-05 21:54:45 +00:00
} else if err == nil {
s.logger.Printf("[INFO] nomad: added raft peer: %v", parts)
}
return nil
}
// removeRaftPeer is used to remove a Raft peer when a Nomad server leaves
// or is reaped
func (s *Server) removeRaftPeer(m serf.Member, parts *serverParts) error {
2017-02-02 23:49:06 +00:00
// TODO (alexdadgar) - This will need to be changed once we support node IDs.
addr := (&net.TCPAddr{IP: m.Addr, Port: parts.Port}).String()
// See if it's already in the configuration. It's harmless to re-remove it
// but we want to avoid doing that if possible to prevent useless Raft
// log entries.
configFuture := s.raft.GetConfiguration()
if err := configFuture.Error(); err != nil {
2017-02-03 00:07:15 +00:00
s.logger.Printf("[ERR] nomad: failed to get raft configuration: %v", err)
2017-02-02 23:49:06 +00:00
return err
}
for _, server := range configFuture.Configuration().Servers {
if server.Address == raft.ServerAddress(addr) {
goto REMOVE
}
}
return nil
REMOVE:
// Attempt to remove as a peer.
future := s.raft.RemovePeer(raft.ServerAddress(addr))
if err := future.Error(); err != nil {
s.logger.Printf("[ERR] nomad: failed to remove raft peer '%v': %v",
parts, err)
return err
}
return nil
2015-06-01 15:49:10 +00:00
}