nomad: support full table watches for allocations

This commit is contained in:
Ryan Uber 2015-10-28 19:25:39 -07:00
parent 6b76a3fc62
commit 692e4e371a
3 changed files with 115 additions and 28 deletions

View File

@ -19,35 +19,45 @@ func (a *Alloc) List(args *structs.AllocListRequest, reply *structs.AllocListRes
}
defer metrics.MeasureSince([]string{"nomad", "alloc", "list"}, time.Now())
// Capture all the allocations
snap, err := a.srv.fsm.State().Snapshot()
if err != nil {
return err
}
iter, err := snap.Allocs()
if err != nil {
return err
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watchTables: []string{"allocs"},
run: func() error {
// Capture all the allocations
snap, err := a.srv.fsm.State().Snapshot()
if err != nil {
return err
}
iter, err := snap.Allocs()
if err != nil {
return err
}
for {
raw := iter.Next()
if raw == nil {
break
}
alloc := raw.(*structs.Allocation)
reply.Allocations = append(reply.Allocations, alloc.Stub())
}
var allocs []*structs.AllocListStub
for {
raw := iter.Next()
if raw == nil {
break
}
alloc := raw.(*structs.Allocation)
allocs = append(allocs, alloc.Stub())
}
reply.Allocations = allocs
// Use the last index that affected the jobs table
index, err := snap.Index("allocs")
if err != nil {
return err
}
reply.Index = index
// Use the last index that affected the jobs table
index, err := snap.Index("allocs")
if err != nil {
return err
}
reply.Index = index
// Set the query response
a.srv.setQueryMeta(&reply.QueryMeta)
return nil
// Set the query response
a.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return a.srv.blockingRPC(&opts)
}
// GetAlloc is used to lookup a particular allocation

View File

@ -3,6 +3,7 @@ package nomad
import (
"reflect"
"testing"
"time"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/nomad/mock"
@ -44,6 +45,74 @@ func TestAllocEndpoint_List(t *testing.T) {
}
}
func TestAllocEndpoint_List_blocking(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
state := s1.fsm.State()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the alloc
alloc := mock.Alloc()
// Upsert alloc triggers watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.UpsertAllocs(2, []*structs.Allocation{alloc}); err != nil {
t.Fatalf("err: %v", err)
}
})
req := &structs.AllocListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 1,
},
}
start := time.Now()
var resp structs.AllocListResponse
if err := msgpackrpc.CallWithCodec(codec, "Alloc.List", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 2 {
t.Fatalf("Bad index: %d %d", resp.Index, 2)
}
if len(resp.Allocations) != 1 || resp.Allocations[0].ID != alloc.ID {
t.Fatalf("bad: %#v", resp.Allocations)
}
// Client updates trigger watches
alloc2 := mock.Alloc()
alloc2.ID = alloc.ID
alloc2.ClientStatus = structs.AllocClientStatusRunning
time.AfterFunc(100*time.Millisecond, func() {
if err := state.UpdateAllocFromClient(3, alloc2); err != nil {
t.Fatalf("err: %v", err)
}
})
req.MinQueryIndex = 2
start = time.Now()
var resp2 structs.AllocListResponse
if err := msgpackrpc.CallWithCodec(codec, "Alloc.List", req, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp2)
}
if resp2.Index != 3 {
t.Fatalf("Bad index: %d %d", resp2.Index, 3)
}
if len(resp2.Allocations) != 1 || resp.Allocations[0].ID != alloc.ID ||
resp2.Allocations[0].ClientStatus != structs.AllocClientStatusRunning {
t.Fatalf("bad: %#v", resp2.Allocations)
}
}
func TestAllocEndpoint_GetAlloc(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()

View File

@ -580,8 +580,12 @@ func (s *StateStore) UpdateAllocFromClient(index uint64, alloc *structs.Allocati
return fmt.Errorf("index update failed: %v", err)
}
tables := map[string]struct{}{"allocs": struct{}{}}
nodes := map[string]struct{}{alloc.NodeID: struct{}{}}
txn.Defer(func() { s.watch.notifyAllocs(nodes) })
txn.Defer(func() {
s.watch.notifyAllocs(nodes)
s.watch.notifyTables(tables)
})
txn.Commit()
return nil
}
@ -621,7 +625,11 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
return fmt.Errorf("index update failed: %v", err)
}
txn.Defer(func() { s.watch.notifyAllocs(nodes) })
tables := map[string]struct{}{"allocs": struct{}{}}
txn.Defer(func() {
s.watch.notifyAllocs(nodes)
s.watch.notifyTables(tables)
})
txn.Commit()
return nil
}