2016-06-02 05:51:04 +00:00
// Package rpcproxy provides a proxy interface to Nomad Servers. The
2016-06-10 05:30:05 +00:00
// RPCProxy periodically shuffles which server a Nomad Client communicates
2016-05-23 18:09:31 +00:00
// with in order to redistribute load across Nomad Servers. Nomad Servers
// that fail an RPC request are automatically cycled to the end of the list
// until the server list is reshuffled.
//
2016-06-02 05:51:04 +00:00
// The rpcproxy package does not provide any external API guarantees and
2016-05-23 18:09:31 +00:00
// should be called only by `hashicorp/nomad`.
2016-05-26 23:15:40 +00:00
package rpcproxy
2016-05-23 18:09:31 +00:00
import (
"fmt"
"log"
"math/rand"
2016-06-01 07:34:46 +00:00
"strings"
2016-05-23 18:09:31 +00:00
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// clientRPCJitterFraction determines the amount of jitter added to
// clientRPCMinReuseDuration before a connection is expired and a new
// connection is established in order to rebalance load across Nomad
// servers. The cluster-wide number of connections per second from
// rebalancing is applied after this jitter to ensure the CPU impact
// is always finite. See newRebalanceConnsPerSecPerServer's comment
// for additional commentary.
//
// For example, in a 10K Nomad cluster with 5x servers, this default
// averages out to ~13 new connections from rebalancing per server
2016-06-02 05:51:04 +00:00
// per second.
2016-05-23 18:09:31 +00:00
clientRPCJitterFraction = 2
// clientRPCMinReuseDuration controls the minimum amount of time RPC
// queries are sent over an established connection to a single server
2016-05-27 00:04:42 +00:00
clientRPCMinReuseDuration = 600 * time . Second
2016-05-23 18:09:31 +00:00
// Limit the number of new connections a server receives per second
// for connection rebalancing. This limit caps the load caused by
// continual rebalancing efforts when a cluster is in equilibrium. A
// lower value comes at the cost of increased recovery time after a
// partition. This parameter begins to take effect when there are
// more than ~48K clients querying 5x servers or at lower server
// counts when there is a partition.
//
// For example, in a 100K Nomad cluster with 5x servers, it will take
// ~5min for all servers to rebalance their connections. If 99,995
// agents are in the minority talking to only one server, it will
// take ~26min for all servers to rebalance. A 10K cluster in the
// same scenario will take ~2.6min to rebalance.
newRebalanceConnsPerSecPerServer = 64
// rpcAPIMismatchLogRate determines the rate at which log entries are
// emitted when the client and server's API versions are mismatched.
rpcAPIMismatchLogRate = 3 * time . Hour
)
// NomadConfigInfo is an interface wrapper around this Nomad Agent's
// configuration to prevents a cyclic import dependency.
type NomadConfigInfo interface {
2016-05-27 10:45:09 +00:00
Datacenter ( ) string
2016-06-11 03:26:15 +00:00
RPCMajorVersion ( ) int
RPCMinorVersion ( ) int
2016-05-23 18:09:31 +00:00
Region ( ) string
}
// Pinger is an interface wrapping client.ConnPool to prevent a
// cyclic import dependency
type Pinger interface {
2016-05-28 01:14:34 +00:00
PingNomadServer ( region string , apiMajorVersion int , s * ServerEndpoint ) ( bool , error )
2016-05-23 18:09:31 +00:00
}
// serverList is an array of Nomad Servers. The first server in the list is
// the active server.
//
// NOTE(sean@): We are explicitly relying on the fact that serverList will be
// copied onto the stack by atomic.Value. Please keep this structure light.
type serverList struct {
L [ ] * ServerEndpoint
}
2016-06-10 05:30:05 +00:00
// RPCProxy is the manager type responsible for returning and managing Nomad
// addresses.
type RPCProxy struct {
2016-05-23 18:09:31 +00:00
// activatedList manages the list of Nomad Servers that are eligible
2016-06-02 05:51:04 +00:00
// to be queried by the Client agent.
2016-06-10 05:57:02 +00:00
activatedList atomic . Value
activatedListLock sync . Mutex
2016-05-23 18:09:31 +00:00
// primaryServers is a list of servers found in the last heartbeat.
// primaryServers are periodically reshuffled. Covered by
// serverListLock.
primaryServers serverList
// backupServers is a list of fallback servers. These servers are
2016-06-10 05:30:05 +00:00
// appended to the RPCProxy's serverList, but are never shuffled with
2016-05-23 18:09:31 +00:00
// the list of servers discovered via the Nomad heartbeat. Covered
// by serverListLock.
backupServers serverList
2016-06-01 07:34:46 +00:00
// serverListLock covers both backupServers and primaryServers. If
// it is necessary to hold serverListLock and listLock, obtain an
// exclusive lock on serverListLock before listLock.
2016-05-23 18:09:31 +00:00
serverListLock sync . RWMutex
leaderAddr string
numNodes int
// rebalanceTimer controls the duration of the rebalance interval
rebalanceTimer * time . Timer
// shutdownCh is a copy of the channel in nomad.Client
shutdownCh chan struct { }
logger * log . Logger
configInfo NomadConfigInfo
// rpcAPIMismatchThrottle regulates the rate at which warning
// messages are emitted in the event of an API mismatch between the
// clients and servers.
rpcAPIMismatchThrottle map [ string ] time . Time
// connPoolPinger is used to test the health of a server in the
// connection pool. Pinger is an interface that wraps
// client.ConnPool.
connPoolPinger Pinger
}
2016-06-10 05:30:05 +00:00
// NewRPCProxy is the only way to safely create a new RPCProxy.
func NewRPCProxy ( logger * log . Logger , shutdownCh chan struct { } , configInfo NomadConfigInfo , connPoolPinger Pinger ) * RPCProxy {
p := & RPCProxy {
logger : logger ,
configInfo : configInfo , // can't pass *nomad.Client: import cycle
connPoolPinger : connPoolPinger , // can't pass *nomad.ConnPool: import cycle
rebalanceTimer : time . NewTimer ( clientRPCMinReuseDuration ) ,
shutdownCh : shutdownCh ,
}
2016-06-10 04:22:04 +00:00
l := serverList { }
l . L = make ( [ ] * ServerEndpoint , 0 )
p . saveServerList ( l )
return p
}
2016-06-10 05:30:05 +00:00
// activateEndpoint adds an endpoint to the RPCProxy's active serverList.
2016-05-23 18:09:31 +00:00
// Returns true if the server was added, returns false if the server already
2016-06-10 05:30:05 +00:00
// existed in the RPCProxy's serverList.
func ( p * RPCProxy ) activateEndpoint ( s * ServerEndpoint ) bool {
2016-05-23 18:09:31 +00:00
l := p . getServerList ( )
// Check if this server is known
found := false
for idx , existing := range l . L {
if existing . Name == s . Name {
newServers := make ( [ ] * ServerEndpoint , len ( l . L ) )
copy ( newServers , l . L )
// Overwrite the existing server details in order to
// possibly update metadata (e.g. server version)
newServers [ idx ] = s
l . L = newServers
found = true
break
}
}
// Add to the list if not known
if ! found {
newServers := make ( [ ] * ServerEndpoint , len ( l . L ) , len ( l . L ) + 1 )
copy ( newServers , l . L )
newServers = append ( newServers , s )
l . L = newServers
}
p . saveServerList ( l )
return ! found
}
// SetBackupServers sets a list of Nomad Servers to be used in the event that
// the Nomad Agent lost contact with the list of Nomad Servers provided via
// the Nomad Agent's heartbeat. If available, the backup servers are
// populated via Consul.
2016-06-10 05:30:05 +00:00
func ( p * RPCProxy ) SetBackupServers ( addrs [ ] string ) error {
2016-05-23 18:09:31 +00:00
l := make ( [ ] * ServerEndpoint , 0 , len ( addrs ) )
for _ , s := range addrs {
2016-06-11 02:14:03 +00:00
s , err := NewServerEndpoint ( s )
2016-05-23 18:09:31 +00:00
if err != nil {
2016-06-11 22:17:20 +00:00
p . logger . Printf ( "[WARN] client.rpcproxy: unable to create backup server %+q: %v" , s , err )
return fmt . Errorf ( "unable to create new backup server from %+q: %v" , s , err )
2016-05-23 18:09:31 +00:00
}
2016-05-26 16:17:32 +00:00
l = append ( l , s )
2016-05-23 18:09:31 +00:00
}
p . serverListLock . Lock ( )
p . backupServers . L = l
p . serverListLock . Unlock ( )
2016-06-10 05:57:02 +00:00
p . activatedListLock . Lock ( )
defer p . activatedListLock . Unlock ( )
2016-05-23 18:09:31 +00:00
for _ , s := range l {
p . activateEndpoint ( s )
}
return nil
}
// AddPrimaryServer takes the RPC address of a Nomad server, creates a new
// endpoint, and adds it to both the primaryServers list and the active
// serverList used in the RPC Proxy. If the endpoint is not known by the
2016-06-10 05:30:05 +00:00
// RPCProxy, appends the endpoint to the list. The new endpoint will begin
2016-05-23 18:09:31 +00:00
// seeing use after the rebalance timer fires (or enough servers fail
// organically). Any values in the primary server list are overridden by the
// next successful heartbeat.
2016-06-10 05:30:05 +00:00
func ( p * RPCProxy ) AddPrimaryServer ( rpcAddr string ) * ServerEndpoint {
2016-06-11 02:14:03 +00:00
s , err := NewServerEndpoint ( rpcAddr )
2016-05-23 18:09:31 +00:00
if err != nil {
2016-06-11 22:17:20 +00:00
p . logger . Printf ( "[WARN] client.rpcproxy: unable to create new primary server from endpoint %+q: %v" , rpcAddr , err )
2016-05-23 18:09:31 +00:00
return nil
}
2016-06-10 19:45:54 +00:00
k := s . Key ( )
2016-05-23 18:09:31 +00:00
p . serverListLock . Lock ( )
2016-06-10 19:45:54 +00:00
if serverExists := p . primaryServers . serverExistByKey ( k ) ; serverExists {
p . serverListLock . Unlock ( )
2016-06-10 20:46:49 +00:00
return s
2016-06-10 19:45:54 +00:00
}
2016-05-23 18:09:31 +00:00
p . primaryServers . L = append ( p . primaryServers . L , s )
p . serverListLock . Unlock ( )
2016-06-10 05:57:02 +00:00
p . activatedListLock . Lock ( )
2016-05-23 18:09:31 +00:00
p . activateEndpoint ( s )
2016-06-10 05:57:02 +00:00
p . activatedListLock . Unlock ( )
2016-05-23 18:09:31 +00:00
return s
}
// cycleServers returns a new list of servers that has dequeued the first
// server and enqueued it at the end of the list. cycleServers assumes the
// caller is holding the listLock. cycleServer does not test or ping
// the next server inline. cycleServer may be called when the environment
// has just entered an unhealthy situation and blocking on a server test is
// less desirable than just returning the next server in the firing line. If
// the next server fails, it will fail fast enough and cycleServer will be
// called again.
func ( l * serverList ) cycleServer ( ) ( servers [ ] * ServerEndpoint ) {
numServers := len ( l . L )
if numServers < 2 {
return servers // No action required
}
newServers := make ( [ ] * ServerEndpoint , 0 , numServers )
newServers = append ( newServers , l . L [ 1 : ] ... )
newServers = append ( newServers , l . L [ 0 ] )
return newServers
}
2016-06-10 19:45:54 +00:00
// serverExistByKey performs a search to see if a server exists in the
// serverList. Assumes the caller is holding at least a read lock.
func ( l * serverList ) serverExistByKey ( targetKey * EndpointKey ) bool {
var found bool
for _ , server := range l . L {
if targetKey . Equal ( server . Key ( ) ) {
found = true
}
}
return found
}
2016-05-23 18:09:31 +00:00
// removeServerByKey performs an inline removal of the first matching server
func ( l * serverList ) removeServerByKey ( targetKey * EndpointKey ) {
for i , s := range l . L {
if targetKey . Equal ( s . Key ( ) ) {
copy ( l . L [ i : ] , l . L [ i + 1 : ] )
l . L [ len ( l . L ) - 1 ] = nil
l . L = l . L [ : len ( l . L ) - 1 ]
return
}
}
}
// shuffleServers shuffles the server list in place
func ( l * serverList ) shuffleServers ( ) {
for i := len ( l . L ) - 1 ; i > 0 ; i -- {
j := rand . Int31n ( int32 ( i + 1 ) )
l . L [ i ] , l . L [ j ] = l . L [ j ] , l . L [ i ]
}
}
2016-06-01 07:41:08 +00:00
// String returns a string representation of serverList
func ( l * serverList ) String ( ) string {
if len ( l . L ) == 0 {
2016-06-10 05:25:47 +00:00
return fmt . Sprintf ( "empty server list" )
2016-06-01 07:41:08 +00:00
}
serverStrs := make ( [ ] string , 0 , len ( l . L ) )
for _ , server := range l . L {
serverStrs = append ( serverStrs , server . String ( ) )
}
return fmt . Sprintf ( "[%s]" , strings . Join ( serverStrs , ", " ) )
}
2016-05-23 18:09:31 +00:00
// FindServer takes out an internal "read lock" and searches through the list
// of servers to find a "healthy" server. If the server is actually
// unhealthy, we rely on heartbeats to detect this and remove the node from
// the server list. If the server at the front of the list has failed or
// fails during an RPC call, it is rotated to the end of the list. If there
// are no servers available, return nil.
2016-06-10 05:30:05 +00:00
func ( p * RPCProxy ) FindServer ( ) * ServerEndpoint {
2016-05-23 18:09:31 +00:00
l := p . getServerList ( )
numServers := len ( l . L )
if numServers == 0 {
2016-06-10 05:23:54 +00:00
p . logger . Printf ( "[WARN] client.rpcproxy: No servers available" )
2016-05-23 18:09:31 +00:00
return nil
}
2016-06-10 05:30:05 +00:00
// Return whatever is at the front of the list because it is
// assumed to be the oldest in the server list (unless -
// hypothetically - the server list was rotated right after a
// server was added).
return l . L [ 0 ]
2016-05-23 18:09:31 +00:00
}
// getServerList is a convenience method which hides the locking semantics
// of atomic.Value from the caller.
2016-06-10 05:30:05 +00:00
func ( p * RPCProxy ) getServerList ( ) serverList {
2016-05-23 18:09:31 +00:00
return p . activatedList . Load ( ) . ( serverList )
}
// saveServerList is a convenience method which hides the locking semantics
// of atomic.Value from the caller.
2016-06-10 05:30:05 +00:00
func ( p * RPCProxy ) saveServerList ( l serverList ) {
2016-05-23 18:09:31 +00:00
p . activatedList . Store ( l )
}
2016-06-10 05:30:05 +00:00
// LeaderAddr returns the current leader address. If an empty string, then
// the Nomad Server for this Nomad Agent is in the minority or the Nomad
// Servers are in the middle of an election.
func ( p * RPCProxy ) LeaderAddr ( ) string {
2016-06-10 05:57:02 +00:00
p . activatedListLock . Lock ( )
defer p . activatedListLock . Unlock ( )
2016-05-23 18:09:31 +00:00
return p . leaderAddr
}
// NotifyFailedServer marks the passed in server as "failed" by rotating it
// to the end of the server list.
2016-06-10 05:30:05 +00:00
func ( p * RPCProxy ) NotifyFailedServer ( s * ServerEndpoint ) {
2016-05-23 18:09:31 +00:00
l := p . getServerList ( )
// If the server being failed is not the first server on the list,
// this is a noop. If, however, the server is failed and first on
// the list, acquire the lock, retest, and take the penalty of moving
// the server to the end of the list.
// Only rotate the server list when there is more than one server
2016-06-10 05:54:56 +00:00
if len ( l . L ) > 1 && l . L [ 0 ] == s {
2016-05-23 18:09:31 +00:00
// Grab a lock, retest, and take the hit of cycling the first
// server to the end.
2016-06-10 05:57:02 +00:00
p . activatedListLock . Lock ( )
defer p . activatedListLock . Unlock ( )
2016-05-23 18:09:31 +00:00
l = p . getServerList ( )
if len ( l . L ) > 1 && l . L [ 0 ] == s {
l . L = l . cycleServer ( )
p . saveServerList ( l )
}
}
}
2016-06-10 05:30:05 +00:00
// NumNodes returns the estimated number of nodes according to the last Nomad
// Heartbeat.
func ( p * RPCProxy ) NumNodes ( ) int {
2016-05-23 18:09:31 +00:00
return p . numNodes
}
// NumServers takes out an internal "read lock" and returns the number of
// servers. numServers includes both healthy and unhealthy servers.
2016-06-10 05:30:05 +00:00
func ( p * RPCProxy ) NumServers ( ) int {
2016-05-23 18:09:31 +00:00
l := p . getServerList ( )
return len ( l . L )
}
// RebalanceServers shuffles the list of servers on this agent. The server
// at the front of the list is selected for the next RPC. RPC calls that
// fail for a particular server are rotated to the end of the list. This
// method reshuffles the list periodically in order to redistribute work
// across all known Nomad servers (i.e. guarantee that the order of servers
// in the server list is not positively correlated with the age of a server
// in the Nomad cluster). Periodically shuffling the server list prevents
// long-lived clients from fixating on long-lived servers.
//
// Unhealthy servers are removed from the server list during the next client
// heartbeat. Before the newly shuffled server list is saved, the new remote
// endpoint is tested to ensure its responsive.
2016-06-10 05:30:05 +00:00
func ( p * RPCProxy ) RebalanceServers ( ) {
2016-05-23 18:09:31 +00:00
var serverListLocked bool
p . serverListLock . Lock ( )
serverListLocked = true
defer func ( ) {
if serverListLocked {
p . serverListLock . Unlock ( )
}
} ( )
// Early abort if there is nothing to shuffle
if ( len ( p . primaryServers . L ) + len ( p . backupServers . L ) ) < 2 {
return
}
// Shuffle server lists independently
p . primaryServers . shuffleServers ( )
p . backupServers . shuffleServers ( )
// Create a new merged serverList
type targetServer struct {
server * ServerEndpoint
2016-05-27 18:47:21 +00:00
// 'p' == Primary Server
// 's' == Secondary/Backup Server
2016-05-23 18:09:31 +00:00
// 'b' == Both
state byte
}
mergedList := make ( map [ EndpointKey ] * targetServer , len ( p . primaryServers . L ) + len ( p . backupServers . L ) )
for _ , s := range p . primaryServers . L {
2016-05-27 18:47:21 +00:00
mergedList [ * s . Key ( ) ] = & targetServer { server : s , state : 'p' }
2016-05-23 18:09:31 +00:00
}
for _ , s := range p . backupServers . L {
k := s . Key ( )
_ , found := mergedList [ * k ]
if found {
mergedList [ * k ] . state = 'b'
} else {
2016-05-27 18:47:21 +00:00
mergedList [ * k ] = & targetServer { server : s , state : 's' }
2016-05-23 18:09:31 +00:00
}
}
l := & serverList { L : make ( [ ] * ServerEndpoint , 0 , len ( mergedList ) ) }
for _ , s := range p . primaryServers . L {
l . L = append ( l . L , s )
}
for _ , v := range mergedList {
2016-05-27 18:47:21 +00:00
if v . state != 's' {
2016-05-23 18:09:31 +00:00
continue
}
l . L = append ( l . L , v . server )
}
// Release the lock before we begin transition to operations on the
// network timescale and attempt to ping servers. A copy of the
// servers has been made at this point.
p . serverListLock . Unlock ( )
serverListLocked = false
// Iterate through the shuffled server list to find an assumed
// healthy server. NOTE: Do not iterate on the list directly because
// this loop mutates the server list in-place.
var foundHealthyServer bool
for i := 0 ; i < len ( l . L ) ; i ++ {
// Always test the first server. Failed servers are cycled
// and eventually removed from the list when Nomad heartbeats
// detect the failed node.
selectedServer := l . L [ 0 ]
2016-06-11 03:26:15 +00:00
ok , err := p . connPoolPinger . PingNomadServer ( p . configInfo . Region ( ) , p . configInfo . RPCMajorVersion ( ) , selectedServer )
2016-05-23 18:09:31 +00:00
if ok {
foundHealthyServer = true
break
}
2016-06-10 05:23:54 +00:00
p . logger . Printf ( ` [DEBUG] client.rpcproxy: pinging server "%s" failed: %s ` , selectedServer . String ( ) , err )
2016-05-23 18:09:31 +00:00
l . cycleServer ( )
}
// If no healthy servers were found, sleep and wait for the admin to
// join this node to a server and begin receiving heartbeats with an
// updated list of Nomad servers. Or Consul will begin advertising a
2016-05-27 22:57:04 +00:00
// new server in the nomad service (Nomad server service).
2016-05-23 18:09:31 +00:00
if ! foundHealthyServer {
2016-06-10 05:23:54 +00:00
p . logger . Printf ( "[DEBUG] client.rpcproxy: No healthy servers during rebalance, aborting" )
2016-05-23 18:09:31 +00:00
return
}
// Verify that all servers are present. Reconcile will save the
// final serverList.
if p . reconcileServerList ( l ) {
2016-06-10 20:46:49 +00:00
p . logger . Printf ( "[TRACE] client.rpcproxy: Rebalanced %d servers, next active server is %s" , len ( l . L ) , l . L [ 0 ] . String ( ) )
2016-05-23 18:09:31 +00:00
} else {
// reconcileServerList failed because Nomad removed the
// server that was at the front of the list that had
// successfully been Ping'ed. Between the Ping and
// reconcile, a Nomad heartbeat removed the node.
//
// Instead of doing any heroics, "freeze in place" and
// continue to use the existing connection until the next
// rebalance occurs.
}
return
}
2016-06-02 05:51:04 +00:00
// reconcileServerList returns true when the first server in serverList
// (l) exists in the receiver's serverList (p). If true, the merged
// serverList (l) is stored as the receiver's serverList (p). Returns
// false if the first server in p does not exist in the passed in list (l)
// (i.e. was removed by Nomad during a PingNomadServer() call. Newly added
// servers are appended to the list and other missing servers are removed
// from the list.
2016-06-10 05:30:05 +00:00
func ( p * RPCProxy ) reconcileServerList ( l * serverList ) bool {
2016-06-10 05:57:02 +00:00
p . activatedListLock . Lock ( )
defer p . activatedListLock . Unlock ( )
2016-05-23 18:09:31 +00:00
// newServerList is a serverList that has been kept up-to-date with
// join and leave events.
newServerList := p . getServerList ( )
// If a Nomad heartbeat removed all nodes, or there is no selected
// server (zero nodes in serverList), abort early.
if len ( newServerList . L ) == 0 || len ( l . L ) == 0 {
return false
}
type targetServer struct {
server * ServerEndpoint
// 'b' == both
// 'o' == original
// 'n' == new
state byte
}
mergedList := make ( map [ EndpointKey ] * targetServer , len ( l . L ) )
for _ , s := range l . L {
mergedList [ * s . Key ( ) ] = & targetServer { server : s , state : 'o' }
}
for _ , s := range newServerList . L {
k := s . Key ( )
_ , found := mergedList [ * k ]
if found {
mergedList [ * k ] . state = 'b'
} else {
mergedList [ * k ] = & targetServer { server : s , state : 'n' }
}
}
// Ensure the selected server has not been removed by a heartbeat
selectedServerKey := l . L [ 0 ] . Key ( )
if v , found := mergedList [ * selectedServerKey ] ; found && v . state == 'o' {
return false
}
// Append any new servers and remove any old servers
for k , v := range mergedList {
switch v . state {
case 'b' :
// Do nothing, server exists in both
case 'o' :
// Server has been removed
l . removeServerByKey ( & k )
case 'n' :
// Server added
l . L = append ( l . L , v . server )
default :
panic ( "unknown merge list state" )
}
}
p . saveServerList ( * l )
return true
}
// RemoveServer takes out an internal write lock and removes a server from
2016-06-01 07:34:46 +00:00
// the activated server list.
2016-06-10 05:30:05 +00:00
func ( p * RPCProxy ) RemoveServer ( s * ServerEndpoint ) {
2016-06-01 07:34:46 +00:00
// Lock hierarchy protocol dictates serverListLock is acquired first.
p . serverListLock . Lock ( )
defer p . serverListLock . Unlock ( )
2016-06-10 05:57:02 +00:00
p . activatedListLock . Lock ( )
defer p . activatedListLock . Unlock ( )
2016-05-23 18:09:31 +00:00
l := p . getServerList ( )
2016-06-01 07:34:46 +00:00
k := s . Key ( )
l . removeServerByKey ( k )
p . saveServerList ( l )
2016-05-23 18:09:31 +00:00
2016-06-01 07:34:46 +00:00
p . primaryServers . removeServerByKey ( k )
p . backupServers . removeServerByKey ( k )
2016-05-23 18:09:31 +00:00
}
2016-06-02 05:51:04 +00:00
// refreshServerRebalanceTimer is only called once p.rebalanceTimer expires.
2016-06-10 05:30:05 +00:00
func ( p * RPCProxy ) refreshServerRebalanceTimer ( ) time . Duration {
2016-05-23 18:09:31 +00:00
l := p . getServerList ( )
numServers := len ( l . L )
// Limit this connection's life based on the size (and health) of the
// cluster. Never rebalance a connection more frequently than
// connReuseLowWatermarkDuration, and make sure we never exceed
// clusterWideRebalanceConnsPerSec operations/s across numLANMembers.
clusterWideRebalanceConnsPerSec := float64 ( numServers * newRebalanceConnsPerSecPerServer )
connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib . RandomStagger ( clientRPCMinReuseDuration / clientRPCJitterFraction )
numLANMembers := p . numNodes
connRebalanceTimeout := lib . RateScaledInterval ( clusterWideRebalanceConnsPerSec , connReuseLowWatermarkDuration , numLANMembers )
p . rebalanceTimer . Reset ( connRebalanceTimeout )
return connRebalanceTimeout
}
// ResetRebalanceTimer resets the rebalance timer. This method exists for
// testing and should not be used directly.
2016-06-10 05:30:05 +00:00
func ( p * RPCProxy ) ResetRebalanceTimer ( ) {
2016-06-10 05:57:02 +00:00
p . activatedListLock . Lock ( )
defer p . activatedListLock . Unlock ( )
2016-05-23 18:09:31 +00:00
p . rebalanceTimer . Reset ( clientRPCMinReuseDuration )
}
// ServerRPCAddrs returns one RPC Address per server
2016-06-10 05:30:05 +00:00
func ( p * RPCProxy ) ServerRPCAddrs ( ) [ ] string {
2016-05-23 18:09:31 +00:00
l := p . getServerList ( )
serverAddrs := make ( [ ] string , 0 , len ( l . L ) )
for _ , s := range l . L {
serverAddrs = append ( serverAddrs , s . Addr . String ( ) )
}
return serverAddrs
}
// Run is used to start and manage the task of automatically shuffling and
// rebalancing the list of Nomad servers. This maintenance only happens
// periodically based on the expiration of the timer. Failed servers are
// automatically cycled to the end of the list. New servers are appended to
// the list. The order of the server list must be shuffled periodically to
// distribute load across all known and available Nomad servers.
2016-06-10 05:30:05 +00:00
func ( p * RPCProxy ) Run ( ) {
2016-05-23 18:09:31 +00:00
for {
select {
case <- p . rebalanceTimer . C :
p . RebalanceServers ( )
p . refreshServerRebalanceTimer ( )
case <- p . shutdownCh :
2016-06-10 05:23:54 +00:00
p . logger . Printf ( "[INFO] client.rpcproxy: shutting down" )
2016-05-23 18:09:31 +00:00
return
}
}
}
2016-06-02 07:12:30 +00:00
// RefreshServerLists is called when the Client receives an update from a
// Nomad Server. The response from Nomad Client Heartbeats contain a list of
// Nomad Servers that the Nomad Client should use for RPC requests.
// RefreshServerLists does not rebalance its serverLists (that is handled
// elsewhere via a periodic timer). New Nomad Servers learned via the
2016-06-10 05:30:05 +00:00
// heartbeat are appended to the RPCProxy's activated serverList. Servers
2016-06-02 07:12:30 +00:00
// that are no longer present in the Heartbeat are removed immediately from
// all server lists. Nomad Servers speaking a newer major or minor API
// version are filtered from the serverList.
2016-06-10 05:30:05 +00:00
func ( p * RPCProxy ) RefreshServerLists ( servers [ ] * structs . NodeServerInfo , numNodes int32 , leaderRPCAddr string ) error {
2016-05-23 18:09:31 +00:00
// Merge all servers found in the response. Servers in the response
// with newer API versions are filtered from the list. If the list
2016-06-10 05:30:05 +00:00
// is missing an address found in the RPCProxy's server list, remove
// it from the RPCProxy.
2016-05-23 18:09:31 +00:00
p . serverListLock . Lock ( )
defer p . serverListLock . Unlock ( )
2016-05-27 10:49:51 +00:00
// Clear the backup server list when a heartbeat contains at least
// one server.
2016-06-02 07:12:30 +00:00
if len ( servers ) > 0 && len ( p . backupServers . L ) > 0 {
p . backupServers . L = make ( [ ] * ServerEndpoint , 0 , len ( servers ) )
2016-05-27 10:49:51 +00:00
}
2016-05-23 18:09:31 +00:00
// 1) Create a map to reconcile the difference between
2016-06-02 07:12:30 +00:00
// p.primaryServers and servers.
2016-05-23 18:09:31 +00:00
type targetServer struct {
server * ServerEndpoint
// 'b' == both
// 'o' == original
// 'n' == new
state byte
}
2016-06-02 07:12:30 +00:00
mergedPrimaryMap := make ( map [ EndpointKey ] * targetServer , len ( p . primaryServers . L ) + len ( servers ) )
2016-05-23 18:09:31 +00:00
numOldServers := 0
for _ , s := range p . primaryServers . L {
2016-05-26 20:28:45 +00:00
mergedPrimaryMap [ * s . Key ( ) ] = & targetServer { server : s , state : 'o' }
2016-05-23 18:09:31 +00:00
numOldServers ++
}
numBothServers := 0
var newServers bool
2016-06-02 07:12:30 +00:00
for _ , s := range servers {
2016-05-23 18:09:31 +00:00
// Filter out servers using a newer API version. Prevent
// spamming the logs every heartbeat.
//
// TODO(sean@): Move the logging throttle logic into a
2016-06-10 05:30:05 +00:00
// dedicated logging package so RPCProxy does not have to
2016-05-23 18:09:31 +00:00
// perform this accounting.
2016-06-11 03:26:15 +00:00
if int32 ( p . configInfo . RPCMajorVersion ( ) ) < s . RPCMajorVersion ||
( int32 ( p . configInfo . RPCMajorVersion ( ) ) == s . RPCMajorVersion &&
int32 ( p . configInfo . RPCMinorVersion ( ) ) < s . RPCMinorVersion ) {
2016-05-23 18:09:31 +00:00
now := time . Now ( )
2016-06-11 03:26:15 +00:00
t , ok := p . rpcAPIMismatchThrottle [ s . RPCAdvertiseAddr ]
2016-05-23 18:09:31 +00:00
if ok && t . After ( now ) {
continue
}
2016-06-11 22:17:20 +00:00
p . logger . Printf ( "[WARN] client.rpcproxy: API mismatch between client version (v%d.%d) and server version (v%d.%d), ignoring server %+q" , p . configInfo . RPCMajorVersion ( ) , p . configInfo . RPCMinorVersion ( ) , s . RPCMajorVersion , s . RPCMinorVersion , s . RPCAdvertiseAddr )
2016-06-11 03:26:15 +00:00
p . rpcAPIMismatchThrottle [ s . RPCAdvertiseAddr ] = now . Add ( rpcAPIMismatchLogRate )
2016-05-23 18:09:31 +00:00
continue
}
2016-06-11 03:26:15 +00:00
server , err := NewServerEndpoint ( s . RPCAdvertiseAddr )
2016-05-23 18:09:31 +00:00
if err != nil {
2016-06-11 22:17:20 +00:00
p . logger . Printf ( "[WARN] client.rpcproxy: Unable to create a server from %+q: %v" , s . RPCAdvertiseAddr , err )
2016-05-27 10:45:09 +00:00
continue
}
// Nomad servers in different datacenters are automatically
// added to the backup server list.
if s . Datacenter != p . configInfo . Datacenter ( ) {
p . backupServers . L = append ( p . backupServers . L , server )
2016-05-23 18:09:31 +00:00
continue
}
k := server . Key ( )
2016-05-26 20:28:45 +00:00
_ , found := mergedPrimaryMap [ * k ]
2016-05-23 18:09:31 +00:00
if found {
2016-05-26 20:28:45 +00:00
mergedPrimaryMap [ * k ] . state = 'b'
2016-05-23 18:09:31 +00:00
numBothServers ++
} else {
2016-05-26 20:28:45 +00:00
mergedPrimaryMap [ * k ] = & targetServer { server : server , state : 'n' }
2016-05-23 18:09:31 +00:00
newServers = true
}
}
2016-05-28 01:17:56 +00:00
// Short-circuit acquiring listLock if nothing changed
2016-05-23 18:09:31 +00:00
if ! newServers && numOldServers == numBothServers {
return nil
}
2016-06-10 05:57:02 +00:00
p . activatedListLock . Lock ( )
defer p . activatedListLock . Unlock ( )
2016-05-23 18:09:31 +00:00
newServerCfg := p . getServerList ( )
2016-05-26 20:28:45 +00:00
for k , v := range mergedPrimaryMap {
2016-05-23 18:09:31 +00:00
switch v . state {
case 'b' :
// Do nothing, server exists in both
case 'o' :
// Server has been removed
// TODO(sean@): Teach Nomad servers how to remove
// themselves from their heartbeat in order to
// gracefully drain their clients over the next
// cluster's max rebalanceTimer duration. Without
// this enhancement, if a server being shutdown and
// it is the first in serverList, the client will
// fail its next RPC connection.
p . primaryServers . removeServerByKey ( & k )
newServerCfg . removeServerByKey ( & k )
case 'n' :
// Server added. Append it to both lists
// immediately. The server should only go into
// active use in the event of a failure or after a
// rebalance occurs.
p . primaryServers . L = append ( p . primaryServers . L , v . server )
newServerCfg . L = append ( newServerCfg . L , v . server )
default :
panic ( "unknown merge list state" )
}
}
2016-06-02 07:12:30 +00:00
p . numNodes = int ( numNodes )
2016-06-10 05:30:05 +00:00
p . leaderAddr = leaderRPCAddr
2016-05-23 18:09:31 +00:00
p . saveServerList ( newServerCfg )
return nil
}