Merge pull request #5900 from hashicorp/f-system-sched-blocked-evals

System scheduler blocked evals
This commit is contained in:
Lang Martin 2019-07-18 16:13:15 -04:00 committed by GitHub
commit ee64b00141
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 255 additions and 127 deletions

View File

@ -47,6 +47,10 @@ type BlockedEvals struct {
// classes.
escaped map[string]wrappedEval
// system is the set of system evaluations that failed to start on nodes because of
// resource constraints.
system *systemEvals
// unblockCh is used to buffer unblocking of evaluations.
capacityChangeCh chan *capacityUpdate
@ -113,6 +117,7 @@ func NewBlockedEvals(evalBroker *EvalBroker, logger log.Logger) *BlockedEvals {
evalBroker: evalBroker,
captured: make(map[string]wrappedEval),
escaped: make(map[string]wrappedEval),
system: newSystemEvals(),
jobs: make(map[structs.NamespacedID]string),
unblockIndexes: make(map[string]uint64),
capacityChangeCh: make(chan *capacityUpdate, unblockBuffer),
@ -227,6 +232,12 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {
return
}
// System evals are indexed by node and re-processed on utilization changes in
// existing nodes
if eval.Type == structs.JobTypeSystem {
b.system.Add(eval, token)
}
// Add the eval to the set of blocked evals whose jobs constraints are
// captured by computed node class.
b.captured[eval.ID] = wrapped
@ -365,6 +376,14 @@ func (b *BlockedEvals) Untrack(jobID, namespace string) {
nsID := structs.NewNamespacedID(jobID, namespace)
if evals, ok := b.system.JobEvals(nsID); ok {
for _, e := range evals {
b.system.Remove(e)
b.stats.TotalBlocked--
}
return
}
// Get the evaluation ID to cancel
evalID, ok := b.jobs[nsID]
if !ok {
@ -477,6 +496,27 @@ func (b *BlockedEvals) UnblockClassAndQuota(class, quota string, index uint64) {
}
}
// UnblockNode finds any blocked evalution that's node specific (system jobs) and enqueues
// it on the eval broker
func (b *BlockedEvals) UnblockNode(nodeID string, index uint64) {
b.l.Lock()
defer b.l.Unlock()
evals, ok := b.system.NodeEvals(nodeID)
// Do nothing if not enabled
if !b.enabled || !ok || len(evals) == 0 {
return
}
for e := range evals {
b.system.Remove(e)
b.stats.TotalBlocked--
}
b.evalBroker.EnqueueAll(evals)
}
// watchCapacity is a long lived function that watches for capacity changes in
// nodes and unblocks the correct set of evals.
func (b *BlockedEvals) watchCapacity(stopCh <-chan struct{}, changeCh <-chan *capacityUpdate) {
@ -652,6 +692,7 @@ func (b *BlockedEvals) Flush() {
b.capacityChangeCh = make(chan *capacityUpdate, unblockBuffer)
b.stopCh = make(chan struct{})
b.duplicateCh = make(chan struct{}, 1)
b.system = newSystemEvals()
}
// Stats is used to query the state of the blocked eval tracker.

View File

@ -0,0 +1,94 @@
package nomad
import "github.com/hashicorp/nomad/nomad/structs"
// systemEvals are handled specially, each job may have a blocked eval on each node
type systemEvals struct {
// byJob maps a jobID to a nodeID to that job's single blocked evalID on that node
byJob map[structs.NamespacedID]map[string]string
// byNode maps a nodeID to a set of evalIDs
byNode map[string]map[string]bool
// evals maps evalIDs to an eval and token
evals map[string]*wrappedEval
}
func newSystemEvals() *systemEvals {
return &systemEvals{
evals: map[string]*wrappedEval{},
byJob: map[structs.NamespacedID]map[string]string{},
byNode: map[string]map[string]bool{},
}
}
func (s *systemEvals) Add(eval *structs.Evaluation, token string) {
// store the eval by node id
if _, ok := s.byNode[eval.NodeID]; !ok {
s.byNode[eval.NodeID] = make(map[string]bool)
}
s.byNode[eval.NodeID][eval.ID] = true
s.evals[eval.ID] = &wrappedEval{eval: eval, token: token}
// link the job to the node for cleanup
jobID := structs.NewNamespacedID(eval.JobID, eval.Namespace)
if _, ok := s.byJob[jobID]; !ok {
s.byJob[jobID] = make(map[string]string)
}
// if we're displacing the old blocked id for this job+node, delete it first
if prevID, ok := s.byJob[jobID][eval.NodeID]; ok {
prev, _ := s.Get(prevID)
s.Remove(prev.eval)
}
// set this eval as the new eval for this job on this node
s.byJob[jobID][eval.NodeID] = eval.ID
}
func (s *systemEvals) Get(evalID string) (*wrappedEval, bool) {
w, ok := s.evals[evalID]
return w, ok
}
func (s *systemEvals) Remove(eval *structs.Evaluation) {
// delete the job index if this eval is the currently listed blocked eval
jobID := structs.NewNamespacedID(eval.JobID, eval.Namespace)
e, ok := s.byJob[jobID][eval.NodeID]
if ok && e == eval.ID {
delete(s.byJob[jobID], eval.NodeID)
}
// delete this eval from the node index, and then the map for this node if empty
delete(s.byNode[eval.NodeID], eval.ID)
if len(s.byNode[eval.NodeID]) == 0 {
delete(s.byNode, eval.NodeID)
}
// delete the eval itself
delete(s.evals, eval.ID)
}
func (s *systemEvals) NodeEvals(nodeID string) (map[*structs.Evaluation]string, bool) {
out := map[*structs.Evaluation]string{}
for eID := range s.byNode[nodeID] {
if w, ok := s.Get(eID); ok {
out[w.eval] = w.token
}
}
ok := len(out) > 0
return out, ok
}
func (s *systemEvals) JobEvals(jobID structs.NamespacedID) ([]*structs.Evaluation, bool) {
out := []*structs.Evaluation{}
_, ok := s.byJob[jobID]
for _, eID := range s.byJob[jobID] {
if e, ok := s.Get(eID); ok {
out = append(out, e.eval)
}
}
return out, ok
}

View File

@ -10,6 +10,7 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)
func testBlockedEvals(t *testing.T) (*BlockedEvals, *EvalBroker) {
@ -174,18 +175,21 @@ func TestBlockedEvals_UnblockEscaped(t *testing.T) {
}
blocked.Unblock("v1:123", 1000)
requireBlockedEvalsEnqueued(t, blocked, broker, 1)
}
func requireBlockedEvalsEnqueued(t *testing.T, blocked *BlockedEvals, broker *EvalBroker, enqueued int) {
testutil.WaitForResult(func() (bool, error) {
// Verify Unblock caused an enqueue
brokerStats := broker.Stats()
if brokerStats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", brokerStats)
if brokerStats.TotalReady != enqueued {
return false, fmt.Errorf("missing enqueued evals: %#v", brokerStats)
}
// Verify Unblock updates the stats
bStats := blocked.Stats()
if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 {
return false, fmt.Errorf("bad: %#v", bStats)
return false, fmt.Errorf("evals still blocked: %#v", bStats)
}
return true, nil
}, func(err error) {
@ -211,23 +215,7 @@ func TestBlockedEvals_UnblockEligible(t *testing.T) {
}
blocked.Unblock("v1:123", 1000)
testutil.WaitForResult(func() (bool, error) {
// Verify Unblock caused an enqueue
brokerStats := broker.Stats()
if brokerStats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", brokerStats)
}
// Verify Unblock updates the stats
bStats := blocked.Stats()
if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 {
return false, fmt.Errorf("bad: %#v", bStats)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
requireBlockedEvalsEnqueued(t, blocked, broker, 1)
}
func TestBlockedEvals_UnblockIneligible(t *testing.T) {
@ -286,23 +274,7 @@ func TestBlockedEvals_UnblockUnknown(t *testing.T) {
// Should unblock because the eval hasn't seen this node class.
blocked.Unblock("v1:789", 1000)
testutil.WaitForResult(func() (bool, error) {
// Verify Unblock causes an enqueue
brokerStats := broker.Stats()
if brokerStats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", brokerStats)
}
// Verify Unblock updates the stats
bStats := blocked.Stats()
if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 {
return false, fmt.Errorf("bad: %#v", bStats)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
requireBlockedEvalsEnqueued(t, blocked, broker, 1)
}
func TestBlockedEvals_UnblockEligible_Quota(t *testing.T) {
@ -322,23 +294,7 @@ func TestBlockedEvals_UnblockEligible_Quota(t *testing.T) {
}
blocked.UnblockQuota("foo", 1000)
testutil.WaitForResult(func() (bool, error) {
// Verify Unblock caused an enqueue
brokerStats := broker.Stats()
if brokerStats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", brokerStats)
}
// Verify Unblock updates the stats
bs := blocked.Stats()
if bs.TotalBlocked != 0 || bs.TotalEscaped != 0 || bs.TotalQuotaLimit != 0 {
return false, fmt.Errorf("bad: %#v", bs)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
requireBlockedEvalsEnqueued(t, blocked, broker, 1)
}
func TestBlockedEvals_UnblockIneligible_Quota(t *testing.T) {
@ -416,22 +372,7 @@ func TestBlockedEvals_Reblock(t *testing.T) {
t.Fatalf("err: %v", err)
}
testutil.WaitForResult(func() (bool, error) {
// Verify Unblock causes an enqueue
brokerStats := broker.Stats()
if brokerStats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", brokerStats)
}
// Verify Unblock updates the stats
bStats := blocked.Stats()
if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 {
return false, fmt.Errorf("bad: %#v", bStats)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
requireBlockedEvalsEnqueued(t, blocked, broker, 1)
}
// Test the block case in which the eval should be immediately unblocked since
@ -457,17 +398,7 @@ func TestBlockedEvals_Block_ImmediateUnblock_Escaped(t *testing.T) {
t.Fatalf("bad: %#v", blockedStats)
}
testutil.WaitForResult(func() (bool, error) {
// Verify Unblock caused an enqueue
brokerStats := broker.Stats()
if brokerStats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", brokerStats)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
requireBlockedEvalsEnqueued(t, blocked, broker, 1)
}
// Test the block case in which the eval should be immediately unblocked since
@ -494,17 +425,7 @@ func TestBlockedEvals_Block_ImmediateUnblock_UnseenClass_After(t *testing.T) {
t.Fatalf("bad: %#v", blockedStats)
}
testutil.WaitForResult(func() (bool, error) {
// Verify Unblock caused an enqueue
brokerStats := broker.Stats()
if brokerStats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", brokerStats)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
requireBlockedEvalsEnqueued(t, blocked, broker, 1)
}
// Test the block case in which the eval should not immediately unblock since
@ -555,17 +476,7 @@ func TestBlockedEvals_Block_ImmediateUnblock_SeenClass(t *testing.T) {
t.Fatalf("bad: %#v", blockedStats)
}
testutil.WaitForResult(func() (bool, error) {
// Verify Unblock caused an enqueue
brokerStats := broker.Stats()
if brokerStats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", brokerStats)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
requireBlockedEvalsEnqueued(t, blocked, broker, 1)
}
// Test the block case in which the eval should be immediately unblocked since
@ -591,17 +502,7 @@ func TestBlockedEvals_Block_ImmediateUnblock_Quota(t *testing.T) {
t.Fatalf("bad: %#v", bs)
}
testutil.WaitForResult(func() (bool, error) {
// Verify Unblock caused an enqueue
brokerStats := broker.Stats()
if brokerStats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", brokerStats)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
requireBlockedEvalsEnqueued(t, blocked, broker, 1)
}
func TestBlockedEvals_UnblockFailed(t *testing.T) {
@ -636,16 +537,7 @@ func TestBlockedEvals_UnblockFailed(t *testing.T) {
t.Fatalf("bad: %#v", bs)
}
testutil.WaitForResult(func() (bool, error) {
// Verify Unblock caused an enqueue
brokerStats := broker.Stats()
if brokerStats.TotalReady != 3 {
return false, fmt.Errorf("bad: %#v", brokerStats)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
requireBlockedEvalsEnqueued(t, blocked, broker, 3)
// Reblock an eval for the same job and check that it gets tracked.
blocked.Block(e)
@ -704,3 +596,78 @@ func TestBlockedEvals_Untrack_Quota(t *testing.T) {
t.Fatalf("bad: %#v", bs)
}
}
func TestBlockedEvals_UnblockNode(t *testing.T) {
t.Parallel()
blocked, broker := testBlockedEvals(t)
require.NotNil(t, broker)
// Create a blocked evals and add it to the blocked tracker.
e := mock.Eval()
e.Type = structs.JobTypeSystem
e.NodeID = "foo"
e.SnapshotIndex = 999
blocked.Block(e)
// Verify block did track
bs := blocked.Stats()
require.Equal(t, 1, bs.TotalBlocked)
blocked.UnblockNode("foo", 1000)
requireBlockedEvalsEnqueued(t, blocked, broker, 1)
bs = blocked.Stats()
require.Empty(t, blocked.system.byNode)
require.Equal(t, 0, bs.TotalBlocked)
}
func TestBlockedEvals_SystemUntrack(t *testing.T) {
t.Parallel()
blocked, _ := testBlockedEvals(t)
// Create a blocked evals and add it to the blocked tracker.
e := mock.Eval()
e.Type = structs.JobTypeSystem
e.NodeID = "foo"
blocked.Block(e)
// Verify block did track
bs := blocked.Stats()
require.Equal(t, 1, bs.TotalBlocked)
require.Equal(t, 0, bs.TotalEscaped)
require.Equal(t, 0, bs.TotalQuotaLimit)
// Untrack and verify
blocked.Untrack(e.JobID, e.Namespace)
bs = blocked.Stats()
require.Equal(t, 0, bs.TotalBlocked)
require.Equal(t, 0, bs.TotalEscaped)
require.Equal(t, 0, bs.TotalQuotaLimit)
}
func TestBlockedEvals_SystemDisableFlush(t *testing.T) {
t.Parallel()
blocked, _ := testBlockedEvals(t)
// Create a blocked evals and add it to the blocked tracker.
e := mock.Eval()
e.Type = structs.JobTypeSystem
e.NodeID = "foo"
blocked.Block(e)
// Verify block did track
bs := blocked.Stats()
require.Equal(t, 1, bs.TotalBlocked)
require.Equal(t, 0, bs.TotalEscaped)
require.Equal(t, 0, bs.TotalQuotaLimit)
// Disable empties
blocked.SetEnabled(false)
bs = blocked.Stats()
require.Equal(t, 0, bs.TotalBlocked)
require.Equal(t, 0, bs.TotalEscaped)
require.Equal(t, 0, bs.TotalQuotaLimit)
require.Empty(t, blocked.system.evals)
require.Empty(t, blocked.system.byJob)
require.Empty(t, blocked.system.byNode)
}

View File

@ -344,6 +344,7 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} {
}
n.blockedEvals.Unblock(node.ComputedClass, index)
n.blockedEvals.UnblockNode(req.NodeID, index)
}
return nil
@ -415,6 +416,7 @@ func (n *nomadFSM) applyNodeEligibilityUpdate(buf []byte, index uint64) interfac
if node != nil && node.SchedulingEligibility == structs.NodeSchedulingIneligible &&
req.Eligibility == structs.NodeSchedulingEligible {
n.blockedEvals.Unblock(node.ComputedClass, index)
n.blockedEvals.UnblockNode(req.NodeID, index)
}
return nil
@ -760,6 +762,7 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{}
}
n.blockedEvals.UnblockClassAndQuota(node.ComputedClass, quota, index)
n.blockedEvals.UnblockNode(node.ID, index)
}
}

View File

@ -253,7 +253,7 @@ func (w *Worker) invokeScheduler(snap *state.StateSnapshot, eval *structs.Evalua
return fmt.Errorf("failed to determine snapshot's index: %v", err)
}
// Create the scheduler, or use the special system scheduler
// Create the scheduler, or use the special core scheduler
var sched scheduler.Scheduler
if eval.Type == structs.JobTypeCore {
sched = NewCoreScheduler(w.srv, snap)

View File

@ -257,7 +257,8 @@ func (s *GenericScheduler) process() (bool, error) {
// If there are failed allocations, we need to create a blocked evaluation
// to place the failed allocations when resources become available. If the
// current evaluation is already a blocked eval, we reuse it.
// current evaluation is already a blocked eval, we reuse it by submitting
// a new eval to the planner in createBlockedEval
if s.eval.Status != structs.EvalStatusBlocked && len(s.failedTGAllocs) != 0 && s.blocked == nil {
if err := s.createBlockedEval(false); err != nil {
s.logger.Error("failed to make blocked eval", "error", err)

View File

@ -18,6 +18,7 @@ const (
// SystemScheduler is used for 'system' jobs. This scheduler is
// designed for services that should be run on every client.
// One for each job, containing an allocation for each node
type SystemScheduler struct {
logger log.Logger
state State
@ -61,7 +62,8 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error {
switch eval.TriggeredBy {
case structs.EvalTriggerJobRegister, structs.EvalTriggerNodeUpdate, structs.EvalTriggerFailedFollowUp,
structs.EvalTriggerJobDeregister, structs.EvalTriggerRollingUpdate, structs.EvalTriggerPreemption,
structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerNodeDrain, structs.EvalTriggerAllocStop:
structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerNodeDrain, structs.EvalTriggerAllocStop,
structs.EvalTriggerQueuedAllocs:
default:
desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason",
eval.TriggeredBy)
@ -324,6 +326,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
// Actual failure to start this task on this candidate node, report it individually
s.failedTGAllocs[missing.TaskGroup.Name] = s.ctx.Metrics()
s.addBlocked(node)
continue
}
@ -390,3 +393,22 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
return nil
}
// addBlocked creates a new blocked eval for this job on this node
// and submit to the planner (worker.go), which keeps the eval for execution later
func (s *SystemScheduler) addBlocked(node *structs.Node) error {
e := s.ctx.Eligibility()
escaped := e.HasEscaped()
// Only store the eligible classes if the eval hasn't escaped.
var classEligibility map[string]bool
if !escaped {
classEligibility = e.GetClasses()
}
blocked := s.eval.CreateBlockedEval(classEligibility, escaped, e.QuotaLimitReached())
blocked.StatusDescription = blockedEvalFailedPlacements
blocked.NodeID = node.ID
return s.planner.CreateEval(blocked)
}