Blocked evals don't store TG alloc metrics

This commit is contained in:
Alex Dadgar 2016-05-27 11:26:14 -07:00
parent 41de4b3521
commit fb8d79a908
4 changed files with 42 additions and 18 deletions

View file

@ -62,7 +62,8 @@ type GenericScheduler struct {
limitReached bool
nextEval *structs.Evaluation
blocked *structs.Evaluation
blocked *structs.Evaluation
failedTGAllocs map[string]*structs.AllocMetric
}
// NewServiceScheduler is a factory function to instantiate a new service scheduler
@ -100,7 +101,8 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
default:
desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason",
eval.TriggeredBy)
return setStatus(s.logger, s.planner, s.eval, s.nextEval, s.blocked, structs.EvalStatusFailed, desc)
return setStatus(s.logger, s.planner, s.eval, s.nextEval, s.blocked,
s.failedTGAllocs, structs.EvalStatusFailed, desc)
}
// Retry up to the maxScheduleAttempts and reset if progress is made.
@ -117,7 +119,8 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
if err := s.createBlockedEval(true); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
if err := setStatus(s.logger, s.planner, s.eval, s.nextEval, s.blocked, statusErr.EvalStatus, err.Error()); err != nil {
if err := setStatus(s.logger, s.planner, s.eval, s.nextEval, s.blocked,
s.failedTGAllocs, statusErr.EvalStatus, err.Error()); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
return mErr.ErrorOrNil()
@ -127,12 +130,13 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
// If the current evaluation is a blocked evaluation and we didn't place
// everything, do not update the status to complete.
if s.eval.Status == structs.EvalStatusBlocked && len(s.eval.FailedTGAllocs) != 0 {
if s.eval.Status == structs.EvalStatusBlocked && len(s.failedTGAllocs) != 0 {
return s.planner.ReblockEval(s.eval)
}
// Update the status to complete
return setStatus(s.logger, s.planner, s.eval, s.nextEval, s.blocked, structs.EvalStatusComplete, "")
return setStatus(s.logger, s.planner, s.eval, s.nextEval, s.blocked,
s.failedTGAllocs, structs.EvalStatusComplete, "")
}
// createBlockedEval creates a blocked eval and submits it to the planner. If
@ -170,7 +174,7 @@ func (s *GenericScheduler) process() (bool, error) {
s.plan = s.eval.MakePlan(s.job)
// Reset the failed allocations
s.eval.FailedTGAllocs = nil
s.failedTGAllocs = nil
// Create an evaluation context
s.ctx = NewEvalContext(s.state, s.plan, s.logger)
@ -190,7 +194,7 @@ func (s *GenericScheduler) process() (bool, error) {
// If there are failed allocations, we need to create a blocked evaluation
// to place the failed allocations when resources become available. If the
// current evaluation is already a blocked eval, we reuse it.
if s.eval.Status != structs.EvalStatusBlocked && len(s.eval.FailedTGAllocs) != 0 && s.blocked == nil {
if s.eval.Status != structs.EvalStatusBlocked && len(s.failedTGAllocs) != 0 && s.blocked == nil {
if err := s.createBlockedEval(false); err != nil {
s.logger.Printf("[ERR] sched: %#v failed to make blocked eval: %v", s.eval, err)
return false, err
@ -382,7 +386,7 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error {
for _, missing := range place {
// Check if this task group has already failed
if metric, ok := s.eval.FailedTGAllocs[missing.TaskGroup.Name]; ok {
if metric, ok := s.failedTGAllocs[missing.TaskGroup.Name]; ok {
metric.CoalescedFailures += 1
continue
}
@ -416,11 +420,11 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error {
s.plan.AppendAlloc(alloc)
} else {
// Lazy initialize the failed map
if s.eval.FailedTGAllocs == nil {
s.eval.FailedTGAllocs = make(map[string]*structs.AllocMetric)
if s.failedTGAllocs == nil {
s.failedTGAllocs = make(map[string]*structs.AllocMetric)
}
s.eval.FailedTGAllocs[missing.TaskGroup.Name] = s.ctx.Metrics()
s.failedTGAllocs[missing.TaskGroup.Name] = s.ctx.Metrics()
}
}

View file

@ -60,20 +60,20 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error {
default:
desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason",
eval.TriggeredBy)
return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, structs.EvalStatusFailed, desc)
return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, nil, structs.EvalStatusFailed, desc)
}
// Retry up to the maxSystemScheduleAttempts and reset if progress is made.
progress := func() bool { return progressMade(s.planResult) }
if err := retryMax(maxSystemScheduleAttempts, s.process, progress); err != nil {
if statusErr, ok := err.(*SetStatusError); ok {
return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, statusErr.EvalStatus, err.Error())
return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, nil, statusErr.EvalStatus, err.Error())
}
return err
}
// Update the status to complete
return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, structs.EvalStatusComplete, "")
return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, nil, structs.EvalStatusComplete, "")
}
// process is wrapped in retryMax to iteratively run the handler until we have no

View file

@ -366,11 +366,15 @@ func networkPortMap(n *structs.NetworkResource) map[string]int {
}
// setStatus is used to update the status of the evaluation
func setStatus(logger *log.Logger, planner Planner, eval, nextEval, spawnedBlocked *structs.Evaluation, status, desc string) error {
func setStatus(logger *log.Logger, planner Planner,
eval, nextEval, spawnedBlocked *structs.Evaluation,
tgMetrics map[string]*structs.AllocMetric, status, desc string) error {
logger.Printf("[DEBUG] sched: %#v: setting status to %s", eval, status)
newEval := eval.Copy()
newEval.Status = status
newEval.StatusDescription = desc
newEval.FailedTGAllocs = tgMetrics
if nextEval != nil {
newEval.NextEval = nextEval.ID
}

View file

@ -488,7 +488,7 @@ func TestSetStatus(t *testing.T) {
eval := mock.Eval()
status := "a"
desc := "b"
if err := setStatus(logger, h, eval, nil, nil, status, desc); err != nil {
if err := setStatus(logger, h, eval, nil, nil, nil, status, desc); err != nil {
t.Fatalf("setStatus() failed: %v", err)
}
@ -504,7 +504,7 @@ func TestSetStatus(t *testing.T) {
// Test next evals
h = NewHarness(t)
next := mock.Eval()
if err := setStatus(logger, h, eval, next, nil, status, desc); err != nil {
if err := setStatus(logger, h, eval, next, nil, nil, status, desc); err != nil {
t.Fatalf("setStatus() failed: %v", err)
}
@ -520,7 +520,7 @@ func TestSetStatus(t *testing.T) {
// Test blocked evals
h = NewHarness(t)
blocked := mock.Eval()
if err := setStatus(logger, h, eval, nil, blocked, status, desc); err != nil {
if err := setStatus(logger, h, eval, nil, blocked, nil, status, desc); err != nil {
t.Fatalf("setStatus() failed: %v", err)
}
@ -532,6 +532,22 @@ func TestSetStatus(t *testing.T) {
if newEval.BlockedEval != blocked.ID {
t.Fatalf("setStatus() didn't set BlockedEval correctly: %v", newEval)
}
// Test metrics
h = NewHarness(t)
metrics := map[string]*structs.AllocMetric{"foo": nil}
if err := setStatus(logger, h, eval, nil, nil, metrics, status, desc); err != nil {
t.Fatalf("setStatus() failed: %v", err)
}
if len(h.Evals) != 1 {
t.Fatalf("setStatus() didn't update plan: %v", h.Evals)
}
newEval = h.Evals[0]
if !reflect.DeepEqual(newEval.FailedTGAllocs, metrics) {
t.Fatalf("setStatus() didn't set failed task group metrics correctly: %v", newEval)
}
}
func TestInplaceUpdate_ChangedTaskGroup(t *testing.T) {