client/metadata: fix crasher caused by AllowStale = false (#16549)
Fixes #16517 Given a 3 Server cluster with at least 1 Client connected to Follower 1: If a NodeMeta.{Apply,Read} for the Client request is received by Follower 1 with `AllowStale = false` the Follower will forward the request to the Leader. The Leader, not being connected to the target Client, will forward the RPC to Follower 1. Follower 1, seeing AllowStale=false, will forward the request to the Leader. The Leader, not being connected to... well hoppefully you get the picture: an infinite loop occurs.
This commit is contained in:
parent
d1b35c6bd0
commit
f8884d8b52
|
@ -0,0 +1,3 @@
|
|||
```release-note:bug
|
||||
core: Fixed a bug where Dynamic Node Metadata requests could crash servers
|
||||
```
|
|
@ -148,9 +148,6 @@ func (a *Allocations) GC(alloc *Allocation, q *QueryOptions) error {
|
|||
// Note: for cluster topologies where API consumers don't have network access to
|
||||
// Nomad clients, set api.ClientConnTimeout to a small value (ex 1ms) to avoid
|
||||
// long pauses on this API call.
|
||||
//
|
||||
// BREAKING: This method will have the following signature in 1.6.0
|
||||
// func (a *Allocations) Restart(allocID string, taskName string, allTasks bool, w *WriteOptions) (*WriteMeta, error) {
|
||||
func (a *Allocations) Restart(alloc *Allocation, taskName string, q *QueryOptions) error {
|
||||
req := AllocationRestartRequest{
|
||||
TaskName: taskName,
|
||||
|
@ -223,9 +220,6 @@ type AllocStopResponse struct {
|
|||
// Note: for cluster topologies where API consumers don't have network access to
|
||||
// Nomad clients, set api.ClientConnTimeout to a small value (ex 1ms) to avoid
|
||||
// long pauses on this API call.
|
||||
//
|
||||
// BREAKING: This method will have the following signature in 1.6.0
|
||||
// func (a *Allocations) Signal(allocID string, task string, signal string, w *WriteOptions) (*WriteMeta, error) {
|
||||
func (a *Allocations) Signal(alloc *Allocation, q *QueryOptions, task, signal string) error {
|
||||
req := AllocSignalRequest{
|
||||
Signal: signal,
|
||||
|
|
30
api/api.go
30
api/api.go
|
@ -937,9 +937,8 @@ func (c *Client) query(endpoint string, out any, q *QueryOptions) (*QueryMeta, e
|
|||
return qm, nil
|
||||
}
|
||||
|
||||
// putQuery is used to do a PUT request when doing a read against an endpoint
|
||||
// and deserialize the response into an interface using standard Nomad
|
||||
// conventions.
|
||||
// putQuery is used to do a PUT request when doing a "write" to a Client RPC.
|
||||
// Client RPCs must use QueryOptions to allow setting AllowStale=true.
|
||||
func (c *Client) putQuery(endpoint string, in, out any, q *QueryOptions) (*QueryMeta, error) {
|
||||
r, err := c.newRequest("PUT", endpoint)
|
||||
if err != nil {
|
||||
|
@ -969,6 +968,31 @@ func (c *Client) put(endpoint string, in, out any, q *WriteOptions) (*WriteMeta,
|
|||
return c.write(http.MethodPut, endpoint, in, out, q)
|
||||
}
|
||||
|
||||
// postQuery is used to do a POST request when doing a "write" to a Client RPC.
|
||||
// Client RPCs must use QueryOptions to allow setting AllowStale=true.
|
||||
func (c *Client) postQuery(endpoint string, in, out any, q *QueryOptions) (*QueryMeta, error) {
|
||||
r, err := c.newRequest("POST", endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r.setQueryOptions(q)
|
||||
r.obj = in
|
||||
rtt, resp, err := requireOK(c.doRequest(r))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
qm := &QueryMeta{}
|
||||
parseQueryMeta(resp, qm)
|
||||
qm.RequestTime = rtt
|
||||
|
||||
if err := decodeBody(resp, out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return qm, nil
|
||||
}
|
||||
|
||||
// post is used to do a POST request against an endpoint and
|
||||
// serialize/deserialized using the standard Nomad conventions.
|
||||
func (c *Client) post(endpoint string, in, out any, q *WriteOptions) (*WriteMeta, error) {
|
||||
|
|
|
@ -30,9 +30,9 @@ func (n *Nodes) Meta() *NodeMeta {
|
|||
|
||||
// Apply dynamic Node metadata updates to a Node. If NodeID is unset then Node
|
||||
// receiving the request is modified.
|
||||
func (n *NodeMeta) Apply(meta *NodeMetaApplyRequest, qo *WriteOptions) (*NodeMetaResponse, error) {
|
||||
func (n *NodeMeta) Apply(meta *NodeMetaApplyRequest, qo *QueryOptions) (*NodeMetaResponse, error) {
|
||||
var out NodeMetaResponse
|
||||
_, err := n.client.post("/v1/client/metadata", meta, &out, qo)
|
||||
_, err := n.client.postQuery("/v1/client/metadata", meta, &out, qo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -59,7 +59,7 @@ func (s *HTTPServer) nodeMetaApply(resp http.ResponseWriter, req *http.Request)
|
|||
return nil, CodedError(http.StatusBadRequest, err.Error())
|
||||
}
|
||||
|
||||
s.parseWriteRequest(req, &args.WriteRequest)
|
||||
s.parse(resp, req, &args.QueryOptions.Region, &args.QueryOptions)
|
||||
parseNode(req, &args.NodeID)
|
||||
|
||||
// Determine the handler to use
|
||||
|
|
|
@ -42,6 +42,14 @@ Prefer adding a new message to changing any existing RPC messages.
|
|||
upgraded, so use this to guard sending the new RPC, else send the old RPC
|
||||
* Version must match the actual release version!
|
||||
|
||||
* [ ] If implementing a Client RPC...
|
||||
* Use `QueryOptions` instead of `WriteRequest` in the Request struct as
|
||||
`WriteRequest` is only for *Raft* writes.
|
||||
* Set `QueryOptions.AllowStale = true` in the *Server* RPC forwarder to avoid
|
||||
an infinite loop between leaders and followers when a Client RPC is
|
||||
forwarded through a follower. See
|
||||
https://github.com/hashicorp/nomad/issues/16517
|
||||
|
||||
## Docs
|
||||
|
||||
* [ ] Changelog
|
||||
|
|
2
go.mod
2
go.mod
|
@ -91,7 +91,7 @@ require (
|
|||
github.com/mitchellh/go-glint v0.0.0-20210722152315-6515ceb4a127
|
||||
github.com/mitchellh/go-homedir v1.1.0
|
||||
github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b
|
||||
github.com/mitchellh/go-testing-interface v1.14.1
|
||||
github.com/mitchellh/go-testing-interface v1.14.2-0.20210821155943-2d9075ca8770
|
||||
github.com/mitchellh/hashstructure v1.1.0
|
||||
github.com/mitchellh/mapstructure v1.5.0
|
||||
github.com/mitchellh/reflectwalk v1.0.2
|
||||
|
|
3
go.sum
3
go.sum
|
@ -1091,8 +1091,9 @@ github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b h1:9+ke9YJ9KGWw5AN
|
|||
github.com/mitchellh/go-ps v0.0.0-20190716172923-621e5597135b/go.mod h1:r1VsdOzOPt1ZSrGZWFoNhsAedKnEd6r9Np1+5blZCWk=
|
||||
github.com/mitchellh/go-testing-interface v0.0.0-20171004221916-a61a99592b77/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
|
||||
github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
|
||||
github.com/mitchellh/go-testing-interface v1.14.1 h1:jrgshOhYAUVNMAJiKbEu7EqAwgJJ2JqpQmpLJOu07cU=
|
||||
github.com/mitchellh/go-testing-interface v1.14.1/go.mod h1:gfgS7OtZj6MA4U1UrDRp04twqAjfvlZyCfX3sDjEym8=
|
||||
github.com/mitchellh/go-testing-interface v1.14.2-0.20210821155943-2d9075ca8770 h1:drhDO54gdT/a15GBcMRmunZiNcLgPiFIJa23KzmcvcU=
|
||||
github.com/mitchellh/go-testing-interface v1.14.2-0.20210821155943-2d9075ca8770/go.mod h1:SO/iHr6q2EzbqRApt+8/E9wqebTwQn5y+UlB04bxzo0=
|
||||
github.com/mitchellh/go-wordwrap v0.0.0-20150314170334-ad45545899c7/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo=
|
||||
github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo=
|
||||
github.com/mitchellh/go-wordwrap v1.0.1 h1:TLuKupo69TCn6TQSyGxwI1EblZZEsQ0vMlAFQflz0v0=
|
||||
|
|
|
@ -25,6 +25,10 @@ func newNodeMetaEndpoint(srv *Server) *NodeMeta {
|
|||
func (n *NodeMeta) Apply(args *structs.NodeMetaApplyRequest, reply *structs.NodeMetaResponse) error {
|
||||
const method = "NodeMeta.Apply"
|
||||
|
||||
// Prevent infinite loop between leader and
|
||||
// follower-with-the-target-node-connection.
|
||||
args.QueryOptions.AllowStale = true
|
||||
|
||||
authErr := n.srv.Authenticate(nil, args)
|
||||
if done, err := n.srv.forward(method, args, args, reply); done {
|
||||
return err
|
||||
|
@ -48,6 +52,10 @@ func (n *NodeMeta) Apply(args *structs.NodeMetaApplyRequest, reply *structs.Node
|
|||
func (n *NodeMeta) Read(args *structs.NodeSpecificRequest, reply *structs.NodeMetaResponse) error {
|
||||
const method = "NodeMeta.Read"
|
||||
|
||||
// Prevent infinite loop between leader and
|
||||
// follower-with-the-target-node-connection.
|
||||
args.QueryOptions.AllowStale = true
|
||||
|
||||
authErr := n.srv.Authenticate(nil, args)
|
||||
if done, err := n.srv.forward(method, args, args, reply); done {
|
||||
return err
|
||||
|
|
|
@ -0,0 +1,129 @@
|
|||
package nomad
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/nomad/ci"
|
||||
"github.com/hashicorp/nomad/client"
|
||||
"github.com/hashicorp/nomad/client/config"
|
||||
"github.com/hashicorp/nomad/helper/pointer"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/shoenig/test/must"
|
||||
)
|
||||
|
||||
// TestNodeMeta_Forward asserts that Client RPCs do not result in infinite
|
||||
// loops. For example in a cluster with 1 Leader, 2 Followers, and a Node
|
||||
// connected to Follower 1:
|
||||
//
|
||||
// If a NodeMeta.Apply RPC with AllowStale=false is received by Follower 1, it
|
||||
// will honor AllowStale=false and forward the request to the Leader.
|
||||
//
|
||||
// The Leader will accept the RPC, notice that Follower 1 has a connection to
|
||||
// the Node, and the Leader will send the request back to Follower 1.
|
||||
//
|
||||
// Follower 1, ever respectful of AllowStale=false, will forward it back to the
|
||||
// Leader.
|
||||
//
|
||||
// The Leader, being unable to forward to the Node, will send it back to
|
||||
// Follower 1.
|
||||
//
|
||||
// This argument will continue until one of the Servers runs out of memory or
|
||||
// patience and stomps away in anger (crashes). Like any good argument the
|
||||
// ending is never pretty as the Servers will suffer CPU starvation and
|
||||
// potentially Raft flapping before anyone actually OOMs.
|
||||
//
|
||||
// See https://github.com/hashicorp/nomad/issues/16517 for details.
|
||||
//
|
||||
// If test fails it will do so spectacularly by consuming all available CPU and
|
||||
// potentially all available memory. Running it in a VM or container is
|
||||
// suggested.
|
||||
func TestNodeMeta_Forward(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
servers := []*Server{}
|
||||
for i := 0; i < 3; i++ {
|
||||
s, cleanup := TestServer(t, func(c *Config) {
|
||||
c.BootstrapExpect = 3
|
||||
c.NumSchedulers = 0
|
||||
})
|
||||
t.Cleanup(cleanup)
|
||||
servers = append(servers, s)
|
||||
}
|
||||
|
||||
TestJoin(t, servers...)
|
||||
leader := testutil.WaitForLeaders(t, servers[0].RPC, servers[1].RPC, servers[2].RPC)
|
||||
|
||||
followers := []string{}
|
||||
for _, s := range servers {
|
||||
if addr := s.config.RPCAddr.String(); addr != leader {
|
||||
followers = append(followers, addr)
|
||||
}
|
||||
}
|
||||
t.Logf("leader=%s followers=%q", leader, followers)
|
||||
|
||||
clients := []*client.Client{}
|
||||
for i := 0; i < 4; i++ {
|
||||
c, cleanup := client.TestClient(t, func(c *config.Config) {
|
||||
// Clients will rebalance across all servers, but try to get them to use
|
||||
// followers to ensure we don't hit the loop in #16517
|
||||
c.Servers = followers
|
||||
})
|
||||
defer cleanup()
|
||||
clients = append(clients, c)
|
||||
}
|
||||
for _, c := range clients {
|
||||
testutil.WaitForClient(t, servers[0].RPC, c.NodeID(), c.Region())
|
||||
}
|
||||
|
||||
agentRPCs := []func(string, any, any) error{}
|
||||
nodeIDs := make([]string, 0, len(clients))
|
||||
|
||||
// Build list of agents and node IDs
|
||||
for _, s := range servers {
|
||||
agentRPCs = append(agentRPCs, s.RPC)
|
||||
}
|
||||
|
||||
for _, c := range clients {
|
||||
agentRPCs = append(agentRPCs, c.RPC)
|
||||
nodeIDs = append(nodeIDs, c.NodeID())
|
||||
}
|
||||
|
||||
region := clients[0].Region()
|
||||
|
||||
// Apply metadata to every client through every agent to ensure forwarding
|
||||
// always works regardless of path taken.
|
||||
for _, rpc := range agentRPCs {
|
||||
for _, nodeID := range nodeIDs {
|
||||
args := &structs.NodeMetaApplyRequest{
|
||||
// Intentionally don't set QueryOptions.AllowStale to exercise #16517
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: region,
|
||||
},
|
||||
NodeID: nodeID,
|
||||
Meta: map[string]*string{"testing": pointer.Of("123")},
|
||||
}
|
||||
reply := &structs.NodeMetaResponse{}
|
||||
must.NoError(t, rpc("NodeMeta.Apply", args, reply))
|
||||
must.MapNotEmpty(t, reply.Meta)
|
||||
}
|
||||
}
|
||||
|
||||
for _, rpc := range agentRPCs {
|
||||
for _, nodeID := range nodeIDs {
|
||||
args := &structs.NodeSpecificRequest{
|
||||
// Intentionally don't set QueryOptions.AllowStale to exercise #16517
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: region,
|
||||
},
|
||||
NodeID: nodeID,
|
||||
}
|
||||
reply := &structs.NodeMetaResponse{}
|
||||
must.NoError(t, rpc("NodeMeta.Read", args, reply))
|
||||
must.MapNotEmpty(t, reply.Meta)
|
||||
must.Eq(t, reply.Meta["testing"], "123")
|
||||
must.MapNotEmpty(t, reply.Dynamic)
|
||||
must.Eq(t, *reply.Dynamic["testing"], "123")
|
||||
}
|
||||
}
|
||||
}
|
|
@ -357,7 +357,7 @@ func (di *DriverInfo) HealthCheckEquals(other *DriverInfo) bool {
|
|||
|
||||
// NodeMetaApplyRequest is used to update Node metadata on Client agents.
|
||||
type NodeMetaApplyRequest struct {
|
||||
WriteRequest
|
||||
QueryOptions // Client RPCs must use QueryOptions to set AllowStale=true
|
||||
|
||||
// NodeID is the node being targeted by this request (or the node
|
||||
// receiving this request if NodeID is empty).
|
||||
|
|
|
@ -6,7 +6,6 @@ import (
|
|||
"math/rand"
|
||||
"net"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/ci"
|
||||
|
@ -15,6 +14,7 @@ import (
|
|||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/version"
|
||||
testing "github.com/mitchellh/go-testing-interface"
|
||||
"github.com/shoenig/test/must"
|
||||
)
|
||||
|
||||
|
@ -22,7 +22,7 @@ var (
|
|||
nodeNumber int32 = 0
|
||||
)
|
||||
|
||||
func TestACLServer(t *testing.T, cb func(*Config)) (*Server, *structs.ACLToken, func()) {
|
||||
func TestACLServer(t testing.T, cb func(*Config)) (*Server, *structs.ACLToken, func()) {
|
||||
server, cleanup := TestServer(t, func(c *Config) {
|
||||
c.ACLEnabled = true
|
||||
if cb != nil {
|
||||
|
@ -37,13 +37,13 @@ func TestACLServer(t *testing.T, cb func(*Config)) (*Server, *structs.ACLToken,
|
|||
return server, token, cleanup
|
||||
}
|
||||
|
||||
func TestServer(t *testing.T, cb func(*Config)) (*Server, func()) {
|
||||
func TestServer(t testing.T, cb func(*Config)) (*Server, func()) {
|
||||
s, c, err := TestServerErr(t, cb)
|
||||
must.NoError(t, err, must.Sprint("failed to start test server"))
|
||||
return s, c
|
||||
}
|
||||
|
||||
func TestServerErr(t *testing.T, cb func(*Config)) (*Server, func(), error) {
|
||||
func TestServerErr(t testing.T, cb func(*Config)) (*Server, func(), error) {
|
||||
// Setup the default settings
|
||||
config := DefaultConfig()
|
||||
|
||||
|
@ -150,19 +150,15 @@ func TestServerErr(t *testing.T, cb func(*Config)) (*Server, func(), error) {
|
|||
return nil, nil, errors.New("unable to acquire ports for test server")
|
||||
}
|
||||
|
||||
func TestJoin(t *testing.T, servers ...*Server) {
|
||||
func TestJoin(t testing.T, servers ...*Server) {
|
||||
for i := 0; i < len(servers)-1; i++ {
|
||||
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||
servers[i].config.SerfConfig.MemberlistConfig.BindPort)
|
||||
|
||||
for j := i + 1; j < len(servers); j++ {
|
||||
num, err := servers[j].Join([]string{addr})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if num != 1 {
|
||||
t.Fatalf("bad: %d", num)
|
||||
}
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, 1, num)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -143,15 +143,16 @@ func WaitForLeader(t testing.TB, rpc rpcFn) {
|
|||
})
|
||||
}
|
||||
|
||||
// WaitForLeaders blocks until each serverRPC knows the leader.
|
||||
func WaitForLeaders(t testing.TB, serverRPCs ...rpcFn) {
|
||||
// WaitForLeaders blocks until each rpcs knows the leader.
|
||||
func WaitForLeaders(t testing.TB, rpcs ...rpcFn) string {
|
||||
t.Helper()
|
||||
|
||||
for i := 0; i < len(serverRPCs); i++ {
|
||||
var leader string
|
||||
for i := 0; i < len(rpcs); i++ {
|
||||
ok := func() (bool, error) {
|
||||
leader = ""
|
||||
args := &structs.GenericRequest{}
|
||||
var leader string
|
||||
err := serverRPCs[i]("Status.Leader", args, &leader)
|
||||
err := rpcs[i]("Status.Leader", args, &leader)
|
||||
return leader != "", err
|
||||
}
|
||||
must.Wait(t, wait.InitialSuccess(
|
||||
|
@ -160,6 +161,8 @@ func WaitForLeaders(t testing.TB, serverRPCs ...rpcFn) {
|
|||
wait.Gap(1*time.Second),
|
||||
))
|
||||
}
|
||||
|
||||
return leader
|
||||
}
|
||||
|
||||
// WaitForClient blocks until the client can be found
|
||||
|
|
Loading…
Reference in New Issue