diff --git a/.changelog/16549.txt b/.changelog/16549.txt new file mode 100644 index 000000000..e5851ab93 --- /dev/null +++ b/.changelog/16549.txt @@ -0,0 +1,3 @@ +```release-note:bug +core: Fixed a bug where Dynamic Node Metadata requests could crash servers +``` diff --git a/api/allocations.go b/api/allocations.go index 9bd2d7aa6..2d859ffcc 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -148,9 +148,6 @@ func (a *Allocations) GC(alloc *Allocation, q *QueryOptions) error { // Note: for cluster topologies where API consumers don't have network access to // Nomad clients, set api.ClientConnTimeout to a small value (ex 1ms) to avoid // long pauses on this API call. -// -// BREAKING: This method will have the following signature in 1.6.0 -// func (a *Allocations) Restart(allocID string, taskName string, allTasks bool, w *WriteOptions) (*WriteMeta, error) { func (a *Allocations) Restart(alloc *Allocation, taskName string, q *QueryOptions) error { req := AllocationRestartRequest{ TaskName: taskName, @@ -223,9 +220,6 @@ type AllocStopResponse struct { // Note: for cluster topologies where API consumers don't have network access to // Nomad clients, set api.ClientConnTimeout to a small value (ex 1ms) to avoid // long pauses on this API call. -// -// BREAKING: This method will have the following signature in 1.6.0 -// func (a *Allocations) Signal(allocID string, task string, signal string, w *WriteOptions) (*WriteMeta, error) { func (a *Allocations) Signal(alloc *Allocation, q *QueryOptions, task, signal string) error { req := AllocSignalRequest{ Signal: signal, diff --git a/api/api.go b/api/api.go index af960745d..107457ce1 100644 --- a/api/api.go +++ b/api/api.go @@ -937,9 +937,8 @@ func (c *Client) query(endpoint string, out any, q *QueryOptions) (*QueryMeta, e return qm, nil } -// putQuery is used to do a PUT request when doing a read against an endpoint -// and deserialize the response into an interface using standard Nomad -// conventions. +// putQuery is used to do a PUT request when doing a "write" to a Client RPC. +// Client RPCs must use QueryOptions to allow setting AllowStale=true. func (c *Client) putQuery(endpoint string, in, out any, q *QueryOptions) (*QueryMeta, error) { r, err := c.newRequest("PUT", endpoint) if err != nil { @@ -969,6 +968,31 @@ func (c *Client) put(endpoint string, in, out any, q *WriteOptions) (*WriteMeta, return c.write(http.MethodPut, endpoint, in, out, q) } +// postQuery is used to do a POST request when doing a "write" to a Client RPC. +// Client RPCs must use QueryOptions to allow setting AllowStale=true. +func (c *Client) postQuery(endpoint string, in, out any, q *QueryOptions) (*QueryMeta, error) { + r, err := c.newRequest("POST", endpoint) + if err != nil { + return nil, err + } + r.setQueryOptions(q) + r.obj = in + rtt, resp, err := requireOK(c.doRequest(r)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + qm := &QueryMeta{} + parseQueryMeta(resp, qm) + qm.RequestTime = rtt + + if err := decodeBody(resp, out); err != nil { + return nil, err + } + return qm, nil +} + // post is used to do a POST request against an endpoint and // serialize/deserialized using the standard Nomad conventions. func (c *Client) post(endpoint string, in, out any, q *WriteOptions) (*WriteMeta, error) { diff --git a/api/node_meta.go b/api/node_meta.go index be851206b..4d5c4ffa5 100644 --- a/api/node_meta.go +++ b/api/node_meta.go @@ -30,9 +30,9 @@ func (n *Nodes) Meta() *NodeMeta { // Apply dynamic Node metadata updates to a Node. If NodeID is unset then Node // receiving the request is modified. -func (n *NodeMeta) Apply(meta *NodeMetaApplyRequest, qo *WriteOptions) (*NodeMetaResponse, error) { +func (n *NodeMeta) Apply(meta *NodeMetaApplyRequest, qo *QueryOptions) (*NodeMetaResponse, error) { var out NodeMetaResponse - _, err := n.client.post("/v1/client/metadata", meta, &out, qo) + _, err := n.client.postQuery("/v1/client/metadata", meta, &out, qo) if err != nil { return nil, err } diff --git a/command/agent/meta_endpoint.go b/command/agent/meta_endpoint.go index f07fe014a..282505549 100644 --- a/command/agent/meta_endpoint.go +++ b/command/agent/meta_endpoint.go @@ -59,7 +59,7 @@ func (s *HTTPServer) nodeMetaApply(resp http.ResponseWriter, req *http.Request) return nil, CodedError(http.StatusBadRequest, err.Error()) } - s.parseWriteRequest(req, &args.WriteRequest) + s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions) parseNode(req, &args.NodeID) // Determine the handler to use diff --git a/contributing/checklist-rpc-endpoint.md b/contributing/checklist-rpc-endpoint.md index f8df1c0c8..a961d8ca0 100644 --- a/contributing/checklist-rpc-endpoint.md +++ b/contributing/checklist-rpc-endpoint.md @@ -42,6 +42,14 @@ Prefer adding a new message to changing any existing RPC messages. upgraded, so use this to guard sending the new RPC, else send the old RPC * Version must match the actual release version! +* [ ] If implementing a Client RPC... + * Use `QueryOptions` instead of `WriteRequest` in the Request struct as + `WriteRequest` is only for *Raft* writes. + * Set `QueryOptions.AllowStale = true` in the *Server* RPC forwarder to avoid + an infinite loop between leaders and followers when a Client RPC is + forwarded through a follower. See + https://github.com/hashicorp/nomad/issues/16517 + ## Docs * [ ] Changelog diff --git a/go.mod b/go.mod index 4fa07c88f..2c86f1e65 100644 --- a/go.mod +++ b/go.mod @@ -91,7 +91,7 @@ require ( github.com/mitchellh/go-glint v0.0.0-20210722152315-6515ceb4a127 github.com/mitchellh/go-homedir v1.1.0 github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b - github.com/mitchellh/go-testing-interface v1.14.1 + github.com/mitchellh/go-testing-interface v1.14.2-0.20210821155943-2d9075ca8770 github.com/mitchellh/hashstructure v1.1.0 github.com/mitchellh/mapstructure v1.5.0 github.com/mitchellh/reflectwalk v1.0.2 diff --git a/go.sum b/go.sum index b40d0e62e..1c5f0118e 100644 --- a/go.sum +++ b/go.sum @@ -1091,8 +1091,9 @@ github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b h1:9+ke9YJ9KGWw5AN github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b/go.mod h1:r1VsdOzOPt1ZSrGZWFoNhsAedKnEd6r9Np1+5blZCWk= github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= -github.com/mitchellh/go-testing-interface v1.14.1 h1:jrgshOhYAUVNMAJiKbEu7EqAwgJJ2JqpQmpLJOu07cU= github.com/mitchellh/go-testing-interface v1.14.1/go.mod h1:gfgS7OtZj6MA4U1UrDRp04twqAjfvlZyCfX3sDjEym8= +github.com/mitchellh/go-testing-interface v1.14.2-0.20210821155943-2d9075ca8770 h1:drhDO54gdT/a15GBcMRmunZiNcLgPiFIJa23KzmcvcU= +github.com/mitchellh/go-testing-interface v1.14.2-0.20210821155943-2d9075ca8770/go.mod h1:SO/iHr6q2EzbqRApt+8/E9wqebTwQn5y+UlB04bxzo0= github.com/mitchellh/go-wordwrap v0.0.0-20150314170334-ad45545899c7/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= github.com/mitchellh/go-wordwrap v1.0.1 h1:TLuKupo69TCn6TQSyGxwI1EblZZEsQ0vMlAFQflz0v0= diff --git a/nomad/client_meta_endpoint.go b/nomad/client_meta_endpoint.go index 80c7d237b..1795a6aba 100644 --- a/nomad/client_meta_endpoint.go +++ b/nomad/client_meta_endpoint.go @@ -25,6 +25,10 @@ func newNodeMetaEndpoint(srv *Server) *NodeMeta { func (n *NodeMeta) Apply(args *structs.NodeMetaApplyRequest, reply *structs.NodeMetaResponse) error { const method = "NodeMeta.Apply" + // Prevent infinite loop between leader and + // follower-with-the-target-node-connection. + args.QueryOptions.AllowStale = true + authErr := n.srv.Authenticate(nil, args) if done, err := n.srv.forward(method, args, args, reply); done { return err @@ -48,6 +52,10 @@ func (n *NodeMeta) Apply(args *structs.NodeMetaApplyRequest, reply *structs.Node func (n *NodeMeta) Read(args *structs.NodeSpecificRequest, reply *structs.NodeMetaResponse) error { const method = "NodeMeta.Read" + // Prevent infinite loop between leader and + // follower-with-the-target-node-connection. + args.QueryOptions.AllowStale = true + authErr := n.srv.Authenticate(nil, args) if done, err := n.srv.forward(method, args, args, reply); done { return err diff --git a/nomad/client_meta_endpoint_test.go b/nomad/client_meta_endpoint_test.go new file mode 100644 index 000000000..64e56c269 --- /dev/null +++ b/nomad/client_meta_endpoint_test.go @@ -0,0 +1,129 @@ +package nomad + +import ( + "testing" + + "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/client" + "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/helper/pointer" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" + "github.com/shoenig/test/must" +) + +// TestNodeMeta_Forward asserts that Client RPCs do not result in infinite +// loops. For example in a cluster with 1 Leader, 2 Followers, and a Node +// connected to Follower 1: +// +// If a NodeMeta.Apply RPC with AllowStale=false is received by Follower 1, it +// will honor AllowStale=false and forward the request to the Leader. +// +// The Leader will accept the RPC, notice that Follower 1 has a connection to +// the Node, and the Leader will send the request back to Follower 1. +// +// Follower 1, ever respectful of AllowStale=false, will forward it back to the +// Leader. +// +// The Leader, being unable to forward to the Node, will send it back to +// Follower 1. +// +// This argument will continue until one of the Servers runs out of memory or +// patience and stomps away in anger (crashes). Like any good argument the +// ending is never pretty as the Servers will suffer CPU starvation and +// potentially Raft flapping before anyone actually OOMs. +// +// See https://github.com/hashicorp/nomad/issues/16517 for details. +// +// If test fails it will do so spectacularly by consuming all available CPU and +// potentially all available memory. Running it in a VM or container is +// suggested. +func TestNodeMeta_Forward(t *testing.T) { + ci.Parallel(t) + + servers := []*Server{} + for i := 0; i < 3; i++ { + s, cleanup := TestServer(t, func(c *Config) { + c.BootstrapExpect = 3 + c.NumSchedulers = 0 + }) + t.Cleanup(cleanup) + servers = append(servers, s) + } + + TestJoin(t, servers...) + leader := testutil.WaitForLeaders(t, servers[0].RPC, servers[1].RPC, servers[2].RPC) + + followers := []string{} + for _, s := range servers { + if addr := s.config.RPCAddr.String(); addr != leader { + followers = append(followers, addr) + } + } + t.Logf("leader=%s followers=%q", leader, followers) + + clients := []*client.Client{} + for i := 0; i < 4; i++ { + c, cleanup := client.TestClient(t, func(c *config.Config) { + // Clients will rebalance across all servers, but try to get them to use + // followers to ensure we don't hit the loop in #16517 + c.Servers = followers + }) + defer cleanup() + clients = append(clients, c) + } + for _, c := range clients { + testutil.WaitForClient(t, servers[0].RPC, c.NodeID(), c.Region()) + } + + agentRPCs := []func(string, any, any) error{} + nodeIDs := make([]string, 0, len(clients)) + + // Build list of agents and node IDs + for _, s := range servers { + agentRPCs = append(agentRPCs, s.RPC) + } + + for _, c := range clients { + agentRPCs = append(agentRPCs, c.RPC) + nodeIDs = append(nodeIDs, c.NodeID()) + } + + region := clients[0].Region() + + // Apply metadata to every client through every agent to ensure forwarding + // always works regardless of path taken. + for _, rpc := range agentRPCs { + for _, nodeID := range nodeIDs { + args := &structs.NodeMetaApplyRequest{ + // Intentionally don't set QueryOptions.AllowStale to exercise #16517 + QueryOptions: structs.QueryOptions{ + Region: region, + }, + NodeID: nodeID, + Meta: map[string]*string{"testing": pointer.Of("123")}, + } + reply := &structs.NodeMetaResponse{} + must.NoError(t, rpc("NodeMeta.Apply", args, reply)) + must.MapNotEmpty(t, reply.Meta) + } + } + + for _, rpc := range agentRPCs { + for _, nodeID := range nodeIDs { + args := &structs.NodeSpecificRequest{ + // Intentionally don't set QueryOptions.AllowStale to exercise #16517 + QueryOptions: structs.QueryOptions{ + Region: region, + }, + NodeID: nodeID, + } + reply := &structs.NodeMetaResponse{} + must.NoError(t, rpc("NodeMeta.Read", args, reply)) + must.MapNotEmpty(t, reply.Meta) + must.Eq(t, reply.Meta["testing"], "123") + must.MapNotEmpty(t, reply.Dynamic) + must.Eq(t, *reply.Dynamic["testing"], "123") + } + } +} diff --git a/nomad/structs/node.go b/nomad/structs/node.go index f930b3385..f025335ec 100644 --- a/nomad/structs/node.go +++ b/nomad/structs/node.go @@ -357,7 +357,7 @@ func (di *DriverInfo) HealthCheckEquals(other *DriverInfo) bool { // NodeMetaApplyRequest is used to update Node metadata on Client agents. type NodeMetaApplyRequest struct { - WriteRequest + QueryOptions // Client RPCs must use QueryOptions to set AllowStale=true // NodeID is the node being targeted by this request (or the node // receiving this request if NodeID is empty). diff --git a/nomad/testing.go b/nomad/testing.go index 6e7d31bb8..d2ec5d616 100644 --- a/nomad/testing.go +++ b/nomad/testing.go @@ -6,7 +6,6 @@ import ( "math/rand" "net" "sync/atomic" - "testing" "time" "github.com/hashicorp/nomad/ci" @@ -15,6 +14,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/version" + testing "github.com/mitchellh/go-testing-interface" "github.com/shoenig/test/must" ) @@ -22,7 +22,7 @@ var ( nodeNumber int32 = 0 ) -func TestACLServer(t *testing.T, cb func(*Config)) (*Server, *structs.ACLToken, func()) { +func TestACLServer(t testing.T, cb func(*Config)) (*Server, *structs.ACLToken, func()) { server, cleanup := TestServer(t, func(c *Config) { c.ACLEnabled = true if cb != nil { @@ -37,13 +37,13 @@ func TestACLServer(t *testing.T, cb func(*Config)) (*Server, *structs.ACLToken, return server, token, cleanup } -func TestServer(t *testing.T, cb func(*Config)) (*Server, func()) { +func TestServer(t testing.T, cb func(*Config)) (*Server, func()) { s, c, err := TestServerErr(t, cb) must.NoError(t, err, must.Sprint("failed to start test server")) return s, c } -func TestServerErr(t *testing.T, cb func(*Config)) (*Server, func(), error) { +func TestServerErr(t testing.T, cb func(*Config)) (*Server, func(), error) { // Setup the default settings config := DefaultConfig() @@ -150,19 +150,15 @@ func TestServerErr(t *testing.T, cb func(*Config)) (*Server, func(), error) { return nil, nil, errors.New("unable to acquire ports for test server") } -func TestJoin(t *testing.T, servers ...*Server) { +func TestJoin(t testing.T, servers ...*Server) { for i := 0; i < len(servers)-1; i++ { addr := fmt.Sprintf("127.0.0.1:%d", servers[i].config.SerfConfig.MemberlistConfig.BindPort) for j := i + 1; j < len(servers); j++ { num, err := servers[j].Join([]string{addr}) - if err != nil { - t.Fatalf("err: %v", err) - } - if num != 1 { - t.Fatalf("bad: %d", num) - } + must.NoError(t, err) + must.Eq(t, 1, num) } } } diff --git a/testutil/wait.go b/testutil/wait.go index 3cb8ee3a0..6f239b908 100644 --- a/testutil/wait.go +++ b/testutil/wait.go @@ -143,15 +143,16 @@ func WaitForLeader(t testing.TB, rpc rpcFn) { }) } -// WaitForLeaders blocks until each serverRPC knows the leader. -func WaitForLeaders(t testing.TB, serverRPCs ...rpcFn) { +// WaitForLeaders blocks until each rpcs knows the leader. +func WaitForLeaders(t testing.TB, rpcs ...rpcFn) string { t.Helper() - for i := 0; i < len(serverRPCs); i++ { + var leader string + for i := 0; i < len(rpcs); i++ { ok := func() (bool, error) { + leader = "" args := &structs.GenericRequest{} - var leader string - err := serverRPCs[i]("Status.Leader", args, &leader) + err := rpcs[i]("Status.Leader", args, &leader) return leader != "", err } must.Wait(t, wait.InitialSuccess( @@ -160,6 +161,8 @@ func WaitForLeaders(t testing.TB, serverRPCs ...rpcFn) { wait.Gap(1*time.Second), )) } + + return leader } // WaitForClient blocks until the client can be found