nomad: job watches return correct response, add tests

This commit is contained in:
Ryan Uber 2015-10-28 12:43:00 -07:00
parent faab2495ee
commit 1fdbd54611
2 changed files with 34 additions and 18 deletions

View File

@ -232,14 +232,16 @@ func (j *Job) List(args *structs.JobListRequest,
return err
}
var jobs []*structs.JobListStub
for {
raw := iter.Next()
if raw == nil {
break
}
job := raw.(*structs.Job)
reply.Jobs = append(reply.Jobs, job.Stub())
jobs = append(jobs, job.Stub())
}
reply.Jobs = jobs
// Use the last index that affected the jobs table
index, err := snap.Index("jobs")

View File

@ -401,26 +401,21 @@ func TestJobEndpoint_ListJobs(t *testing.T) {
func TestJobEndpoint_ListJobs_blocking(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
state := s1.fsm.State()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the job
job := mock.Job()
go func() {
// Wait a bit
time.Sleep(100 * time.Millisecond)
// Send the register request
state := s1.fsm.State()
err := state.UpsertJob(2, job)
if err != nil {
// Upsert job triggers watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.UpsertJob(2, job); err != nil {
t.Fatalf("err: %v", err)
}
}()
})
// Lookup the jobs. Should block until the index is reached.
get := &structs.JobListRequest{
req := &structs.JobListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 1,
@ -428,23 +423,42 @@ func TestJobEndpoint_ListJobs_blocking(t *testing.T) {
}
start := time.Now()
var resp structs.JobListResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.List", get, &resp); err != nil {
if err := msgpackrpc.CallWithCodec(codec, "Job.List", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
// Check that we blocked
if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 2 {
t.Fatalf("Bad index: %d %d", resp.Index, 2)
}
if len(resp.Jobs) != 1 {
if len(resp.Jobs) != 1 || resp.Jobs[0].ID != job.ID {
t.Fatalf("bad: %#v", resp.Jobs)
}
if resp.Jobs[0].ID != job.ID {
t.Fatalf("bad: %#v", resp.Jobs[0])
// Job deletion triggers watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.DeleteJob(3, job.ID); err != nil {
t.Fatalf("err: %v", err)
}
})
req.MinQueryIndex = 2
start = time.Now()
var resp2 structs.JobListResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.List", req, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp2.Index != 3 {
t.Fatalf("Bad index: %d %d", resp2.Index, 3)
}
if len(resp2.Jobs) != 0 {
t.Fatalf("bad: %#v", resp2.Jobs)
}
}