open-nomad/client/client.go

2739 lines
82 KiB
Go
Raw Normal View History

2015-08-20 22:25:09 +00:00
package client
import (
"errors"
2015-08-20 23:07:26 +00:00
"fmt"
"io/ioutil"
2015-08-20 23:07:26 +00:00
"net"
2018-01-11 19:24:57 +00:00
"net/rpc"
2015-08-20 22:25:09 +00:00
"os"
"path/filepath"
2018-01-09 23:26:53 +00:00
"sort"
2015-08-20 23:07:26 +00:00
"strconv"
"strings"
2015-08-20 22:25:09 +00:00
"sync"
2015-08-20 23:07:26 +00:00
"time"
metrics "github.com/armon/go-metrics"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
2018-09-17 21:22:40 +00:00
hclog "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocdir"
2018-10-04 23:22:01 +00:00
"github.com/hashicorp/nomad/client/allocrunner"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
arstate "github.com/hashicorp/nomad/client/allocrunner/state"
"github.com/hashicorp/nomad/client/allocwatcher"
"github.com/hashicorp/nomad/client/config"
consulApi "github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/devicemanager"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/client/pluginmanager"
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
2018-01-09 23:26:53 +00:00
"github.com/hashicorp/nomad/client/servers"
"github.com/hashicorp/nomad/client/state"
2016-05-09 15:55:19 +00:00
"github.com/hashicorp/nomad/client/stats"
cstructs "github.com/hashicorp/nomad/client/structs"
2016-08-18 03:28:48 +00:00
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper"
2018-01-12 21:58:44 +00:00
"github.com/hashicorp/nomad/helper/pool"
hstats "github.com/hashicorp/nomad/helper/stats"
2016-10-25 23:05:37 +00:00
"github.com/hashicorp/nomad/helper/tlsutil"
"github.com/hashicorp/nomad/helper/uuid"
2015-08-20 23:41:29 +00:00
"github.com/hashicorp/nomad/nomad/structs"
nconfig "github.com/hashicorp/nomad/nomad/structs/config"
2018-11-16 22:13:01 +00:00
"github.com/hashicorp/nomad/plugins/device"
vaultapi "github.com/hashicorp/vault/api"
"github.com/shirou/gopsutil/host"
2015-08-20 22:25:09 +00:00
)
2015-08-20 23:07:26 +00:00
const (
// clientRPCCache controls how long we keep an idle connection
// open to a server
2016-03-09 18:37:56 +00:00
clientRPCCache = 5 * time.Minute
2015-08-20 23:07:26 +00:00
2018-03-11 17:50:39 +00:00
// clientMaxStreams controls how many idle streams we keep
2015-08-20 23:07:26 +00:00
// open to a server
clientMaxStreams = 2
2015-08-21 00:49:04 +00:00
// datacenterQueryLimit searches through up to this many adjacent
// datacenters looking for the Nomad server service.
datacenterQueryLimit = 9
2015-08-21 00:49:04 +00:00
// registerRetryIntv is minimum interval on which we retry
// registration. We pick a value between this and 2x this.
2015-08-24 00:40:14 +00:00
registerRetryIntv = 15 * time.Second
2015-08-23 02:31:22 +00:00
// getAllocRetryIntv is minimum interval on which we retry
// to fetch allocations. We pick a value between this and 2x this.
getAllocRetryIntv = 30 * time.Second
2015-08-24 00:40:14 +00:00
// devModeRetryIntv is the retry interval used for development
devModeRetryIntv = time.Second
2015-08-31 00:19:20 +00:00
// stateSnapshotIntv is how often the client snapshots state
stateSnapshotIntv = 60 * time.Second
// initialHeartbeatStagger is used to stagger the interval between
2017-09-26 22:26:33 +00:00
// starting and the initial heartbeat. After the initial heartbeat,
// we switch to using the TTL specified by the servers.
initialHeartbeatStagger = 10 * time.Second
// nodeUpdateRetryIntv is how often the client checks for updates to the
// node attributes or meta map.
2016-02-10 22:09:23 +00:00
nodeUpdateRetryIntv = 5 * time.Second
// allocSyncIntv is the batching period of allocation updates before they
// are synced with the server.
allocSyncIntv = 200 * time.Millisecond
2016-02-22 05:32:32 +00:00
// allocSyncRetryIntv is the interval on which we retry updating
// the status of the allocation
allocSyncRetryIntv = 5 * time.Second
2015-08-20 23:07:26 +00:00
)
// ClientStatsReporter exposes all the APIs related to resource usage of a Nomad
// Client
2016-05-09 15:55:19 +00:00
type ClientStatsReporter interface {
// GetAllocStats returns the AllocStatsReporter for the passed allocation.
// If it does not exist an error is reported.
2018-10-04 22:45:46 +00:00
GetAllocStats(allocID string) (interfaces.AllocStatsReporter, error)
2016-05-25 20:12:09 +00:00
// LatestHostStats returns the latest resource usage stats for the host
LatestHostStats() *stats.HostStats
2016-05-09 15:55:19 +00:00
}
// AllocRunner is the interface implemented by the core alloc runner.
//TODO Create via factory to allow testing Client with mock AllocRunners.
type AllocRunner interface {
Alloc() *structs.Allocation
client: expose task state to client The interesting decision in this commit was to expose AR's state and not a fully materialized Allocation struct. AR.clientAlloc builds an Alloc that contains the task state, so I considered simply memoizing and exposing that method. However, that would lead to AR having two awkwardly similar methods: - Alloc() - which returns the server-sent alloc - ClientAlloc() - which returns the fully materialized client alloc Since ClientAlloc() could be memoized it would be just as cheap to call as Alloc(), so why not replace Alloc() entirely? Replacing Alloc() entirely would require Update() to immediately materialize the task states on server-sent Allocs as there may have been local task state changes since the server received an Alloc update. This quickly becomes difficult to reason about: should Update hooks use the TaskStates? Are state changes caused by TR Update hooks immediately reflected in the Alloc? Should AR persist its copy of the Alloc? If so, are its TaskStates canonical or the TaskStates on TR? So! Forget that. Let's separate the static Allocation from the dynamic AR & TR state! - AR.Alloc() is for static Allocation access (often for the Job) - AR.AllocState() is for the dynamic AR & TR runtime state (deployment status, task states, etc). If code needs to know the status of a task: AllocState() If code needs to know the names of tasks: Alloc() It should be very easy for a developer to reason about which method they should call and what they can do with the return values.
2018-09-27 00:08:43 +00:00
AllocState() *arstate.State
Destroy()
Shutdown()
GetAllocDir() *allocdir.AllocDir
IsDestroyed() bool
IsMigrating() bool
IsWaiting() bool
Listener() *cstructs.AllocListener
Restore() error
Run()
2018-10-04 22:45:46 +00:00
StatsReporter() interfaces.AllocStatsReporter
Update(*structs.Allocation)
WaitCh() <-chan struct{}
DestroyCh() <-chan struct{}
ShutdownCh() <-chan struct{}
GetTaskEventHandler(taskName string) drivermanager.EventHandler
}
2015-08-20 22:25:09 +00:00
// Client is used to implement the client interaction with Nomad. Clients
// are expected to register as a schedulable node to the servers, and to
// run allocations as determined by the servers.
type Client struct {
2016-02-10 22:09:23 +00:00
config *config.Config
start time.Time
2017-04-29 22:43:23 +00:00
// stateDB is used to efficiently store client state.
stateDB state.StateDB
2017-04-29 22:43:23 +00:00
2016-02-10 22:09:23 +00:00
// configCopy is a copy that should be passed to alloc-runners.
configCopy *config.Config
2016-02-10 21:44:53 +00:00
configLock sync.RWMutex
2015-08-20 23:07:26 +00:00
logger hclog.Logger
rpcLogger hclog.Logger
2015-08-20 22:25:09 +00:00
2018-01-12 21:58:44 +00:00
connPool *pool.ConnPool
2015-08-20 23:07:26 +00:00
2018-02-15 23:22:57 +00:00
// tlsWrap is used to wrap outbound connections using TLS. It should be
// accessed using the lock.
tlsWrap tlsutil.RegionWrapper
tlsWrapLock sync.RWMutex
2018-01-09 23:26:53 +00:00
// servers is the list of nomad servers
servers *servers.Manager
// heartbeat related times for tracking how often to heartbeat
2017-12-07 01:57:50 +00:00
lastHeartbeat time.Time
heartbeatTTL time.Duration
haveHeartbeated bool
heartbeatLock sync.Mutex
// triggerDiscoveryCh triggers Consul discovery; see triggerDiscovery
triggerDiscoveryCh chan struct{}
// triggerNodeUpdate triggers the client to mark the Node as changed and
// update it.
triggerNodeUpdate chan struct{}
// triggerEmitNodeEvent sends an event and triggers the client to update the
// server for the node event
triggerEmitNodeEvent chan *structs.NodeEvent
2018-04-04 01:05:28 +00:00
// rpcRetryCh is closed when there an event such as server discovery or a
2018-04-04 01:30:01 +00:00
// successful RPC occurring happens such that a retry should happen. Access
2018-04-04 01:05:28 +00:00
// should only occur via the getter method
rpcRetryCh chan struct{}
rpcRetryLock sync.Mutex
2015-08-23 01:16:05 +00:00
// allocs maps alloc IDs to their AllocRunner. This map includes all
// AllocRunners - running and GC'd - until the server GCs them.
allocs map[string]AllocRunner
2015-08-23 22:06:47 +00:00
allocLock sync.RWMutex
2015-08-23 21:54:52 +00:00
// invalidAllocs is a map that tracks allocations that failed because
// the client couldn't initialize alloc or task runners for it. This can
// happen due to driver errors
invalidAllocs map[string]struct{}
// allocUpdates stores allocations that need to be synced to the server.
allocUpdates chan *structs.Allocation
// consulService is Nomad's custom Consul client for managing services
// and checks.
2018-06-11 20:33:18 +00:00
consulService consulApi.ConsulServiceAPI
// consulCatalog is the subset of Consul's Catalog API Nomad uses.
consulCatalog consul.CatalogAPI
2016-05-26 22:25:18 +00:00
// HostStatsCollector collects host resource usage stats
2016-05-22 09:04:27 +00:00
hostStatsCollector *stats.HostStatsCollector
2016-05-09 15:55:19 +00:00
// shutdown is true when the Client has been shutdown. Must hold
// shutdownLock to access.
shutdown bool
// shutdownCh is closed to signal the Client is shutting down.
shutdownCh chan struct{}
2015-08-20 22:25:09 +00:00
shutdownLock sync.Mutex
2016-08-18 03:28:48 +00:00
// shutdownGroup are goroutines that exit when shutdownCh is closed.
// Shutdown() blocks on Wait() after closing shutdownCh.
shutdownGroup group
2016-09-14 20:30:01 +00:00
// vaultClient is used to interact with Vault for token and secret renewals
2016-08-18 03:28:48 +00:00
vaultClient vaultclient.VaultClient
// garbageCollector is used to garbage collect terminal allocations present
// in the node automatically
garbageCollector *AllocGarbageCollector
2017-08-20 00:20:05 +00:00
2017-08-23 20:49:08 +00:00
// clientACLResolver holds the ACL resolution state
clientACLResolver
2018-01-11 19:24:57 +00:00
// rpcServer is used to serve RPCs by the local agent.
rpcServer *rpc.Server
endpoints rpcEndpoints
2018-03-11 18:41:13 +00:00
streamingRpcs *structs.StreamingRpcRegistry
2018-01-11 19:24:57 +00:00
// pluginManagers is the set of PluginManagers registered by the client
pluginManagers *pluginmanager.PluginGroup
// devicemanger is responsible for managing device plugins.
devicemanager devicemanager.Manager
// drivermanager is responsible for managing driver plugins
drivermanager drivermanager.Manager
// baseLabels are used when emitting tagged metrics. All client metrics will
// have these tags, and optionally more.
baseLabels []metrics.Label
// batchNodeUpdates is used to batch initial updates to the node
batchNodeUpdates *batchNodeUpdates
// fpInitialized chan is closed when the first batch of fingerprints are
// applied to the node and the server is updated
fpInitialized chan struct{}
2015-08-20 22:25:09 +00:00
}
2016-09-26 22:12:35 +00:00
var (
// noServersErr is returned by the RPC method when the client has no
// configured servers. This is used to trigger Consul discovery if
// enabled.
noServersErr = errors.New("no servers")
)
2015-08-20 22:25:09 +00:00
// NewClient is used to create a new client from the given configuration
func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulService consulApi.ConsulServiceAPI, stateDBFunc state.NewStateDBFunc) (*Client, error) {
2016-10-25 23:01:53 +00:00
// Create the tls wrapper
2016-11-01 18:55:29 +00:00
var tlsWrap tlsutil.RegionWrapper
2016-10-25 22:57:38 +00:00
if cfg.TLSConfig.EnableRPC {
tw, err := tlsutil.NewTLSConfiguration(cfg.TLSConfig, true, true)
if err != nil {
return nil, err
}
tlsWrap, err = tw.OutgoingTLSWrapper()
if err != nil {
return nil, err
}
}
2018-09-17 21:22:40 +00:00
// Create the logger
logger := cfg.Logger.ResetNamed("client")
2018-09-17 21:22:40 +00:00
2015-08-20 23:41:29 +00:00
// Create the client
2015-08-20 22:25:09 +00:00
c := &Client{
config: cfg,
consulCatalog: consulCatalog,
consulService: consulService,
start: time.Now(),
2018-09-17 21:22:40 +00:00
connPool: pool.NewPool(logger, clientRPCCache, clientMaxStreams, tlsWrap),
tlsWrap: tlsWrap,
streamingRpcs: structs.NewStreamingRpcRegistry(),
2018-09-17 22:44:37 +00:00
logger: logger,
rpcLogger: logger.Named("rpc"),
allocs: make(map[string]AllocRunner),
allocUpdates: make(chan *structs.Allocation, 64),
shutdownCh: make(chan struct{}),
triggerDiscoveryCh: make(chan struct{}),
triggerNodeUpdate: make(chan struct{}, 8),
triggerEmitNodeEvent: make(chan *structs.NodeEvent, 8),
fpInitialized: make(chan struct{}),
invalidAllocs: make(map[string]struct{}),
}
c.batchNodeUpdates = newBatchNodeUpdates(
c.updateNodeFromDriver,
c.updateNodeFromDevices,
)
2018-01-09 23:26:53 +00:00
// Initialize the server manager
c.servers = servers.New(c.logger, c.shutdownCh, c)
2015-09-12 18:47:44 +00:00
// Initialize the client
if err := c.init(stateDBFunc); err != nil {
return nil, fmt.Errorf("failed to initialize client: %v", err)
2015-09-12 18:47:44 +00:00
}
2018-01-11 19:24:57 +00:00
// Setup the clients RPC server
c.setupClientRpc()
2017-08-23 20:49:08 +00:00
// Initialize the ACL state
if err := c.clientACLResolver.init(); err != nil {
return nil, fmt.Errorf("failed to initialize ACL state: %v", err)
}
2015-08-20 23:41:29 +00:00
// Setup the node
if err := c.setupNode(); err != nil {
return nil, fmt.Errorf("node setup failed: %v", err)
}
2018-04-16 20:28:23 +00:00
// Store the config copy before restoring state but after it has been
// initialized.
c.configLock.Lock()
c.configCopy = c.config.Copy()
c.configLock.Unlock()
fingerprintManager := NewFingerprintManager(
c.configCopy.PluginSingletonLoader, c.GetConfig, c.configCopy.Node,
c.shutdownCh, c.updateNodeFromFingerprint, c.logger)
c.pluginManagers = pluginmanager.New(c.logger)
2018-01-24 13:01:37 +00:00
2018-02-05 23:02:52 +00:00
// Fingerprint the node and scan for drivers
if err := fingerprintManager.Run(); err != nil {
2015-08-20 23:41:29 +00:00
return nil, fmt.Errorf("fingerprinting failed: %v", err)
}
2015-08-20 23:53:43 +00:00
// Build the white/blacklists of drivers.
allowlistDrivers := cfg.ReadStringListToMap("driver.whitelist")
blocklistDrivers := cfg.ReadStringListToMap("driver.blacklist")
// Setup the driver manager
driverConfig := &drivermanager.Config{
Logger: c.logger,
Loader: c.configCopy.PluginSingletonLoader,
PluginConfig: c.configCopy.NomadPluginConfig(),
Updater: c.batchNodeUpdates.updateNodeFromDriver,
EventHandlerFactory: c.GetTaskEventHandler,
State: c.stateDB,
AllowedDrivers: allowlistDrivers,
BlockedDrivers: blocklistDrivers,
}
drvManager := drivermanager.New(driverConfig)
c.drivermanager = drvManager
c.pluginManagers.RegisterAndRun(drvManager)
// Setup the device manager
devConfig := &devicemanager.Config{
Logger: c.logger,
Loader: c.configCopy.PluginSingletonLoader,
PluginConfig: c.configCopy.NomadPluginConfig(),
Updater: c.batchNodeUpdates.updateNodeFromDevices,
StatsInterval: c.configCopy.StatsCollectionInterval,
State: c.stateDB,
}
devManager := devicemanager.New(devConfig)
c.devicemanager = devManager
c.pluginManagers.RegisterAndRun(devManager)
// Batching of initial fingerprints is done to reduce the number of node
// updates sent to the server on startup.
go c.batchFirstFingerprints()
// Add the stats collector
statsCollector := stats.NewHostStatsCollector(c.logger, c.config.AllocDir, c.devicemanager.AllStats)
c.hostStatsCollector = statsCollector
// Add the garbage collector
gcConfig := &GCConfig{
MaxAllocs: cfg.GCMaxAllocs,
DiskUsageThreshold: cfg.GCDiskUsageThreshold,
InodeUsageThreshold: cfg.GCInodeUsageThreshold,
Interval: cfg.GCInterval,
ParallelDestroys: cfg.GCParallelDestroys,
ReservedDiskMB: cfg.Node.Reserved.DiskMB,
}
c.garbageCollector = NewAllocGarbageCollector(c.logger, statsCollector, c, gcConfig)
go c.garbageCollector.Run()
// Set the preconfigured list of static servers
c.configLock.RLock()
if len(c.configCopy.Servers) > 0 {
if _, err := c.setServersImpl(c.configCopy.Servers, true); err != nil {
logger.Warn("none of the configured servers are valid", "error", err)
}
}
c.configLock.RUnlock()
// Setup Consul discovery if enabled
2017-01-18 23:55:14 +00:00
if c.configCopy.ConsulConfig.ClientAutoJoin != nil && *c.configCopy.ConsulConfig.ClientAutoJoin {
c.shutdownGroup.Go(c.consulDiscovery)
2018-01-09 23:26:53 +00:00
if c.servers.NumServers() == 0 {
// No configured servers; trigger discovery manually
c.triggerDiscoveryCh <- struct{}{}
}
}
2016-08-18 03:28:48 +00:00
// Setup the vault client for token and secret renewals
if err := c.setupVaultClient(); err != nil {
return nil, fmt.Errorf("failed to setup vault client: %v", err)
}
2016-09-14 20:30:01 +00:00
// Restore the state
if err := c.restoreState(); err != nil {
logger.Error("failed to restore state", "error", err)
logger.Error("Nomad is unable to start due to corrupt state. "+
"The safest way to proceed is to manually stop running task processes "+
"and remove Nomad's state and alloc directories before "+
2017-07-03 19:29:21 +00:00
"restarting. Lost allocations will be rescheduled.",
"state_dir", c.config.StateDir, "alloc_dir", c.config.AllocDir)
logger.Error("Corrupt state is often caused by a bug. Please " +
"report as much information as possible to " +
"https://github.com/hashicorp/nomad/issues")
return nil, fmt.Errorf("failed to restore state")
2016-09-14 20:30:01 +00:00
}
// Register and then start heartbeating to the servers.
c.shutdownGroup.Go(c.registerAndHeartbeat)
// Begin periodic snapshotting of state.
c.shutdownGroup.Go(c.periodicSnapshot)
// Begin syncing allocations to the server
c.shutdownGroup.Go(c.allocSync)
// Start the client! Don't use the shutdownGroup as run handles
// shutdowns manually to prevent updates from being applied during
// shutdown.
2015-08-21 00:49:04 +00:00
go c.run()
2016-05-09 15:55:19 +00:00
// Start collecting stats
c.shutdownGroup.Go(c.emitStats)
2016-05-09 15:55:19 +00:00
c.logger.Info("started client", "node_id", c.NodeID())
2015-08-20 22:25:09 +00:00
return c, nil
2015-08-23 23:53:15 +00:00
}
// Ready returns a chan that is closed when the client is fully initialized
func (c *Client) Ready() <-chan struct{} {
return c.fpInitialized
}
2015-09-12 18:47:44 +00:00
// init is used to initialize the client and perform any setup
// needed before we begin starting its various components.
func (c *Client) init(statedbFunc state.NewStateDBFunc) error {
2015-09-24 21:29:53 +00:00
// Ensure the state dir exists if we have one
if c.config.StateDir != "" {
if err := os.MkdirAll(c.config.StateDir, 0700); err != nil {
return fmt.Errorf("failed creating state dir: %s", err)
}
2015-11-11 00:03:18 +00:00
} else {
2018-03-11 18:34:27 +00:00
// Otherwise make a temp directory to use.
2015-11-11 00:03:18 +00:00
p, err := ioutil.TempDir("", "NomadClient")
if err != nil {
return fmt.Errorf("failed creating temporary directory for the StateDir: %v", err)
}
p, err = filepath.EvalSymlinks(p)
if err != nil {
2016-10-11 23:16:06 +00:00
return fmt.Errorf("failed to find temporary directory for the StateDir: %v", err)
}
2015-11-11 00:03:18 +00:00
c.config.StateDir = p
2015-09-24 21:29:53 +00:00
}
c.logger.Info("using state directory", "state_dir", c.config.StateDir)
2015-09-24 21:29:53 +00:00
// Open the state database
db, err := statedbFunc(c.logger, c.config.StateDir)
2017-04-29 22:43:23 +00:00
if err != nil {
return fmt.Errorf("failed to open state database: %v", err)
2017-04-29 22:43:23 +00:00
}
// Upgrade the state database
if err := db.Upgrade(); err != nil {
// Upgrade only returns an error on critical persistence
// failures in which an operator should intervene before the
// node is accessible. Upgrade drops and logs corrupt state it
// encounters, so failing to start the agent should be extremely
// rare.
return fmt.Errorf("failed to upgrade state database: %v", err)
}
2017-04-29 22:43:23 +00:00
c.stateDB = db
// Ensure the alloc dir exists if we have one
if c.config.AllocDir != "" {
2017-05-25 21:44:13 +00:00
if err := os.MkdirAll(c.config.AllocDir, 0711); err != nil {
return fmt.Errorf("failed creating alloc dir: %s", err)
}
2015-09-26 01:12:11 +00:00
} else {
2018-03-11 18:34:27 +00:00
// Otherwise make a temp directory to use.
2015-09-26 01:12:11 +00:00
p, err := ioutil.TempDir("", "NomadClient")
if err != nil {
return fmt.Errorf("failed creating temporary directory for the AllocDir: %v", err)
}
p, err = filepath.EvalSymlinks(p)
if err != nil {
2016-10-11 23:16:06 +00:00
return fmt.Errorf("failed to find temporary directory for the AllocDir: %v", err)
}
// Change the permissions to have the execute bit
2017-05-25 21:44:13 +00:00
if err := os.Chmod(p, 0711); err != nil {
return fmt.Errorf("failed to change directory permissions for the AllocDir: %v", err)
}
2015-09-26 01:12:11 +00:00
c.config.AllocDir = p
2015-09-12 18:47:44 +00:00
}
c.logger.Info("using alloc directory", "alloc_dir", c.config.AllocDir)
2015-09-12 18:47:44 +00:00
return nil
}
// reloadTLSConnections allows a client to reload its TLS configuration on the
// fly
func (c *Client) reloadTLSConnections(newConfig *nconfig.TLSConfig) error {
var tlsWrap tlsutil.RegionWrapper
if newConfig != nil && newConfig.EnableRPC {
tw, err := tlsutil.NewTLSConfiguration(newConfig, true, true)
if err != nil {
return err
}
twWrap, err := tw.OutgoingTLSWrapper()
if err != nil {
return err
}
tlsWrap = twWrap
}
2018-02-15 23:22:57 +00:00
// Store the new tls wrapper.
c.tlsWrapLock.Lock()
c.tlsWrap = tlsWrap
c.tlsWrapLock.Unlock()
// Keep the client configuration up to date as we use configuration values to
// decide on what type of connections to accept
c.configLock.Lock()
c.config.TLSConfig = newConfig
c.configLock.Unlock()
c.connPool.ReloadTLS(tlsWrap)
return nil
}
// Reload allows a client to reload its configuration on the fly
func (c *Client) Reload(newConfig *config.Config) error {
shouldReloadTLS, err := tlsutil.ShouldReloadRPCConnections(c.config.TLSConfig, newConfig.TLSConfig)
if err != nil {
c.logger.Error("error parsing TLS configuration", "error", err)
return err
}
if shouldReloadTLS {
return c.reloadTLSConnections(newConfig.TLSConfig)
}
return nil
}
2015-08-23 23:53:15 +00:00
// Leave is used to prepare the client to leave the cluster
func (c *Client) Leave() error {
// TODO
return nil
2015-08-20 22:25:09 +00:00
}
2018-02-05 23:02:52 +00:00
// GetConfig returns the config of the client
func (c *Client) GetConfig() *config.Config {
2018-02-05 23:02:52 +00:00
c.configLock.Lock()
defer c.configLock.Unlock()
2018-04-16 22:02:00 +00:00
return c.configCopy
}
// Datacenter returns the datacenter for the given client
func (c *Client) Datacenter() string {
2017-09-14 21:08:17 +00:00
return c.config.Node.Datacenter
}
// Region returns the region for the given client
func (c *Client) Region() string {
return c.config.Region
}
// NodeID returns the node ID for the given client
func (c *Client) NodeID() string {
return c.config.Node.ID
}
// secretNodeID returns the secret node ID for the given client
func (c *Client) secretNodeID() string {
return c.config.Node.SecretID
}
// RPCMajorVersion returns the structs.ApiMajorVersion supported by the
// client.
func (c *Client) RPCMajorVersion() int {
return structs.ApiMajorVersion
}
// RPCMinorVersion returns the structs.ApiMinorVersion supported by the
// client.
func (c *Client) RPCMinorVersion() int {
return structs.ApiMinorVersion
}
2015-08-20 22:25:09 +00:00
// Shutdown is used to tear down the client
func (c *Client) Shutdown() error {
c.shutdownLock.Lock()
defer c.shutdownLock.Unlock()
if c.shutdown {
c.logger.Info("already shutdown")
2015-08-20 22:25:09 +00:00
return nil
}
c.logger.Info("shutting down")
2017-04-29 22:43:23 +00:00
2016-08-18 03:28:48 +00:00
// Stop renewing tokens and secrets
if c.vaultClient != nil {
c.vaultClient.Stop()
}
// Stop Garbage collector
c.garbageCollector.Stop()
arGroup := group{}
2015-10-04 20:36:03 +00:00
if c.config.DevMode {
// In DevMode destroy all the running allocations.
for _, ar := range c.getAllocRunners() {
ar.Destroy()
arGroup.AddCh(ar.DestroyCh())
2015-10-04 20:36:03 +00:00
}
} else {
// In normal mode call shutdown
for _, ar := range c.getAllocRunners() {
ar.Shutdown()
arGroup.AddCh(ar.ShutdownCh())
}
2015-10-04 20:36:03 +00:00
}
arGroup.Wait()
2015-10-04 20:36:03 +00:00
2018-12-21 19:23:21 +00:00
// Shutdown the plugin managers
c.pluginManagers.Shutdown()
2015-08-20 22:25:09 +00:00
c.shutdown = true
close(c.shutdownCh)
// Must close connection pool to unblock alloc watcher
2015-08-21 00:49:04 +00:00
c.connPool.Shutdown()
// Wait for goroutines to stop
c.shutdownGroup.Wait()
// One final save state
c.saveState()
return c.stateDB.Close()
2015-08-20 22:25:09 +00:00
}
2015-08-20 23:07:26 +00:00
// Stats is used to return statistics for debugging and insight
// for various sub-systems
func (c *Client) Stats() map[string]map[string]string {
c.heartbeatLock.Lock()
defer c.heartbeatLock.Unlock()
2015-08-20 23:07:26 +00:00
stats := map[string]map[string]string{
2017-09-26 22:26:33 +00:00
"client": {
"node_id": c.NodeID(),
2018-01-09 23:26:53 +00:00
"known_servers": strings.Join(c.GetServers(), ","),
"num_allocations": strconv.Itoa(c.NumAllocs()),
2015-09-22 22:29:30 +00:00
"last_heartbeat": fmt.Sprintf("%v", time.Since(c.lastHeartbeat)),
"heartbeat_ttl": fmt.Sprintf("%v", c.heartbeatTTL),
2015-08-20 23:07:26 +00:00
},
2018-01-12 21:58:44 +00:00
"runtime": hstats.RuntimeStats(),
2015-08-20 23:07:26 +00:00
}
return stats
}
2015-08-20 23:41:29 +00:00
// CollectAllocation garbage collects a single allocation on a node. Returns
// true if alloc was found and garbage collected; otherwise false.
func (c *Client) CollectAllocation(allocID string) bool {
return c.garbageCollector.Collect(allocID)
}
// CollectAllAllocs garbage collects all allocations on a node in the terminal
// state
func (c *Client) CollectAllAllocs() {
c.garbageCollector.CollectAll()
}
2015-08-20 23:41:29 +00:00
// Node returns the locally registered node
func (c *Client) Node() *structs.Node {
2016-02-19 07:02:28 +00:00
c.configLock.RLock()
defer c.configLock.RUnlock()
return c.configCopy.Node
2015-08-20 23:41:29 +00:00
}
2018-12-12 19:45:45 +00:00
func (c *Client) getAllocRunner(allocID string) (AllocRunner, error) {
c.allocLock.RLock()
defer c.allocLock.RUnlock()
ar, ok := c.allocs[allocID]
if !ok {
return nil, structs.NewErrUnknownAllocation(allocID)
}
return ar, nil
}
// StatsReporter exposes the various APIs related resource usage of a Nomad
// client
2016-05-09 15:55:19 +00:00
func (c *Client) StatsReporter() ClientStatsReporter {
return c
}
2018-10-04 22:45:46 +00:00
func (c *Client) GetAllocStats(allocID string) (interfaces.AllocStatsReporter, error) {
2018-12-12 19:45:45 +00:00
ar, err := c.getAllocRunner(allocID)
if err != nil {
return nil, err
}
return ar.StatsReporter(), nil
2016-05-09 15:55:19 +00:00
}
// HostStats returns all the stats related to a Nomad client
func (c *Client) LatestHostStats() *stats.HostStats {
2016-12-12 06:58:28 +00:00
return c.hostStatsCollector.Stats()
}
func (c *Client) LatestDeviceResourceStats(devices []*structs.AllocatedDeviceResource) []*device.DeviceGroupStats {
return c.computeAllocatedDeviceGroupStats(devices, c.LatestHostStats().DeviceStats)
}
func (c *Client) computeAllocatedDeviceGroupStats(devices []*structs.AllocatedDeviceResource, hostDeviceGroupStats []*device.DeviceGroupStats) []*device.DeviceGroupStats {
// basic optimization for the usual case
if len(devices) == 0 || len(hostDeviceGroupStats) == 0 {
return nil
}
// Build an index of allocated devices
adIdx := map[structs.DeviceIdTuple][]string{}
total := 0
for _, ds := range devices {
adIdx[*ds.ID()] = ds.DeviceIDs
total += len(ds.DeviceIDs)
}
// Collect allocated device stats from host stats
result := make([]*device.DeviceGroupStats, 0, len(adIdx))
for _, dg := range hostDeviceGroupStats {
k := structs.DeviceIdTuple{
Vendor: dg.Vendor,
Type: dg.Type,
Name: dg.Name,
}
allocatedDeviceIDs, ok := adIdx[k]
if !ok {
continue
}
rdgStats := &device.DeviceGroupStats{
Vendor: dg.Vendor,
Type: dg.Type,
Name: dg.Name,
InstanceStats: map[string]*device.DeviceStats{},
}
for _, adID := range allocatedDeviceIDs {
deviceStats, ok := dg.InstanceStats[adID]
if !ok || deviceStats == nil {
c.logger.Warn("device not found in stats", "device_id", adID, "device_group_id", k)
continue
}
rdgStats.InstanceStats[adID] = deviceStats
}
result = append(result, rdgStats)
}
return result
}
2017-10-07 00:54:09 +00:00
// ValidateMigrateToken verifies that a token is for a specific client and
2017-10-07 01:54:55 +00:00
// allocation, and has been created by a trusted party that has privileged
2017-10-07 00:54:09 +00:00
// knowledge of the client's secret identifier
func (c *Client) ValidateMigrateToken(allocID, migrateToken string) bool {
if !c.config.ACLEnabled {
return true
}
2018-01-12 21:58:44 +00:00
return structs.CompareMigrateToken(allocID, c.secretNodeID(), migrateToken)
}
// GetAllocFS returns the AllocFS interface for the alloc dir of an allocation
2016-01-14 21:35:42 +00:00
func (c *Client) GetAllocFS(allocID string) (allocdir.AllocDirFS, error) {
2018-12-12 19:45:45 +00:00
ar, err := c.getAllocRunner(allocID)
if err != nil {
return nil, err
2016-01-12 23:25:51 +00:00
}
return ar.GetAllocDir(), nil
2016-01-13 05:28:07 +00:00
}
client: expose task state to client The interesting decision in this commit was to expose AR's state and not a fully materialized Allocation struct. AR.clientAlloc builds an Alloc that contains the task state, so I considered simply memoizing and exposing that method. However, that would lead to AR having two awkwardly similar methods: - Alloc() - which returns the server-sent alloc - ClientAlloc() - which returns the fully materialized client alloc Since ClientAlloc() could be memoized it would be just as cheap to call as Alloc(), so why not replace Alloc() entirely? Replacing Alloc() entirely would require Update() to immediately materialize the task states on server-sent Allocs as there may have been local task state changes since the server received an Alloc update. This quickly becomes difficult to reason about: should Update hooks use the TaskStates? Are state changes caused by TR Update hooks immediately reflected in the Alloc? Should AR persist its copy of the Alloc? If so, are its TaskStates canonical or the TaskStates on TR? So! Forget that. Let's separate the static Allocation from the dynamic AR & TR state! - AR.Alloc() is for static Allocation access (often for the Job) - AR.AllocState() is for the dynamic AR & TR runtime state (deployment status, task states, etc). If code needs to know the status of a task: AllocState() If code needs to know the names of tasks: Alloc() It should be very easy for a developer to reason about which method they should call and what they can do with the return values.
2018-09-27 00:08:43 +00:00
// GetAllocState returns a copy of an allocation's state on this client. It
// returns either an AllocState or an unknown allocation error.
func (c *Client) GetAllocState(allocID string) (*arstate.State, error) {
2018-12-12 19:45:45 +00:00
ar, err := c.getAllocRunner(allocID)
if err != nil {
return nil, err
}
client: expose task state to client The interesting decision in this commit was to expose AR's state and not a fully materialized Allocation struct. AR.clientAlloc builds an Alloc that contains the task state, so I considered simply memoizing and exposing that method. However, that would lead to AR having two awkwardly similar methods: - Alloc() - which returns the server-sent alloc - ClientAlloc() - which returns the fully materialized client alloc Since ClientAlloc() could be memoized it would be just as cheap to call as Alloc(), so why not replace Alloc() entirely? Replacing Alloc() entirely would require Update() to immediately materialize the task states on server-sent Allocs as there may have been local task state changes since the server received an Alloc update. This quickly becomes difficult to reason about: should Update hooks use the TaskStates? Are state changes caused by TR Update hooks immediately reflected in the Alloc? Should AR persist its copy of the Alloc? If so, are its TaskStates canonical or the TaskStates on TR? So! Forget that. Let's separate the static Allocation from the dynamic AR & TR state! - AR.Alloc() is for static Allocation access (often for the Job) - AR.AllocState() is for the dynamic AR & TR runtime state (deployment status, task states, etc). If code needs to know the status of a task: AllocState() If code needs to know the names of tasks: Alloc() It should be very easy for a developer to reason about which method they should call and what they can do with the return values.
2018-09-27 00:08:43 +00:00
return ar.AllocState(), nil
}
// GetServers returns the list of nomad servers this client is aware of.
func (c *Client) GetServers() []string {
2018-01-09 23:26:53 +00:00
endpoints := c.servers.GetServers()
res := make([]string, len(endpoints))
for i := range endpoints {
2018-01-09 23:26:53 +00:00
res[i] = endpoints[i].String()
}
2018-01-09 23:26:53 +00:00
sort.Strings(res)
return res
}
// SetServers sets a new list of nomad servers to connect to. As long as one
// server is resolvable no error is returned.
func (c *Client) SetServers(in []string) (int, error) {
return c.setServersImpl(in, false)
}
// setServersImpl sets a new list of nomad servers to connect to. If force is
2018-03-11 18:55:30 +00:00
// set, we add the server to the internal serverlist even if the server could not
// be pinged. An error is returned if no endpoints were valid when non-forcing.
//
// Force should be used when setting the servers from the initial configuration
// since the server may be starting up in parallel and initial pings may fail.
func (c *Client) setServersImpl(in []string, force bool) (int, error) {
var mu sync.Mutex
var wg sync.WaitGroup
var merr multierror.Error
endpoints := make([]*servers.Server, 0, len(in))
wg.Add(len(in))
for _, s := range in {
go func(srv string) {
defer wg.Done()
addr, err := resolveServer(srv)
if err != nil {
2018-09-06 00:34:17 +00:00
mu.Lock()
c.logger.Debug("ignoring server due to resolution error", "error", err, "server", srv)
merr.Errors = append(merr.Errors, err)
2018-09-06 00:34:17 +00:00
mu.Unlock()
return
}
// Try to ping to check if it is a real server
if err := c.Ping(addr); err != nil {
2018-09-06 00:34:17 +00:00
mu.Lock()
merr.Errors = append(merr.Errors, fmt.Errorf("Server at address %s failed ping: %v", addr, err))
2018-09-06 00:34:17 +00:00
mu.Unlock()
// If we are forcing the setting of the servers, inject it to
// the serverlist even if we can't ping immediately.
if !force {
return
}
}
mu.Lock()
endpoints = append(endpoints, &servers.Server{Addr: addr})
mu.Unlock()
}(s)
}
wg.Wait()
// Only return errors if no servers are valid
if len(endpoints) == 0 {
if len(merr.Errors) > 0 {
return 0, merr.ErrorOrNil()
}
return 0, noServersErr
}
2018-01-09 23:26:53 +00:00
c.servers.SetServers(endpoints)
return len(endpoints), nil
}
2015-08-23 21:12:26 +00:00
// restoreState is used to restore our state from the data dir
// If there are errors restoring a specific allocation it is marked
// as failed whenever possible.
2015-08-23 21:12:26 +00:00
func (c *Client) restoreState() error {
if c.config.DevMode {
return nil
}
//XXX REMOVED! make a note in backward compat / upgrading doc
2017-05-02 20:31:56 +00:00
// COMPAT: Remove in 0.7.0
2018-03-11 19:06:05 +00:00
// 0.6.0 transitioned from individual state files to a single bolt-db.
2017-05-02 20:31:56 +00:00
// The upgrade path is to:
// Check if old state exists
// If so, restore from that and delete old state
// Restore using state database
// Restore allocations
allocs, allocErrs, err := c.stateDB.GetAllAllocations()
if err != nil {
return err
}
for allocID, err := range allocErrs {
c.logger.Error("error restoring alloc", "error", err, "alloc_id", allocID)
//TODO Cleanup
// Try to clean up alloc dir
// Remove boltdb entries?
// Send to server with clientstatus=failed
}
// Load each alloc back
for _, alloc := range allocs {
//XXX On Restore we give up on watching previous allocs because
// we need the local AllocRunners initialized first. We could
// add a second loop to initialize just the alloc watcher.
prevAllocWatcher := allocwatcher.NoopPrevAlloc{}
prevAllocMigrator := allocwatcher.NoopPrevAlloc{}
2016-02-10 21:44:53 +00:00
c.configLock.RLock()
arConf := &allocrunner.Config{
Alloc: alloc,
Logger: c.logger,
ClientConfig: c.configCopy,
StateDB: c.stateDB,
StateUpdater: c,
DeviceStatsReporter: c,
Consul: c.consulService,
Vault: c.vaultClient,
PrevAllocWatcher: prevAllocWatcher,
PrevAllocMigrator: prevAllocMigrator,
DeviceManager: c.devicemanager,
DriverManager: c.drivermanager,
}
2016-02-10 21:44:53 +00:00
c.configLock.RUnlock()
2017-05-02 20:31:56 +00:00
2018-10-04 23:22:01 +00:00
ar, err := allocrunner.NewAllocRunner(arConf)
if err != nil {
c.logger.Error("error running alloc", "error", err, "alloc_id", alloc.ID)
c.handleInvalidAllocs(alloc, err)
continue
}
2017-05-02 20:31:56 +00:00
// Restore state
if err := ar.Restore(); err != nil {
c.logger.Error("error restoring alloc", "error", err, "alloc_id", alloc.ID)
// Override the status of the alloc to failed
ar.SetClientStatus(structs.AllocClientStatusFailed)
// Destroy the alloc runner since this is a failed restore
ar.Destroy()
continue
2017-05-02 20:31:56 +00:00
}
//XXX is this locking necessary?
c.allocLock.Lock()
c.allocs[alloc.ID] = ar
c.allocLock.Unlock()
2017-05-02 20:31:56 +00:00
}
// All allocs restored successfully, run them!
2018-07-19 00:04:36 +00:00
c.allocLock.Lock()
for _, ar := range c.allocs {
go ar.Run()
}
2018-07-19 00:04:36 +00:00
c.allocLock.Unlock()
return nil
2015-08-23 21:12:26 +00:00
}
func (c *Client) handleInvalidAllocs(alloc *structs.Allocation, err error) {
c.invalidAllocs[alloc.ID] = struct{}{}
// Mark alloc as failed so server can handle this
failed := makeFailedAlloc(alloc, err)
select {
case c.allocUpdates <- failed:
case <-c.shutdownCh:
}
}
2017-05-09 17:50:24 +00:00
// saveState is used to snapshot our state into the data dir.
func (c *Client) saveState() error {
2017-05-01 23:16:53 +00:00
var wg sync.WaitGroup
var l sync.Mutex
var mErr multierror.Error
runners := c.getAllocRunners()
wg.Add(len(runners))
2017-05-02 20:31:56 +00:00
for id, ar := range runners {
go func(id string, ar AllocRunner) {
err := c.stateDB.PutAllocation(ar.Alloc())
2017-05-01 23:16:53 +00:00
if err != nil {
c.logger.Error("error saving alloc state", "error", err, "alloc_id", id)
2017-05-01 23:16:53 +00:00
l.Lock()
multierror.Append(&mErr, err)
l.Unlock()
2017-05-01 22:06:18 +00:00
}
2017-05-01 23:16:53 +00:00
wg.Done()
2017-05-02 20:31:56 +00:00
}(id, ar)
}
2017-05-01 22:06:18 +00:00
2017-05-09 17:50:24 +00:00
wg.Wait()
return mErr.ErrorOrNil()
2015-08-23 21:12:26 +00:00
}
2016-02-20 03:51:55 +00:00
// getAllocRunners returns a snapshot of the current set of alloc runners.
func (c *Client) getAllocRunners() map[string]AllocRunner {
2016-02-20 03:51:55 +00:00
c.allocLock.RLock()
defer c.allocLock.RUnlock()
runners := make(map[string]AllocRunner, len(c.allocs))
2016-02-20 03:51:55 +00:00
for id, ar := range c.allocs {
runners[id] = ar
}
return runners
}
// NumAllocs returns the number of un-GC'd allocs this client has. Used to
// fulfill the AllocCounter interface for the GC.
func (c *Client) NumAllocs() int {
n := 0
c.allocLock.RLock()
for _, a := range c.allocs {
if !a.IsDestroyed() {
n++
}
}
2017-05-31 21:05:47 +00:00
c.allocLock.RUnlock()
return n
}
// nodeID restores, or generates if necessary, a unique node ID and SecretID.
// The node ID is, if available, a persistent unique ID. The secret ID is a
// high-entropy random UUID.
func (c *Client) nodeID() (id, secret string, err error) {
var hostID string
hostInfo, err := host.Info()
if !c.config.NoHostUUID && err == nil {
if hashed, ok := helper.HashUUID(hostInfo.HostID); ok {
hostID = hashed
}
}
if hostID == "" {
// Generate a random hostID if no constant ID is available on
// this platform.
hostID = uuid.Generate()
}
2015-09-22 17:31:47 +00:00
// Do not persist in dev mode
if c.config.DevMode {
return hostID, uuid.Generate(), nil
2015-09-22 17:31:47 +00:00
}
// Attempt to read existing ID
idPath := filepath.Join(c.config.StateDir, "client-id")
idBuf, err := ioutil.ReadFile(idPath)
2015-09-22 17:31:47 +00:00
if err != nil && !os.IsNotExist(err) {
return "", "", err
}
// Attempt to read existing secret ID
secretPath := filepath.Join(c.config.StateDir, "secret-id")
secretBuf, err := ioutil.ReadFile(secretPath)
if err != nil && !os.IsNotExist(err) {
return "", "", err
2015-09-22 17:31:47 +00:00
}
// Use existing ID if any
2016-08-19 02:01:24 +00:00
if len(idBuf) != 0 {
id = strings.ToLower(string(idBuf))
2016-08-19 02:01:24 +00:00
} else {
id = hostID
2016-08-19 02:01:24 +00:00
// Persist the ID
if err := ioutil.WriteFile(idPath, []byte(id), 0700); err != nil {
return "", "", err
}
2015-09-22 17:31:47 +00:00
}
2016-08-19 02:01:24 +00:00
if len(secretBuf) != 0 {
secret = string(secretBuf)
} else {
// Generate new ID
secret = uuid.Generate()
2015-09-22 17:31:47 +00:00
2016-08-19 02:01:24 +00:00
// Persist the ID
if err := ioutil.WriteFile(secretPath, []byte(secret), 0700); err != nil {
return "", "", err
}
}
2016-08-19 02:01:24 +00:00
return id, secret, nil
2015-09-22 17:31:47 +00:00
}
2015-08-20 23:41:29 +00:00
// setupNode is used to setup the initial node
func (c *Client) setupNode() error {
node := c.config.Node
if node == nil {
node = &structs.Node{}
c.config.Node = node
}
// Generate an ID and secret for the node
id, secretID, err := c.nodeID()
if err != nil {
return fmt.Errorf("node ID setup failed: %v", err)
}
node.ID = id
node.SecretID = secretID
2015-08-20 23:41:29 +00:00
if node.Attributes == nil {
node.Attributes = make(map[string]string)
}
if node.Links == nil {
node.Links = make(map[string]string)
}
if node.Drivers == nil {
node.Drivers = make(map[string]*structs.DriverInfo)
}
2015-08-20 23:41:29 +00:00
if node.Meta == nil {
node.Meta = make(map[string]string)
}
2018-09-30 00:23:41 +00:00
if node.NodeResources == nil {
node.NodeResources = &structs.NodeResources{}
}
2018-10-04 21:33:09 +00:00
if node.ReservedResources == nil {
node.ReservedResources = &structs.NodeReservedResources{}
}
2015-08-21 00:49:04 +00:00
if node.Resources == nil {
node.Resources = &structs.Resources{}
}
2016-03-14 02:05:41 +00:00
if node.Reserved == nil {
node.Reserved = &structs.Resources{}
}
2015-08-21 00:49:04 +00:00
if node.Datacenter == "" {
node.Datacenter = "dc1"
}
if node.Name == "" {
node.Name, _ = os.Hostname()
}
if node.Name == "" {
node.Name = node.ID
}
node.Status = structs.NodeStatusInit
2015-08-20 23:41:29 +00:00
return nil
}
// updateNodeFromFingerprint updates the node with the result of
// fingerprinting the node from the diff that was created
func (c *Client) updateNodeFromFingerprint(response *fingerprint.FingerprintResponse) *structs.Node {
c.configLock.Lock()
defer c.configLock.Unlock()
nodeHasChanged := false
2018-02-23 20:01:57 +00:00
for name, newVal := range response.Attributes {
oldVal := c.config.Node.Attributes[name]
if oldVal == newVal {
continue
}
nodeHasChanged = true
2018-02-23 20:01:57 +00:00
if newVal == "" {
2018-01-26 11:51:09 +00:00
delete(c.config.Node.Attributes, name)
} else {
2018-02-23 20:01:57 +00:00
c.config.Node.Attributes[name] = newVal
2018-01-26 11:51:09 +00:00
}
}
// update node links and resources from the diff created from
// fingerprinting
2018-02-23 20:01:57 +00:00
for name, newVal := range response.Links {
oldVal := c.config.Node.Links[name]
if oldVal == newVal {
continue
}
nodeHasChanged = true
2018-02-23 20:01:57 +00:00
if newVal == "" {
2018-01-26 11:51:09 +00:00
delete(c.config.Node.Links, name)
} else {
2018-02-23 20:01:57 +00:00
c.config.Node.Links[name] = newVal
2018-01-26 11:51:09 +00:00
}
}
2018-09-30 00:23:41 +00:00
// COMPAT(0.10): Remove in 0.10
if response.Resources != nil && !resourcesAreEqual(c.config.Node.Resources, response.Resources) {
nodeHasChanged = true
c.config.Node.Resources.Merge(response.Resources)
}
2018-09-30 00:23:41 +00:00
if response.NodeResources != nil && !c.config.Node.NodeResources.Equals(response.NodeResources) {
nodeHasChanged = true
c.config.Node.NodeResources.Merge(response.NodeResources)
}
2018-03-09 17:28:01 +00:00
if nodeHasChanged {
2018-04-16 20:28:23 +00:00
c.updateNodeLocked()
2018-03-09 17:28:01 +00:00
}
2018-03-07 18:34:38 +00:00
2018-04-16 20:28:23 +00:00
return c.configCopy.Node
2018-03-09 17:28:01 +00:00
}
// resourcesAreEqual is a temporary function to compare whether resources are
// equal. We can use this until we change fingerprinters to set pointers on a
// return type.
func resourcesAreEqual(first, second *structs.Resources) bool {
if first.CPU != second.CPU {
return false
}
if first.MemoryMB != second.MemoryMB {
return false
}
if first.DiskMB != second.DiskMB {
return false
}
if len(first.Networks) != len(second.Networks) {
return false
}
for i, e := range first.Networks {
if len(second.Networks) < i {
return false
}
f := second.Networks[i]
if !e.Equals(f) {
return false
}
}
return true
}
2015-08-24 00:40:14 +00:00
// retryIntv calculates a retry interval value given the base
func (c *Client) retryIntv(base time.Duration) time.Duration {
if c.config.DevMode {
return devModeRetryIntv
}
return base + lib.RandomStagger(base)
2015-08-24 00:40:14 +00:00
}
// registerAndHeartbeat is a long lived goroutine used to register the client
2018-03-11 18:12:19 +00:00
// and then start heartbeating to the server.
func (c *Client) registerAndHeartbeat() {
// Register the node
c.retryRegisterNode()
// Start watching changes for node changes
go c.watchNodeUpdates()
// Start watching for emitting node events
2018-03-13 13:33:53 +00:00
go c.watchNodeEvents()
// Setup the heartbeat timer, for the initial registration
// we want to do this quickly. We want to do it extra quickly
// in development mode.
var heartbeat <-chan time.Time
if c.config.DevMode {
heartbeat = time.After(0)
} else {
heartbeat = time.After(lib.RandomStagger(initialHeartbeatStagger))
}
2015-08-23 01:16:05 +00:00
for {
select {
2018-04-04 01:05:28 +00:00
case <-c.rpcRetryWatcher():
case <-heartbeat:
case <-c.shutdownCh:
return
}
if err := c.updateNodeStatus(); err != nil {
// The servers have changed such that this node has not been
// registered before
if strings.Contains(err.Error(), "node not found") {
// Re-register the node
c.logger.Info("re-registering node")
c.retryRegisterNode()
heartbeat = time.After(lib.RandomStagger(initialHeartbeatStagger))
} else {
intv := c.getHeartbeatRetryIntv(err)
c.logger.Error("error heartbeating. retrying", "error", err, "period", intv)
heartbeat = time.After(intv)
// If heartbeating fails, trigger Consul discovery
c.triggerDiscovery()
}
} else {
c.heartbeatLock.Lock()
heartbeat = time.After(c.heartbeatTTL)
c.heartbeatLock.Unlock()
}
}
}
// getHeartbeatRetryIntv is used to retrieve the time to wait before attempting
// another heartbeat.
func (c *Client) getHeartbeatRetryIntv(err error) time.Duration {
if c.config.DevMode {
return devModeRetryIntv
}
// Collect the useful heartbeat info
c.heartbeatLock.Lock()
haveHeartbeated := c.haveHeartbeated
last := c.lastHeartbeat
ttl := c.heartbeatTTL
c.heartbeatLock.Unlock()
// If we haven't even successfully heartbeated once or there is no leader
// treat it as a registration. In the case that there is a leadership loss,
// we will have our heartbeat timer reset to a much larger threshold, so
// do not put unnecessary pressure on the new leader.
if !haveHeartbeated || err == structs.ErrNoLeader {
return c.retryIntv(registerRetryIntv)
}
// Determine how much time we have left to heartbeat
left := last.Add(ttl).Sub(time.Now())
// Logic for retrying is:
// * Do not retry faster than once a second
// * Do not retry less that once every 30 seconds
// * If we have missed the heartbeat by more than 30 seconds, start to use
// the absolute time since we do not want to retry indefinitely
switch {
case left < -30*time.Second:
2018-04-05 20:48:33 +00:00
// Make left the absolute value so we delay and jitter properly.
left *= -1
case left < 0:
return time.Second + lib.RandomStagger(time.Second)
default:
}
stagger := lib.RandomStagger(left)
switch {
case stagger < time.Second:
return time.Second + lib.RandomStagger(time.Second)
case stagger > 30*time.Second:
2018-04-05 20:48:33 +00:00
return 25*time.Second + lib.RandomStagger(5*time.Second)
default:
return stagger
}
}
// periodicSnapshot is a long lived goroutine used to periodically snapshot the
// state of the client
func (c *Client) periodicSnapshot() {
2015-08-31 00:19:20 +00:00
// Create a snapshot timer
snapshot := time.After(stateSnapshotIntv)
2015-08-24 00:40:14 +00:00
for {
select {
2015-08-31 00:19:20 +00:00
case <-snapshot:
snapshot = time.After(stateSnapshotIntv)
2017-05-09 17:50:24 +00:00
if err := c.saveState(); err != nil {
c.logger.Error("error saving state", "error", err)
2015-08-31 00:19:20 +00:00
}
case <-c.shutdownCh:
return
}
}
}
// run is a long lived goroutine used to run the client. Shutdown() stops it first
func (c *Client) run() {
// Watch for changes in allocations
2016-02-19 04:43:48 +00:00
allocUpdates := make(chan *allocUpdates, 8)
go c.watchAllocations(allocUpdates)
for {
select {
case update := <-allocUpdates:
// Don't apply updates while shutting down.
c.shutdownLock.Lock()
if c.shutdown {
c.shutdownLock.Unlock()
return
}
// Apply updates inside lock to prevent a concurrent
// shutdown.
c.runAllocs(update)
c.shutdownLock.Unlock()
2015-08-24 00:40:14 +00:00
case <-c.shutdownCh:
return
}
2015-08-21 00:49:04 +00:00
}
}
// submitNodeEvents is used to submit a client-side node event. Examples of
2018-03-09 17:43:20 +00:00
// these kinds of events include when a driver moves from healthy to unhealthy
// (and vice versa)
2018-03-12 01:00:13 +00:00
func (c *Client) submitNodeEvents(events []*structs.NodeEvent) error {
2018-03-13 13:33:53 +00:00
nodeID := c.NodeID()
nodeEvents := map[string][]*structs.NodeEvent{
2018-03-12 01:00:13 +00:00
nodeID: events,
}
2018-03-12 01:00:13 +00:00
req := structs.EmitNodeEventsRequest{
NodeEvents: nodeEvents,
WriteRequest: structs.WriteRequest{Region: c.Region()},
}
2018-03-12 01:00:13 +00:00
var resp structs.EmitNodeEventsResponse
if err := c.RPC("Node.EmitEvents", &req, &resp); err != nil {
2018-03-14 01:04:55 +00:00
return fmt.Errorf("Emitting node events failed: %v", err)
}
return nil
}
2018-03-13 13:33:53 +00:00
// watchNodeEvents is a handler which receives node events and on a interval
// and submits them in batch format to the server
func (c *Client) watchNodeEvents() {
// batchEvents stores events that have yet to be published
var batchEvents []*structs.NodeEvent
2018-03-13 13:33:53 +00:00
// Create and drain the timer
timer := time.NewTimer(0)
timer.Stop()
select {
case <-timer.C:
default:
}
defer timer.Stop()
for {
select {
case event := <-c.triggerEmitNodeEvent:
2018-03-13 13:33:53 +00:00
if l := len(batchEvents); l <= structs.MaxRetainedNodeEvents {
batchEvents = append(batchEvents, event)
} else {
// Drop the oldest event
c.logger.Warn("dropping node event", "node_event", batchEvents[0])
2018-03-13 13:33:53 +00:00
batchEvents = append(batchEvents[1:], event)
}
2018-03-13 13:33:53 +00:00
timer.Reset(c.retryIntv(nodeUpdateRetryIntv))
case <-timer.C:
if err := c.submitNodeEvents(batchEvents); err != nil {
c.logger.Error("error submitting node events", "error", err)
2018-03-13 13:33:53 +00:00
timer.Reset(c.retryIntv(nodeUpdateRetryIntv))
2018-03-14 16:48:59 +00:00
} else {
2018-03-14 20:54:25 +00:00
// Reset the events since we successfully sent them.
2018-03-20 17:25:07 +00:00
batchEvents = []*structs.NodeEvent{}
2018-03-12 01:00:13 +00:00
}
case <-c.shutdownCh:
return
}
}
}
2018-03-12 01:00:13 +00:00
// triggerNodeEvent triggers a emit node event
func (c *Client) triggerNodeEvent(nodeEvent *structs.NodeEvent) {
select {
case c.triggerEmitNodeEvent <- nodeEvent:
// emit node event goroutine was released to execute
default:
// emit node event goroutine was already running
}
}
// retryRegisterNode is used to register the node or update the registration and
// retry in case of failure.
func (c *Client) retryRegisterNode() {
for {
err := c.registerNode()
if err == nil {
// Registered!
return
}
2016-09-26 22:12:35 +00:00
if err == noServersErr {
c.logger.Debug("registration waiting on servers")
c.triggerDiscovery()
} else {
c.logger.Error("error registering", "error", err)
}
select {
2018-04-04 01:05:28 +00:00
case <-c.rpcRetryWatcher():
case <-time.After(c.retryIntv(registerRetryIntv)):
case <-c.shutdownCh:
return
}
}
}
2015-08-21 00:49:04 +00:00
// registerNode is used to register the node or update the registration
func (c *Client) registerNode() error {
node := c.Node()
req := structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: c.Region()},
2015-08-21 00:49:04 +00:00
}
var resp structs.NodeUpdateResponse
2016-08-08 23:57:21 +00:00
if err := c.RPC("Node.Register", &req, &resp); err != nil {
2015-08-21 00:49:04 +00:00
return err
}
2016-02-19 07:02:28 +00:00
// Update the node status to ready after we register.
c.configLock.Lock()
node.Status = structs.NodeStatusReady
c.config.Node.Status = structs.NodeStatusReady
2016-02-19 07:02:28 +00:00
c.configLock.Unlock()
c.logger.Info("node registration complete")
2015-08-21 00:49:04 +00:00
if len(resp.EvalIDs) != 0 {
c.logger.Debug("evaluations triggered by node registration", "num_evals", len(resp.EvalIDs))
2015-08-21 00:49:04 +00:00
}
2016-02-10 06:43:16 +00:00
c.heartbeatLock.Lock()
defer c.heartbeatLock.Unlock()
2015-08-23 01:16:05 +00:00
c.lastHeartbeat = time.Now()
c.heartbeatTTL = resp.HeartbeatTTL
return nil
}
// updateNodeStatus is used to heartbeat and update the status of the node
func (c *Client) updateNodeStatus() error {
2017-12-07 02:02:24 +00:00
start := time.Now()
2015-08-23 01:16:05 +00:00
req := structs.NodeUpdateStatusRequest{
NodeID: c.NodeID(),
2015-08-23 01:16:05 +00:00
Status: structs.NodeStatusReady,
WriteRequest: structs.WriteRequest{Region: c.Region()},
2015-08-23 01:16:05 +00:00
}
var resp structs.NodeUpdateResponse
2016-08-08 23:57:21 +00:00
if err := c.RPC("Node.UpdateStatus", &req, &resp); err != nil {
c.triggerDiscovery()
2016-08-10 22:17:32 +00:00
return fmt.Errorf("failed to update status: %v", err)
2015-08-23 01:16:05 +00:00
}
2017-12-07 02:02:24 +00:00
end := time.Now()
2015-08-23 01:16:05 +00:00
if len(resp.EvalIDs) != 0 {
c.logger.Debug("evaluations triggered by node update", "num_evals", len(resp.EvalIDs))
2015-08-23 01:16:05 +00:00
}
2016-02-10 06:43:16 +00:00
2017-12-07 01:57:50 +00:00
// Update the last heartbeat and the new TTL, capturing the old values
c.heartbeatLock.Lock()
last := c.lastHeartbeat
oldTTL := c.heartbeatTTL
haveHeartbeated := c.haveHeartbeated
2015-08-23 01:16:05 +00:00
c.lastHeartbeat = time.Now()
c.heartbeatTTL = resp.HeartbeatTTL
2017-12-07 01:57:50 +00:00
c.haveHeartbeated = true
c.heartbeatLock.Unlock()
c.logger.Trace("next heartbeat", "period", resp.HeartbeatTTL)
2017-12-07 01:57:50 +00:00
if resp.Index != 0 {
c.logger.Debug("state updated", "node_status", req.Status)
2017-12-07 01:57:50 +00:00
// We have potentially missed our TTL log how delayed we were
if haveHeartbeated {
c.logger.Warn("missed heartbeat",
"req_latency", end.Sub(start), "heartbeat_ttl", oldTTL, "since_last_heartbeat", time.Since(last))
2017-12-07 01:57:50 +00:00
}
}
2018-01-09 23:26:53 +00:00
// Update the number of nodes in the cluster so we can adjust our server
// rebalance rate.
c.servers.SetNumNodes(resp.NumNodes)
// Convert []*NodeServerInfo to []*servers.Server
nomadServers := make([]*servers.Server, 0, len(resp.Servers))
for _, s := range resp.Servers {
addr, err := resolveServer(s.RPCAdvertiseAddr)
if err != nil {
c.logger.Warn("ignoring invalid server", "error", err, "server", s.RPCAdvertiseAddr)
continue
}
2018-01-09 23:26:53 +00:00
e := &servers.Server{DC: s.Datacenter, Addr: addr}
nomadServers = append(nomadServers, e)
}
2018-01-09 23:26:53 +00:00
if len(nomadServers) == 0 {
2018-01-25 02:00:21 +00:00
return fmt.Errorf("heartbeat response returned no valid servers")
}
2018-01-09 23:26:53 +00:00
c.servers.SetServers(nomadServers)
// Begin polling Consul if there is no Nomad leader. We could be
// heartbeating to a Nomad server that is in the minority of a
// partition of the Nomad server quorum, but this Nomad Agent still
// has connectivity to the existing majority of Nomad Servers, but
// only if it queries Consul.
if resp.LeaderRPCAddr == "" {
c.triggerDiscovery()
}
2015-08-21 00:49:04 +00:00
return nil
}
2015-08-23 02:31:22 +00:00
// AllocStateUpdated asynchronously updates the server with the current state
// of an allocations and its tasks.
func (c *Client) AllocStateUpdated(alloc *structs.Allocation) {
2017-05-31 21:05:47 +00:00
if alloc.Terminated() {
// Terminated, mark for GC if we're still tracking this alloc
// runner. If it's not being tracked that means the server has
// already GC'd it (see removeAlloc).
2018-12-12 19:45:45 +00:00
ar, err := c.getAllocRunner(alloc.ID)
2018-12-12 19:45:45 +00:00
if err == nil {
c.garbageCollector.MarkForCollection(alloc.ID, ar)
// Trigger a GC in case we're over thresholds and just
// waiting for eligible allocs.
c.garbageCollector.Trigger()
}
}
// Strip all the information that can be reconstructed at the server. Only
// send the fields that are updatable by the client.
stripped := new(structs.Allocation)
stripped.ID = alloc.ID
stripped.NodeID = c.NodeID()
stripped.TaskStates = alloc.TaskStates
stripped.ClientStatus = alloc.ClientStatus
stripped.ClientDescription = alloc.ClientDescription
2017-07-03 04:49:56 +00:00
stripped.DeploymentStatus = alloc.DeploymentStatus
2016-02-22 05:32:32 +00:00
select {
case c.allocUpdates <- stripped:
case <-c.shutdownCh:
}
}
// allocSync is a long lived function that batches allocation updates to the
// server.
func (c *Client) allocSync() {
2016-02-22 05:32:32 +00:00
staggered := false
syncTicker := time.NewTicker(allocSyncIntv)
updates := make(map[string]*structs.Allocation)
for {
select {
case <-c.shutdownCh:
2016-02-22 05:32:32 +00:00
syncTicker.Stop()
return
case alloc := <-c.allocUpdates:
// Batch the allocation updates until the timer triggers.
updates[alloc.ID] = alloc
2016-02-22 05:32:32 +00:00
case <-syncTicker.C:
// Fast path if there are no updates
if len(updates) == 0 {
continue
}
2016-02-20 05:44:23 +00:00
sync := make([]*structs.Allocation, 0, len(updates))
for _, alloc := range updates {
sync = append(sync, alloc)
}
// Send to server.
args := structs.AllocUpdateRequest{
Alloc: sync,
WriteRequest: structs.WriteRequest{Region: c.Region()},
}
var resp structs.GenericResponse
if err := c.RPC("Node.UpdateAlloc", &args, &resp); err != nil {
c.logger.Error("error updating allocations", "error", err)
2016-02-22 05:32:32 +00:00
syncTicker.Stop()
syncTicker = time.NewTicker(c.retryIntv(allocSyncRetryIntv))
staggered = true
} else {
updates = make(map[string]*structs.Allocation)
2016-02-22 05:32:32 +00:00
if staggered {
syncTicker.Stop()
syncTicker = time.NewTicker(allocSyncIntv)
staggered = false
}
}
}
}
2015-08-29 21:22:24 +00:00
}
// allocUpdates holds the results of receiving updated allocations from the
// servers.
type allocUpdates struct {
// pulled is the set of allocations that were downloaded from the servers.
pulled map[string]*structs.Allocation
// filtered is the set of allocations that were not pulled because their
// AllocModifyIndex didn't change.
filtered map[string]struct{}
// migrateTokens are a list of tokens necessary for when clients pull data
// from authorized volumes
migrateTokens map[string]string
}
2015-08-23 02:31:22 +00:00
// watchAllocations is used to scan for updates to allocations
func (c *Client) watchAllocations(updates chan *allocUpdates) {
// The request and response for getting the map of allocations that should
// be running on the Node to their AllocModifyIndex which is incremented
// when the allocation is updated by the servers.
2015-08-23 02:31:22 +00:00
req := structs.NodeSpecificRequest{
NodeID: c.NodeID(),
SecretID: c.secretNodeID(),
2015-08-23 02:31:22 +00:00
QueryOptions: structs.QueryOptions{
Region: c.Region(),
2015-08-24 00:40:14 +00:00
AllowStale: true,
2015-08-23 02:31:22 +00:00
},
}
var resp structs.NodeClientAllocsResponse
// The request and response for pulling down the set of allocations that are
// new, or updated server side.
allocsReq := structs.AllocsGetRequest{
QueryOptions: structs.QueryOptions{
Region: c.Region(),
AllowStale: true,
},
}
var allocsResp structs.AllocsGetResponse
2015-08-23 02:31:22 +00:00
2017-01-21 00:30:40 +00:00
OUTER:
2015-08-23 02:31:22 +00:00
for {
// Get the allocation modify index map, blocking for updates. We will
// use this to determine exactly what allocations need to be downloaded
// in full.
resp = structs.NodeClientAllocsResponse{}
err := c.RPC("Node.GetClientAllocs", &req, &resp)
2015-08-23 02:31:22 +00:00
if err != nil {
// Shutdown often causes EOF errors, so check for shutdown first
select {
case <-c.shutdownCh:
return
default:
}
// COMPAT: Remove in 0.6. This is to allow the case in which the
// servers are not fully upgraded before the clients register. This
// can cause the SecretID to be lost
if strings.Contains(err.Error(), "node secret ID does not match") {
c.logger.Debug("secret mismatch; re-registering node", "error", err)
c.retryRegisterNode()
} else if err != noServersErr {
c.logger.Error("error querying node allocations", "error", err)
}
2015-08-24 00:40:14 +00:00
retry := c.retryIntv(getAllocRetryIntv)
2015-08-23 02:31:22 +00:00
select {
2018-04-04 01:05:28 +00:00
case <-c.rpcRetryWatcher():
continue
2015-08-23 02:31:22 +00:00
case <-time.After(retry):
continue
case <-c.shutdownCh:
return
}
}
// Check for shutdown
select {
case <-c.shutdownCh:
return
default:
}
// Filter all allocations whose AllocModifyIndex was not incremented.
// These are the allocations who have either not been updated, or whose
// updates are a result of the client sending an update for the alloc.
// This lets us reduce the network traffic to the server as we don't
// need to pull all the allocations.
var pull []string
filtered := make(map[string]struct{})
2017-01-21 00:30:40 +00:00
var pullIndex uint64
for allocID, modifyIndex := range resp.Allocs {
// Pull the allocation if we don't have an alloc runner for the
// allocation or if the alloc runner requires an updated allocation.
//XXX Part of Client alloc index tracking exp
c.allocLock.RLock()
currentAR, ok := c.allocs[allocID]
c.allocLock.RUnlock()
2017-01-21 00:30:40 +00:00
// Ignore alloc updates for allocs that are invalid because of initialization errors
_, isInvalid := c.invalidAllocs[allocID]
if (!ok || modifyIndex > currentAR.Alloc().AllocModifyIndex) && !isInvalid {
2017-01-21 00:30:40 +00:00
// Only pull allocs that are required. Filtered
// allocs might be at a higher index, so ignore
// it.
if modifyIndex > pullIndex {
pullIndex = modifyIndex
}
pull = append(pull, allocID)
} else {
filtered[allocID] = struct{}{}
}
}
2016-02-20 03:51:55 +00:00
// Pull the allocations that passed filtering.
allocsResp.Allocs = nil
var pulledAllocs map[string]*structs.Allocation
if len(pull) != 0 {
// Pull the allocations that need to be updated.
allocsReq.AllocIDs = pull
2017-01-21 00:30:40 +00:00
allocsReq.MinQueryIndex = pullIndex - 1
allocsResp = structs.AllocsGetResponse{}
if err := c.RPC("Alloc.GetAllocs", &allocsReq, &allocsResp); err != nil {
c.logger.Error("error querying updated allocations", "error", err)
retry := c.retryIntv(getAllocRetryIntv)
select {
2018-04-04 01:05:28 +00:00
case <-c.rpcRetryWatcher():
continue
case <-time.After(retry):
continue
case <-c.shutdownCh:
return
}
}
// Ensure that we received all the allocations we wanted
pulledAllocs = make(map[string]*structs.Allocation, len(allocsResp.Allocs))
for _, alloc := range allocsResp.Allocs {
pulledAllocs[alloc.ID] = alloc
}
for _, desiredID := range pull {
if _, ok := pulledAllocs[desiredID]; !ok {
// We didn't get everything we wanted. Do not update the
// MinQueryIndex, sleep and then retry.
2017-01-11 21:24:23 +00:00
wait := c.retryIntv(2 * time.Second)
select {
2017-01-11 21:24:23 +00:00
case <-time.After(wait):
// Wait for the server we contact to receive the
// allocations
2017-01-21 00:30:40 +00:00
continue OUTER
case <-c.shutdownCh:
return
}
}
}
// Check for shutdown
select {
case <-c.shutdownCh:
return
default:
}
}
c.logger.Debug("updated allocations", "index", resp.Index,
"total", len(resp.Allocs), "pulled", len(allocsResp.Allocs), "filtered", len(filtered))
// Update the query index.
if resp.Index > req.MinQueryIndex {
req.MinQueryIndex = resp.Index
2015-08-23 02:31:22 +00:00
}
// Push the updates.
update := &allocUpdates{
filtered: filtered,
pulled: pulledAllocs,
migrateTokens: resp.MigrateTokens,
}
2015-08-23 02:31:22 +00:00
select {
case updates <- update:
2015-08-23 02:31:22 +00:00
case <-c.shutdownCh:
return
}
}
}
2018-04-16 20:28:23 +00:00
// updateNode updates the Node copy and triggers the client to send the updated
2018-04-17 15:53:08 +00:00
// Node to the server. This should be done while the caller holds the
// configLock lock.
2018-04-16 20:28:23 +00:00
func (c *Client) updateNodeLocked() {
// Update the config copy.
node := c.config.Node.Copy()
c.configCopy.Node = node
select {
case c.triggerNodeUpdate <- struct{}{}:
// Node update goroutine was released to execute
default:
// Node update goroutine was already running
}
}
// watchNodeUpdates blocks until it is edge triggered. Once triggered,
// it will update the client node copy and re-register the node.
func (c *Client) watchNodeUpdates() {
var hasChanged bool
timer := time.NewTimer(c.retryIntv(nodeUpdateRetryIntv))
defer timer.Stop()
for {
select {
case <-timer.C:
c.logger.Debug("state changed, updating node and re-registering")
c.retryRegisterNode()
hasChanged = false
case <-c.triggerNodeUpdate:
2018-03-01 14:48:26 +00:00
if hasChanged {
continue
}
hasChanged = true
2018-03-01 14:48:26 +00:00
timer.Reset(c.retryIntv(nodeUpdateRetryIntv))
case <-c.shutdownCh:
return
}
}
}
2015-08-23 02:31:22 +00:00
// runAllocs is invoked when we get an updated set of allocations
func (c *Client) runAllocs(update *allocUpdates) {
2015-08-23 21:54:52 +00:00
// Get the existing allocs
2015-08-23 22:06:47 +00:00
c.allocLock.RLock()
existing := make(map[string]uint64, len(c.allocs))
for id, ar := range c.allocs {
existing[id] = ar.Alloc().AllocModifyIndex
2015-08-23 21:54:52 +00:00
}
2015-08-23 22:06:47 +00:00
c.allocLock.RUnlock()
2015-08-23 21:54:52 +00:00
// Diff the existing and updated allocations
diff := diffAllocs(existing, update)
c.logger.Debug("allocation updates", "added", len(diff.added), "removed", len(diff.removed),
"updated", len(diff.updated), "ignored", len(diff.ignore))
2015-08-23 21:54:52 +00:00
// Remove the old allocations
for _, remove := range diff.removed {
c.removeAlloc(remove)
2015-08-23 21:54:52 +00:00
}
// Update the existing allocations
for _, update := range diff.updated {
c.logger.Trace("updating alloc", "alloc_id", update.ID, "index", update.AllocModifyIndex)
c.updateAlloc(update)
2015-08-23 21:54:52 +00:00
}
// Make room for new allocations before running
if err := c.garbageCollector.MakeRoomFor(diff.added); err != nil {
c.logger.Error("error making room for new allocations", "error", err)
}
2015-08-23 21:54:52 +00:00
// Start the new allocations
for _, add := range diff.added {
migrateToken := update.migrateTokens[add.ID]
if err := c.addAlloc(add, migrateToken); err != nil {
c.logger.Error("error adding alloc", "error", err, "alloc_id", add.ID)
// We mark the alloc as failed and send an update to the server
// We track the fact that creating an allocrunner failed so that we don't send updates again
if add.ClientStatus != structs.AllocClientStatusFailed {
c.handleInvalidAllocs(add, err)
}
2015-08-23 21:54:52 +00:00
}
}
// Trigger the GC once more now that new allocs are started that could
2018-03-11 19:03:47 +00:00
// have caused thresholds to be exceeded
c.garbageCollector.Trigger()
2015-08-23 21:54:52 +00:00
}
// makeFailedAlloc creates a stripped down version of the allocation passed in
// with its status set to failed and other fields needed for the server to be
// able to examine deployment and task states
func makeFailedAlloc(add *structs.Allocation, err error) *structs.Allocation {
stripped := new(structs.Allocation)
stripped.ID = add.ID
stripped.NodeID = add.NodeID
stripped.ClientStatus = structs.AllocClientStatusFailed
stripped.ClientDescription = fmt.Sprintf("Unable to add allocation due to err: %v", err)
// Copy task states if it exists in the original allocation
if add.TaskStates != nil {
stripped.TaskStates = add.TaskStates
} else {
stripped.TaskStates = make(map[string]*structs.TaskState)
}
failTime := time.Now()
stripped.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(false),
Timestamp: failTime,
}
taskGroup := add.Job.LookupTaskGroup(add.TaskGroup)
if taskGroup == nil {
return stripped
}
for _, task := range taskGroup.Tasks {
ts, ok := stripped.TaskStates[task.Name]
if !ok {
ts = &structs.TaskState{}
stripped.TaskStates[task.Name] = ts
}
if ts.FinishedAt.IsZero() {
ts.FinishedAt = failTime
}
}
return stripped
}
// removeAlloc is invoked when we should remove an allocation because it has
// been removed by the server.
func (c *Client) removeAlloc(allocID string) {
2015-08-29 21:33:30 +00:00
c.allocLock.Lock()
defer c.allocLock.Unlock()
2018-12-12 19:45:45 +00:00
ar, ok := c.allocs[allocID]
2015-08-23 22:06:47 +00:00
if !ok {
2018-12-12 19:45:45 +00:00
c.logger.Warn("cannot remove alloc", "alloc_id", allocID, "error", "alloc not found")
return
2015-08-23 22:06:47 +00:00
}
// Stop tracking alloc runner as it's been GC'd by the server
delete(c.allocs, allocID)
2016-02-20 03:51:55 +00:00
// Ensure the GC has a reference and then collect. Collecting through the GC
// applies rate limiting
c.garbageCollector.MarkForCollection(allocID, ar)
// GC immediately since the server has GC'd it
go c.garbageCollector.Collect(allocID)
2015-08-23 21:54:52 +00:00
}
// updateAlloc is invoked when we should update an allocation
func (c *Client) updateAlloc(update *structs.Allocation) {
2018-12-12 19:45:45 +00:00
ar, err := c.getAllocRunner(update.ID)
if err != nil {
c.logger.Warn("cannot update nonexistent alloc", "alloc_id", update.ID)
return
2015-08-23 22:06:47 +00:00
}
2016-02-20 03:51:55 +00:00
// Update local copy of alloc
if err := c.stateDB.PutAllocation(update); err != nil {
c.logger.Error("error persisting updated alloc locally", "error", err, "alloc_id", update.ID)
}
// Update alloc runner
ar.Update(update)
2015-08-23 21:54:52 +00:00
}
// addAlloc is invoked when we should add an allocation
func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error {
2017-01-05 21:06:56 +00:00
c.allocLock.Lock()
defer c.allocLock.Unlock()
// Check if we already have an alloc runner
2016-12-13 20:34:23 +00:00
if _, ok := c.allocs[alloc.ID]; ok {
c.logger.Debug("dropping duplicate add allocation request", "alloc_id", alloc.ID)
2016-12-13 20:34:23 +00:00
return nil
}
2017-01-05 21:06:56 +00:00
// Initialize local copy of alloc before creating the alloc runner so
// we can't end up with an alloc runner that does not have an alloc.
if err := c.stateDB.PutAllocation(alloc); err != nil {
return err
}
// Collect any preempted allocations to pass into the previous alloc watcher
var preemptedAllocs map[string]allocwatcher.AllocRunnerMeta
if len(alloc.PreemptedAllocations) > 0 {
preemptedAllocs = make(map[string]allocwatcher.AllocRunnerMeta)
for _, palloc := range alloc.PreemptedAllocations {
preemptedAllocs[palloc] = c.allocs[palloc]
}
}
// Since only the Client has access to other AllocRunners and the RPC
// client, create the previous allocation watcher here.
watcherConfig := allocwatcher.Config{
Alloc: alloc,
PreviousRunner: c.allocs[alloc.PreviousAllocation],
PreemptedRunners: preemptedAllocs,
RPC: c,
Config: c.configCopy,
MigrateToken: migrateToken,
Logger: c.logger,
}
prevAllocWatcher, prevAllocMigrator := allocwatcher.NewAllocWatcher(watcherConfig)
// Copy the config since the node can be swapped out as it is being updated.
// The long term fix is to pass in the config and node separately and then
// we don't have to do a copy.
c.configLock.RLock()
arConf := &allocrunner.Config{
Alloc: alloc,
Logger: c.logger,
ClientConfig: c.configCopy,
StateDB: c.stateDB,
Consul: c.consulService,
Vault: c.vaultClient,
StateUpdater: c,
DeviceStatsReporter: c,
PrevAllocWatcher: prevAllocWatcher,
PrevAllocMigrator: prevAllocMigrator,
DeviceManager: c.devicemanager,
DriverManager: c.drivermanager,
}
2016-02-10 21:44:53 +00:00
c.configLock.RUnlock()
2017-05-02 20:31:56 +00:00
2018-10-04 23:22:01 +00:00
ar, err := allocrunner.NewAllocRunner(arConf)
if err != nil {
return err
}
// Store the alloc runner.
c.allocs[alloc.ID] = ar
go ar.Run()
2015-08-23 21:54:52 +00:00
return nil
2015-08-23 02:31:22 +00:00
}
2016-08-18 03:28:48 +00:00
// setupVaultClient creates an object to periodically renew tokens and secrets
// with vault.
func (c *Client) setupVaultClient() error {
var err error
c.vaultClient, err = vaultclient.NewVaultClient(c.config.VaultConfig, c.logger, c.deriveToken)
if err != nil {
2016-08-18 03:28:48 +00:00
return err
}
if c.vaultClient == nil {
c.logger.Error("failed to create vault client")
return fmt.Errorf("failed to create vault client")
}
2016-09-14 20:30:01 +00:00
// Start renewing tokens and secrets
c.vaultClient.Start()
2016-08-18 03:28:48 +00:00
return nil
}
2016-08-30 01:30:06 +00:00
// deriveToken takes in an allocation and a set of tasks and derives vault
// tokens for each of the tasks, unwraps all of them using the supplied vault
// client and returns a map of unwrapped tokens, indexed by the task name.
func (c *Client) deriveToken(alloc *structs.Allocation, taskNames []string, vclient *vaultapi.Client) (map[string]string, error) {
vlogger := c.logger.Named("vault")
if alloc == nil {
return nil, fmt.Errorf("nil allocation")
}
if taskNames == nil || len(taskNames) == 0 {
return nil, fmt.Errorf("missing task names")
}
group := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if group == nil {
return nil, fmt.Errorf("group name in allocation is not present in job")
}
verifiedTasks := []string{}
// Check if the given task names actually exist in the allocation
for _, taskName := range taskNames {
2017-09-26 22:26:33 +00:00
found := false
for _, task := range group.Tasks {
if task.Name == taskName {
found = true
}
}
if !found {
vlogger.Error("task not found in the allocation", "task_name", taskName)
2018-03-11 17:37:22 +00:00
return nil, fmt.Errorf("task %q not found in the allocation", taskName)
}
verifiedTasks = append(verifiedTasks, taskName)
}
// DeriveVaultToken of nomad server can take in a set of tasks and
// creates tokens for all the tasks.
req := &structs.DeriveVaultTokenRequest{
NodeID: c.NodeID(),
SecretID: c.secretNodeID(),
AllocID: alloc.ID,
Tasks: verifiedTasks,
QueryOptions: structs.QueryOptions{
Region: c.Region(),
AllowStale: false,
},
}
// Derive the tokens
var resp structs.DeriveVaultTokenResponse
if err := c.RPC("Node.DeriveVaultToken", &req, &resp); err != nil {
vlogger.Error("error making derive token RPC", "error", err)
2016-10-23 01:20:50 +00:00
return nil, fmt.Errorf("DeriveVaultToken RPC failed: %v", err)
}
if resp.Error != nil {
vlogger.Error("error deriving vault tokens", "error", resp.Error)
return nil, structs.NewWrappedServerError(resp.Error)
}
if resp.Tasks == nil {
vlogger.Error("error derivng vault token", "error", "invalid response")
return nil, fmt.Errorf("failed to derive vault tokens: invalid response")
}
unwrappedTokens := make(map[string]string)
// Retrieve the wrapped tokens from the response and unwrap it
for _, taskName := range verifiedTasks {
// Get the wrapped token
wrappedToken, ok := resp.Tasks[taskName]
if !ok {
vlogger.Error("wrapped token missing for task", "task_name", taskName)
return nil, fmt.Errorf("wrapped token missing for task %q", taskName)
}
// Unwrap the vault token
unwrapResp, err := vclient.Logical().Unwrap(wrappedToken)
if err != nil {
2018-04-03 21:29:22 +00:00
if structs.VaultUnrecoverableError.MatchString(err.Error()) {
return nil, err
}
// The error is recoverable
return nil, structs.NewRecoverableError(
fmt.Errorf("failed to unwrap the token for task %q: %v", taskName, err), true)
}
// Validate the response
var validationErr error
if unwrapResp == nil {
validationErr = fmt.Errorf("Vault returned nil secret when unwrapping")
} else if unwrapResp.Auth == nil {
validationErr = fmt.Errorf("Vault returned unwrap secret with nil Auth. Secret warnings: %v", unwrapResp.Warnings)
} else if unwrapResp.Auth.ClientToken == "" {
validationErr = fmt.Errorf("Vault returned unwrap secret with empty Auth.ClientToken. Secret warnings: %v", unwrapResp.Warnings)
}
2018-04-03 21:29:22 +00:00
if validationErr != nil {
vlogger.Warn("error unwrapping token", "error", err)
2018-04-03 21:29:22 +00:00
return nil, structs.NewRecoverableError(validationErr, true)
}
// Append the unwrapped token to the return value
unwrappedTokens[taskName] = unwrapResp.Auth.ClientToken
}
return unwrappedTokens, nil
}
2018-03-11 17:39:04 +00:00
// triggerDiscovery causes a Consul discovery to begin (if one hasn't already)
func (c *Client) triggerDiscovery() {
select {
case c.triggerDiscoveryCh <- struct{}{}:
// Discovery goroutine was released to execute
default:
// Discovery goroutine was already running
}
}
// consulDiscovery waits for the signal to attempt server discovery via Consul.
// It's intended to be started in a goroutine. See triggerDiscovery() for
// causing consul discovery from other code locations.
func (c *Client) consulDiscovery() {
for {
select {
case <-c.triggerDiscoveryCh:
if err := c.consulDiscoveryImpl(); err != nil {
c.logger.Error("error discovering nomad servers", "error", err)
}
case <-c.shutdownCh:
return
}
}
}
func (c *Client) consulDiscoveryImpl() error {
consulLogger := c.logger.Named("consul")
// Acquire heartbeat lock to prevent heartbeat from running
// concurrently with discovery. Concurrent execution is safe, however
// discovery is usually triggered when heartbeating has failed so
// there's no point in allowing it.
c.heartbeatLock.Lock()
defer c.heartbeatLock.Unlock()
dcs, err := c.consulCatalog.Datacenters()
if err != nil {
return fmt.Errorf("client.consul: unable to query Consul datacenters: %v", err)
}
if len(dcs) > 2 {
// Query the local DC first, then shuffle the
// remaining DCs. Future heartbeats will cause Nomad
// Clients to fixate on their local datacenter so
// it's okay to talk with remote DCs. If the no
// Nomad servers are available within
// datacenterQueryLimit, the next heartbeat will pick
// a new set of servers so it's okay.
shuffleStrings(dcs[1:])
dcs = dcs[0:lib.MinInt(len(dcs), datacenterQueryLimit)]
}
// Query for servers in this client's region only
region := c.Region()
rpcargs := structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Region: region,
},
}
serviceName := c.configCopy.ConsulConfig.ServerServiceName
var mErr multierror.Error
2018-01-09 23:26:53 +00:00
var nomadServers servers.Servers
consulLogger.Debug("bootstrap contacting Consul DCs", "consul_dcs", dcs)
DISCOLOOP:
for _, dc := range dcs {
consulOpts := &consulapi.QueryOptions{
AllowStale: true,
Datacenter: dc,
Near: "_agent",
WaitTime: consul.DefaultQueryWaitDuration,
}
consulServices, _, err := c.consulCatalog.Service(serviceName, consul.ServiceTagRPC, consulOpts)
if err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %+q from Consul datacenter %+q: %v", serviceName, dc, err))
continue
}
for _, s := range consulServices {
port := strconv.Itoa(s.ServicePort)
addrstr := s.ServiceAddress
if addrstr == "" {
addrstr = s.Address
}
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(addrstr, port))
if err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
var peers []string
if err := c.connPool.RPC(region, addr, c.RPCMajorVersion(), "Status.Peers", rpcargs, &peers); err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
// Successfully received the Server peers list of the correct
// region
for _, p := range peers {
addr, err := net.ResolveTCPAddr("tcp", p)
if err != nil {
mErr.Errors = append(mErr.Errors, err)
}
2018-01-09 23:26:53 +00:00
srv := &servers.Server{Addr: addr}
nomadServers = append(nomadServers, srv)
}
2018-01-09 23:26:53 +00:00
if len(nomadServers) > 0 {
break DISCOLOOP
}
}
}
2018-01-09 23:26:53 +00:00
if len(nomadServers) == 0 {
if len(mErr.Errors) > 0 {
return mErr.ErrorOrNil()
}
return fmt.Errorf("no Nomad Servers advertising service %q in Consul datacenters: %+q", serviceName, dcs)
}
consulLogger.Info("discovered following servers", "servers", nomadServers)
// Fire the retry trigger if we have updated the set of servers.
if c.servers.SetServers(nomadServers) {
// Start rebalancing
c.servers.RebalanceServers()
// Notify waiting rpc calls. If a goroutine just failed an RPC call and
// isn't receiving on this chan yet they'll still retry eventually.
// This is a shortcircuit for the longer retry intervals.
c.fireRpcRetryWatcher()
}
2018-04-04 01:05:28 +00:00
return nil
}
// emitStats collects host resource usage stats periodically
func (c *Client) emitStats() {
2018-02-18 15:19:40 +00:00
// Determining NodeClass to be emitted
var emittedNodeClass string
if emittedNodeClass = c.Node().NodeClass; emittedNodeClass == "" {
emittedNodeClass = "none"
}
2017-09-04 02:56:47 +00:00
// Assign labels directly before emitting stats so the information expected
// is ready
2018-02-18 15:19:40 +00:00
c.baseLabels = []metrics.Label{
{Name: "node_id", Value: c.NodeID()},
{Name: "datacenter", Value: c.Datacenter()},
{Name: "node_class", Value: emittedNodeClass},
}
2017-09-04 02:56:47 +00:00
// Start collecting host stats right away and then keep collecting every
// collection interval
next := time.NewTimer(0)
defer next.Stop()
2016-05-09 15:55:19 +00:00
for {
select {
case <-next.C:
2016-12-12 06:58:28 +00:00
err := c.hostStatsCollector.Collect()
2016-06-03 21:23:18 +00:00
next.Reset(c.config.StatsCollectionInterval)
2016-05-09 15:55:19 +00:00
if err != nil {
c.logger.Warn("error fetching host resource usage stats", "error", err)
2016-05-25 04:44:11 +00:00
continue
2016-05-09 15:55:19 +00:00
}
2016-06-03 21:23:18 +00:00
// Publish Node metrics if operator has opted in
if c.config.PublishNodeMetrics {
c.emitHostStats()
}
c.emitClientMetrics()
2016-05-09 15:55:19 +00:00
case <-c.shutdownCh:
return
}
}
}
2016-05-31 02:02:03 +00:00
2017-09-04 02:56:47 +00:00
// setGaugeForMemoryStats proxies metrics for memory specific statistics
2017-08-31 20:00:09 +00:00
func (c *Client) setGaugeForMemoryStats(nodeID string, hStats *stats.HostStats) {
if !c.config.DisableTaggedMetrics {
metrics.SetGaugeWithLabels([]string{"client", "host", "memory", "total"}, float32(hStats.Memory.Total), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "host", "memory", "available"}, float32(hStats.Memory.Available), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "host", "memory", "used"}, float32(hStats.Memory.Used), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "host", "memory", "free"}, float32(hStats.Memory.Free), c.baseLabels)
}
if c.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "total"}, float32(hStats.Memory.Total))
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "available"}, float32(hStats.Memory.Available))
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "used"}, float32(hStats.Memory.Used))
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "free"}, float32(hStats.Memory.Free))
}
}
2017-09-04 02:56:47 +00:00
// setGaugeForCPUStats proxies metrics for CPU specific statistics
2017-08-31 20:00:09 +00:00
func (c *Client) setGaugeForCPUStats(nodeID string, hStats *stats.HostStats) {
2016-05-31 02:02:03 +00:00
for _, cpu := range hStats.CPU {
if !c.config.DisableTaggedMetrics {
2017-09-26 22:26:33 +00:00
labels := append(c.baseLabels, metrics.Label{
Name: "cpu",
Value: cpu.CPU,
})
metrics.SetGaugeWithLabels([]string{"client", "host", "cpu", "total"}, float32(cpu.Total), labels)
metrics.SetGaugeWithLabels([]string{"client", "host", "cpu", "user"}, float32(cpu.User), labels)
metrics.SetGaugeWithLabels([]string{"client", "host", "cpu", "idle"}, float32(cpu.Idle), labels)
metrics.SetGaugeWithLabels([]string{"client", "host", "cpu", "system"}, float32(cpu.System), labels)
}
if c.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "host", "cpu", nodeID, cpu.CPU, "total"}, float32(cpu.Total))
metrics.SetGauge([]string{"client", "host", "cpu", nodeID, cpu.CPU, "user"}, float32(cpu.User))
metrics.SetGauge([]string{"client", "host", "cpu", nodeID, cpu.CPU, "idle"}, float32(cpu.Idle))
metrics.SetGauge([]string{"client", "host", "cpu", nodeID, cpu.CPU, "system"}, float32(cpu.System))
}
2016-05-31 02:02:03 +00:00
}
}
2017-09-04 02:56:47 +00:00
// setGaugeForDiskStats proxies metrics for disk specific statistics
2017-08-31 20:00:09 +00:00
func (c *Client) setGaugeForDiskStats(nodeID string, hStats *stats.HostStats) {
2016-05-31 02:02:03 +00:00
for _, disk := range hStats.DiskStats {
if !c.config.DisableTaggedMetrics {
2017-09-26 22:26:33 +00:00
labels := append(c.baseLabels, metrics.Label{
Name: "disk",
Value: disk.Device,
})
metrics.SetGaugeWithLabels([]string{"client", "host", "disk", "size"}, float32(disk.Size), labels)
metrics.SetGaugeWithLabels([]string{"client", "host", "disk", "used"}, float32(disk.Used), labels)
metrics.SetGaugeWithLabels([]string{"client", "host", "disk", "available"}, float32(disk.Available), labels)
metrics.SetGaugeWithLabels([]string{"client", "host", "disk", "used_percent"}, float32(disk.UsedPercent), labels)
metrics.SetGaugeWithLabels([]string{"client", "host", "disk", "inodes_percent"}, float32(disk.InodesUsedPercent), labels)
}
if c.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "size"}, float32(disk.Size))
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "used"}, float32(disk.Used))
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "available"}, float32(disk.Available))
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "used_percent"}, float32(disk.UsedPercent))
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "inodes_percent"}, float32(disk.InodesUsedPercent))
}
2016-05-31 02:02:03 +00:00
}
}
2017-09-04 02:56:47 +00:00
// setGaugeForAllocationStats proxies metrics for allocation specific statistics
2017-08-31 20:00:09 +00:00
func (c *Client) setGaugeForAllocationStats(nodeID string) {
2017-09-11 15:32:37 +00:00
c.configLock.RLock()
node := c.configCopy.Node
c.configLock.RUnlock()
2018-10-03 16:47:18 +00:00
total := node.NodeResources
res := node.ReservedResources
2018-10-04 21:33:09 +00:00
allocated := c.getAllocatedResources(node)
// Emit allocated
if !c.config.DisableTaggedMetrics {
2018-10-03 16:47:18 +00:00
metrics.SetGaugeWithLabels([]string{"client", "allocated", "memory"}, float32(allocated.Flattened.Memory.MemoryMB), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocated", "disk"}, float32(allocated.Shared.DiskMB), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocated", "cpu"}, float32(allocated.Flattened.Cpu.CpuShares), c.baseLabels)
}
if c.config.BackwardsCompatibleMetrics {
2018-10-03 16:47:18 +00:00
metrics.SetGauge([]string{"client", "allocated", "memory", nodeID}, float32(allocated.Flattened.Memory.MemoryMB))
metrics.SetGauge([]string{"client", "allocated", "disk", nodeID}, float32(allocated.Shared.DiskMB))
metrics.SetGauge([]string{"client", "allocated", "cpu", nodeID}, float32(allocated.Flattened.Cpu.CpuShares))
}
2018-10-03 16:47:18 +00:00
for _, n := range allocated.Flattened.Networks {
if !c.config.DisableTaggedMetrics {
2017-09-26 22:26:33 +00:00
labels := append(c.baseLabels, metrics.Label{
Name: "device",
Value: n.Device,
})
metrics.SetGaugeWithLabels([]string{"client", "allocated", "network"}, float32(n.MBits), labels)
}
if c.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "allocated", "network", n.Device, nodeID}, float32(n.MBits))
}
}
// Emit unallocated
2018-10-03 16:47:18 +00:00
unallocatedMem := total.Memory.MemoryMB - res.Memory.MemoryMB - allocated.Flattened.Memory.MemoryMB
unallocatedDisk := total.Disk.DiskMB - res.Disk.DiskMB - allocated.Shared.DiskMB
2018-10-04 21:33:09 +00:00
unallocatedCpu := total.Cpu.CpuShares - res.Cpu.CpuShares - allocated.Flattened.Cpu.CpuShares
if !c.config.DisableTaggedMetrics {
metrics.SetGaugeWithLabels([]string{"client", "unallocated", "memory"}, float32(unallocatedMem), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "unallocated", "disk"}, float32(unallocatedDisk), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "unallocated", "cpu"}, float32(unallocatedCpu), c.baseLabels)
}
if c.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "unallocated", "memory", nodeID}, float32(unallocatedMem))
metrics.SetGauge([]string{"client", "unallocated", "disk", nodeID}, float32(unallocatedDisk))
metrics.SetGauge([]string{"client", "unallocated", "cpu", nodeID}, float32(unallocatedCpu))
}
2018-10-03 16:47:18 +00:00
totalComparable := total.Comparable()
for _, n := range totalComparable.Flattened.Networks {
// Determined the used resources
var usedMbits int
totalIdx := allocated.Flattened.Networks.NetIndex(n)
if totalIdx != -1 {
2018-10-03 16:47:18 +00:00
usedMbits = allocated.Flattened.Networks[totalIdx].MBits
}
2018-10-03 16:47:18 +00:00
unallocatedMbits := n.MBits - usedMbits
if !c.config.DisableTaggedMetrics {
2017-09-26 22:26:33 +00:00
labels := append(c.baseLabels, metrics.Label{
Name: "device",
Value: n.Device,
})
metrics.SetGaugeWithLabels([]string{"client", "unallocated", "network"}, float32(unallocatedMbits), labels)
}
if c.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "unallocated", "network", n.Device, nodeID}, float32(unallocatedMbits))
}
}
}
2018-03-11 18:21:44 +00:00
// No labels are required so we emit with only a key/value syntax
2017-08-31 20:00:09 +00:00
func (c *Client) setGaugeForUptime(hStats *stats.HostStats) {
if !c.config.DisableTaggedMetrics {
metrics.SetGaugeWithLabels([]string{"client", "uptime"}, float32(hStats.Uptime), c.baseLabels)
}
if c.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "uptime"}, float32(hStats.Uptime))
}
}
// emitHostStats pushes host resource usage stats to remote metrics collection sinks
func (c *Client) emitHostStats() {
nodeID := c.NodeID()
2017-08-31 20:00:09 +00:00
hStats := c.hostStatsCollector.Stats()
c.setGaugeForMemoryStats(nodeID, hStats)
c.setGaugeForUptime(hStats)
c.setGaugeForCPUStats(nodeID, hStats)
c.setGaugeForDiskStats(nodeID, hStats)
}
// emitClientMetrics emits lower volume client metrics
func (c *Client) emitClientMetrics() {
nodeID := c.NodeID()
c.setGaugeForAllocationStats(nodeID)
// Emit allocation metrics
blocked, migrating, pending, running, terminal := 0, 0, 0, 0, 0
for _, ar := range c.getAllocRunners() {
switch ar.Alloc().ClientStatus {
case structs.AllocClientStatusPending:
switch {
case ar.IsWaiting():
blocked++
case ar.IsMigrating():
migrating++
default:
pending++
}
case structs.AllocClientStatusRunning:
running++
case structs.AllocClientStatusComplete, structs.AllocClientStatusFailed:
terminal++
}
}
if !c.config.DisableTaggedMetrics {
metrics.SetGaugeWithLabels([]string{"client", "allocations", "migrating"}, float32(migrating), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocations", "blocked"}, float32(blocked), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocations", "pending"}, float32(pending), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocations", "running"}, float32(running), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocations", "terminal"}, float32(terminal), c.baseLabels)
}
if c.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "allocations", "migrating", nodeID}, float32(migrating))
metrics.SetGauge([]string{"client", "allocations", "blocked", nodeID}, float32(blocked))
metrics.SetGauge([]string{"client", "allocations", "pending", nodeID}, float32(pending))
metrics.SetGauge([]string{"client", "allocations", "running", nodeID}, float32(running))
metrics.SetGauge([]string{"client", "allocations", "terminal", nodeID}, float32(terminal))
}
}
2018-10-04 21:33:09 +00:00
func (c *Client) getAllocatedResources(selfNode *structs.Node) *structs.ComparableResources {
2018-10-03 16:47:18 +00:00
// Unfortunately the allocs only have IP so we need to match them to the
// device
cidrToDevice := make(map[*net.IPNet]string, len(selfNode.Resources.Networks))
for _, n := range selfNode.NodeResources.Networks {
_, ipnet, err := net.ParseCIDR(n.CIDR)
if err != nil {
continue
}
cidrToDevice[ipnet] = n.Device
}
// Sum the allocated resources
allocs := c.allAllocs()
2018-10-04 21:33:09 +00:00
var allocated structs.ComparableResources
2018-10-03 16:47:18 +00:00
allocatedDeviceMbits := make(map[string]int)
for _, alloc := range allocs {
if alloc.TerminalStatus() {
continue
}
// Add the resources
// COMPAT(0.11): Just use the allocated resources
allocated.Add(alloc.ComparableResources())
// Add the used network
if alloc.AllocatedResources != nil {
for _, tr := range alloc.AllocatedResources.Tasks {
for _, allocatedNetwork := range tr.Networks {
for cidr, dev := range cidrToDevice {
ip := net.ParseIP(allocatedNetwork.IP)
if cidr.Contains(ip) {
allocatedDeviceMbits[dev] += allocatedNetwork.MBits
break
}
}
}
}
} else if alloc.Resources != nil {
for _, allocatedNetwork := range alloc.Resources.Networks {
for cidr, dev := range cidrToDevice {
ip := net.ParseIP(allocatedNetwork.IP)
if cidr.Contains(ip) {
allocatedDeviceMbits[dev] += allocatedNetwork.MBits
break
}
}
}
}
}
// Clear the networks
allocated.Flattened.Networks = nil
for dev, speed := range allocatedDeviceMbits {
net := &structs.NetworkResource{
Device: dev,
MBits: speed,
}
allocated.Flattened.Networks = append(allocated.Flattened.Networks, net)
}
return &allocated
}
// allAllocs returns all the allocations managed by the client
func (c *Client) allAllocs() map[string]*structs.Allocation {
ars := c.getAllocRunners()
allocs := make(map[string]*structs.Allocation, len(ars))
for _, ar := range ars {
a := ar.Alloc()
allocs[a.ID] = a
}
return allocs
2016-05-31 02:02:03 +00:00
}
// GetTaskEventHandler returns an event handler for the given allocID and task name
func (c *Client) GetTaskEventHandler(allocID, taskName string) drivermanager.EventHandler {
c.allocLock.RLock()
defer c.allocLock.RUnlock()
if ar, ok := c.allocs[allocID]; ok {
return ar.GetTaskEventHandler(taskName)
}
return nil
}
// group wraps a func() in a goroutine and provides a way to block until it
// exits. Inspired by https://godoc.org/golang.org/x/sync/errgroup
type group struct {
wg sync.WaitGroup
}
// Go starts f in a goroutine and must be called before Wait.
func (g *group) Go(f func()) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
f()
}()
}
func (c *group) AddCh(ch <-chan struct{}) {
c.Go(func() {
<-ch
})
}
// Wait for all goroutines to exit. Must be called after all calls to Go
// complete.
func (g *group) Wait() {
g.wg.Wait()
}