nomad: reap failed evaluations

This commit is contained in:
Armon Dadgar 2015-08-16 11:10:18 -07:00
parent 79a1471b85
commit c1aa76cc3b
2 changed files with 78 additions and 0 deletions

View File

@ -114,6 +114,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
// Scheduler periodic jobs
go s.schedulePeriodic(stopCh)
// Reap any failed evaluations
go s.reapFailedEvaluations(stopCh)
return nil
}
@ -173,6 +176,44 @@ func (s *Server) coreJobEval(job string) *structs.Evaluation {
}
}
// reapFailedEvaluations is used to reap evaluations that
// have reached their delivery limit and should be failed
func (s *Server) reapFailedEvaluations(stopCh chan struct{}) {
for {
select {
case <-stopCh:
return
default:
// Scan for a failed evaluation
eval, token, err := s.evalBroker.Dequeue([]string{failedQueue}, time.Second)
if err != nil {
return
}
if eval == nil {
continue
}
// Update the status to failed
newEval := eval.Copy()
newEval.Status = structs.EvalStatusFailed
newEval.StatusDescription = fmt.Sprintf("evaluation reached delivery limit (%d)", s.config.EvalDeliveryLimit)
s.logger.Printf("[WARN] nomad: eval %#v reached delivery limit, marking as failed", newEval)
// Update via Raft
req := structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{newEval},
}
if _, _, err := s.raftApply(structs.EvalUpdateRequestType, &req); err != nil {
s.logger.Printf("[ERR] nomad: failed to update failed eval %#v: %v", newEval, err)
continue
}
// Ack completion
s.evalBroker.Ack(eval.ID, token)
}
}
}
// revokeLeadership is invoked once we step down as leader.
// This is used to cleanup any state that may be specific to a leader.
func (s *Server) revokeLeadership() error {

View File

@ -304,3 +304,40 @@ func TestLeader_PeriodicDispatch(t *testing.T) {
t.Fatalf("should pending job")
})
}
func TestLeader_ReapFailedEval(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0
c.EvalDeliveryLimit = 1
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Wait for a periodic dispatch
eval := mock.Eval()
testutil.WaitForResult(func() (bool, error) {
err := s1.evalBroker.Enqueue(eval)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Dequeue and Nack
out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
s1.evalBroker.Nack(out.ID, token)
// Wait updated evaluation
state := s1.fsm.State()
testutil.WaitForResult(func() (bool, error) {
out, err := state.GetEvalByID(eval.ID)
if err != nil {
return false, err
}
return out != nil && out.Status == structs.EvalStatusFailed, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}