From cae67b7f60652bf76cfb86806ded40b6b02f5889 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sat, 15 Aug 2015 14:25:00 -0700 Subject: [PATCH] nomad: expose UpdateEval as a planner --- nomad/worker.go | 35 +++++++++++++++++++++++++++ nomad/worker_test.go | 47 +++++++++++++++++++++++++++++++++++++ scheduler/scheduler.go | 4 ++++ scheduler/scheduler_test.go | 6 +++++ 4 files changed, 92 insertions(+) diff --git a/nomad/worker.go b/nomad/worker.go index c3df8a9ce..e7a817666 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -280,6 +280,41 @@ SUBMIT: return result, state, nil } +// UpdateEval is used to submit an updated evaluation. This allows +// the worker to act as the planner for the scheduler. +func (w *Worker) UpdateEval(eval *structs.Evaluation) error { + // Check for a shutdown before plan submission + if w.srv.IsShutdown() { + return fmt.Errorf("shutdown while planning") + } + defer metrics.MeasureSince([]string{"nomad", "worker", "update_eval"}, time.Now()) + + // Setup the request + req := structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{eval}, + EvalToken: w.evalToken, + WriteRequest: structs.WriteRequest{ + Region: w.srv.config.Region, + }, + } + var resp structs.GenericResponse + +SUBMIT: + // Make the RPC call + if err := w.srv.RPC("Eval.Update", &req, &resp); err != nil { + w.logger.Printf("[ERR] worker: failed to update evaluation %#v: %v", + eval, err) + if w.shouldResubmit(err) && !w.backoffErr() { + goto SUBMIT + } + return err + } else { + w.logger.Printf("[DEBUG] worker: updated evaluation %#v", eval) + w.backoffReset() + } + return nil +} + // shouldResubmit checks if a given error should be swallowed and the plan // resubmitted after a backoff. Usually these are transient errors that // the cluster should heal from quickly. diff --git a/nomad/worker_test.go b/nomad/worker_test.go index a522a1e23..a42254120 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -321,3 +321,50 @@ func TestWorker_SubmitPlan_MissingNodeRefresh(t *testing.T) { t.Fatalf("expected state update") } } + +func TestWorker_UpdateEval(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 + c.EnabledSchedulers = []string{structs.JobTypeService} + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Register node + node := mock.Node() + testRegisterNode(t, s1, node) + + // Create the register request + eval1 := mock.Eval() + testutil.WaitForResult(func() (bool, error) { + err := s1.evalBroker.Enqueue(eval1) + return err == nil, err + }, func(err error) { + t.Fatalf("err: %v", err) + }) + evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second) + if err != nil { + t.Fatalf("err: %v", err) + } + if evalOut != eval1 { + t.Fatalf("Bad eval") + } + + eval2 := evalOut.Copy() + eval2.Status = structs.EvalStatusComplete + + // Attempt to update eval + w := &Worker{srv: s1, logger: s1.logger, evalToken: token} + err = w.UpdateEval(eval2) + if err != nil { + t.Fatalf("err: %v", err) + } + + out, err := s1.fsm.State().GetEvalByID(eval2.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out.Status != structs.EvalStatusComplete { + t.Fatalf("bad: %v", out) + } +} diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index d831e071c..bbb3ae508 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -75,4 +75,8 @@ type Planner interface { // This will return a PlanResult or an error. It is possible // that this will result in a state refresh as well. SubmitPlan(*structs.Plan) (*structs.PlanResult, State, error) + + // UpdateEval is used to update an evaluation. This should update + // a copy of the input evaluation since that should be immutable. + UpdateEval(*structs.Evaluation) error } diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go index e931a742b..6f605e450 100644 --- a/scheduler/scheduler_test.go +++ b/scheduler/scheduler_test.go @@ -20,6 +20,7 @@ type Harness struct { planLock sync.Mutex Plans []*structs.Plan + Evals []*structs.Evaluation nextIndex uint64 nextIndexLock sync.Mutex @@ -78,6 +79,11 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er return result, nil, err } +func (h *Harness) UpdateEval(eval *structs.Evaluation) error { + h.Evals = append(h.Evals, eval) + return nil +} + // NextIndex returns the next index func (h *Harness) NextIndex() uint64 { h.nextIndexLock.Lock()