nomad: support blocking queries on eval list

This commit is contained in:
Ryan Uber 2015-10-28 18:34:56 -07:00
parent 07b6597353
commit 4e70d52e29
3 changed files with 108 additions and 27 deletions

View File

@ -219,35 +219,45 @@ func (e *Eval) List(args *structs.EvalListRequest,
} }
defer metrics.MeasureSince([]string{"nomad", "eval", "list"}, time.Now()) defer metrics.MeasureSince([]string{"nomad", "eval", "list"}, time.Now())
// Scan all the evaluations // Setup the blocking query
snap, err := e.srv.fsm.State().Snapshot() opts := blockingOptions{
if err != nil { queryOpts: &args.QueryOptions,
return err queryMeta: &reply.QueryMeta,
} watchTables: []string{"evals"},
iter, err := snap.Evals() run: func() error {
if err != nil { // Scan all the evaluations
return err snap, err := e.srv.fsm.State().Snapshot()
} if err != nil {
return err
}
iter, err := snap.Evals()
if err != nil {
return err
}
for { var evals []*structs.Evaluation
raw := iter.Next() for {
if raw == nil { raw := iter.Next()
break if raw == nil {
} break
eval := raw.(*structs.Evaluation) }
reply.Evaluations = append(reply.Evaluations, eval) eval := raw.(*structs.Evaluation)
} evals = append(evals, eval)
}
reply.Evaluations = evals
// Use the last index that affected the jobs table // Use the last index that affected the jobs table
index, err := snap.Index("evals") index, err := snap.Index("evals")
if err != nil { if err != nil {
return err return err
} }
reply.Index = index reply.Index = index
// Set the query response // Set the query response
e.srv.setQueryMeta(&reply.QueryMeta) e.srv.setQueryMeta(&reply.QueryMeta)
return nil return nil
}}
return e.srv.blockingRPC(&opts)
} }
// Allocations is used to list the allocations for an evaluation // Allocations is used to list the allocations for an evaluation

View File

@ -334,6 +334,70 @@ func TestEvalEndpoint_List(t *testing.T) {
} }
} }
func TestEvalEndpoint_List_blocking(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
state := s1.fsm.State()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the ieval
eval := mock.Eval()
// Upsert eval triggers watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.UpsertEvals(2, []*structs.Evaluation{eval}); err != nil {
t.Fatalf("err: %v", err)
}
})
req := &structs.EvalListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 1,
},
}
start := time.Now()
var resp structs.EvalListResponse
if err := msgpackrpc.CallWithCodec(codec, "Eval.List", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 2 {
t.Fatalf("Bad index: %d %d", resp.Index, 2)
}
if len(resp.Evaluations) != 1 || resp.Evaluations[0].ID != eval.ID {
t.Fatalf("bad: %#v", resp.Evaluations)
}
// Eval deletion triggers watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.DeleteEval(3, []string{eval.ID}, nil); err != nil {
t.Fatalf("err: %v", err)
}
})
req.MinQueryIndex = 2
start = time.Now()
var resp2 structs.EvalListResponse
if err := msgpackrpc.CallWithCodec(codec, "Eval.List", req, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp2)
}
if resp2.Index != 3 {
t.Fatalf("Bad index: %d %d", resp2.Index, 3)
}
if len(resp2.Evaluations) != 0 {
t.Fatalf("bad: %#v", resp2.Evaluations)
}
}
func TestEvalEndpoint_Allocations(t *testing.T) { func TestEvalEndpoint_Allocations(t *testing.T) {
s1 := testServer(t, nil) s1 := testServer(t, nil)
defer s1.Shutdown() defer s1.Shutdown()

View File

@ -407,6 +407,8 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro
} }
} }
tables := map[string]struct{}{"evals": struct{}{}}
txn.Defer(func() { s.watch.notifyTables(tables) })
txn.Commit() txn.Commit()
return nil return nil
} }
@ -478,7 +480,12 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
if err := txn.Insert("index", &IndexEntry{"allocs", index}); err != nil { if err := txn.Insert("index", &IndexEntry{"allocs", index}); err != nil {
return fmt.Errorf("index update failed: %v", err) return fmt.Errorf("index update failed: %v", err)
} }
txn.Defer(func() { s.watch.notifyAllocs(nodes) })
tables := map[string]struct{}{"evals": struct{}{}}
txn.Defer(func() {
s.watch.notifyAllocs(nodes)
s.watch.notifyTables(tables)
})
txn.Commit() txn.Commit()
return nil return nil
} }