diff --git a/CHANGELOG.md b/CHANGELOG.md index 577815fff..653e80ea8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,30 +40,31 @@ IMPROVEMENTS: * cli: Clearer task event descriptions in `nomad alloc-status` when there are server side failures authenticating to Vault [[GH-3968](https://github.com/hashicorp/nomad/issues/3968)] * client: Allow '.' in environment variable names [[GH-3760](https://github.com/hashicorp/nomad/issues/3760)] * client: Refactor client fingerprint methods to a request/response format + * client: Improved handling of failed RPCs and heartbeat retry logic [[GH-4106](https://github.com/hashicorp/nomad/issues/4106)] [[GH-3781](https://github.com/hashicorp/nomad/issues/3781)] - * discovery: Allow `check_restart` to be specified in the `service` stanza. + * discovery: Allow `check_restart` to be specified in the `service` stanza [[GH-3718](https://github.com/hashicorp/nomad/issues/3718)] - * discovery: Allow configuring names of Nomad client and server health checks. + * discovery: Allow configuring names of Nomad client and server health checks [[GH-4003](https://github.com/hashicorp/nomad/issues/4003)] * discovery: Only log if Consul does not support TLSSkipVerify instead of - dropping checks which relied on it. Consul has had this feature since 0.7.2. [[GH-3983](https://github.com/hashicorp/nomad/issues/3983)] + dropping checks which relied on it. Consul has had this feature since 0.7.2 [[GH-3983](https://github.com/hashicorp/nomad/issues/3983)] * driver/docker: Support hard CPU limits [[GH-3825](https://github.com/hashicorp/nomad/issues/3825)] * driver/docker: Support advertising IPv6 addresses [[GH-3790](https://github.com/hashicorp/nomad/issues/3790)] * driver/docker; Support overriding image entrypoint [[GH-3788](https://github.com/hashicorp/nomad/issues/3788)] * driver/docker: Support adding or dropping capabilities [[GH-3754](https://github.com/hashicorp/nomad/issues/3754)] * driver/docker: Support mounting root filesystem as read-only [[GH-3802](https://github.com/hashicorp/nomad/issues/3802)] - * driver/docker: Retry on Portworx "volume is attached on another node" errors. + * driver/docker: Retry on Portworx "volume is attached on another node" errors [[GH-3993](https://github.com/hashicorp/nomad/issues/3993)] * driver/lxc: Add volumes config to LXC driver [[GH-3687](https://github.com/hashicorp/nomad/issues/3687)] * driver/rkt: Allow overriding group [[GH-3990](https://github.com/hashicorp/nomad/issues/3990)] * telemetry: Support DataDog tags [[GH-3839](https://github.com/hashicorp/nomad/issues/3839)] - * ui: Specialized job detail pages for each job type (system, service, batch, periodic, parameterized, periodic instance, parameterized instance). [[GH-3829](https://github.com/hashicorp/nomad/issues/3829)] - * ui: Allocation stats requests are made through the server instead of directly through clients. [[GH-3908](https://github.com/hashicorp/nomad/issues/3908)] - * ui: Allocation log requests fallback to using the server when the client can't be reached. [[GH-3908](https://github.com/hashicorp/nomad/issues/3908)] - * ui: All views poll for changes using long-polling via blocking queries. [[GH-3936](https://github.com/hashicorp/nomad/issues/3936)] - * ui: Dispatch payload on the parameterized instance job detail page. [[GH-3829](https://github.com/hashicorp/nomad/issues/3829)] - * ui: Periodic force launch button on the periodic job detail page. [[GH-3829](https://github.com/hashicorp/nomad/issues/3829)] - * ui: Allocation breadcrumbs now extend job breadcrumbs. [[GH-3829](https://github.com/hashicorp/nomad/issues/3974)] + * ui: Specialized job detail pages for each job type (system, service, batch, periodic, parameterized, periodic instance, parameterized instance) [[GH-3829](https://github.com/hashicorp/nomad/issues/3829)] + * ui: Allocation stats requests are made through the server instead of directly through clients [[GH-3908](https://github.com/hashicorp/nomad/issues/3908)] + * ui: Allocation log requests fallback to using the server when the client can't be reached [[GH-3908](https://github.com/hashicorp/nomad/issues/3908)] + * ui: All views poll for changes using long-polling via blocking queries [[GH-3936](https://github.com/hashicorp/nomad/issues/3936)] + * ui: Dispatch payload on the parameterized instance job detail page [[GH-3829](https://github.com/hashicorp/nomad/issues/3829)] + * ui: Periodic force launch button on the periodic job detail page [[GH-3829](https://github.com/hashicorp/nomad/issues/3829)] + * ui: Allocation breadcrumbs now extend job breadcrumbs [[GH-3829](https://github.com/hashicorp/nomad/issues/3974)] * vault: Allow Nomad to create orphaned tokens for allocations [[GH-3992](https://github.com/hashicorp/nomad/issues/3992)] BUG FIXES: diff --git a/client/client.go b/client/client.go index 5ac0b0ca6..10735efb2 100644 --- a/client/client.go +++ b/client/client.go @@ -137,9 +137,11 @@ type Client struct { // server for the node event triggerEmitNodeEvent chan *structs.NodeEvent - // discovered will be ticked whenever Consul discovery completes - // successfully - serversDiscoveredCh chan struct{} + // rpcRetryCh is closed when there an event such as server discovery or a + // successful RPC occurring happens such that a retry should happen. Access + // should only occur via the getter method + rpcRetryCh chan struct{} + rpcRetryLock sync.Mutex // allocs maps alloc IDs to their AllocRunner. This map includes all // AllocRunners - running and GC'd - until the server GCs them. @@ -217,7 +219,6 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic shutdownCh: make(chan struct{}), triggerDiscoveryCh: make(chan struct{}), triggerNodeUpdate: make(chan struct{}, 8), - serversDiscoveredCh: make(chan struct{}), triggerEmitNodeEvent: make(chan *structs.NodeEvent, 8), } @@ -1154,7 +1155,7 @@ func (c *Client) registerAndHeartbeat() { for { select { - case <-c.serversDiscoveredCh: + case <-c.rpcRetryWatcher(): case <-heartbeat: case <-c.shutdownCh: return @@ -1169,11 +1170,11 @@ func (c *Client) registerAndHeartbeat() { c.retryRegisterNode() heartbeat = time.After(lib.RandomStagger(initialHeartbeatStagger)) } else { - intv := c.retryIntv(registerRetryIntv) + intv := c.getHeartbeatRetryIntv(err) c.logger.Printf("[ERR] client: heartbeating failed. Retrying in %v: %v", intv, err) heartbeat = time.After(intv) - // if heartbeating fails, trigger Consul discovery + // If heartbeating fails, trigger Consul discovery c.triggerDiscovery() } } else { @@ -1184,6 +1185,56 @@ func (c *Client) registerAndHeartbeat() { } } +// getHeartbeatRetryIntv is used to retrieve the time to wait before attempting +// another heartbeat. +func (c *Client) getHeartbeatRetryIntv(err error) time.Duration { + if c.config.DevMode { + return devModeRetryIntv + } + + // Collect the useful heartbeat info + c.heartbeatLock.Lock() + haveHeartbeated := c.haveHeartbeated + last := c.lastHeartbeat + ttl := c.heartbeatTTL + c.heartbeatLock.Unlock() + + // If we haven't even successfully heartbeated once or there is no leader + // treat it as a registration. In the case that there is a leadership loss, + // we will have our heartbeat timer reset to a much larger threshold, so + // do not put unnecessary pressure on the new leader. + if !haveHeartbeated || err == structs.ErrNoLeader { + return c.retryIntv(registerRetryIntv) + } + + // Determine how much time we have left to heartbeat + left := last.Add(ttl).Sub(time.Now()) + + // Logic for retrying is: + // * Do not retry faster than once a second + // * Do not retry less that once every 30 seconds + // * If we have missed the heartbeat by more than 30 seconds, start to use + // the absolute time since we do not want to retry indefinitely + switch { + case left < -30*time.Second: + // Make left the absolute value so we delay and jitter properly. + left *= -1 + case left < 0: + return time.Second + lib.RandomStagger(time.Second) + default: + } + + stagger := lib.RandomStagger(left) + switch { + case stagger < time.Second: + return time.Second + lib.RandomStagger(time.Second) + case stagger > 30*time.Second: + return 25*time.Second + lib.RandomStagger(5*time.Second) + default: + return stagger + } +} + // periodicSnapshot is a long lived goroutine used to periodically snapshot the // state of the client func (c *Client) periodicSnapshot() { @@ -1307,7 +1358,7 @@ func (c *Client) retryRegisterNode() { c.logger.Printf("[ERR] client: registration failure: %v", err) } select { - case <-c.serversDiscoveredCh: + case <-c.rpcRetryWatcher(): case <-time.After(c.retryIntv(registerRetryIntv)): case <-c.shutdownCh: return @@ -1567,7 +1618,7 @@ OUTER: } retry := c.retryIntv(getAllocRetryIntv) select { - case <-c.serversDiscoveredCh: + case <-c.rpcRetryWatcher(): continue case <-time.After(retry): continue @@ -1622,7 +1673,7 @@ OUTER: c.logger.Printf("[ERR] client: failed to query updated allocations: %v", err) retry := c.retryIntv(getAllocRetryIntv) select { - case <-c.serversDiscoveredCh: + case <-c.rpcRetryWatcher(): continue case <-time.After(retry): continue @@ -2085,18 +2136,16 @@ DISCOLOOP: } c.logger.Printf("[INFO] client.consul: discovered following Servers: %s", nomadServers) - c.servers.SetServers(nomadServers) - // Notify waiting rpc calls. If a goroutine just failed an RPC call and - // isn't receiving on this chan yet they'll still retry eventually. - // This is a shortcircuit for the longer retry intervals. - for { - select { - case c.serversDiscoveredCh <- struct{}{}: - default: - return nil - } + // Fire the retry trigger if we have updated the set of servers. + if c.servers.SetServers(nomadServers) { + // Notify waiting rpc calls. If a goroutine just failed an RPC call and + // isn't receiving on this chan yet they'll still retry eventually. + // This is a shortcircuit for the longer retry intervals. + c.fireRpcRetryWatcher() } + + return nil } // emitStats collects host resource usage stats periodically diff --git a/client/client_test.go b/client/client_test.go index 5e7967441..240e7585f 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -89,6 +89,34 @@ func TestClient_RPC(t *testing.T) { }) } +func TestClient_RPC_FireRetryWatchers(t *testing.T) { + t.Parallel() + s1, addr := testServer(t, nil) + defer s1.Shutdown() + + c1 := TestClient(t, func(c *config.Config) { + c.Servers = []string{addr} + }) + defer c1.Shutdown() + + watcher := c1.rpcRetryWatcher() + + // RPC should succeed + testutil.WaitForResult(func() (bool, error) { + var out struct{} + err := c1.RPC("Status.Ping", struct{}{}, &out) + return err == nil, err + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + select { + case <-watcher: + default: + t.Fatal("watcher should be fired") + } +} + func TestClient_RPC_Passthrough(t *testing.T) { t.Parallel() s1, _ := testServer(t, nil) diff --git a/client/rpc.go b/client/rpc.go index 90a1eec47..151421c66 100644 --- a/client/rpc.go +++ b/client/rpc.go @@ -67,6 +67,7 @@ TRY: // Make the request. rpcErr := c.connPool.RPC(c.Region(), server.Addr, c.RPCMajorVersion(), method, args, reply) if rpcErr == nil { + c.fireRpcRetryWatcher() return nil } @@ -382,3 +383,27 @@ func (c *Client) Ping(srv net.Addr) error { err := c.connPool.RPC(c.Region(), srv, c.RPCMajorVersion(), "Status.Ping", struct{}{}, &reply) return err } + +// rpcRetryWatcher returns a channel that will be closed if an event happens +// such that we expect the next RPC to be successful. +func (c *Client) rpcRetryWatcher() <-chan struct{} { + c.rpcRetryLock.Lock() + defer c.rpcRetryLock.Unlock() + + if c.rpcRetryCh == nil { + c.rpcRetryCh = make(chan struct{}) + } + + return c.rpcRetryCh +} + +// fireRpcRetryWatcher causes any RPC retryloops to retry their RPCs because we +// believe the will be successful. +func (c *Client) fireRpcRetryWatcher() { + c.rpcRetryLock.Lock() + defer c.rpcRetryLock.Unlock() + if c.rpcRetryCh != nil { + close(c.rpcRetryCh) + c.rpcRetryCh = nil + } +} diff --git a/client/servers/manager.go b/client/servers/manager.go index 6dac0c7e4..31d12b831 100644 --- a/client/servers/manager.go +++ b/client/servers/manager.go @@ -7,6 +7,7 @@ import ( "log" "math/rand" "net" + "sort" "strings" "sync" "time" @@ -74,6 +75,16 @@ func (s *Server) String() string { return s.addr } +func (s *Server) Equal(o *Server) bool { + if s == nil && o == nil { + return true + } else if s == nil && o != nil || s != nil && o == nil { + return false + } + + return s.Addr.String() == o.Addr.String() && s.DC == o.DC +} + type Servers []*Server func (s Servers) String() string { @@ -106,6 +117,32 @@ func (s Servers) shuffle() { } } +func (s Servers) Sort() { + sort.Slice(s, func(i, j int) bool { + a, b := s[i], s[j] + if addr1, addr2 := a.Addr.String(), b.Addr.String(); addr1 == addr2 { + return a.DC < b.DC + } else { + return addr1 < addr2 + } + }) +} + +// Equal returns if the two server lists are equal, including the ordering. +func (s Servers) Equal(o Servers) bool { + if len(s) != len(o) { + return false + } + + for i, v := range s { + if !v.Equal(o[i]) { + return false + } + } + + return true +} + type Manager struct { // servers is the list of all known Nomad servers. servers Servers @@ -157,10 +194,24 @@ func (m *Manager) Start() { } } -func (m *Manager) SetServers(servers Servers) { +// SetServers sets the servers and returns if the new server list is different +// than the existing server set +func (m *Manager) SetServers(servers Servers) bool { m.Lock() defer m.Unlock() + + // Sort both the existing and incoming servers + servers.Sort() + m.servers.Sort() + + // Determine if they are equal + equal := servers.Equal(m.servers) + + // Randomize the incoming servers + servers.shuffle() m.servers = servers + + return !equal } // FindServer returns a server to send an RPC too. If there are no servers, nil @@ -204,7 +255,7 @@ func (m *Manager) NotifyFailedServer(s *Server) { // If the server being failed is not the first server on the list, // this is a noop. If, however, the server is failed and first on // the list, move the server to the end of the list. - if len(m.servers) > 1 && m.servers[0] == s { + if len(m.servers) > 1 && m.servers[0].Equal(s) { m.servers.cycle() } } diff --git a/client/servers/manager_test.go b/client/servers/manager_test.go index deea7f48f..33d76469b 100644 --- a/client/servers/manager_test.go +++ b/client/servers/manager_test.go @@ -10,6 +10,8 @@ import ( "testing" "github.com/hashicorp/nomad/client/servers" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/stretchr/testify/require" ) type fauxAddr struct { @@ -32,22 +34,23 @@ func (cp *fauxConnPool) Ping(net.Addr) error { return fmt.Errorf("bad server") } -func testManager() (m *servers.Manager) { - logger := log.New(os.Stderr, "", log.LstdFlags) +func testManager(t *testing.T) (m *servers.Manager) { + logger := testlog.Logger(t) shutdownCh := make(chan struct{}) m = servers.New(logger, shutdownCh, &fauxConnPool{}) return m } -func testManagerFailProb(failPct float64) (m *servers.Manager) { - logger := log.New(os.Stderr, "", log.LstdFlags) +func testManagerFailProb(t *testing.T, failPct float64) (m *servers.Manager) { + logger := testlog.Logger(t) shutdownCh := make(chan struct{}) m = servers.New(logger, shutdownCh, &fauxConnPool{failPct: failPct}) return m } func TestServers_SetServers(t *testing.T) { - m := testManager() + require := require.New(t) + m := testManager(t) var num int num = m.NumServers() if num != 0 { @@ -56,24 +59,19 @@ func TestServers_SetServers(t *testing.T) { s1 := &servers.Server{Addr: &fauxAddr{"server1"}} s2 := &servers.Server{Addr: &fauxAddr{"server2"}} - m.SetServers([]*servers.Server{s1, s2}) - num = m.NumServers() - if num != 2 { - t.Fatalf("Expected two servers") - } + require.True(m.SetServers([]*servers.Server{s1, s2})) + require.False(m.SetServers([]*servers.Server{s1, s2})) + require.False(m.SetServers([]*servers.Server{s2, s1})) + require.Equal(2, m.NumServers()) + require.Len(m.GetServers(), 2) - all := m.GetServers() - if l := len(all); l != 2 { - t.Fatalf("expected 2 servers got %d", l) - } - - if all[0] == s1 || all[0] == s2 { - t.Fatalf("expected a copy, got actual server") - } + require.True(m.SetServers([]*servers.Server{s1})) + require.Equal(1, m.NumServers()) + require.Len(m.GetServers(), 1) } func TestServers_FindServer(t *testing.T) { - m := testManager() + m := testManager(t) if m.FindServer() != nil { t.Fatalf("Expected nil return") @@ -105,20 +103,14 @@ func TestServers_FindServer(t *testing.T) { t.Fatalf("Expected two servers") } s1 = m.FindServer() - if s1 == nil || s1.String() != "s1" { - t.Fatalf("Expected s1 server (still)") + + for _, srv := range srvs { + m.NotifyFailedServer(srv) } - m.NotifyFailedServer(s1) s2 := m.FindServer() - if s2 == nil || s2.String() != "s2" { - t.Fatalf("Expected s2 server") - } - - m.NotifyFailedServer(s2) - s1 = m.FindServer() - if s1 == nil || s1.String() != "s1" { - t.Fatalf("Expected s1 server") + if s1.Equal(s2) { + t.Fatalf("Expected different server") } } @@ -132,7 +124,7 @@ func TestServers_New(t *testing.T) { } func TestServers_NotifyFailedServer(t *testing.T) { - m := testManager() + m := testManager(t) if m.NumServers() != 0 { t.Fatalf("Expected zero servers to start") @@ -159,32 +151,39 @@ func TestServers_NotifyFailedServer(t *testing.T) { t.Fatalf("Expected two servers") } - s1 = m.FindServer() - if s1 == nil || s1.String() != "s1" { - t.Fatalf("Expected s1 server") + // Grab a server + first := m.FindServer() + + // Find the other server + second := s1 + if first.Equal(s1) { + second = s2 } - m.NotifyFailedServer(s2) - s1 = m.FindServer() - if s1 == nil || s1.String() != "s1" { - t.Fatalf("Expected s1 server (still)") + // Fail the other server + m.NotifyFailedServer(second) + next := m.FindServer() + if !next.Equal(first) { + t.Fatalf("Expected first server (still)") } - m.NotifyFailedServer(s1) - s2 = m.FindServer() - if s2 == nil || s2.String() != "s2" { - t.Fatalf("Expected s2 server") + // Fail the first + m.NotifyFailedServer(first) + next = m.FindServer() + if !next.Equal(second) { + t.Fatalf("Expected second server") } - m.NotifyFailedServer(s2) - s1 = m.FindServer() - if s1 == nil || s1.String() != "s1" { - t.Fatalf("Expected s1 server") + // Fail the second + m.NotifyFailedServer(second) + next = m.FindServer() + if !next.Equal(first) { + t.Fatalf("Expected first server") } } func TestServers_NumServers(t *testing.T) { - m := testManager() + m := testManager(t) var num int num = m.NumServers() if num != 0 { @@ -201,7 +200,7 @@ func TestServers_NumServers(t *testing.T) { func TestServers_RebalanceServers(t *testing.T) { const failPct = 0.5 - m := testManagerFailProb(failPct) + m := testManagerFailProb(t, failPct) const maxServers = 100 const numShuffleTests = 100 const uniquePassRate = 0.5