diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 715b6a58d..23c50de57 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -410,7 +410,6 @@ func (n *Node) List(args *structs.NodeListRequest, queryMeta: &reply.QueryMeta, watchTables: []string{"nodes"}, run: func() error { - // Capture all the nodes snap, err := n.srv.fsm.State().Snapshot() if err != nil { @@ -421,14 +420,16 @@ func (n *Node) List(args *structs.NodeListRequest, return err } + var nodes []*structs.NodeListStub for { raw := iter.Next() if raw == nil { break } node := raw.(*structs.Node) - reply.Nodes = append(reply.Nodes, node.Stub()) + nodes = append(nodes, node.Stub()) } + reply.Nodes = nodes // Use the last index that affected the jobs table index, err := snap.Index("nodes") diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index c1a312d48..91ae5d4fc 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -756,26 +756,21 @@ func TestClientEndpoint_ListNodes(t *testing.T) { func TestClientEndpoint_ListNodes_blocking(t *testing.T) { s1 := testServer(t, nil) defer s1.Shutdown() + state := s1.fsm.State() codec := rpcClient(t, s1) testutil.WaitForLeader(t, s1.RPC) // Create the node node := mock.Node() - go func() { - // Wait a bit - time.Sleep(100 * time.Millisecond) - - // Send the register request - state := s1.fsm.State() - err := state.UpsertNode(2, node) - if err != nil { + // Node upsert triggers watches + time.AfterFunc(100*time.Millisecond, func() { + if err := state.UpsertNode(2, node); err != nil { t.Fatalf("err: %v", err) } - }() + }) - // List the nodes. Should block until the index is reached. - get := &structs.NodeListRequest{ + req := &structs.NodeListRequest{ QueryOptions: structs.QueryOptions{ Region: "global", MinQueryIndex: 1, @@ -783,22 +778,89 @@ func TestClientEndpoint_ListNodes_blocking(t *testing.T) { } start := time.Now() var resp structs.NodeListResponse - if err := msgpackrpc.CallWithCodec(codec, "Node.List", get, &resp); err != nil { + if err := msgpackrpc.CallWithCodec(codec, "Node.List", req, &resp); err != nil { t.Fatalf("err: %v", err) } - // Check that we blocked if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond { t.Fatalf("should block (returned in %s) %#v", elapsed, resp) } - if resp.Index != 2 { t.Fatalf("Bad index: %d %d", resp.Index, 2) } - if len(resp.Nodes) != 1 { + if len(resp.Nodes) != 1 || resp.Nodes[0].ID != node.ID { t.Fatalf("bad: %#v", resp.Nodes) } - if resp.Nodes[0].ID != node.ID { - t.Fatalf("bad: %#v", resp.Nodes[0]) + + // Node drain updates trigger watches. + time.AfterFunc(100*time.Millisecond, func() { + if err := state.UpdateNodeDrain(3, node.ID, true); err != nil { + t.Fatalf("err: %v", err) + } + }) + + req.MinQueryIndex = 2 + var resp2 structs.NodeListResponse + start = time.Now() + if err := msgpackrpc.CallWithCodec(codec, "Node.List", req, &resp2); err != nil { + t.Fatalf("err: %v", err) + } + + if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp) + } + if resp2.Index != 3 { + t.Fatalf("Bad index: %d %d", resp2.Index, 3) + } + if len(resp2.Nodes) != 1 || !resp2.Nodes[0].Drain { + t.Fatalf("bad: %#v", resp2.Nodes) + } + + // Node status update triggers watches + time.AfterFunc(100*time.Millisecond, func() { + if err := state.UpdateNodeStatus(4, node.ID, structs.NodeStatusDown); err != nil { + t.Fatalf("err: %v", err) + } + }) + + req.MinQueryIndex = 3 + var resp3 structs.NodeListResponse + start = time.Now() + if err := msgpackrpc.CallWithCodec(codec, "Node.List", req, &resp3); err != nil { + t.Fatalf("err: %v", err) + } + + if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp) + } + if resp3.Index != 4 { + t.Fatalf("Bad index: %d %d", resp3.Index, 4) + } + if len(resp3.Nodes) != 1 || resp3.Nodes[0].Status != structs.NodeStatusDown { + t.Fatalf("bad: %#v", resp3.Nodes) + } + + // Node delete triggers watches. + time.AfterFunc(100*time.Millisecond, func() { + if err := state.DeleteNode(5, node.ID); err != nil { + t.Fatalf("err: %v", err) + } + }) + + req.MinQueryIndex = 4 + var resp4 structs.NodeListResponse + start = time.Now() + if err := msgpackrpc.CallWithCodec(codec, "Node.List", req, &resp4); err != nil { + t.Fatalf("err: %v", err) + } + + if elapsed := time.Now().Sub(start); elapsed < 100*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp) + } + if resp4.Index != 5 { + t.Fatalf("Bad index: %d %d", resp4.Index, 5) + } + if len(resp4.Nodes) != 0 { + t.Fatalf("bad: %#v", resp4.Nodes) } }