Merge pull request #4106 from hashicorp/b-servers

Improved Client handling of failed RPCs
This commit is contained in:
Alex Dadgar 2018-04-05 13:48:50 -07:00 committed by GitHub
commit 929b6823a3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 234 additions and 81 deletions

View file

@ -40,30 +40,31 @@ IMPROVEMENTS:
* cli: Clearer task event descriptions in `nomad alloc-status` when there are server side failures authenticating to Vault [[GH-3968](https://github.com/hashicorp/nomad/issues/3968)]
* client: Allow '.' in environment variable names [[GH-3760](https://github.com/hashicorp/nomad/issues/3760)]
* client: Refactor client fingerprint methods to a request/response format
* client: Improved handling of failed RPCs and heartbeat retry logic [[GH-4106](https://github.com/hashicorp/nomad/issues/4106)]
[[GH-3781](https://github.com/hashicorp/nomad/issues/3781)]
* discovery: Allow `check_restart` to be specified in the `service` stanza.
* discovery: Allow `check_restart` to be specified in the `service` stanza
[[GH-3718](https://github.com/hashicorp/nomad/issues/3718)]
* discovery: Allow configuring names of Nomad client and server health checks.
* discovery: Allow configuring names of Nomad client and server health checks
[[GH-4003](https://github.com/hashicorp/nomad/issues/4003)]
* discovery: Only log if Consul does not support TLSSkipVerify instead of
dropping checks which relied on it. Consul has had this feature since 0.7.2. [[GH-3983](https://github.com/hashicorp/nomad/issues/3983)]
dropping checks which relied on it. Consul has had this feature since 0.7.2 [[GH-3983](https://github.com/hashicorp/nomad/issues/3983)]
* driver/docker: Support hard CPU limits [[GH-3825](https://github.com/hashicorp/nomad/issues/3825)]
* driver/docker: Support advertising IPv6 addresses [[GH-3790](https://github.com/hashicorp/nomad/issues/3790)]
* driver/docker; Support overriding image entrypoint [[GH-3788](https://github.com/hashicorp/nomad/issues/3788)]
* driver/docker: Support adding or dropping capabilities [[GH-3754](https://github.com/hashicorp/nomad/issues/3754)]
* driver/docker: Support mounting root filesystem as read-only [[GH-3802](https://github.com/hashicorp/nomad/issues/3802)]
* driver/docker: Retry on Portworx "volume is attached on another node" errors.
* driver/docker: Retry on Portworx "volume is attached on another node" errors
[[GH-3993](https://github.com/hashicorp/nomad/issues/3993)]
* driver/lxc: Add volumes config to LXC driver [[GH-3687](https://github.com/hashicorp/nomad/issues/3687)]
* driver/rkt: Allow overriding group [[GH-3990](https://github.com/hashicorp/nomad/issues/3990)]
* telemetry: Support DataDog tags [[GH-3839](https://github.com/hashicorp/nomad/issues/3839)]
* ui: Specialized job detail pages for each job type (system, service, batch, periodic, parameterized, periodic instance, parameterized instance). [[GH-3829](https://github.com/hashicorp/nomad/issues/3829)]
* ui: Allocation stats requests are made through the server instead of directly through clients. [[GH-3908](https://github.com/hashicorp/nomad/issues/3908)]
* ui: Allocation log requests fallback to using the server when the client can't be reached. [[GH-3908](https://github.com/hashicorp/nomad/issues/3908)]
* ui: All views poll for changes using long-polling via blocking queries. [[GH-3936](https://github.com/hashicorp/nomad/issues/3936)]
* ui: Dispatch payload on the parameterized instance job detail page. [[GH-3829](https://github.com/hashicorp/nomad/issues/3829)]
* ui: Periodic force launch button on the periodic job detail page. [[GH-3829](https://github.com/hashicorp/nomad/issues/3829)]
* ui: Allocation breadcrumbs now extend job breadcrumbs. [[GH-3829](https://github.com/hashicorp/nomad/issues/3974)]
* ui: Specialized job detail pages for each job type (system, service, batch, periodic, parameterized, periodic instance, parameterized instance) [[GH-3829](https://github.com/hashicorp/nomad/issues/3829)]
* ui: Allocation stats requests are made through the server instead of directly through clients [[GH-3908](https://github.com/hashicorp/nomad/issues/3908)]
* ui: Allocation log requests fallback to using the server when the client can't be reached [[GH-3908](https://github.com/hashicorp/nomad/issues/3908)]
* ui: All views poll for changes using long-polling via blocking queries [[GH-3936](https://github.com/hashicorp/nomad/issues/3936)]
* ui: Dispatch payload on the parameterized instance job detail page [[GH-3829](https://github.com/hashicorp/nomad/issues/3829)]
* ui: Periodic force launch button on the periodic job detail page [[GH-3829](https://github.com/hashicorp/nomad/issues/3829)]
* ui: Allocation breadcrumbs now extend job breadcrumbs [[GH-3829](https://github.com/hashicorp/nomad/issues/3974)]
* vault: Allow Nomad to create orphaned tokens for allocations [[GH-3992](https://github.com/hashicorp/nomad/issues/3992)]
BUG FIXES:

View file

@ -137,9 +137,11 @@ type Client struct {
// server for the node event
triggerEmitNodeEvent chan *structs.NodeEvent
// discovered will be ticked whenever Consul discovery completes
// successfully
serversDiscoveredCh chan struct{}
// rpcRetryCh is closed when there an event such as server discovery or a
// successful RPC occurring happens such that a retry should happen. Access
// should only occur via the getter method
rpcRetryCh chan struct{}
rpcRetryLock sync.Mutex
// allocs maps alloc IDs to their AllocRunner. This map includes all
// AllocRunners - running and GC'd - until the server GCs them.
@ -217,7 +219,6 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
shutdownCh: make(chan struct{}),
triggerDiscoveryCh: make(chan struct{}),
triggerNodeUpdate: make(chan struct{}, 8),
serversDiscoveredCh: make(chan struct{}),
triggerEmitNodeEvent: make(chan *structs.NodeEvent, 8),
}
@ -1154,7 +1155,7 @@ func (c *Client) registerAndHeartbeat() {
for {
select {
case <-c.serversDiscoveredCh:
case <-c.rpcRetryWatcher():
case <-heartbeat:
case <-c.shutdownCh:
return
@ -1169,11 +1170,11 @@ func (c *Client) registerAndHeartbeat() {
c.retryRegisterNode()
heartbeat = time.After(lib.RandomStagger(initialHeartbeatStagger))
} else {
intv := c.retryIntv(registerRetryIntv)
intv := c.getHeartbeatRetryIntv(err)
c.logger.Printf("[ERR] client: heartbeating failed. Retrying in %v: %v", intv, err)
heartbeat = time.After(intv)
// if heartbeating fails, trigger Consul discovery
// If heartbeating fails, trigger Consul discovery
c.triggerDiscovery()
}
} else {
@ -1184,6 +1185,56 @@ func (c *Client) registerAndHeartbeat() {
}
}
// getHeartbeatRetryIntv is used to retrieve the time to wait before attempting
// another heartbeat.
func (c *Client) getHeartbeatRetryIntv(err error) time.Duration {
if c.config.DevMode {
return devModeRetryIntv
}
// Collect the useful heartbeat info
c.heartbeatLock.Lock()
haveHeartbeated := c.haveHeartbeated
last := c.lastHeartbeat
ttl := c.heartbeatTTL
c.heartbeatLock.Unlock()
// If we haven't even successfully heartbeated once or there is no leader
// treat it as a registration. In the case that there is a leadership loss,
// we will have our heartbeat timer reset to a much larger threshold, so
// do not put unnecessary pressure on the new leader.
if !haveHeartbeated || err == structs.ErrNoLeader {
return c.retryIntv(registerRetryIntv)
}
// Determine how much time we have left to heartbeat
left := last.Add(ttl).Sub(time.Now())
// Logic for retrying is:
// * Do not retry faster than once a second
// * Do not retry less that once every 30 seconds
// * If we have missed the heartbeat by more than 30 seconds, start to use
// the absolute time since we do not want to retry indefinitely
switch {
case left < -30*time.Second:
// Make left the absolute value so we delay and jitter properly.
left *= -1
case left < 0:
return time.Second + lib.RandomStagger(time.Second)
default:
}
stagger := lib.RandomStagger(left)
switch {
case stagger < time.Second:
return time.Second + lib.RandomStagger(time.Second)
case stagger > 30*time.Second:
return 25*time.Second + lib.RandomStagger(5*time.Second)
default:
return stagger
}
}
// periodicSnapshot is a long lived goroutine used to periodically snapshot the
// state of the client
func (c *Client) periodicSnapshot() {
@ -1307,7 +1358,7 @@ func (c *Client) retryRegisterNode() {
c.logger.Printf("[ERR] client: registration failure: %v", err)
}
select {
case <-c.serversDiscoveredCh:
case <-c.rpcRetryWatcher():
case <-time.After(c.retryIntv(registerRetryIntv)):
case <-c.shutdownCh:
return
@ -1567,7 +1618,7 @@ OUTER:
}
retry := c.retryIntv(getAllocRetryIntv)
select {
case <-c.serversDiscoveredCh:
case <-c.rpcRetryWatcher():
continue
case <-time.After(retry):
continue
@ -1622,7 +1673,7 @@ OUTER:
c.logger.Printf("[ERR] client: failed to query updated allocations: %v", err)
retry := c.retryIntv(getAllocRetryIntv)
select {
case <-c.serversDiscoveredCh:
case <-c.rpcRetryWatcher():
continue
case <-time.After(retry):
continue
@ -2085,18 +2136,16 @@ DISCOLOOP:
}
c.logger.Printf("[INFO] client.consul: discovered following Servers: %s", nomadServers)
c.servers.SetServers(nomadServers)
// Notify waiting rpc calls. If a goroutine just failed an RPC call and
// isn't receiving on this chan yet they'll still retry eventually.
// This is a shortcircuit for the longer retry intervals.
for {
select {
case c.serversDiscoveredCh <- struct{}{}:
default:
return nil
}
// Fire the retry trigger if we have updated the set of servers.
if c.servers.SetServers(nomadServers) {
// Notify waiting rpc calls. If a goroutine just failed an RPC call and
// isn't receiving on this chan yet they'll still retry eventually.
// This is a shortcircuit for the longer retry intervals.
c.fireRpcRetryWatcher()
}
return nil
}
// emitStats collects host resource usage stats periodically

View file

@ -89,6 +89,34 @@ func TestClient_RPC(t *testing.T) {
})
}
func TestClient_RPC_FireRetryWatchers(t *testing.T) {
t.Parallel()
s1, addr := testServer(t, nil)
defer s1.Shutdown()
c1 := TestClient(t, func(c *config.Config) {
c.Servers = []string{addr}
})
defer c1.Shutdown()
watcher := c1.rpcRetryWatcher()
// RPC should succeed
testutil.WaitForResult(func() (bool, error) {
var out struct{}
err := c1.RPC("Status.Ping", struct{}{}, &out)
return err == nil, err
}, func(err error) {
t.Fatalf("err: %v", err)
})
select {
case <-watcher:
default:
t.Fatal("watcher should be fired")
}
}
func TestClient_RPC_Passthrough(t *testing.T) {
t.Parallel()
s1, _ := testServer(t, nil)

View file

@ -67,6 +67,7 @@ TRY:
// Make the request.
rpcErr := c.connPool.RPC(c.Region(), server.Addr, c.RPCMajorVersion(), method, args, reply)
if rpcErr == nil {
c.fireRpcRetryWatcher()
return nil
}
@ -382,3 +383,27 @@ func (c *Client) Ping(srv net.Addr) error {
err := c.connPool.RPC(c.Region(), srv, c.RPCMajorVersion(), "Status.Ping", struct{}{}, &reply)
return err
}
// rpcRetryWatcher returns a channel that will be closed if an event happens
// such that we expect the next RPC to be successful.
func (c *Client) rpcRetryWatcher() <-chan struct{} {
c.rpcRetryLock.Lock()
defer c.rpcRetryLock.Unlock()
if c.rpcRetryCh == nil {
c.rpcRetryCh = make(chan struct{})
}
return c.rpcRetryCh
}
// fireRpcRetryWatcher causes any RPC retryloops to retry their RPCs because we
// believe the will be successful.
func (c *Client) fireRpcRetryWatcher() {
c.rpcRetryLock.Lock()
defer c.rpcRetryLock.Unlock()
if c.rpcRetryCh != nil {
close(c.rpcRetryCh)
c.rpcRetryCh = nil
}
}

View file

@ -7,6 +7,7 @@ import (
"log"
"math/rand"
"net"
"sort"
"strings"
"sync"
"time"
@ -74,6 +75,16 @@ func (s *Server) String() string {
return s.addr
}
func (s *Server) Equal(o *Server) bool {
if s == nil && o == nil {
return true
} else if s == nil && o != nil || s != nil && o == nil {
return false
}
return s.Addr.String() == o.Addr.String() && s.DC == o.DC
}
type Servers []*Server
func (s Servers) String() string {
@ -106,6 +117,32 @@ func (s Servers) shuffle() {
}
}
func (s Servers) Sort() {
sort.Slice(s, func(i, j int) bool {
a, b := s[i], s[j]
if addr1, addr2 := a.Addr.String(), b.Addr.String(); addr1 == addr2 {
return a.DC < b.DC
} else {
return addr1 < addr2
}
})
}
// Equal returns if the two server lists are equal, including the ordering.
func (s Servers) Equal(o Servers) bool {
if len(s) != len(o) {
return false
}
for i, v := range s {
if !v.Equal(o[i]) {
return false
}
}
return true
}
type Manager struct {
// servers is the list of all known Nomad servers.
servers Servers
@ -157,10 +194,24 @@ func (m *Manager) Start() {
}
}
func (m *Manager) SetServers(servers Servers) {
// SetServers sets the servers and returns if the new server list is different
// than the existing server set
func (m *Manager) SetServers(servers Servers) bool {
m.Lock()
defer m.Unlock()
// Sort both the existing and incoming servers
servers.Sort()
m.servers.Sort()
// Determine if they are equal
equal := servers.Equal(m.servers)
// Randomize the incoming servers
servers.shuffle()
m.servers = servers
return !equal
}
// FindServer returns a server to send an RPC too. If there are no servers, nil
@ -204,7 +255,7 @@ func (m *Manager) NotifyFailedServer(s *Server) {
// If the server being failed is not the first server on the list,
// this is a noop. If, however, the server is failed and first on
// the list, move the server to the end of the list.
if len(m.servers) > 1 && m.servers[0] == s {
if len(m.servers) > 1 && m.servers[0].Equal(s) {
m.servers.cycle()
}
}

View file

@ -10,6 +10,8 @@ import (
"testing"
"github.com/hashicorp/nomad/client/servers"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/stretchr/testify/require"
)
type fauxAddr struct {
@ -32,22 +34,23 @@ func (cp *fauxConnPool) Ping(net.Addr) error {
return fmt.Errorf("bad server")
}
func testManager() (m *servers.Manager) {
logger := log.New(os.Stderr, "", log.LstdFlags)
func testManager(t *testing.T) (m *servers.Manager) {
logger := testlog.Logger(t)
shutdownCh := make(chan struct{})
m = servers.New(logger, shutdownCh, &fauxConnPool{})
return m
}
func testManagerFailProb(failPct float64) (m *servers.Manager) {
logger := log.New(os.Stderr, "", log.LstdFlags)
func testManagerFailProb(t *testing.T, failPct float64) (m *servers.Manager) {
logger := testlog.Logger(t)
shutdownCh := make(chan struct{})
m = servers.New(logger, shutdownCh, &fauxConnPool{failPct: failPct})
return m
}
func TestServers_SetServers(t *testing.T) {
m := testManager()
require := require.New(t)
m := testManager(t)
var num int
num = m.NumServers()
if num != 0 {
@ -56,24 +59,19 @@ func TestServers_SetServers(t *testing.T) {
s1 := &servers.Server{Addr: &fauxAddr{"server1"}}
s2 := &servers.Server{Addr: &fauxAddr{"server2"}}
m.SetServers([]*servers.Server{s1, s2})
num = m.NumServers()
if num != 2 {
t.Fatalf("Expected two servers")
}
require.True(m.SetServers([]*servers.Server{s1, s2}))
require.False(m.SetServers([]*servers.Server{s1, s2}))
require.False(m.SetServers([]*servers.Server{s2, s1}))
require.Equal(2, m.NumServers())
require.Len(m.GetServers(), 2)
all := m.GetServers()
if l := len(all); l != 2 {
t.Fatalf("expected 2 servers got %d", l)
}
if all[0] == s1 || all[0] == s2 {
t.Fatalf("expected a copy, got actual server")
}
require.True(m.SetServers([]*servers.Server{s1}))
require.Equal(1, m.NumServers())
require.Len(m.GetServers(), 1)
}
func TestServers_FindServer(t *testing.T) {
m := testManager()
m := testManager(t)
if m.FindServer() != nil {
t.Fatalf("Expected nil return")
@ -105,20 +103,14 @@ func TestServers_FindServer(t *testing.T) {
t.Fatalf("Expected two servers")
}
s1 = m.FindServer()
if s1 == nil || s1.String() != "s1" {
t.Fatalf("Expected s1 server (still)")
for _, srv := range srvs {
m.NotifyFailedServer(srv)
}
m.NotifyFailedServer(s1)
s2 := m.FindServer()
if s2 == nil || s2.String() != "s2" {
t.Fatalf("Expected s2 server")
}
m.NotifyFailedServer(s2)
s1 = m.FindServer()
if s1 == nil || s1.String() != "s1" {
t.Fatalf("Expected s1 server")
if s1.Equal(s2) {
t.Fatalf("Expected different server")
}
}
@ -132,7 +124,7 @@ func TestServers_New(t *testing.T) {
}
func TestServers_NotifyFailedServer(t *testing.T) {
m := testManager()
m := testManager(t)
if m.NumServers() != 0 {
t.Fatalf("Expected zero servers to start")
@ -159,32 +151,39 @@ func TestServers_NotifyFailedServer(t *testing.T) {
t.Fatalf("Expected two servers")
}
s1 = m.FindServer()
if s1 == nil || s1.String() != "s1" {
t.Fatalf("Expected s1 server")
// Grab a server
first := m.FindServer()
// Find the other server
second := s1
if first.Equal(s1) {
second = s2
}
m.NotifyFailedServer(s2)
s1 = m.FindServer()
if s1 == nil || s1.String() != "s1" {
t.Fatalf("Expected s1 server (still)")
// Fail the other server
m.NotifyFailedServer(second)
next := m.FindServer()
if !next.Equal(first) {
t.Fatalf("Expected first server (still)")
}
m.NotifyFailedServer(s1)
s2 = m.FindServer()
if s2 == nil || s2.String() != "s2" {
t.Fatalf("Expected s2 server")
// Fail the first
m.NotifyFailedServer(first)
next = m.FindServer()
if !next.Equal(second) {
t.Fatalf("Expected second server")
}
m.NotifyFailedServer(s2)
s1 = m.FindServer()
if s1 == nil || s1.String() != "s1" {
t.Fatalf("Expected s1 server")
// Fail the second
m.NotifyFailedServer(second)
next = m.FindServer()
if !next.Equal(first) {
t.Fatalf("Expected first server")
}
}
func TestServers_NumServers(t *testing.T) {
m := testManager()
m := testManager(t)
var num int
num = m.NumServers()
if num != 0 {
@ -201,7 +200,7 @@ func TestServers_NumServers(t *testing.T) {
func TestServers_RebalanceServers(t *testing.T) {
const failPct = 0.5
m := testManagerFailProb(failPct)
m := testManagerFailProb(t, failPct)
const maxServers = 100
const numShuffleTests = 100
const uniquePassRate = 0.5