nomad: test all node watch triggers
This commit is contained in:
parent
2c4735fbcf
commit
faab2495ee
|
@ -410,7 +410,6 @@ func (n *Node) List(args *structs.NodeListRequest,
|
|||
queryMeta: &reply.QueryMeta,
|
||||
watchTables: []string{"nodes"},
|
||||
run: func() error {
|
||||
|
||||
// Capture all the nodes
|
||||
snap, err := n.srv.fsm.State().Snapshot()
|
||||
if err != nil {
|
||||
|
@ -421,14 +420,16 @@ func (n *Node) List(args *structs.NodeListRequest,
|
|||
return err
|
||||
}
|
||||
|
||||
var nodes []*structs.NodeListStub
|
||||
for {
|
||||
raw := iter.Next()
|
||||
if raw == nil {
|
||||
break
|
||||
}
|
||||
node := raw.(*structs.Node)
|
||||
reply.Nodes = append(reply.Nodes, node.Stub())
|
||||
nodes = append(nodes, node.Stub())
|
||||
}
|
||||
reply.Nodes = nodes
|
||||
|
||||
// Use the last index that affected the jobs table
|
||||
index, err := snap.Index("nodes")
|
||||
|
|
|
@ -756,26 +756,21 @@ func TestClientEndpoint_ListNodes(t *testing.T) {
|
|||
func TestClientEndpoint_ListNodes_blocking(t *testing.T) {
|
||||
s1 := testServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
state := s1.fsm.State()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
// Create the node
|
||||
node := mock.Node()
|
||||
|
||||
go func() {
|
||||
// Wait a bit
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Send the register request
|
||||
state := s1.fsm.State()
|
||||
err := state.UpsertNode(2, node)
|
||||
if err != nil {
|
||||
// Node upsert triggers watches
|
||||
time.AfterFunc(100*time.Millisecond, func() {
|
||||
if err := state.UpsertNode(2, node); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}()
|
||||
})
|
||||
|
||||
// List the nodes. Should block until the index is reached.
|
||||
get := &structs.NodeListRequest{
|
||||
req := &structs.NodeListRequest{
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: "global",
|
||||
MinQueryIndex: 1,
|
||||
|
@ -783,22 +778,89 @@ func TestClientEndpoint_ListNodes_blocking(t *testing.T) {
|
|||
}
|
||||
start := time.Now()
|
||||
var resp structs.NodeListResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Node.List", get, &resp); err != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Node.List", req, &resp); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Check that we blocked
|
||||
if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond {
|
||||
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
|
||||
}
|
||||
|
||||
if resp.Index != 2 {
|
||||
t.Fatalf("Bad index: %d %d", resp.Index, 2)
|
||||
}
|
||||
if len(resp.Nodes) != 1 {
|
||||
if len(resp.Nodes) != 1 || resp.Nodes[0].ID != node.ID {
|
||||
t.Fatalf("bad: %#v", resp.Nodes)
|
||||
}
|
||||
if resp.Nodes[0].ID != node.ID {
|
||||
t.Fatalf("bad: %#v", resp.Nodes[0])
|
||||
|
||||
// Node drain updates trigger watches.
|
||||
time.AfterFunc(100*time.Millisecond, func() {
|
||||
if err := state.UpdateNodeDrain(3, node.ID, true); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
req.MinQueryIndex = 2
|
||||
var resp2 structs.NodeListResponse
|
||||
start = time.Now()
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Node.List", req, &resp2); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond {
|
||||
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
|
||||
}
|
||||
if resp2.Index != 3 {
|
||||
t.Fatalf("Bad index: %d %d", resp2.Index, 3)
|
||||
}
|
||||
if len(resp2.Nodes) != 1 || !resp2.Nodes[0].Drain {
|
||||
t.Fatalf("bad: %#v", resp2.Nodes)
|
||||
}
|
||||
|
||||
// Node status update triggers watches
|
||||
time.AfterFunc(100*time.Millisecond, func() {
|
||||
if err := state.UpdateNodeStatus(4, node.ID, structs.NodeStatusDown); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
req.MinQueryIndex = 3
|
||||
var resp3 structs.NodeListResponse
|
||||
start = time.Now()
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Node.List", req, &resp3); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond {
|
||||
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
|
||||
}
|
||||
if resp3.Index != 4 {
|
||||
t.Fatalf("Bad index: %d %d", resp3.Index, 4)
|
||||
}
|
||||
if len(resp3.Nodes) != 1 || resp3.Nodes[0].Status != structs.NodeStatusDown {
|
||||
t.Fatalf("bad: %#v", resp3.Nodes)
|
||||
}
|
||||
|
||||
// Node delete triggers watches.
|
||||
time.AfterFunc(100*time.Millisecond, func() {
|
||||
if err := state.DeleteNode(5, node.ID); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
req.MinQueryIndex = 4
|
||||
var resp4 structs.NodeListResponse
|
||||
start = time.Now()
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Node.List", req, &resp4); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond {
|
||||
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
|
||||
}
|
||||
if resp4.Index != 5 {
|
||||
t.Fatalf("Bad index: %d %d", resp4.Index, 5)
|
||||
}
|
||||
if len(resp4.Nodes) != 0 {
|
||||
t.Fatalf("bad: %#v", resp4.Nodes)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue