nomad: expose UpdateEval as a planner

This commit is contained in:
Armon Dadgar 2015-08-15 14:25:00 -07:00
parent dd82bdb059
commit cae67b7f60
4 changed files with 92 additions and 0 deletions

View File

@ -280,6 +280,41 @@ SUBMIT:
return result, state, nil
}
// UpdateEval is used to submit an updated evaluation. This allows
// the worker to act as the planner for the scheduler.
func (w *Worker) UpdateEval(eval *structs.Evaluation) error {
// Check for a shutdown before plan submission
if w.srv.IsShutdown() {
return fmt.Errorf("shutdown while planning")
}
defer metrics.MeasureSince([]string{"nomad", "worker", "update_eval"}, time.Now())
// Setup the request
req := structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
EvalToken: w.evalToken,
WriteRequest: structs.WriteRequest{
Region: w.srv.config.Region,
},
}
var resp structs.GenericResponse
SUBMIT:
// Make the RPC call
if err := w.srv.RPC("Eval.Update", &req, &resp); err != nil {
w.logger.Printf("[ERR] worker: failed to update evaluation %#v: %v",
eval, err)
if w.shouldResubmit(err) && !w.backoffErr() {
goto SUBMIT
}
return err
} else {
w.logger.Printf("[DEBUG] worker: updated evaluation %#v", eval)
w.backoffReset()
}
return nil
}
// shouldResubmit checks if a given error should be swallowed and the plan
// resubmitted after a backoff. Usually these are transient errors that
// the cluster should heal from quickly.

View File

@ -321,3 +321,50 @@ func TestWorker_SubmitPlan_MissingNodeRefresh(t *testing.T) {
t.Fatalf("expected state update")
}
}
func TestWorker_UpdateEval(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0
c.EnabledSchedulers = []string{structs.JobTypeService}
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Register node
node := mock.Node()
testRegisterNode(t, s1, node)
// Create the register request
eval1 := mock.Eval()
testutil.WaitForResult(func() (bool, error) {
err := s1.evalBroker.Enqueue(eval1)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
})
evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if evalOut != eval1 {
t.Fatalf("Bad eval")
}
eval2 := evalOut.Copy()
eval2.Status = structs.EvalStatusComplete
// Attempt to update eval
w := &Worker{srv: s1, logger: s1.logger, evalToken: token}
err = w.UpdateEval(eval2)
if err != nil {
t.Fatalf("err: %v", err)
}
out, err := s1.fsm.State().GetEvalByID(eval2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out.Status != structs.EvalStatusComplete {
t.Fatalf("bad: %v", out)
}
}

View File

@ -75,4 +75,8 @@ type Planner interface {
// This will return a PlanResult or an error. It is possible
// that this will result in a state refresh as well.
SubmitPlan(*structs.Plan) (*structs.PlanResult, State, error)
// UpdateEval is used to update an evaluation. This should update
// a copy of the input evaluation since that should be immutable.
UpdateEval(*structs.Evaluation) error
}

View File

@ -20,6 +20,7 @@ type Harness struct {
planLock sync.Mutex
Plans []*structs.Plan
Evals []*structs.Evaluation
nextIndex uint64
nextIndexLock sync.Mutex
@ -78,6 +79,11 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er
return result, nil, err
}
func (h *Harness) UpdateEval(eval *structs.Evaluation) error {
h.Evals = append(h.Evals, eval)
return nil
}
// NextIndex returns the next index
func (h *Harness) NextIndex() uint64 {
h.nextIndexLock.Lock()