diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 83245661f..b994216d2 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -762,6 +762,97 @@ func TestJobEndpoint_GetJobSummary(t *testing.T) { } } +func TestJobEndpoint_GetJobSummary_Blocking(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + state := s1.fsm.State() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create a job and insert it + job1 := mock.Job() + time.AfterFunc(200*time.Millisecond, func() { + if err := state.UpsertJob(100, job1); err != nil { + t.Fatalf("err: %v", err) + } + }) + + // Ensure the job summary request gets fired + req := &structs.JobSummaryRequest{ + JobID: job1.ID, + QueryOptions: structs.QueryOptions{ + Region: "global", + MinQueryIndex: 50, + }, + } + var resp structs.JobSummaryResponse + start := time.Now() + if err := msgpackrpc.CallWithCodec(codec, "Job.GetSummary", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + if elapsed := time.Since(start); elapsed < 200*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp) + } + + // Upsert an allocation for the job which should trigger the watch. + time.AfterFunc(200*time.Millisecond, func() { + alloc := mock.Alloc() + alloc.JobID = job1.ID + alloc.Job = job1 + if err := state.UpsertAllocs(200, []*structs.Allocation{alloc}); err != nil { + t.Fatalf("err: %v", err) + } + }) + req = &structs.JobSummaryRequest{ + JobID: job1.ID, + QueryOptions: structs.QueryOptions{ + Region: "global", + MinQueryIndex: 199, + }, + } + start = time.Now() + var resp1 structs.JobSummaryResponse + start = time.Now() + if err := msgpackrpc.CallWithCodec(codec, "Job.GetSummary", req, &resp1); err != nil { + t.Fatalf("err: %v", err) + } + + if elapsed := time.Since(start); elapsed < 200*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp) + } + if resp1.Index != 200 { + t.Fatalf("Bad index: %d %d", resp.Index, 200) + } + if resp1.JobSummary == nil { + t.Fatalf("bad: %#v", resp) + } + + // Job delete fires watches + time.AfterFunc(100*time.Millisecond, func() { + if err := state.DeleteJob(300, job1.ID); err != nil { + t.Fatalf("err: %v", err) + } + }) + + req.QueryOptions.MinQueryIndex = 250 + start = time.Now() + + var resp2 structs.SingleJobResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.GetSummary", req, &resp2); err != nil { + t.Fatalf("err: %v", err) + } + + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp2) + } + if resp2.Index != 300 { + t.Fatalf("Bad index: %d %d", resp2.Index, 300) + } + if resp2.Job != nil { + t.Fatalf("bad: %#v", resp2.Job) + } +} + func TestJobEndpoint_GetJob_Blocking(t *testing.T) { s1 := testServer(t, nil) defer s1.Shutdown()