Implement blocking queries for /v1/job/evaluations
This commit is contained in:
parent
c0eafb24ce
commit
df4398beac
|
@ -570,26 +570,36 @@ func (j *Job) Evaluations(args *structs.JobSpecificRequest,
|
||||||
}
|
}
|
||||||
defer metrics.MeasureSince([]string{"nomad", "job", "evaluations"}, time.Now())
|
defer metrics.MeasureSince([]string{"nomad", "job", "evaluations"}, time.Now())
|
||||||
|
|
||||||
// Capture the evaluations
|
// Setup the blocking query
|
||||||
snap, err := j.srv.fsm.State().Snapshot()
|
opts := blockingOptions{
|
||||||
if err != nil {
|
queryOpts: &args.QueryOptions,
|
||||||
return err
|
queryMeta: &reply.QueryMeta,
|
||||||
}
|
watch: watch.NewItems(watch.Item{EvalJob: args.JobID}),
|
||||||
reply.Evaluations, err = snap.EvalsByJob(args.JobID)
|
run: func() error {
|
||||||
if err != nil {
|
// Capture the evals
|
||||||
return err
|
snap, err := j.srv.fsm.State().Snapshot()
|
||||||
}
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Use the last index that affected the evals table
|
reply.Evaluations, err = snap.EvalsByJob(args.JobID)
|
||||||
index, err := snap.Index("evals")
|
if err != nil {
|
||||||
if err != nil {
|
return err
|
||||||
return err
|
}
|
||||||
}
|
|
||||||
reply.Index = index
|
|
||||||
|
|
||||||
// Set the query response
|
// Use the last index that affected the evals table
|
||||||
j.srv.setQueryMeta(&reply.QueryMeta)
|
index, err := snap.Index("evals")
|
||||||
return nil
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
reply.Index = index
|
||||||
|
|
||||||
|
// Set the query response
|
||||||
|
j.srv.setQueryMeta(&reply.QueryMeta)
|
||||||
|
return nil
|
||||||
|
}}
|
||||||
|
|
||||||
|
return j.srv.blockingRPC(&opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Plan is used to cause a dry-run evaluation of the Job and return the results
|
// Plan is used to cause a dry-run evaluation of the Job and return the results
|
||||||
|
|
|
@ -1421,6 +1421,59 @@ func TestJobEndpoint_Evaluations(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestJobEndpoint_Evaluations_Blocking(t *testing.T) {
|
||||||
|
s1 := testServer(t, nil)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
codec := rpcClient(t, s1)
|
||||||
|
testutil.WaitForLeader(t, s1.RPC)
|
||||||
|
|
||||||
|
// Create the register request
|
||||||
|
eval1 := mock.Eval()
|
||||||
|
eval2 := mock.Eval()
|
||||||
|
eval2.JobID = "job1"
|
||||||
|
state := s1.fsm.State()
|
||||||
|
|
||||||
|
// First upsert an unrelated eval
|
||||||
|
time.AfterFunc(100*time.Millisecond, func() {
|
||||||
|
err := state.UpsertEvals(100, []*structs.Evaluation{eval1})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// Upsert an eval for the job we are interested in later
|
||||||
|
time.AfterFunc(200*time.Millisecond, func() {
|
||||||
|
err := state.UpsertEvals(200, []*structs.Evaluation{eval2})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// Lookup the jobs
|
||||||
|
get := &structs.JobSpecificRequest{
|
||||||
|
JobID: "job1",
|
||||||
|
QueryOptions: structs.QueryOptions{
|
||||||
|
Region: "global",
|
||||||
|
MinQueryIndex: 50,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
var resp structs.JobEvaluationsResponse
|
||||||
|
start := time.Now()
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec, "Job.Evaluations", get, &resp); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
|
||||||
|
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
|
||||||
|
}
|
||||||
|
if resp.Index != 200 {
|
||||||
|
t.Fatalf("Bad index: %d %d", resp.Index, 200)
|
||||||
|
}
|
||||||
|
if len(resp.Evaluations) != 1 || resp.Evaluations[0].JobID != "job1" {
|
||||||
|
t.Fatalf("bad: %#v", resp.Evaluations)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestJobEndpoint_Plan_WithDiff(t *testing.T) {
|
func TestJobEndpoint_Plan_WithDiff(t *testing.T) {
|
||||||
s1 := testServer(t, func(c *Config) {
|
s1 := testServer(t, func(c *Config) {
|
||||||
c.NumSchedulers = 0 // Prevent automatic dequeue
|
c.NumSchedulers = 0 // Prevent automatic dequeue
|
||||||
|
|
|
@ -638,6 +638,7 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro
|
||||||
jobs := make(map[string]string, len(evals))
|
jobs := make(map[string]string, len(evals))
|
||||||
for _, eval := range evals {
|
for _, eval := range evals {
|
||||||
watcher.Add(watch.Item{Eval: eval.ID})
|
watcher.Add(watch.Item{Eval: eval.ID})
|
||||||
|
watcher.Add(watch.Item{EvalJob: eval.JobID})
|
||||||
if err := s.nestedUpsertEval(txn, index, eval); err != nil {
|
if err := s.nestedUpsertEval(txn, index, eval); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -734,8 +735,10 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
|
||||||
if err := txn.Delete("evals", existing); err != nil {
|
if err := txn.Delete("evals", existing); err != nil {
|
||||||
return fmt.Errorf("eval delete failed: %v", err)
|
return fmt.Errorf("eval delete failed: %v", err)
|
||||||
}
|
}
|
||||||
|
jobID := existing.(*structs.Evaluation).JobID
|
||||||
watcher.Add(watch.Item{Eval: eval})
|
watcher.Add(watch.Item{Eval: eval})
|
||||||
jobs[existing.(*structs.Evaluation).JobID] = ""
|
watcher.Add(watch.Item{EvalJob: jobID})
|
||||||
|
jobs[jobID] = ""
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, alloc := range allocs {
|
for _, alloc := range allocs {
|
||||||
|
@ -1729,6 +1732,7 @@ func (r *StateRestore) JobRestore(job *structs.Job) error {
|
||||||
func (r *StateRestore) EvalRestore(eval *structs.Evaluation) error {
|
func (r *StateRestore) EvalRestore(eval *structs.Evaluation) error {
|
||||||
r.items.Add(watch.Item{Table: "evals"})
|
r.items.Add(watch.Item{Table: "evals"})
|
||||||
r.items.Add(watch.Item{Eval: eval.ID})
|
r.items.Add(watch.Item{Eval: eval.ID})
|
||||||
|
r.items.Add(watch.Item{EvalJob: eval.JobID})
|
||||||
if err := r.txn.Insert("evals", eval); err != nil {
|
if err := r.txn.Insert("evals", eval); err != nil {
|
||||||
return fmt.Errorf("eval insert failed: %v", err)
|
return fmt.Errorf("eval insert failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1227,7 +1227,8 @@ func TestStateStore_UpsertEvals_Eval(t *testing.T) {
|
||||||
notify := setupNotifyTest(
|
notify := setupNotifyTest(
|
||||||
state,
|
state,
|
||||||
watch.Item{Table: "evals"},
|
watch.Item{Table: "evals"},
|
||||||
watch.Item{Eval: eval.ID})
|
watch.Item{Eval: eval.ID},
|
||||||
|
watch.Item{EvalJob: eval.JobID})
|
||||||
|
|
||||||
err := state.UpsertEvals(1000, []*structs.Evaluation{eval})
|
err := state.UpsertEvals(1000, []*structs.Evaluation{eval})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1266,10 +1267,12 @@ func TestStateStore_Update_UpsertEvals_Eval(t *testing.T) {
|
||||||
notify := setupNotifyTest(
|
notify := setupNotifyTest(
|
||||||
state,
|
state,
|
||||||
watch.Item{Table: "evals"},
|
watch.Item{Table: "evals"},
|
||||||
watch.Item{Eval: eval.ID})
|
watch.Item{Eval: eval.ID},
|
||||||
|
watch.Item{EvalJob: eval.JobID})
|
||||||
|
|
||||||
eval2 := mock.Eval()
|
eval2 := mock.Eval()
|
||||||
eval2.ID = eval.ID
|
eval2.ID = eval.ID
|
||||||
|
eval2.JobID = eval.JobID
|
||||||
err = state.UpsertEvals(1001, []*structs.Evaluation{eval2})
|
err = state.UpsertEvals(1001, []*structs.Evaluation{eval2})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
|
@ -1315,6 +1318,8 @@ func TestStateStore_DeleteEval_Eval(t *testing.T) {
|
||||||
watch.Item{Table: "allocs"},
|
watch.Item{Table: "allocs"},
|
||||||
watch.Item{Eval: eval1.ID},
|
watch.Item{Eval: eval1.ID},
|
||||||
watch.Item{Eval: eval2.ID},
|
watch.Item{Eval: eval2.ID},
|
||||||
|
watch.Item{EvalJob: eval1.JobID},
|
||||||
|
watch.Item{EvalJob: eval2.JobID},
|
||||||
watch.Item{Alloc: alloc1.ID},
|
watch.Item{Alloc: alloc1.ID},
|
||||||
watch.Item{Alloc: alloc2.ID},
|
watch.Item{Alloc: alloc2.ID},
|
||||||
watch.Item{AllocEval: alloc1.EvalID},
|
watch.Item{AllocEval: alloc1.EvalID},
|
||||||
|
|
|
@ -14,6 +14,7 @@ type Item struct {
|
||||||
AllocJob string
|
AllocJob string
|
||||||
AllocNode string
|
AllocNode string
|
||||||
Eval string
|
Eval string
|
||||||
|
EvalJob string
|
||||||
Job string
|
Job string
|
||||||
JobSummary string
|
JobSummary string
|
||||||
Node string
|
Node string
|
||||||
|
|
Loading…
Reference in a new issue