open-nomad/command/agent/agent.go

827 lines
24 KiB
Go
Raw Normal View History

2015-08-16 20:54:49 +00:00
package agent
2015-08-16 23:40:04 +00:00
import (
2015-08-24 00:40:27 +00:00
"fmt"
2015-08-16 23:40:04 +00:00
"io"
"log"
2015-08-31 01:10:23 +00:00
"net"
"os"
2015-08-31 01:10:23 +00:00
"path/filepath"
"runtime"
"strings"
2015-08-16 23:40:04 +00:00
"sync"
"sync/atomic"
"time"
2015-08-16 21:34:38 +00:00
metrics "github.com/armon/go-metrics"
"github.com/hashicorp/consul/api"
version "github.com/hashicorp/go-version"
2015-08-23 23:53:15 +00:00
"github.com/hashicorp/nomad/client"
clientconfig "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/command/agent/consul"
2015-08-16 23:40:04 +00:00
"github.com/hashicorp/nomad/nomad"
2015-08-31 01:10:23 +00:00
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
2015-08-16 23:40:04 +00:00
)
const (
2017-09-26 22:26:33 +00:00
agentHttpCheckInterval = 10 * time.Second
agentHttpCheckTimeout = 5 * time.Second
serverRpcCheckInterval = 10 * time.Second
serverRpcCheckTimeout = 3 * time.Second
serverSerfCheckInterval = 10 * time.Second
serverSerfCheckTimeout = 3 * time.Second
// roles used in identifying Consul entries for Nomad agents
consulRoleServer = "server"
consulRoleClient = "client"
)
2015-08-16 23:40:04 +00:00
// Agent is a long running daemon that is used to run both
// clients and servers. Servers are responsible for managing
// state and making scheduling decisions. Clients can be
// scheduled to, and are responsible for interfacing with
// servers to run allocations.
2015-08-16 20:54:49 +00:00
type Agent struct {
config *Config
configLock sync.Mutex
2015-08-16 23:40:04 +00:00
logger *log.Logger
logOutput io.Writer
// consulService is Nomad's custom Consul client for managing services
// and checks.
consulService *consul.ServiceClient
// consulCatalog is the subset of Consul's Catalog API Nomad uses.
consulCatalog consul.CatalogAPI
// consulSupportsTLSSkipVerify flags whether or not Nomad can register
// checks with TLSSkipVerify
consulSupportsTLSSkipVerify bool
client *client.Client
server *nomad.Server
2015-08-16 23:40:04 +00:00
shutdown bool
shutdownCh chan struct{}
2015-08-16 23:40:04 +00:00
shutdownLock sync.Mutex
InmemSink *metrics.InmemSink
2015-08-16 20:54:49 +00:00
}
2015-08-16 23:40:04 +00:00
// NewAgent is used to create a new agent with the given configuration
func NewAgent(config *Config, logOutput io.Writer, inmem *metrics.InmemSink) (*Agent, error) {
2015-08-16 23:40:04 +00:00
a := &Agent{
config: config,
2016-06-22 22:20:56 +00:00
logger: log.New(logOutput, "", log.LstdFlags|log.Lmicroseconds),
2015-08-16 23:40:04 +00:00
logOutput: logOutput,
shutdownCh: make(chan struct{}),
InmemSink: inmem,
2015-08-16 23:40:04 +00:00
}
2015-08-24 00:40:27 +00:00
if err := a.setupConsul(config.Consul); err != nil {
return nil, fmt.Errorf("Failed to initialize Consul client: %v", err)
}
2015-08-24 00:40:27 +00:00
if err := a.setupServer(); err != nil {
return nil, err
}
if err := a.setupClient(); err != nil {
return nil, err
}
if a.client == nil && a.server == nil {
return nil, fmt.Errorf("must have at least client or server mode enabled")
}
2015-08-16 21:34:38 +00:00
return a, nil
}
// convertServerConfig takes an agent config and log output and returns a Nomad
// Config.
func convertServerConfig(agentConfig *Config, logOutput io.Writer) (*nomad.Config, error) {
conf := agentConfig.NomadConfig
2015-09-06 01:41:00 +00:00
if conf == nil {
conf = nomad.DefaultConfig()
}
conf.LogOutput = logOutput
conf.DevMode = agentConfig.DevMode
conf.Build = agentConfig.Version.VersionNumber()
if agentConfig.Region != "" {
conf.Region = agentConfig.Region
2015-08-31 01:14:40 +00:00
}
// Set the Authoritative Region if set, otherwise default to
// the same as the local region.
if agentConfig.Server.AuthoritativeRegion != "" {
conf.AuthoritativeRegion = agentConfig.Server.AuthoritativeRegion
} else if agentConfig.Region != "" {
conf.AuthoritativeRegion = agentConfig.Region
}
if agentConfig.Datacenter != "" {
conf.Datacenter = agentConfig.Datacenter
2015-08-31 01:14:40 +00:00
}
if agentConfig.NodeName != "" {
conf.NodeName = agentConfig.NodeName
2015-08-31 01:14:40 +00:00
}
if agentConfig.Server.BootstrapExpect > 0 {
if agentConfig.Server.BootstrapExpect == 1 {
conf.Bootstrap = true
} else {
atomic.StoreInt32(&conf.BootstrapExpect, int32(agentConfig.Server.BootstrapExpect))
}
2015-08-31 01:10:23 +00:00
}
if agentConfig.DataDir != "" {
conf.DataDir = filepath.Join(agentConfig.DataDir, "server")
2015-08-31 01:10:23 +00:00
}
if agentConfig.Server.DataDir != "" {
conf.DataDir = agentConfig.Server.DataDir
2015-08-31 01:10:23 +00:00
}
if agentConfig.Server.ProtocolVersion != 0 {
conf.ProtocolVersion = uint8(agentConfig.Server.ProtocolVersion)
2015-08-31 01:10:23 +00:00
}
if agentConfig.Server.NumSchedulers != 0 {
conf.NumSchedulers = agentConfig.Server.NumSchedulers
2015-08-31 01:10:23 +00:00
}
if len(agentConfig.Server.EnabledSchedulers) != 0 {
conf.EnabledSchedulers = agentConfig.Server.EnabledSchedulers
2015-08-31 01:10:23 +00:00
}
if agentConfig.ACL.Enabled {
conf.ACLEnabled = true
}
if agentConfig.ACL.ReplicationToken != "" {
conf.ReplicationToken = agentConfig.ACL.ReplicationToken
}
2017-09-19 14:47:10 +00:00
if agentConfig.Sentinel != nil {
conf.SentinelConfig = agentConfig.Sentinel
}
// Set up the bind addresses
rpcAddr, err := net.ResolveTCPAddr("tcp", agentConfig.normalizedAddrs.RPC)
if err != nil {
return nil, fmt.Errorf("Failed to parse RPC address %q: %v", agentConfig.normalizedAddrs.RPC, err)
}
serfAddr, err := net.ResolveTCPAddr("tcp", agentConfig.normalizedAddrs.Serf)
if err != nil {
return nil, fmt.Errorf("Failed to parse Serf address %q: %v", agentConfig.normalizedAddrs.Serf, err)
}
conf.RPCAddr.Port = rpcAddr.Port
conf.RPCAddr.IP = rpcAddr.IP
conf.SerfConfig.MemberlistConfig.BindPort = serfAddr.Port
conf.SerfConfig.MemberlistConfig.BindAddr = serfAddr.IP.String()
// Set up the advertise addresses
rpcAddr, err = net.ResolveTCPAddr("tcp", agentConfig.AdvertiseAddrs.RPC)
if err != nil {
return nil, fmt.Errorf("Failed to parse RPC advertise address %q: %v", agentConfig.AdvertiseAddrs.RPC, err)
}
serfAddr, err = net.ResolveTCPAddr("tcp", agentConfig.AdvertiseAddrs.Serf)
if err != nil {
return nil, fmt.Errorf("Failed to parse Serf advertise address %q: %v", agentConfig.AdvertiseAddrs.Serf, err)
}
conf.RPCAdvertise = rpcAddr
conf.SerfConfig.MemberlistConfig.AdvertiseAddr = serfAddr.IP.String()
conf.SerfConfig.MemberlistConfig.AdvertisePort = serfAddr.Port
// Set up gc threshold and heartbeat grace period
if gcThreshold := agentConfig.Server.NodeGCThreshold; gcThreshold != "" {
dur, err := time.ParseDuration(gcThreshold)
if err != nil {
return nil, err
}
conf.NodeGCThreshold = dur
}
if gcThreshold := agentConfig.Server.JobGCThreshold; gcThreshold != "" {
dur, err := time.ParseDuration(gcThreshold)
if err != nil {
return nil, err
}
conf.JobGCThreshold = dur
}
if gcThreshold := agentConfig.Server.EvalGCThreshold; gcThreshold != "" {
dur, err := time.ParseDuration(gcThreshold)
if err != nil {
return nil, err
}
conf.EvalGCThreshold = dur
}
2017-06-29 18:29:44 +00:00
if gcThreshold := agentConfig.Server.DeploymentGCThreshold; gcThreshold != "" {
dur, err := time.ParseDuration(gcThreshold)
if err != nil {
return nil, err
}
conf.DeploymentGCThreshold = dur
}
if heartbeatGrace := agentConfig.Server.HeartbeatGrace; heartbeatGrace != 0 {
conf.HeartbeatGrace = heartbeatGrace
}
if min := agentConfig.Server.MinHeartbeatTTL; min != 0 {
conf.MinHeartbeatTTL = min
}
if maxHPS := agentConfig.Server.MaxHeartbeatsPerSecond; maxHPS != 0 {
conf.MaxHeartbeatsPerSecond = maxHPS
2016-03-04 23:44:12 +00:00
}
if *agentConfig.Consul.AutoAdvertise && agentConfig.Consul.ServerServiceName == "" {
return nil, fmt.Errorf("server_service_name must be set when auto_advertise is enabled")
}
2016-08-09 21:35:40 +00:00
// Add the Consul and Vault configs
conf.ConsulConfig = agentConfig.Consul
conf.VaultConfig = agentConfig.Vault
2016-10-25 22:57:38 +00:00
// Set the TLS config
conf.TLSConfig = agentConfig.TLSConfig
2017-10-30 19:19:11 +00:00
// Setup telemetry related config
conf.StatsCollectionInterval = agentConfig.Telemetry.collectionInterval
2017-10-30 23:10:58 +00:00
conf.DisableTaggedMetrics = agentConfig.Telemetry.DisableTaggedMetrics
conf.BackwardsCompatibleMetrics = agentConfig.Telemetry.BackwardsCompatibleMetrics
2017-10-30 19:19:11 +00:00
return conf, nil
}
// serverConfig is used to generate a new server configuration struct
// for initializing a nomad server.
func (a *Agent) serverConfig() (*nomad.Config, error) {
return convertServerConfig(a.config, a.logOutput)
}
// clientConfig is used to generate a new client configuration struct
// for initializing a Nomad client.
func (a *Agent) clientConfig() (*clientconfig.Config, error) {
2015-08-24 00:40:27 +00:00
// Setup the configuration
2015-09-06 01:41:00 +00:00
conf := a.config.ClientConfig
if conf == nil {
conf = clientconfig.DefaultConfig()
2015-09-06 01:41:00 +00:00
}
2015-08-24 00:40:27 +00:00
if a.server != nil {
conf.RPCHandler = a.server
}
conf.LogOutput = a.logOutput
2017-01-09 19:21:51 +00:00
conf.LogLevel = a.config.LogLevel
2015-08-24 00:40:27 +00:00
conf.DevMode = a.config.DevMode
2015-08-31 01:14:40 +00:00
if a.config.Region != "" {
conf.Region = a.config.Region
}
if a.config.DataDir != "" {
conf.StateDir = filepath.Join(a.config.DataDir, "client")
conf.AllocDir = filepath.Join(a.config.DataDir, "alloc")
}
if a.config.Client.StateDir != "" {
conf.StateDir = a.config.Client.StateDir
}
if a.config.Client.AllocDir != "" {
conf.AllocDir = a.config.Client.AllocDir
}
2015-08-31 01:10:23 +00:00
conf.Servers = a.config.Client.Servers
if a.config.Client.NetworkInterface != "" {
conf.NetworkInterface = a.config.Client.NetworkInterface
}
conf.ChrootEnv = a.config.Client.ChrootEnv
conf.Options = a.config.Client.Options
// Logging deprecation messages about consul related configuration in client
// options
var invalidConsulKeys []string
for key := range conf.Options {
if strings.HasPrefix(key, "consul") {
invalidConsulKeys = append(invalidConsulKeys, fmt.Sprintf("options.%s", key))
}
}
if len(invalidConsulKeys) > 0 {
a.logger.Printf("[WARN] agent: Invalid keys: %v", strings.Join(invalidConsulKeys, ","))
a.logger.Printf(`Nomad client ignores consul related configuration in client options.
Please refer to the guide https://www.nomadproject.io/docs/agent/configuration/consul.html
to configure Nomad to work with Consul.`)
}
2015-10-03 00:32:11 +00:00
if a.config.Client.NetworkSpeed != 0 {
conf.NetworkSpeed = a.config.Client.NetworkSpeed
}
if a.config.Client.CpuCompute != 0 {
conf.CpuCompute = a.config.Client.CpuCompute
}
if a.config.Client.MaxKillTimeout != "" {
dur, err := time.ParseDuration(a.config.Client.MaxKillTimeout)
if err != nil {
2016-11-15 23:28:21 +00:00
return nil, fmt.Errorf("Error parsing max kill timeout: %s", err)
}
conf.MaxKillTimeout = dur
}
conf.ClientMaxPort = uint(a.config.Client.ClientMaxPort)
conf.ClientMinPort = uint(a.config.Client.ClientMinPort)
2015-08-31 01:10:23 +00:00
// Setup the node
conf.Node = new(structs.Node)
conf.Node.Datacenter = a.config.Datacenter
conf.Node.Name = a.config.NodeName
conf.Node.Meta = a.config.Client.Meta
conf.Node.NodeClass = a.config.Client.NodeClass
// Set up the HTTP advertise address
conf.Node.HTTPAddr = a.config.AdvertiseAddrs.HTTP
2016-03-14 02:05:41 +00:00
// Reserve resources on the node.
r := conf.Node.Reserved
if r == nil {
r = new(structs.Resources)
conf.Node.Reserved = r
}
r.CPU = a.config.Client.Reserved.CPU
r.MemoryMB = a.config.Client.Reserved.MemoryMB
r.DiskMB = a.config.Client.Reserved.DiskMB
r.IOPS = a.config.Client.Reserved.IOPS
conf.GloballyReservedPorts = a.config.Client.Reserved.ParsedReservedPorts
conf.Version = a.config.Version
2017-01-18 23:55:14 +00:00
if *a.config.Consul.AutoAdvertise && a.config.Consul.ClientServiceName == "" {
return nil, fmt.Errorf("client_service_name must be set when auto_advertise is enabled")
}
conf.ConsulConfig = a.config.Consul
2016-08-09 22:00:50 +00:00
conf.VaultConfig = a.config.Vault
// Set up Telemetry configuration
conf.StatsCollectionInterval = a.config.Telemetry.collectionInterval
conf.PublishNodeMetrics = a.config.Telemetry.PublishNodeMetrics
conf.PublishAllocationMetrics = a.config.Telemetry.PublishAllocationMetrics
conf.DisableTaggedMetrics = a.config.Telemetry.DisableTaggedMetrics
conf.BackwardsCompatibleMetrics = a.config.Telemetry.BackwardsCompatibleMetrics
// Set the TLS related configs
2016-10-25 22:57:38 +00:00
conf.TLSConfig = a.config.TLSConfig
conf.Node.TLSEnabled = conf.TLSConfig.EnableHTTP
2017-01-31 23:32:20 +00:00
// Set the GC related configs
conf.GCInterval = a.config.Client.GCInterval
conf.GCParallelDestroys = a.config.Client.GCParallelDestroys
2017-01-31 23:32:20 +00:00
conf.GCDiskUsageThreshold = a.config.Client.GCDiskUsageThreshold
conf.GCInodeUsageThreshold = a.config.Client.GCInodeUsageThreshold
conf.GCMaxAllocs = a.config.Client.GCMaxAllocs
if a.config.Client.NoHostUUID != nil {
conf.NoHostUUID = *a.config.Client.NoHostUUID
} else {
// Default no_host_uuid to true
conf.NoHostUUID = true
}
2017-01-31 23:32:20 +00:00
2017-08-20 00:19:38 +00:00
// Setup the ACLs
conf.ACLEnabled = a.config.ACL.Enabled
conf.ACLTokenTTL = a.config.ACL.TokenTTL
conf.ACLPolicyTTL = a.config.ACL.PolicyTTL
return conf, nil
}
// setupServer is used to setup the server if enabled
func (a *Agent) setupServer() error {
if !a.config.Server.Enabled {
return nil
}
// Setup the configuration
conf, err := a.serverConfig()
if err != nil {
return fmt.Errorf("server config setup failed: %s", err)
}
// Sets up the keyring for gossip encryption
if err := a.setupKeyrings(conf); err != nil {
return fmt.Errorf("failed to configure keyring: %v", err)
}
// Create the server
server, err := nomad.NewServer(conf, a.consulCatalog, a.logger)
if err != nil {
return fmt.Errorf("server setup failed: %v", err)
}
a.server = server
// Consul check addresses default to bind but can be toggled to use advertise
rpcCheckAddr := a.config.normalizedAddrs.RPC
serfCheckAddr := a.config.normalizedAddrs.Serf
2017-01-18 23:55:14 +00:00
if *a.config.Consul.ChecksUseAdvertise {
rpcCheckAddr = a.config.AdvertiseAddrs.RPC
serfCheckAddr = a.config.AdvertiseAddrs.Serf
}
// Create the Nomad Server services for Consul
2017-01-18 23:55:14 +00:00
if *a.config.Consul.AutoAdvertise {
2016-06-13 23:29:07 +00:00
httpServ := &structs.Service{
Name: a.config.Consul.ServerServiceName,
PortLabel: a.config.AdvertiseAddrs.HTTP,
2016-06-13 23:29:07 +00:00
Tags: []string{consul.ServiceTagHTTP},
}
const isServer = true
if check := a.agentHTTPCheck(isServer); check != nil {
httpServ.Checks = []*structs.ServiceCheck{check}
}
2016-06-13 23:29:07 +00:00
rpcServ := &structs.Service{
Name: a.config.Consul.ServerServiceName,
PortLabel: a.config.AdvertiseAddrs.RPC,
2016-06-13 23:29:07 +00:00
Tags: []string{consul.ServiceTagRPC},
Checks: []*structs.ServiceCheck{
2017-09-26 22:26:33 +00:00
{
Name: "Nomad Server RPC Check",
Type: "tcp",
Interval: serverRpcCheckInterval,
Timeout: serverRpcCheckTimeout,
PortLabel: rpcCheckAddr,
},
},
2016-06-13 23:29:07 +00:00
}
serfServ := &structs.Service{
Name: a.config.Consul.ServerServiceName,
PortLabel: a.config.AdvertiseAddrs.Serf,
2016-06-13 23:29:07 +00:00
Tags: []string{consul.ServiceTagSerf},
Checks: []*structs.ServiceCheck{
2017-09-26 22:26:33 +00:00
{
Name: "Nomad Server Serf Check",
Type: "tcp",
Interval: serverSerfCheckInterval,
Timeout: serverSerfCheckTimeout,
PortLabel: serfCheckAddr,
},
},
2016-06-13 23:29:07 +00:00
}
2016-11-04 00:33:58 +00:00
// Add the http port check if TLS isn't enabled
consulServices := []*structs.Service{
rpcServ,
serfServ,
httpServ,
}
if err := a.consulService.RegisterAgent(consulRoleServer, consulServices); err != nil {
return err
2016-11-04 00:33:58 +00:00
}
}
return nil
}
// setupKeyrings is used to initialize and load keyrings during agent startup
func (a *Agent) setupKeyrings(config *nomad.Config) error {
file := filepath.Join(a.config.DataDir, serfKeyring)
if a.config.Server.EncryptKey == "" {
goto LOAD
}
if _, err := os.Stat(file); err != nil {
if err := initKeyring(file, a.config.Server.EncryptKey); err != nil {
return err
}
}
LOAD:
if _, err := os.Stat(file); err == nil {
config.SerfConfig.KeyringFile = file
}
if err := loadKeyringFile(config.SerfConfig); err != nil {
return err
}
// Success!
return nil
}
// setupClient is used to setup the client if enabled
func (a *Agent) setupClient() error {
if !a.config.Client.Enabled {
return nil
}
// Setup the configuration
conf, err := a.clientConfig()
if err != nil {
return fmt.Errorf("client setup failed: %v", err)
}
2015-08-31 01:10:23 +00:00
// Reserve some ports for the plugins if we are on Windows
if runtime.GOOS == "windows" {
if err := a.reservePortsForClient(conf); err != nil {
return err
}
2016-02-05 23:17:15 +00:00
}
client, err := client.NewClient(conf, a.consulCatalog, a.consulService, a.logger)
2015-08-24 00:40:27 +00:00
if err != nil {
return fmt.Errorf("client setup failed: %v", err)
}
a.client = client
2016-06-14 00:32:18 +00:00
// Create the Nomad Client services for Consul
2017-01-18 23:55:14 +00:00
if *a.config.Consul.AutoAdvertise {
2016-06-13 23:29:07 +00:00
httpServ := &structs.Service{
Name: a.config.Consul.ClientServiceName,
PortLabel: a.config.AdvertiseAddrs.HTTP,
2016-06-13 23:29:07 +00:00
Tags: []string{consul.ServiceTagHTTP},
}
const isServer = false
if check := a.agentHTTPCheck(isServer); check != nil {
httpServ.Checks = []*structs.ServiceCheck{check}
2016-11-04 00:33:58 +00:00
}
if err := a.consulService.RegisterAgent(consulRoleClient, []*structs.Service{httpServ}); err != nil {
return err
}
}
2015-08-24 00:40:27 +00:00
return nil
}
// agentHTTPCheck returns a health check for the agent's HTTP API if possible.
// If no HTTP health check can be supported nil is returned.
func (a *Agent) agentHTTPCheck(server bool) *structs.ServiceCheck {
// Resolve the http check address
httpCheckAddr := a.config.normalizedAddrs.HTTP
if *a.config.Consul.ChecksUseAdvertise {
httpCheckAddr = a.config.AdvertiseAddrs.HTTP
}
check := structs.ServiceCheck{
Name: "Nomad Client HTTP Check",
Type: "http",
2017-10-13 22:37:44 +00:00
Path: "/v1/agent/health?type=client",
Protocol: "http",
2017-09-26 22:26:33 +00:00
Interval: agentHttpCheckInterval,
Timeout: agentHttpCheckTimeout,
PortLabel: httpCheckAddr,
}
// Switch to endpoint that doesn't require a leader for servers
if server {
check.Name = "Nomad Server HTTP Check"
2017-10-13 22:37:44 +00:00
check.Path = "/v1/agent/health?type=server"
}
if !a.config.TLSConfig.EnableHTTP {
// No HTTPS, return a plain http check
return &check
}
if !a.consulSupportsTLSSkipVerify {
a.logger.Printf("[WARN] agent: not registering Nomad HTTPS Health Check because it requires Consul>=0.7.2")
return nil
}
if a.config.TLSConfig.VerifyHTTPSClient {
a.logger.Printf("[WARN] agent: not registering Nomad HTTPS Health Check because verify_https_client enabled")
return nil
}
// HTTPS enabled; skip verification
check.Protocol = "https"
check.TLSSkipVerify = true
return &check
}
2016-06-01 10:08:39 +00:00
// reservePortsForClient reserves a range of ports for the client to use when
2016-02-12 22:25:32 +00:00
// it creates various plugins for log collection, executors, drivers, etc
func (a *Agent) reservePortsForClient(conf *clientconfig.Config) error {
2016-02-12 22:25:32 +00:00
// finding the device name for loopback
deviceName, addr, mask, err := a.findLoopbackDevice()
if err != nil {
return fmt.Errorf("error finding the device name for loopback: %v", err)
}
2016-02-12 22:25:32 +00:00
// seeing if the user has already reserved some resources on this device
var nr *structs.NetworkResource
if conf.Node.Reserved == nil {
conf.Node.Reserved = &structs.Resources{}
}
for _, n := range conf.Node.Reserved.Networks {
if n.Device == deviceName {
nr = n
}
}
2016-02-12 22:25:32 +00:00
// If the user hasn't already created the device, we create it
if nr == nil {
nr = &structs.NetworkResource{
Device: deviceName,
IP: addr,
CIDR: mask,
ReservedPorts: make([]structs.Port, 0),
}
}
2016-02-12 22:25:32 +00:00
// appending the port ranges we want to use for the client to the list of
// reserved ports for this device
for i := conf.ClientMinPort; i <= conf.ClientMaxPort; i++ {
nr.ReservedPorts = append(nr.ReservedPorts, structs.Port{Label: fmt.Sprintf("plugin-%d", i), Value: int(i)})
}
conf.Node.Reserved.Networks = append(conf.Node.Reserved.Networks, nr)
return nil
}
2016-02-12 22:25:32 +00:00
// findLoopbackDevice iterates through all the interfaces on a machine and
// returns the ip addr, mask of the loopback device
func (a *Agent) findLoopbackDevice() (string, string, string, error) {
2016-02-05 23:17:15 +00:00
var ifcs []net.Interface
var err error
ifcs, err = net.Interfaces()
if err != nil {
return "", "", "", err
2016-02-05 23:17:15 +00:00
}
for _, ifc := range ifcs {
addrs, err := ifc.Addrs()
if err != nil {
2016-02-12 22:25:32 +00:00
return "", "", "", err
2016-02-05 23:17:15 +00:00
}
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
if ip.IsLoopback() {
2016-02-12 22:25:32 +00:00
if ip.To4() == nil {
continue
}
return ifc.Name, ip.String(), addr.String(), nil
2016-02-05 23:17:15 +00:00
}
}
}
2016-02-12 22:25:32 +00:00
return "", "", "", fmt.Errorf("no loopback devices with IPV4 addr found")
2016-02-05 23:17:15 +00:00
}
2015-08-16 23:40:04 +00:00
// Leave is used gracefully exit. Clients will inform servers
// of their departure so that allocations can be rescheduled.
2015-08-16 20:54:49 +00:00
func (a *Agent) Leave() error {
2015-08-23 23:53:15 +00:00
if a.client != nil {
if err := a.client.Leave(); err != nil {
a.logger.Printf("[ERR] agent: client leave failed: %v", err)
}
}
if a.server != nil {
if err := a.server.Leave(); err != nil {
a.logger.Printf("[ERR] agent: server leave failed: %v", err)
}
}
2015-08-16 20:54:49 +00:00
return nil
}
2015-08-16 23:40:04 +00:00
// Shutdown is used to terminate the agent.
2015-08-16 20:54:49 +00:00
func (a *Agent) Shutdown() error {
2015-08-16 23:40:04 +00:00
a.shutdownLock.Lock()
defer a.shutdownLock.Unlock()
if a.shutdown {
return nil
}
a.logger.Println("[INFO] agent: requesting shutdown")
2015-08-23 23:53:15 +00:00
if a.client != nil {
if err := a.client.Shutdown(); err != nil {
a.logger.Printf("[ERR] agent: client shutdown failed: %v", err)
}
}
2015-08-16 23:40:04 +00:00
if a.server != nil {
2015-08-23 23:53:15 +00:00
if err := a.server.Shutdown(); err != nil {
a.logger.Printf("[ERR] agent: server shutdown failed: %v", err)
}
2015-08-16 23:40:04 +00:00
}
if err := a.consulService.Shutdown(); err != nil {
a.logger.Printf("[ERR] agent: shutting down Consul client failed: %v", err)
}
2015-08-16 23:40:04 +00:00
a.logger.Println("[INFO] agent: shutdown complete")
a.shutdown = true
close(a.shutdownCh)
2015-08-23 23:53:15 +00:00
return nil
2015-08-16 23:40:04 +00:00
}
// RPC is used to make an RPC call to the Nomad servers
func (a *Agent) RPC(method string, args interface{}, reply interface{}) error {
if a.server != nil {
return a.server.RPC(method, args, reply)
}
return a.client.RPC(method, args, reply)
2015-08-16 20:54:49 +00:00
}
2015-08-24 00:40:27 +00:00
// Client returns the configured client or nil
func (a *Agent) Client() *client.Client {
return a.client
}
// Server returns the configured server or nil
func (a *Agent) Server() *nomad.Server {
return a.server
}
2015-08-31 01:20:00 +00:00
// Stats is used to return statistics for debugging and insight
// for various sub-systems
func (a *Agent) Stats() map[string]map[string]string {
stats := make(map[string]map[string]string)
if a.server != nil {
subStat := a.server.Stats()
for k, v := range subStat {
stats[k] = v
}
}
if a.client != nil {
subStat := a.client.Stats()
for k, v := range subStat {
stats[k] = v
}
}
return stats
}
// Reload handles configuration changes for the agent. Provides a method that
// is easier to unit test, as this action is invoked via SIGHUP.
func (a *Agent) Reload(newConfig *Config) error {
a.configLock.Lock()
defer a.configLock.Unlock()
if newConfig.TLSConfig != nil {
// TODO(chelseakomlo) In a later PR, we will introduce the ability to reload
// TLS configuration if the agent is not running with TLS enabled.
if a.config.TLSConfig != nil {
// Reload the certificates on the keyloader and on success store the
// updated TLS config. It is important to reuse the same keyloader
// as this allows us to dynamically reload configurations not only
// on the Agent but on the Server and Client too (they are
// referencing the same keyloader).
keyloader := a.config.TLSConfig.GetKeyLoader()
_, err := keyloader.LoadKeyPair(newConfig.TLSConfig.CertFile, newConfig.TLSConfig.KeyFile)
if err != nil {
return err
}
a.config.TLSConfig = newConfig.TLSConfig
a.config.TLSConfig.KeyLoader = keyloader
}
}
return nil
}
// GetConfigCopy creates a replica of the agent's config, excluding locks
func (a *Agent) GetConfig() *Config {
return a.config
}
// setupConsul creates the Consul client and starts its main Run loop.
func (a *Agent) setupConsul(consulConfig *config.ConsulConfig) error {
apiConf, err := consulConfig.ApiConfig()
if err != nil {
return err
}
client, err := api.NewClient(apiConf)
if err != nil {
return err
}
// Determine version for TLSSkipVerify
if self, err := client.Agent().Self(); err == nil {
a.consulSupportsTLSSkipVerify = consulSupportsTLSSkipVerify(self)
}
// Create Consul Catalog client for service discovery.
a.consulCatalog = client.Catalog()
// Create Consul Service client for service advertisement and checks.
a.consulService = consul.NewServiceClient(client.Agent(), a.consulSupportsTLSSkipVerify, a.logger)
// Run the Consul service client's sync'ing main loop
go a.consulService.Run()
return nil
}
var consulTLSSkipVerifyMinVersion = version.Must(version.NewVersion("0.7.2"))
// consulSupportsTLSSkipVerify returns true if Consul supports TLSSkipVerify.
func consulSupportsTLSSkipVerify(self map[string]map[string]interface{}) bool {
member, ok := self["Member"]
if !ok {
return false
}
tagsI, ok := member["Tags"]
if !ok {
return false
}
tags, ok := tagsI.(map[string]interface{})
if !ok {
return false
}
buildI, ok := tags["build"]
if !ok {
return false
}
build, ok := buildI.(string)
if !ok {
return false
}
parts := strings.SplitN(build, ":", 2)
if len(parts) != 2 {
return false
}
v, err := version.NewVersion(parts[0])
if err != nil {
return false
}
if v.LessThan(consulTLSSkipVerifyMinVersion) {
return false
}
return true
}