nomad: support blocking queries on eval-specific allocations

This commit is contained in:
Ryan Uber 2015-10-29 16:20:57 -07:00
parent 1469d0e7c4
commit 55cb559eae
4 changed files with 88 additions and 25 deletions

View File

@ -277,32 +277,40 @@ func (e *Eval) Allocations(args *structs.EvalSpecificRequest,
}
defer metrics.MeasureSince([]string{"nomad", "eval", "allocations"}, time.Now())
// Capture the allocations
snap, err := e.srv.fsm.State().Snapshot()
if err != nil {
return err
}
allocs, err := snap.AllocsByEval(args.EvalID)
if err != nil {
return err
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{AllocEval: args.EvalID}),
run: func() error {
// Capture the allocations
snap, err := e.srv.fsm.State().Snapshot()
if err != nil {
return err
}
allocs, err := snap.AllocsByEval(args.EvalID)
if err != nil {
return err
}
// Convert to a stub
if len(allocs) > 0 {
reply.Allocations = make([]*structs.AllocListStub, 0, len(allocs))
for _, alloc := range allocs {
reply.Allocations = append(reply.Allocations, alloc.Stub())
}
}
// Convert to a stub
if len(allocs) > 0 {
reply.Allocations = make([]*structs.AllocListStub, 0, len(allocs))
for _, alloc := range allocs {
reply.Allocations = append(reply.Allocations, alloc.Stub())
}
}
// Use the last index that affected the allocs table
index, err := snap.Index("allocs")
if err != nil {
return err
}
reply.Index = index
// Use the last index that affected the allocs table
index, err := snap.Index("allocs")
if err != nil {
return err
}
reply.Index = index
// Set the query response
e.srv.setQueryMeta(&reply.QueryMeta)
return nil
// Set the query response
e.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return e.srv.blockingRPC(&opts)
}

View File

@ -484,3 +484,55 @@ func TestEvalEndpoint_Allocations(t *testing.T) {
t.Fatalf("bad: %#v", resp.Allocations)
}
}
func TestEvalEndpoint_Allocations_blocking(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
state := s1.fsm.State()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the allocs
alloc1 := mock.Alloc()
alloc2 := mock.Alloc()
// Upsert an unrelated alloc first
time.AfterFunc(100*time.Millisecond, func() {
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
}
})
// Upsert an alloc which will trigger the watch later
time.AfterFunc(200*time.Millisecond, func() {
err := state.UpsertAllocs(2000, []*structs.Allocation{alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
})
// Lookup the eval
get := &structs.EvalSpecificRequest{
EvalID: alloc2.EvalID,
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 1,
},
}
var resp structs.EvalAllocationsResponse
start := time.Now()
if err := msgpackrpc.CallWithCodec(codec, "Eval.Allocations", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Now().Sub(start); elapsed < 200*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 2000 {
t.Fatalf("Bad index: %d %d", resp.Index, 2000)
}
if len(resp.Allocations) != 1 || resp.Allocations[0].ID != alloc2.ID {
t.Fatalf("bad: %#v", resp.Allocations)
}
}

View File

@ -532,6 +532,7 @@ func (s *StateStore) UpdateAllocFromClient(index uint64, alloc *structs.Allocati
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "allocs"})
watcher.Add(watch.Item{Alloc: alloc.ID})
watcher.Add(watch.Item{AllocEval: alloc.EvalID})
watcher.Add(watch.Item{AllocJob: alloc.JobID})
watcher.Add(watch.Item{AllocNode: alloc.NodeID})
@ -604,6 +605,7 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
}
watcher.Add(watch.Item{Alloc: alloc.ID})
watcher.Add(watch.Item{AllocEval: alloc.EvalID})
watcher.Add(watch.Item{AllocJob: alloc.JobID})
watcher.Add(watch.Item{AllocNode: alloc.NodeID})
}

View File

@ -8,6 +8,7 @@ package watch
// input for subscribe/unsubscribe and notification firing.
type Item struct {
Alloc string
AllocEval string
AllocJob string
AllocNode string
Eval string