client: don't use `Status` RPC for Consul discovery (#16490)

In #16217 we switched clients using Consul discovery to the `Status.Members`
endpoint for getting the list of servers so that we're using the correct
address. This endpoint has an authorization gate, so this fails if the anonymous
policy doesn't have `node:read`. We also can't check the `AuthToken` for the
request for the client secret, because the client hasn't yet registered so the
server doesn't have anything to compare against.

Instead of hitting the `Status.Peers` or `Status.Members` RPC endpoint, use the
Consul response directly. Update the `registerNode` method to handle the list of
servers we get back in the response; if we get a "no servers" or "no path to
region" response we'll kick off discovery again and retry immediately rather
than waiting 15s.
This commit is contained in:
Tim Gross 2023-03-16 15:38:33 -04:00 committed by GitHub
parent 5b1970468e
commit ec47b245d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 46 deletions

3
.changelog/16490.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
client: Fixed a bug where clients using Consul discovery to join the cluster would get permission denied errors
```

View File

@ -1943,7 +1943,7 @@ func (c *Client) retryRegisterNode() {
}
retryIntv := registerRetryIntv
if err == noServersErr {
if err == noServersErr || structs.IsErrNoRegionPath(err) {
c.logger.Debug("registration waiting on servers")
c.triggerDiscovery()
retryIntv = noServerRetryIntv
@ -1970,6 +1970,11 @@ func (c *Client) registerNode() error {
return err
}
err := c.handleNodeUpdateResponse(resp)
if err != nil {
return err
}
// Update the node status to ready after we register.
c.UpdateConfig(func(c *config.Config) {
c.Node.Status = structs.NodeStatusReady
@ -1984,6 +1989,7 @@ func (c *Client) registerNode() error {
defer c.heartbeatLock.Unlock()
c.heartbeatStop.setLastOk(time.Now())
c.heartbeatTTL = resp.HeartbeatTTL
return nil
}
@ -2035,6 +2041,22 @@ func (c *Client) updateNodeStatus() error {
}
})
err := c.handleNodeUpdateResponse(resp)
if err != nil {
return fmt.Errorf("heartbeat response returned no valid servers")
}
// If there's no Leader in the response we may be talking to a partitioned
// server. Redo discovery to ensure our server list is up to date.
if resp.LeaderRPCAddr == "" {
c.triggerDiscovery()
}
c.EnterpriseClient.SetFeatures(resp.Features)
return nil
}
func (c *Client) handleNodeUpdateResponse(resp structs.NodeUpdateResponse) error {
// Update the number of nodes in the cluster so we can adjust our server
// rebalance rate.
c.servers.SetNumNodes(resp.NumNodes)
@ -2051,20 +2073,9 @@ func (c *Client) updateNodeStatus() error {
nomadServers = append(nomadServers, e)
}
if len(nomadServers) == 0 {
return fmt.Errorf("heartbeat response returned no valid servers")
return noServersErr
}
c.servers.SetServers(nomadServers)
// Begin polling Consul if there is no Nomad leader. We could be
// heartbeating to a Nomad server that is in the minority of a
// partition of the Nomad server quorum, but this Nomad Agent still
// has connectivity to the existing majority of Nomad Servers, but
// only if it queries Consul.
if resp.LeaderRPCAddr == "" {
c.triggerDiscovery()
}
c.EnterpriseClient.SetFeatures(resp.Features)
return nil
}
@ -2906,14 +2917,6 @@ func (c *Client) consulDiscoveryImpl() error {
dcs = dcs[0:helper.Min(len(dcs), datacenterQueryLimit)]
}
// Query for servers in this client's region only
region := c.Region()
rpcargs := structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Region: region,
},
}
serviceName := c.GetConfig().ConsulConfig.ServerServiceName
var mErr multierror.Error
var nomadServers servers.Servers
@ -2944,32 +2947,14 @@ DISCOLOOP:
continue
}
// Query the members from the region that Consul gave us, and
// extract the client-advertise RPC address from each member
var membersResp structs.ServerMembersResponse
if err := c.connPool.RPC(region, addr, "Status.Members", rpcargs, &membersResp); err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
for _, member := range membersResp.Members {
if addrTag, ok := member.Tags["rpc_addr"]; ok {
if portTag, ok := member.Tags["port"]; ok {
addr, err := net.ResolveTCPAddr("tcp",
fmt.Sprintf("%s:%s", addrTag, portTag))
if err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
srv := &servers.Server{Addr: addr}
nomadServers = append(nomadServers, srv)
}
}
}
if len(nomadServers) > 0 {
break DISCOLOOP
}
srv := &servers.Server{Addr: addr}
nomadServers = append(nomadServers, srv)
}
if len(nomadServers) > 0 {
break DISCOLOOP
}
}
if len(nomadServers) == 0 {
if len(mErr.Errors) > 0 {