Merge pull request #3206 from hashicorp/b-eval-index
Worker waits til max ModifyIndex across EvalsByJob
This commit is contained in:
commit
d156cb48b3
|
@ -6,6 +6,8 @@ IMPROVEMENTS:
|
|||
|
||||
BUG FIXES:
|
||||
* core: *Fix restoration of stopped periodic jobs [GH-3201]
|
||||
* core: Fix a race condition in which scheduling results from one invocation of
|
||||
the scheduler wouldn't be considered by the next for the same job [GH-3206]
|
||||
* api: Sort /v1/agent/servers output so that output of Consul checks does not
|
||||
change [GH-3214]
|
||||
* api: Fix search handling of jobs with more than four hyphens and case were
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
multierror "github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/nomad/nomad/state"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/scheduler"
|
||||
|
@ -92,8 +93,24 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest,
|
|||
|
||||
// Provide the output if any
|
||||
if eval != nil {
|
||||
// Get the index that the worker should wait until before scheduling.
|
||||
waitIndex, err := e.getWaitIndex(eval.Namespace, eval.JobID)
|
||||
if err != nil {
|
||||
var mErr multierror.Error
|
||||
multierror.Append(&mErr, err)
|
||||
|
||||
// We have dequeued the evaluation but won't be returning it to the
|
||||
// worker so Nack the eval.
|
||||
if err := e.srv.evalBroker.Nack(eval.ID, token); err != nil {
|
||||
multierror.Append(&mErr, err)
|
||||
}
|
||||
|
||||
return &mErr
|
||||
}
|
||||
|
||||
reply.Eval = eval
|
||||
reply.Token = token
|
||||
reply.WaitIndex = waitIndex
|
||||
}
|
||||
|
||||
// Set the query response
|
||||
|
@ -101,6 +118,31 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest,
|
|||
return nil
|
||||
}
|
||||
|
||||
// getWaitIndex returns the wait index that should be used by the worker before
|
||||
// invoking the scheduler. The index should be the highest modify index of any
|
||||
// evaluation for the job. This prevents scheduling races for the same job when
|
||||
// there are blocked evaluations.
|
||||
func (e *Eval) getWaitIndex(namespace, job string) (uint64, error) {
|
||||
snap, err := e.srv.State().Snapshot()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
evals, err := snap.EvalsByJob(nil, namespace, job)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
var max uint64
|
||||
for _, eval := range evals {
|
||||
if max < eval.ModifyIndex {
|
||||
max = eval.ModifyIndex
|
||||
}
|
||||
}
|
||||
|
||||
return max, nil
|
||||
}
|
||||
|
||||
// Ack is used to acknowledge completion of a dequeued evaluation
|
||||
func (e *Eval) Ack(args *structs.EvalAckRequest,
|
||||
reply *structs.GenericResponse) error {
|
||||
|
|
|
@ -170,6 +170,56 @@ func TestEvalEndpoint_Dequeue(t *testing.T) {
|
|||
if token != resp.Token {
|
||||
t.Fatalf("bad token: %#v %#v", token, resp.Token)
|
||||
}
|
||||
|
||||
if resp.WaitIndex != eval1.ModifyIndex {
|
||||
t.Fatalf("bad wait index; got %d; want %d", resp.WaitIndex, eval1.ModifyIndex)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvalEndpoint_Dequeue_WaitIndex(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1 := testServer(t, func(c *Config) {
|
||||
c.NumSchedulers = 0 // Prevent automatic dequeue
|
||||
})
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
// Create the register request
|
||||
eval1 := mock.Eval()
|
||||
eval2 := mock.Eval()
|
||||
eval2.JobID = eval1.JobID
|
||||
s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1})
|
||||
s1.evalBroker.Enqueue(eval1)
|
||||
s1.fsm.State().UpsertEvals(1001, []*structs.Evaluation{eval2})
|
||||
|
||||
// Dequeue the eval
|
||||
get := &structs.EvalDequeueRequest{
|
||||
Schedulers: defaultSched,
|
||||
SchedulerVersion: scheduler.SchedulerVersion,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
}
|
||||
var resp structs.EvalDequeueResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(eval1, resp.Eval) {
|
||||
t.Fatalf("bad: %v %v", eval1, resp.Eval)
|
||||
}
|
||||
|
||||
// Ensure outstanding
|
||||
token, ok := s1.evalBroker.Outstanding(eval1.ID)
|
||||
if !ok {
|
||||
t.Fatalf("should be outstanding")
|
||||
}
|
||||
if token != resp.Token {
|
||||
t.Fatalf("bad token: %#v %#v", token, resp.Token)
|
||||
}
|
||||
|
||||
if resp.WaitIndex != 1001 {
|
||||
t.Fatalf("bad wait index; got %d; want %d", resp.WaitIndex, 1001)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvalEndpoint_Dequeue_Version_Mismatch(t *testing.T) {
|
||||
|
|
|
@ -932,9 +932,29 @@ type SingleEvalResponse struct {
|
|||
type EvalDequeueResponse struct {
|
||||
Eval *Evaluation
|
||||
Token string
|
||||
|
||||
// WaitIndex is the Raft index the worker should wait until invoking the
|
||||
// scheduler.
|
||||
WaitIndex uint64
|
||||
|
||||
QueryMeta
|
||||
}
|
||||
|
||||
// GetWaitIndex is used to retrieve the Raft index in which state should be at
|
||||
// or beyond before invoking the scheduler.
|
||||
func (e *EvalDequeueResponse) GetWaitIndex() uint64 {
|
||||
// Prefer the wait index sent. This will be populated on all responses from
|
||||
// 0.7.0 and above
|
||||
if e.WaitIndex != 0 {
|
||||
return e.WaitIndex
|
||||
} else if e.Eval != nil {
|
||||
return e.Eval.ModifyIndex
|
||||
}
|
||||
|
||||
// This should never happen
|
||||
return 1
|
||||
}
|
||||
|
||||
// PlanResponse is used to return from a PlanRequest
|
||||
type PlanResponse struct {
|
||||
Result *PlanResult
|
||||
|
|
|
@ -106,7 +106,7 @@ func (w *Worker) checkPaused() {
|
|||
func (w *Worker) run() {
|
||||
for {
|
||||
// Dequeue a pending evaluation
|
||||
eval, token, shutdown := w.dequeueEvaluation(dequeueTimeout)
|
||||
eval, token, waitIndex, shutdown := w.dequeueEvaluation(dequeueTimeout)
|
||||
if shutdown {
|
||||
return
|
||||
}
|
||||
|
@ -118,7 +118,7 @@ func (w *Worker) run() {
|
|||
}
|
||||
|
||||
// Wait for the raft log to catchup to the evaluation
|
||||
if err := w.waitForIndex(eval.ModifyIndex, raftSyncLimit); err != nil {
|
||||
if err := w.waitForIndex(waitIndex, raftSyncLimit); err != nil {
|
||||
w.sendAck(eval.ID, token, false)
|
||||
continue
|
||||
}
|
||||
|
@ -136,7 +136,8 @@ func (w *Worker) run() {
|
|||
|
||||
// dequeueEvaluation is used to fetch the next ready evaluation.
|
||||
// This blocks until an evaluation is available or a timeout is reached.
|
||||
func (w *Worker) dequeueEvaluation(timeout time.Duration) (*structs.Evaluation, string, bool) {
|
||||
func (w *Worker) dequeueEvaluation(timeout time.Duration) (
|
||||
eval *structs.Evaluation, token string, waitIndex uint64, shutdown bool) {
|
||||
// Setup the request
|
||||
req := structs.EvalDequeueRequest{
|
||||
Schedulers: w.srv.config.EnabledSchedulers,
|
||||
|
@ -170,7 +171,7 @@ REQ:
|
|||
}
|
||||
|
||||
if w.backoffErr(base, limit) {
|
||||
return nil, "", true
|
||||
return nil, "", 0, true
|
||||
}
|
||||
goto REQ
|
||||
}
|
||||
|
@ -179,12 +180,12 @@ REQ:
|
|||
// Check if we got a response
|
||||
if resp.Eval != nil {
|
||||
w.logger.Printf("[DEBUG] worker: dequeued evaluation %s", resp.Eval.ID)
|
||||
return resp.Eval, resp.Token, false
|
||||
return resp.Eval, resp.Token, resp.GetWaitIndex(), false
|
||||
}
|
||||
|
||||
// Check for potential shutdown
|
||||
if w.srv.IsShutdown() {
|
||||
return nil, "", true
|
||||
return nil, "", 0, true
|
||||
}
|
||||
goto REQ
|
||||
}
|
||||
|
|
|
@ -60,13 +60,16 @@ func TestWorker_dequeueEvaluation(t *testing.T) {
|
|||
w := &Worker{srv: s1, logger: s1.logger}
|
||||
|
||||
// Attempt dequeue
|
||||
eval, token, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
|
||||
eval, token, waitIndex, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
|
||||
if shutdown {
|
||||
t.Fatalf("should not shutdown")
|
||||
}
|
||||
if token == "" {
|
||||
t.Fatalf("should get token")
|
||||
}
|
||||
if waitIndex != eval1.ModifyIndex {
|
||||
t.Fatalf("bad wait index; got %d; want %d", eval1.ModifyIndex)
|
||||
}
|
||||
|
||||
// Ensure we get a sane eval
|
||||
if !reflect.DeepEqual(eval, eval1) {
|
||||
|
@ -74,6 +77,76 @@ func TestWorker_dequeueEvaluation(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Test that the worker picks up the correct wait index when there are multiple
|
||||
// evals for the same job.
|
||||
func TestWorker_dequeueEvaluation_SerialJobs(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1 := testServer(t, func(c *Config) {
|
||||
c.NumSchedulers = 0
|
||||
c.EnabledSchedulers = []string{structs.JobTypeService}
|
||||
})
|
||||
defer s1.Shutdown()
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
// Create the evaluation
|
||||
eval1 := mock.Eval()
|
||||
eval2 := mock.Eval()
|
||||
eval2.JobID = eval1.JobID
|
||||
|
||||
// Insert the evals into the state store
|
||||
if err := s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1, eval2}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s1.evalBroker.Enqueue(eval1)
|
||||
s1.evalBroker.Enqueue(eval2)
|
||||
|
||||
// Create a worker
|
||||
w := &Worker{srv: s1, logger: s1.logger}
|
||||
|
||||
// Attempt dequeue
|
||||
eval, token, waitIndex, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
|
||||
if shutdown {
|
||||
t.Fatalf("should not shutdown")
|
||||
}
|
||||
if token == "" {
|
||||
t.Fatalf("should get token")
|
||||
}
|
||||
if waitIndex != eval1.ModifyIndex {
|
||||
t.Fatalf("bad wait index; got %d; want %d", eval1.ModifyIndex)
|
||||
}
|
||||
|
||||
// Ensure we get a sane eval
|
||||
if !reflect.DeepEqual(eval, eval1) {
|
||||
t.Fatalf("bad: %#v %#v", eval, eval1)
|
||||
}
|
||||
|
||||
// Update the modify index of the first eval
|
||||
if err := s1.fsm.State().UpsertEvals(2000, []*structs.Evaluation{eval1}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Send the Ack
|
||||
w.sendAck(eval1.ID, token, true)
|
||||
|
||||
// Attempt second dequeue
|
||||
eval, token, waitIndex, shutdown = w.dequeueEvaluation(10 * time.Millisecond)
|
||||
if shutdown {
|
||||
t.Fatalf("should not shutdown")
|
||||
}
|
||||
if token == "" {
|
||||
t.Fatalf("should get token")
|
||||
}
|
||||
if waitIndex != 2000 {
|
||||
t.Fatalf("bad wait index; got %d; want 2000", eval2.ModifyIndex)
|
||||
}
|
||||
|
||||
// Ensure we get a sane eval
|
||||
if !reflect.DeepEqual(eval, eval2) {
|
||||
t.Fatalf("bad: %#v %#v", eval, eval2)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWorker_dequeueEvaluation_paused(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1 := testServer(t, func(c *Config) {
|
||||
|
@ -101,7 +174,7 @@ func TestWorker_dequeueEvaluation_paused(t *testing.T) {
|
|||
|
||||
// Attempt dequeue
|
||||
start := time.Now()
|
||||
eval, token, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
|
||||
eval, token, waitIndex, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
|
||||
if diff := time.Since(start); diff < 100*time.Millisecond {
|
||||
t.Fatalf("should have paused: %v", diff)
|
||||
}
|
||||
|
@ -111,6 +184,9 @@ func TestWorker_dequeueEvaluation_paused(t *testing.T) {
|
|||
if token == "" {
|
||||
t.Fatalf("should get token")
|
||||
}
|
||||
if waitIndex != eval1.ModifyIndex {
|
||||
t.Fatalf("bad wait index; got %d; want %d", eval1.ModifyIndex)
|
||||
}
|
||||
|
||||
// Ensure we get a sane eval
|
||||
if !reflect.DeepEqual(eval, eval1) {
|
||||
|
@ -136,7 +212,7 @@ func TestWorker_dequeueEvaluation_shutdown(t *testing.T) {
|
|||
}()
|
||||
|
||||
// Attempt dequeue
|
||||
eval, _, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
|
||||
eval, _, _, shutdown := w.dequeueEvaluation(10 * time.Millisecond)
|
||||
if !shutdown {
|
||||
t.Fatalf("should not shutdown")
|
||||
}
|
||||
|
@ -164,7 +240,7 @@ func TestWorker_sendAck(t *testing.T) {
|
|||
w := &Worker{srv: s1, logger: s1.logger}
|
||||
|
||||
// Attempt dequeue
|
||||
eval, token, _ := w.dequeueEvaluation(10 * time.Millisecond)
|
||||
eval, token, _, _ := w.dequeueEvaluation(10 * time.Millisecond)
|
||||
|
||||
// Check the depth is 0, 1 unacked
|
||||
stats := s1.evalBroker.Stats()
|
||||
|
@ -182,7 +258,7 @@ func TestWorker_sendAck(t *testing.T) {
|
|||
}
|
||||
|
||||
// Attempt dequeue
|
||||
eval, token, _ = w.dequeueEvaluation(10 * time.Millisecond)
|
||||
eval, token, _, _ = w.dequeueEvaluation(10 * time.Millisecond)
|
||||
|
||||
// Send the Ack
|
||||
w.sendAck(eval.ID, token, true)
|
||||
|
|
Loading…
Reference in New Issue