Cancel blocked evals upon successful one for job

This PR causes blocked evaluations to be cancelled if there is a
subsequent successful evaluation for the job. This fixes UX problems
showing failed placements when there are not any in reality and makes GC
possible for these jobs in certain cases.

Fixes https://github.com/hashicorp/nomad/issues/2124
This commit is contained in:
Alex Dadgar 2017-01-04 15:25:03 -08:00
parent bcf513201e
commit 751d81f242
8 changed files with 293 additions and 15 deletions

View file

@ -39,8 +39,8 @@ type BlockedEvals struct {
capacityChangeCh chan *capacityUpdate
// jobs is the map of blocked job and is used to ensure that only one
// blocked eval exists for each job.
jobs map[string]struct{}
// blocked eval exists for each job. The value is the blocked evaluation ID.
jobs map[string]string
// unblockIndexes maps computed node classes to the index in which they were
// unblocked. This is used to check if an evaluation could have been
@ -91,7 +91,7 @@ func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals {
evalBroker: evalBroker,
captured: make(map[string]wrappedEval),
escaped: make(map[string]wrappedEval),
jobs: make(map[string]struct{}),
jobs: make(map[string]string),
unblockIndexes: make(map[string]uint64),
capacityChangeCh: make(chan *capacityUpdate, unblockBuffer),
duplicateCh: make(chan struct{}, 1),
@ -183,7 +183,7 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {
// Mark the job as tracked.
b.stats.TotalBlocked++
b.jobs[eval.JobID] = struct{}{}
b.jobs[eval.JobID] = eval.ID
// Wrap the evaluation, capturing its token.
wrapped := wrappedEval{
@ -244,6 +244,40 @@ func (b *BlockedEvals) missedUnblock(eval *structs.Evaluation) bool {
return false
}
// Untrack causes any blocked evaluation for the passed job to be no longer
// tracked. Untrack is called when there is a successful evaluation for the job
// and a blocked evaluation is no longer needed.
func (b *BlockedEvals) Untrack(jobID string) {
b.l.Lock()
defer b.l.Unlock()
// Do nothing if not enabled
if !b.enabled {
return
}
// Get the evaluation ID to cancel
evalID, ok := b.jobs[jobID]
if !ok {
// No blocked evaluation so exit
return
}
// Attempt to delete the evaluation
if w, ok := b.captured[evalID]; ok {
delete(b.jobs, w.eval.JobID)
delete(b.captured, evalID)
b.stats.TotalBlocked--
}
if w, ok := b.escaped[evalID]; ok {
delete(b.jobs, w.eval.JobID)
delete(b.escaped, evalID)
b.stats.TotalEscaped--
b.stats.TotalBlocked--
}
}
// Unblock causes any evaluation that could potentially make progress on a
// capacity change on the passed computed node class to be enqueued into the
// eval broker.
@ -410,7 +444,7 @@ func (b *BlockedEvals) Flush() {
b.stats.TotalBlocked = 0
b.captured = make(map[string]wrappedEval)
b.escaped = make(map[string]wrappedEval)
b.jobs = make(map[string]struct{})
b.jobs = make(map[string]string)
b.duplicates = nil
b.capacityChangeCh = make(chan *capacityUpdate, unblockBuffer)
b.stopCh = make(chan struct{})

View file

@ -484,3 +484,27 @@ func TestBlockedEvals_UnblockFailed(t *testing.T) {
t.Fatalf("bad: %#v", blockedStats)
}
}
func TestBlockedEvals_Untrack(t *testing.T) {
blocked, _ := testBlockedEvals(t)
// Create two blocked evals and add them to the blocked tracker.
e := mock.Eval()
e.Status = structs.EvalStatusBlocked
e.ClassEligibility = map[string]bool{"v1:123": false, "v1:456": false}
e.SnapshotIndex = 1000
blocked.Block(e)
// Verify block did track
bStats := blocked.Stats()
if bStats.TotalBlocked != 1 || bStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", bStats)
}
// Untrack and verify
blocked.Untrack(e.JobID)
bStats = blocked.Stats()
if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 {
t.Fatalf("bad: %#v", bStats)
}
}

View file

@ -350,6 +350,11 @@ func (n *nomadFSM) applyUpdateEval(buf []byte, index uint64) interface{} {
n.evalBroker.Enqueue(eval)
} else if eval.ShouldBlock() {
n.blockedEvals.Block(eval)
} else if eval.Status == structs.EvalStatusComplete &&
len(eval.FailedTGAllocs) == 0 {
// If we have a successful evaluation for a node, untrack any
// blocked evaluation
n.blockedEvals.Untrack(eval.JobID)
}
}
return nil

View file

@ -445,6 +445,114 @@ func TestFSM_UpdateEval_Blocked(t *testing.T) {
}
}
func TestFSM_UpdateEval_Untrack(t *testing.T) {
fsm := testFSM(t)
fsm.evalBroker.SetEnabled(true)
fsm.blockedEvals.SetEnabled(true)
// Mark an eval as blocked.
bEval := mock.Eval()
bEval.ClassEligibility = map[string]bool{"v1:123": true}
fsm.blockedEvals.Block(bEval)
// Create a successful eval for the same job
eval := mock.Eval()
eval.JobID = bEval.JobID
eval.Status = structs.EvalStatusComplete
req := structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
}
buf, err := structs.Encode(structs.EvalUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
out, err := fsm.State().EvalByID(eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("not found!")
}
if out.CreateIndex != 1 {
t.Fatalf("bad index: %d", out.CreateIndex)
}
// Verify the eval wasn't enqueued
stats := fsm.evalBroker.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v %#v", stats, out)
}
// Verify the eval was untracked in the blocked tracker.
bStats := fsm.blockedEvals.Stats()
if bStats.TotalBlocked != 0 {
t.Fatalf("bad: %#v %#v", bStats, out)
}
}
func TestFSM_UpdateEval_NoUntrack(t *testing.T) {
fsm := testFSM(t)
fsm.evalBroker.SetEnabled(true)
fsm.blockedEvals.SetEnabled(true)
// Mark an eval as blocked.
bEval := mock.Eval()
bEval.ClassEligibility = map[string]bool{"v1:123": true}
fsm.blockedEvals.Block(bEval)
// Create a successful eval for the same job but with placement failures
eval := mock.Eval()
eval.JobID = bEval.JobID
eval.Status = structs.EvalStatusComplete
eval.FailedTGAllocs = make(map[string]*structs.AllocMetric)
eval.FailedTGAllocs["test"] = new(structs.AllocMetric)
req := structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
}
buf, err := structs.Encode(structs.EvalUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
out, err := fsm.State().EvalByID(eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("not found!")
}
if out.CreateIndex != 1 {
t.Fatalf("bad index: %d", out.CreateIndex)
}
// Verify the eval wasn't enqueued
stats := fsm.evalBroker.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v %#v", stats, out)
}
// Verify the eval was not untracked in the blocked tracker.
bStats := fsm.blockedEvals.Stats()
if bStats.TotalBlocked != 1 {
t.Fatalf("bad: %#v %#v", bStats, out)
}
}
func TestFSM_DeleteEval(t *testing.T) {
fsm := testFSM(t)

View file

@ -683,11 +683,11 @@ func TestClientEndpoint_Drain_Down(t *testing.T) {
// Wait for the scheduler to create an allocation
testutil.WaitForResult(func() (bool, error) {
allocs, err := s1.fsm.state.AllocsByJob(job.ID)
allocs, err := s1.fsm.state.AllocsByJob(job.ID, true)
if err != nil {
return false, err
}
allocs1, err := s1.fsm.state.AllocsByJob(job1.ID)
allocs1, err := s1.fsm.state.AllocsByJob(job1.ID, true)
if err != nil {
return false, err
}

View file

@ -214,9 +214,17 @@ func evalTableSchema() *memdb.TableSchema {
Name: "job",
AllowMissing: false,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "JobID",
Lowercase: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "JobID",
Lowercase: true,
},
&memdb.StringFieldIndex{
Field: "Status",
Lowercase: true,
},
},
},
},
},

View file

@ -626,7 +626,7 @@ func (s *StateStore) PeriodicLaunches() (memdb.ResultIterator, error) {
return iter, nil
}
// UpsertEvaluation is used to upsert an evaluation
// UpsertEvals is used to upsert a set of evaluations
func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) error {
txn := s.db.Txn(true)
defer txn.Abort()
@ -639,7 +639,7 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro
for _, eval := range evals {
watcher.Add(watch.Item{Eval: eval.ID})
watcher.Add(watch.Item{EvalJob: eval.JobID})
if err := s.nestedUpsertEval(txn, index, eval); err != nil {
if err := s.nestedUpsertEval(txn, watcher, index, eval); err != nil {
return err
}
@ -657,7 +657,7 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro
}
// nestedUpsertEvaluation is used to nest an evaluation upsert within a transaction
func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *structs.Evaluation) error {
func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, watcher watch.Items, index uint64, eval *structs.Evaluation) error {
// Lookup the evaluation
existing, err := txn.First("evals", "id", eval.ID)
if err != nil {
@ -705,6 +705,37 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct
}
}
// Check if the job has any blocked evaluations and cancel them
if eval.Status == structs.EvalStatusComplete && len(eval.FailedTGAllocs) == 0 {
// Get the blocked evaluation for a job if it exists
iter, err := txn.Get("evals", "job", eval.JobID, structs.EvalStatusBlocked)
if err != nil {
return fmt.Errorf("failed to get blocked evals for job %q", eval.JobID, err)
}
var blocked []*structs.Evaluation
for {
raw := iter.Next()
if raw == nil {
break
}
blocked = append(blocked, raw.(*structs.Evaluation))
}
// Go through and update the evals
for _, eval := range blocked {
newEval := eval.Copy()
newEval.Status = structs.EvalStatusCancelled
newEval.StatusDescription = fmt.Sprintf("evaluation %q successful", newEval.ID)
newEval.ModifyIndex = index
if err := txn.Insert("evals", newEval); err != nil {
return fmt.Errorf("eval insert failed: %v", err)
}
watcher.Add(watch.Item{Eval: newEval.ID})
}
}
// Insert the eval
if err := txn.Insert("evals", eval); err != nil {
return fmt.Errorf("eval insert failed: %v", err)
@ -809,7 +840,7 @@ func (s *StateStore) EvalsByJob(jobID string) ([]*structs.Evaluation, error) {
txn := s.db.Txn(false)
// Get an iterator over the node allocations
iter, err := txn.Get("evals", "job", jobID)
iter, err := txn.Get("evals", "job_prefix", jobID)
if err != nil {
return nil, err
}
@ -1490,7 +1521,7 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b
}
}
evals, err := txn.Get("evals", "job", job.ID)
evals, err := txn.Get("evals", "job_prefix", job.ID)
if err != nil {
return "", err
}

View file

@ -1255,6 +1255,74 @@ func TestStateStore_UpsertEvals_Eval(t *testing.T) {
notify.verify(t)
}
func TestStateStore_UpsertEvals_CancelBlocked(t *testing.T) {
state := testStateStore(t)
// Create two blocked evals for the same job
j := "test-job"
b1, b2 := mock.Eval(), mock.Eval()
b1.JobID = j
b1.Status = structs.EvalStatusBlocked
b2.JobID = j
b2.Status = structs.EvalStatusBlocked
err := state.UpsertEvals(999, []*structs.Evaluation{b1, b2})
if err != nil {
t.Fatalf("err: %v", err)
}
// Create one complete and successful eval for the job
eval := mock.Eval()
eval.JobID = j
eval.Status = structs.EvalStatusComplete
notify := setupNotifyTest(
state,
watch.Item{Table: "evals"},
watch.Item{Eval: b1.ID},
watch.Item{Eval: b2.ID},
watch.Item{Eval: eval.ID},
watch.Item{EvalJob: eval.JobID})
if err := state.UpsertEvals(1000, []*structs.Evaluation{eval}); err != nil {
t.Fatalf("err: %v", err)
}
out, err := state.EvalByID(eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if !reflect.DeepEqual(eval, out) {
t.Fatalf("bad: %#v %#v", eval, out)
}
index, err := state.Index("evals")
if err != nil {
t.Fatalf("err: %v", err)
}
if index != 1000 {
t.Fatalf("bad: %d", index)
}
// Get b1/b2 and check they are cancelled
out1, err := state.EvalByID(b1.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
out2, err := state.EvalByID(b2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out1.Status != structs.EvalStatusCancelled || out2.Status != structs.EvalStatusCancelled {
t.Fatalf("bad: %#v %#v", out1, out2)
}
notify.verify(t)
}
func TestStateStore_Update_UpsertEvals_Eval(t *testing.T) {
state := testStateStore(t)
eval := mock.Eval()