Added test for blocking query of job summary endpoint
This commit is contained in:
parent
4a17d8e6d6
commit
de2c79f421
|
@ -762,6 +762,97 @@ func TestJobEndpoint_GetJobSummary(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestJobEndpoint_GetJobSummary_Blocking(t *testing.T) {
|
||||
s1 := testServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
state := s1.fsm.State()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
// Create a job and insert it
|
||||
job1 := mock.Job()
|
||||
time.AfterFunc(200*time.Millisecond, func() {
|
||||
if err := state.UpsertJob(100, job1); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
// Ensure the job summary request gets fired
|
||||
req := &structs.JobSummaryRequest{
|
||||
JobID: job1.ID,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: "global",
|
||||
MinQueryIndex: 50,
|
||||
},
|
||||
}
|
||||
var resp structs.JobSummaryResponse
|
||||
start := time.Now()
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Job.GetSummary", req, &resp); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
|
||||
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
|
||||
}
|
||||
|
||||
// Upsert an allocation for the job which should trigger the watch.
|
||||
time.AfterFunc(200*time.Millisecond, func() {
|
||||
alloc := mock.Alloc()
|
||||
alloc.JobID = job1.ID
|
||||
alloc.Job = job1
|
||||
if err := state.UpsertAllocs(200, []*structs.Allocation{alloc}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
})
|
||||
req = &structs.JobSummaryRequest{
|
||||
JobID: job1.ID,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: "global",
|
||||
MinQueryIndex: 199,
|
||||
},
|
||||
}
|
||||
start = time.Now()
|
||||
var resp1 structs.JobSummaryResponse
|
||||
start = time.Now()
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Job.GetSummary", req, &resp1); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
|
||||
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
|
||||
}
|
||||
if resp1.Index != 200 {
|
||||
t.Fatalf("Bad index: %d %d", resp.Index, 200)
|
||||
}
|
||||
if resp1.JobSummary == nil {
|
||||
t.Fatalf("bad: %#v", resp)
|
||||
}
|
||||
|
||||
// Job delete fires watches
|
||||
time.AfterFunc(100*time.Millisecond, func() {
|
||||
if err := state.DeleteJob(300, job1.ID); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
req.QueryOptions.MinQueryIndex = 250
|
||||
start = time.Now()
|
||||
|
||||
var resp2 structs.SingleJobResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Job.GetSummary", req, &resp2); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
|
||||
t.Fatalf("should block (returned in %s) %#v", elapsed, resp2)
|
||||
}
|
||||
if resp2.Index != 300 {
|
||||
t.Fatalf("Bad index: %d %d", resp2.Index, 300)
|
||||
}
|
||||
if resp2.Job != nil {
|
||||
t.Fatalf("bad: %#v", resp2.Job)
|
||||
}
|
||||
}
|
||||
|
||||
func TestJobEndpoint_GetJob_Blocking(t *testing.T) {
|
||||
s1 := testServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
|
|
Loading…
Reference in a new issue