Populate the RPC Proxy's server list if heartbeat did not include a leader.
It's possible that a Nomad Client is heartbeating with a Nomad server that has become issolated from the quorum of Nomad Servers. When 3x the heartbeatTTL has been exceeded, append the Consul server list to the primary primary server list. When the next RPCProxy rebalance occurs, there is a chance one of the servers discovered from Consul will be in the majority. When client reattaches to a Nomad Server in the majority, it will include a heartbeat and will reset the TTLs *AND* will clear the primary server list to include only values from the heartbeat.
This commit is contained in:
parent
9a223936bb
commit
ed29946f5e
|
@ -9,6 +9,7 @@ import (
|
|||
"path/filepath"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
|
@ -111,6 +112,13 @@ type Client struct {
|
|||
|
||||
connPool *nomad.ConnPool
|
||||
|
||||
// lastHeartbeatFromQuorum is an atomic int32 acting as a bool. When
|
||||
// true, the last heartbeat message had a leader. When false (0),
|
||||
// the last heartbeat did not include the RPC address of the leader,
|
||||
// indicating that the server is in the minority or middle of an
|
||||
// election.
|
||||
lastHeartbeatFromQuorum int32
|
||||
|
||||
lastHeartbeat time.Time
|
||||
heartbeatTTL time.Duration
|
||||
heartbeatLock sync.Mutex
|
||||
|
@ -361,6 +369,8 @@ func (c *Client) Stats() map[string]map[string]string {
|
|||
numAllocs := len(c.allocs)
|
||||
c.allocLock.RUnlock()
|
||||
|
||||
c.heartbeatLock.Lock()
|
||||
defer c.heartbeatLock.Unlock()
|
||||
stats := map[string]map[string]string{
|
||||
"client": map[string]string{
|
||||
"node_id": c.Node().ID,
|
||||
|
@ -924,8 +934,20 @@ func (c *Client) updateNodeStatus() error {
|
|||
if err := c.rpcProxy.RefreshServerLists(resp.Servers, resp.NumNodes, resp.LeaderRPCAddr); err != nil {
|
||||
return err
|
||||
}
|
||||
c.consulPullHeartbeatDeadline = time.Now().Add(2 * resp.HeartbeatTTL)
|
||||
|
||||
// Begin polling Consul if there is no Nomad leader. We could be
|
||||
// heartbeating to a Nomad server that is in the minority of a
|
||||
// partition of the Nomad server quorum, but this Nomad Agent still
|
||||
// has connectivity to the existing majority of Nomad Servers, but
|
||||
// only if it queries Consul.
|
||||
if resp.LeaderRPCAddr == "" {
|
||||
atomic.CompareAndSwapInt32(&c.lastHeartbeatFromQuorum, 1, 0)
|
||||
return nil
|
||||
}
|
||||
|
||||
const heartbeatFallbackFactor = 3
|
||||
atomic.CompareAndSwapInt32(&c.lastHeartbeatFromQuorum, 0, 1)
|
||||
c.consulPullHeartbeatDeadline = time.Now().Add(heartbeatFallbackFactor * resp.HeartbeatTTL)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1249,20 +1271,29 @@ func (c *Client) setupConsulSyncer() error {
|
|||
bootstrapFn := func() error {
|
||||
now := time.Now()
|
||||
c.configLock.RLock()
|
||||
if now.Before(c.consulPullHeartbeatDeadline) {
|
||||
|
||||
// If the last heartbeat didn't contain a leader, give the
|
||||
// Nomad server this Agent is talking to one more attempt at
|
||||
// providing a heartbeat that does contain a leader.
|
||||
if atomic.LoadInt32(&c.lastHeartbeatFromQuorum) == 1 && now.Before(c.consulPullHeartbeatDeadline) {
|
||||
c.configLock.RUnlock()
|
||||
return nil
|
||||
}
|
||||
c.configLock.RUnlock()
|
||||
c.logger.Printf("[TRACE] client.consul: lost heartbeat with Nomad quorum, falling back to Consul for server list")
|
||||
|
||||
nomadServerServiceName := c.config.ConsulConfig.ServerServiceName
|
||||
services, _, err := c.consulSyncer.ConsulClient().Catalog().
|
||||
Service(nomadServerServiceName, consul.ServiceTagRpc,
|
||||
&consulapi.QueryOptions{AllowStale: true})
|
||||
if err != nil {
|
||||
c.logger.Printf("[WARN] client: unable to query service %q: %v", nomadServerServiceName, err)
|
||||
return err
|
||||
return fmt.Errorf("client.consul: unable to query service %q: %v", nomadServerServiceName, err)
|
||||
}
|
||||
|
||||
if len(services) == 0 {
|
||||
return fmt.Errorf("client.consul: no Nomad servers advertising service %q", nomadServerServiceName)
|
||||
}
|
||||
|
||||
serverAddrs := make([]string, 0, len(services))
|
||||
for _, s := range services {
|
||||
port := strconv.FormatInt(int64(s.ServicePort), 10)
|
||||
|
@ -1273,8 +1304,24 @@ func (c *Client) setupConsulSyncer() error {
|
|||
serverAddrs = append(serverAddrs, net.JoinHostPort(addr, port))
|
||||
}
|
||||
|
||||
if err := c.rpcProxy.SetBackupServers(serverAddrs); err != nil {
|
||||
return err
|
||||
if atomic.LoadInt32(&c.lastHeartbeatFromQuorum) == 1 && now.Before(c.consulPullHeartbeatDeadline) {
|
||||
// Common, healthy path
|
||||
if err := c.rpcProxy.SetBackupServers(serverAddrs); err != nil {
|
||||
return fmt.Errorf("client.consul: unable to set backup servers: %v", err)
|
||||
}
|
||||
} else {
|
||||
// If this Client is talking with a Server that
|
||||
// doesn't have a leader, and we have exceeded the
|
||||
// consulPullHeartbeatDeadline, change the call from
|
||||
// SetBackupServers() to calling AddPrimaryServer()
|
||||
// in order to allow the Clients to randomly begin
|
||||
// considering all known Nomad servers and
|
||||
// eventually, hopefully, find their way to a Nomad
|
||||
// Server that has quorum (assuming Consul has a
|
||||
// server list that is in the majority).
|
||||
for _, s := range serverAddrs {
|
||||
c.rpcProxy.AddPrimaryServer(s)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -645,12 +645,6 @@ func (p *RpcProxy) RefreshServerLists(servers []*structs.NodeServerInfo, numNode
|
|||
// with newer API versions are filtered from the list. If the list
|
||||
// is missing an address found in the RpcProxy's server list, remove
|
||||
// it from the RpcProxy.
|
||||
//
|
||||
// FIXME(sean@): This is not true. We rely on an outside pump to set
|
||||
// these values. In order to catch the orphaned clients where all
|
||||
// Nomad servers were rolled between the heartbeat interval, the
|
||||
// rebalance task queries Consul and adds the servers found in Consul
|
||||
// to the server list in order to reattach an orphan to a server.
|
||||
|
||||
p.serverListLock.Lock()
|
||||
defer p.serverListLock.Unlock()
|
||||
|
|
Loading…
Reference in New Issue