From 1fdbd54611ca1f2a7687918412a18f8f7c525e98 Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Wed, 28 Oct 2015 12:43:00 -0700 Subject: [PATCH] nomad: job watches return correct response, add tests --- nomad/job_endpoint.go | 4 +++- nomad/job_endpoint_test.go | 48 ++++++++++++++++++++++++-------------- 2 files changed, 34 insertions(+), 18 deletions(-) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 8960a2e9d..ca6d59e1a 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -232,14 +232,16 @@ func (j *Job) List(args *structs.JobListRequest, return err } + var jobs []*structs.JobListStub for { raw := iter.Next() if raw == nil { break } job := raw.(*structs.Job) - reply.Jobs = append(reply.Jobs, job.Stub()) + jobs = append(jobs, job.Stub()) } + reply.Jobs = jobs // Use the last index that affected the jobs table index, err := snap.Index("jobs") diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index e922f31c3..8a9e5a1ee 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -401,26 +401,21 @@ func TestJobEndpoint_ListJobs(t *testing.T) { func TestJobEndpoint_ListJobs_blocking(t *testing.T) { s1 := testServer(t, nil) defer s1.Shutdown() + state := s1.fsm.State() codec := rpcClient(t, s1) testutil.WaitForLeader(t, s1.RPC) // Create the job job := mock.Job() - go func() { - // Wait a bit - time.Sleep(100 * time.Millisecond) - - // Send the register request - state := s1.fsm.State() - err := state.UpsertJob(2, job) - if err != nil { + // Upsert job triggers watches + time.AfterFunc(100*time.Millisecond, func() { + if err := state.UpsertJob(2, job); err != nil { t.Fatalf("err: %v", err) } - }() + }) - // Lookup the jobs. Should block until the index is reached. - get := &structs.JobListRequest{ + req := &structs.JobListRequest{ QueryOptions: structs.QueryOptions{ Region: "global", MinQueryIndex: 1, @@ -428,23 +423,42 @@ func TestJobEndpoint_ListJobs_blocking(t *testing.T) { } start := time.Now() var resp structs.JobListResponse - if err := msgpackrpc.CallWithCodec(codec, "Job.List", get, &resp); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Job.List", req, &resp); err != nil { t.Fatalf("err: %v", err) } - // Check that we blocked if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond { t.Fatalf("should block (returned in %s) %#v", elapsed, resp) } - if resp.Index != 2 { t.Fatalf("Bad index: %d %d", resp.Index, 2) } - if len(resp.Jobs) != 1 { + if len(resp.Jobs) != 1 || resp.Jobs[0].ID != job.ID { t.Fatalf("bad: %#v", resp.Jobs) } - if resp.Jobs[0].ID != job.ID { - t.Fatalf("bad: %#v", resp.Jobs[0]) + + // Job deletion triggers watches + time.AfterFunc(100*time.Millisecond, func() { + if err := state.DeleteJob(3, job.ID); err != nil { + t.Fatalf("err: %v", err) + } + }) + + req.MinQueryIndex = 2 + start = time.Now() + var resp2 structs.JobListResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.List", req, &resp2); err != nil { + t.Fatalf("err: %v", err) + } + + if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp) + } + if resp2.Index != 3 { + t.Fatalf("Bad index: %d %d", resp2.Index, 3) + } + if len(resp2.Jobs) != 0 { + t.Fatalf("bad: %#v", resp2.Jobs) } }