open-nomad/client/servers/manager.go

299 lines
8.1 KiB
Go
Raw Normal View History

2018-01-09 23:26:53 +00:00
// Package servers provides an interface for choosing Servers to communicate
// with from a Nomad Client perspective. The package does not provide any API
// guarantees and should be called only by `hashicorp/nomad`.
package servers
import (
"log"
"math/rand"
"net"
"strings"
"sync"
"time"
"github.com/hashicorp/consul/lib"
)
const (
// clientRPCMinReuseDuration controls the minimum amount of time RPC
// queries are sent over an established connection to a single server
2018-01-10 19:01:46 +00:00
clientRPCMinReuseDuration = 5 * time.Minute
2018-01-09 23:26:53 +00:00
// Limit the number of new connections a server receives per second
// for connection rebalancing. This limit caps the load caused by
// continual rebalancing efforts when a cluster is in equilibrium. A
// lower value comes at the cost of increased recovery time after a
// partition. This parameter begins to take effect when there are
// more than ~48K clients querying 5x servers or at lower server
// values when there is a partition.
//
// For example, in a 100K Nomad cluster with 5x servers, it will
// take ~5min for all servers to rebalance their connections. If
// 99,995 agents are in the minority talking to only one server, it
// will take ~26min for all servers to rebalance. A 10K cluster in
// the same scenario will take ~2.6min to rebalance.
newRebalanceConnsPerSecPerServer = 64
)
2018-01-25 05:14:03 +00:00
// Pinger is an interface for pinging a server to see if it is healthy.
type Pinger interface {
Ping(addr net.Addr) error
}
2018-01-09 23:26:53 +00:00
// Server contains the address of a server and metadata that can be used for
// choosing a server to contact.
type Server struct {
// Addr is the resolved address of the server
Addr net.Addr
addr string
2018-01-25 02:00:21 +00:00
sync.Mutex
2018-01-09 23:26:53 +00:00
// DC is the datacenter of the server
DC string
}
func (s *Server) String() string {
2018-01-25 02:00:21 +00:00
s.Lock()
defer s.Unlock()
2018-01-09 23:26:53 +00:00
if s.addr == "" {
s.addr = s.Addr.String()
}
return s.addr
}
type Servers []*Server
func (s Servers) String() string {
addrs := make([]string, 0, len(s))
for _, srv := range s {
addrs = append(addrs, srv.String())
}
return strings.Join(addrs, ",")
}
2018-01-25 05:14:03 +00:00
// cycle cycles a list of servers in-place
func (s Servers) cycle() {
numServers := len(s)
2018-01-25 02:00:21 +00:00
if numServers < 2 {
2018-01-25 05:14:03 +00:00
return // No action required
2018-01-25 02:00:21 +00:00
}
2018-01-25 05:14:03 +00:00
start := s[0]
for i := 1; i < numServers; i++ {
s[i-1] = s[i]
}
s[numServers-1] = start
2018-01-25 02:00:21 +00:00
}
// removeServerByKey performs an inline removal of the first matching server
2018-01-25 05:14:03 +00:00
func (s Servers) removeServerByKey(targetKey string) {
for i, srv := range s {
if targetKey == srv.String() {
copy(s[i:], s[i+1:])
s[len(s)-1] = nil
s = s[:len(s)-1]
2018-01-25 02:00:21 +00:00
return
}
}
}
2018-01-25 05:14:03 +00:00
// shuffle shuffles the server list in place
func (s Servers) shuffle() {
for i := len(s) - 1; i > 0; i-- {
2018-01-25 02:00:21 +00:00
j := rand.Int31n(int32(i + 1))
2018-01-25 05:14:03 +00:00
s[i], s[j] = s[j], s[i]
2018-01-25 02:00:21 +00:00
}
}
2018-01-09 23:26:53 +00:00
type Manager struct {
2018-01-25 05:14:03 +00:00
// servers is the list of all known Nomad servers.
servers Servers
2018-01-09 23:26:53 +00:00
// rebalanceTimer controls the duration of the rebalance interval
rebalanceTimer *time.Timer
// shutdownCh is a copy of the channel in Nomad.Client
shutdownCh chan struct{}
logger *log.Logger
// numNodes is used to estimate the approximate number of nodes in
// a cluster and limit the rate at which it rebalances server
// connections. This should be read and set using atomic.
numNodes int32
// connPoolPinger is used to test the health of a server in the connection
// pool. Pinger is an interface that wraps client.ConnPool.
connPoolPinger Pinger
2018-01-25 05:14:03 +00:00
sync.Mutex
2018-01-09 23:26:53 +00:00
}
// New is the only way to safely create a new Manager struct.
func New(logger *log.Logger, shutdownCh chan struct{}, connPoolPinger Pinger) (m *Manager) {
2018-01-25 05:14:03 +00:00
return &Manager{
logger: logger,
connPoolPinger: connPoolPinger,
rebalanceTimer: time.NewTimer(clientRPCMinReuseDuration),
shutdownCh: shutdownCh,
}
2018-01-09 23:26:53 +00:00
}
// Start is used to start and manage the task of automatically shuffling and
2018-01-25 05:14:03 +00:00
// rebalancing the list of Nomad servers in order to distribute load across
// all known and available Nomad servers.
2018-01-09 23:26:53 +00:00
func (m *Manager) Start() {
for {
select {
case <-m.rebalanceTimer.C:
m.RebalanceServers()
m.refreshServerRebalanceTimer()
case <-m.shutdownCh:
2018-01-25 02:00:21 +00:00
m.logger.Printf("[DEBUG] manager: shutting down")
2018-01-09 23:26:53 +00:00
return
}
}
}
2018-01-25 05:14:03 +00:00
func (m *Manager) SetServers(servers Servers) {
m.Lock()
defer m.Unlock()
m.servers = servers
2018-01-09 23:26:53 +00:00
}
2018-01-25 05:14:03 +00:00
// FindServer returns a server to send an RPC too. If there are no servers, nil
// is returned.
2018-01-09 23:26:53 +00:00
func (m *Manager) FindServer() *Server {
2018-01-25 05:14:03 +00:00
m.Lock()
defer m.Unlock()
if len(m.servers) == 0 {
2018-01-09 23:26:53 +00:00
m.logger.Printf("[WARN] manager: No servers available")
return nil
}
// Return whatever is at the front of the list because it is
// assumed to be the oldest in the server list (unless -
// hypothetically - the server list was rotated right after a
// server was added).
2018-01-25 05:14:03 +00:00
return m.servers[0]
2018-01-09 23:26:53 +00:00
}
// NumNodes returns the number of approximate nodes in the cluster.
func (m *Manager) NumNodes() int32 {
2018-01-25 05:14:03 +00:00
m.Lock()
defer m.Unlock()
return m.numNodes
2018-01-09 23:26:53 +00:00
}
// SetNumNodes stores the number of approximate nodes in the cluster.
func (m *Manager) SetNumNodes(n int32) {
2018-01-25 05:14:03 +00:00
m.Lock()
defer m.Unlock()
m.numNodes = n
2018-01-09 23:26:53 +00:00
}
// NotifyFailedServer marks the passed in server as "failed" by rotating it
// to the end of the server list.
func (m *Manager) NotifyFailedServer(s *Server) {
2018-01-25 05:14:03 +00:00
m.Lock()
defer m.Unlock()
2018-01-09 23:26:53 +00:00
// If the server being failed is not the first server on the list,
// this is a noop. If, however, the server is failed and first on
2018-01-25 05:14:03 +00:00
// the list, move the server to the end of the list.
if len(m.servers) > 1 && m.servers[0] == s {
m.servers.cycle()
2018-01-09 23:26:53 +00:00
}
}
2018-01-25 02:00:21 +00:00
// NumServers returns the total number of known servers whether healthy or not.
2018-01-09 23:26:53 +00:00
func (m *Manager) NumServers() int {
2018-01-25 05:14:03 +00:00
m.Lock()
defer m.Unlock()
return len(m.servers)
2018-01-09 23:26:53 +00:00
}
// GetServers returns a copy of the current list of servers.
2018-01-25 05:14:03 +00:00
func (m *Manager) GetServers() Servers {
m.Lock()
defer m.Unlock()
copy := make([]*Server, 0, len(m.servers))
for _, s := range m.servers {
2018-01-09 23:26:53 +00:00
ns := new(Server)
*ns = *s
copy = append(copy, ns)
}
return copy
}
2018-01-25 05:14:03 +00:00
// RebalanceServers shuffles the order in which Servers will be contacted. The
// function will shuffle the set of potential servers to contact and then attempt
// to contact each server. If a server successfully responds it is used, otherwise
// it is rotated such that it will be the last attempted server.
2018-01-09 23:26:53 +00:00
func (m *Manager) RebalanceServers() {
2018-01-25 05:14:03 +00:00
m.Lock()
defer m.Unlock()
2018-01-09 23:26:53 +00:00
// Shuffle servers so we have a chance of picking a new one.
2018-01-25 05:14:03 +00:00
m.servers.shuffle()
2018-01-09 23:26:53 +00:00
// Iterate through the shuffled server list to find an assumed
// healthy server. NOTE: Do not iterate on the list directly because
// this loop mutates the server list in-place.
var foundHealthyServer bool
2018-01-25 05:14:03 +00:00
for i := 0; i < len(m.servers); i++ {
2018-01-09 23:26:53 +00:00
// Always test the first server. Failed servers are cycled
// while Serf detects the node has failed.
2018-01-25 05:14:03 +00:00
srv := m.servers[0]
2018-01-09 23:26:53 +00:00
2018-01-25 02:00:21 +00:00
err := m.connPoolPinger.Ping(srv.Addr)
if err == nil {
2018-01-09 23:26:53 +00:00
foundHealthyServer = true
break
}
m.logger.Printf(`[DEBUG] manager: pinging server "%s" failed: %s`, srv, err)
2018-01-25 05:14:03 +00:00
m.servers.cycle()
2018-01-09 23:26:53 +00:00
}
2018-01-25 02:00:21 +00:00
if !foundHealthyServer {
2018-01-09 23:26:53 +00:00
m.logger.Printf("[DEBUG] manager: No healthy servers during rebalance, aborting")
return
}
return
}
// refreshServerRebalanceTimer is only called once m.rebalanceTimer expires.
func (m *Manager) refreshServerRebalanceTimer() time.Duration {
2018-01-25 05:14:03 +00:00
m.Lock()
defer m.Unlock()
numServers := len(m.servers)
2018-01-10 19:01:46 +00:00
2018-01-09 23:26:53 +00:00
// Limit this connection's life based on the size (and health) of the
// cluster. Never rebalance a connection more frequently than
// connReuseLowWatermarkDuration, and make sure we never exceed
// clusterWideRebalanceConnsPerSec operations/s across numLANMembers.
clusterWideRebalanceConnsPerSec := float64(numServers * newRebalanceConnsPerSecPerServer)
2018-01-25 05:14:03 +00:00
connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, clientRPCMinReuseDuration, int(m.numNodes))
2018-01-09 23:26:53 +00:00
connRebalanceTimeout += lib.RandomStagger(connRebalanceTimeout)
m.rebalanceTimer.Reset(connRebalanceTimeout)
return connRebalanceTimeout
}
// ResetRebalanceTimer resets the rebalance timer. This method exists for
// testing and should not be used directly.
func (m *Manager) ResetRebalanceTimer() {
2018-01-25 05:14:03 +00:00
m.Lock()
defer m.Unlock()
2018-01-09 23:26:53 +00:00
m.rebalanceTimer.Reset(clientRPCMinReuseDuration)
}