Properly guard consulPullHeartbeatDeadline behind heartbeatLock

This commit is contained in:
Sean Chittenden 2016-06-10 00:31:55 -04:00
parent 8dc16ad5e3
commit f139d0c68b
No known key found for this signature in database
GPG Key ID: 4EBC9DC16C2E5E16
1 changed files with 16 additions and 13 deletions

View File

@ -101,13 +101,6 @@ type Client struct {
logger *log.Logger
// consulPullHeartbeatDeadline is the deadline at which this Nomad
// Agent will begin polling Consul for a list of Nomad Servers. When
// Nomad Clients are heartbeating successfully with Nomad Servers,
// Nomad Clients do not poll Consul to populate their backup server
// list.
consulPullHeartbeatDeadline time.Time
rpcProxy *rpcproxy.RpcProxy
connPool *nomad.ConnPool
@ -119,9 +112,15 @@ type Client struct {
// election.
lastHeartbeatFromQuorum int32
lastHeartbeat time.Time
heartbeatTTL time.Duration
heartbeatLock sync.Mutex
// consulPullHeartbeatDeadline is the deadline at which this Nomad
// Agent will begin polling Consul for a list of Nomad Servers. When
// Nomad Clients are heartbeating successfully with Nomad Servers,
// Nomad Clients do not poll Consul to populate their backup server
// list.
consulPullHeartbeatDeadline time.Time
lastHeartbeat time.Time
heartbeatTTL time.Duration
heartbeatLock sync.Mutex
// allocs is the current set of allocations
allocs map[string]*AllocRunner
@ -1270,16 +1269,17 @@ func (c *Client) setupConsulSyncer() error {
// to its cluster and automatically recover from a detached state.
bootstrapFn := func() error {
now := time.Now()
c.configLock.RLock()
c.heartbeatLock.Lock()
// If the last heartbeat didn't contain a leader, give the
// Nomad server this Agent is talking to one more attempt at
// providing a heartbeat that does contain a leader.
if atomic.LoadInt32(&c.lastHeartbeatFromQuorum) == 1 && now.Before(c.consulPullHeartbeatDeadline) {
c.configLock.RUnlock()
c.heartbeatLock.Unlock()
// c.logger.Printf("[TRACE] client.consul: heartbeat received, sleeping until %v", c.consulPullHeartbeatDeadline)
return nil
}
c.configLock.RUnlock()
c.heartbeatLock.Unlock()
c.logger.Printf("[TRACE] client.consul: lost heartbeat with Nomad quorum, falling back to Consul for server list")
nomadServerServiceName := c.config.ConsulConfig.ServerServiceName
@ -1304,12 +1304,15 @@ func (c *Client) setupConsulSyncer() error {
serverAddrs = append(serverAddrs, net.JoinHostPort(addr, port))
}
c.heartbeatLock.Lock()
if atomic.LoadInt32(&c.lastHeartbeatFromQuorum) == 1 && now.Before(c.consulPullHeartbeatDeadline) {
c.heartbeatLock.Unlock()
// Common, healthy path
if err := c.rpcProxy.SetBackupServers(serverAddrs); err != nil {
return fmt.Errorf("client.consul: unable to set backup servers: %v", err)
}
} else {
c.heartbeatLock.Unlock()
// If this Client is talking with a Server that
// doesn't have a leader, and we have exceeded the
// consulPullHeartbeatDeadline, change the call from