open-nomad/command/agent/agent.go

622 lines
18 KiB
Go
Raw Normal View History

2015-08-16 20:54:49 +00:00
package agent
2015-08-16 23:40:04 +00:00
import (
2015-08-24 00:40:27 +00:00
"fmt"
2015-08-16 23:40:04 +00:00
"io"
"log"
2015-08-31 01:10:23 +00:00
"net"
2015-08-16 23:40:04 +00:00
"os"
2015-08-31 01:10:23 +00:00
"path/filepath"
"runtime"
"strconv"
2015-08-16 23:40:04 +00:00
"sync"
"time"
2015-08-16 21:34:38 +00:00
2015-08-23 23:53:15 +00:00
"github.com/hashicorp/nomad/client"
clientconfig "github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/consul"
2015-08-16 23:40:04 +00:00
"github.com/hashicorp/nomad/nomad"
2015-08-31 01:10:23 +00:00
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
2015-08-16 23:40:04 +00:00
)
// Agent is a long running daemon that is used to run both
// clients and servers. Servers are responsible for managing
// state and making scheduling decisions. Clients can be
// scheduled to, and are responsible for interfacing with
// servers to run allocations.
2015-08-16 20:54:49 +00:00
type Agent struct {
2015-08-16 23:40:04 +00:00
config *Config
logger *log.Logger
logOutput io.Writer
// consulConfig is a limited subset of the information necessary to
// establish a connection with this Nomad Agent's Consul Agent.
consulConfig *config.ConsulConfig
// consulSyncer registers the Nomad agent with the Consul Agent
consulSyncer *consul.Syncer
client *client.Client
clientHttpAddr string
clientRpcAddr string
server *nomad.Server
serverHttpAddr string
serverRpcAddr string
2015-08-16 23:40:04 +00:00
shutdown bool
shutdownCh chan struct{}
2015-08-16 23:40:04 +00:00
shutdownLock sync.Mutex
2015-08-16 20:54:49 +00:00
}
2015-08-16 23:40:04 +00:00
// NewAgent is used to create a new agent with the given configuration
2015-08-16 21:34:38 +00:00
func NewAgent(config *Config, logOutput io.Writer) (*Agent, error) {
2015-08-16 23:40:04 +00:00
// Ensure we have a log sink
if logOutput == nil {
logOutput = os.Stderr
}
shutdownCh := make(chan struct{})
2015-08-16 23:40:04 +00:00
a := &Agent{
config: config,
logger: log.New(logOutput, "", log.LstdFlags),
logOutput: logOutput,
shutdownCh: shutdownCh,
2015-08-16 23:40:04 +00:00
}
2015-08-24 00:40:27 +00:00
if err := a.setupConsulSyncer(shutdownCh); err != nil {
return nil, err
}
2015-08-24 00:40:27 +00:00
if err := a.setupServer(); err != nil {
return nil, err
}
if err := a.setupClient(); err != nil {
return nil, err
}
if a.client == nil && a.server == nil {
return nil, fmt.Errorf("must have at least client or server mode enabled")
}
if a.config.Consul.AutoRegister {
if err := a.syncAgentServicesWithConsul(); err != nil {
2016-05-15 00:08:19 +00:00
a.logger.Printf("[ERR] agent: unable to sync agent services with consul: %v", err)
}
}
go a.consulSyncer.Run()
2015-08-16 21:34:38 +00:00
return a, nil
}
// serverConfig is used to generate a new server configuration struct
// for initializing a nomad server.
func (a *Agent) serverConfig() (*nomad.Config, error) {
2015-09-06 01:41:00 +00:00
conf := a.config.NomadConfig
if conf == nil {
conf = nomad.DefaultConfig()
}
2015-08-24 00:40:27 +00:00
conf.LogOutput = a.logOutput
conf.DevMode = a.config.DevMode
2015-08-31 01:10:23 +00:00
conf.Build = fmt.Sprintf("%s%s", a.config.Version, a.config.VersionPrerelease)
if a.config.Region != "" {
2015-08-31 01:14:40 +00:00
conf.Region = a.config.Region
}
if a.config.Datacenter != "" {
conf.Datacenter = a.config.Datacenter
}
if a.config.NodeName != "" {
conf.NodeName = a.config.NodeName
}
2015-08-31 01:10:23 +00:00
if a.config.Server.BootstrapExpect > 0 {
if a.config.Server.BootstrapExpect == 1 {
conf.Bootstrap = true
} else {
conf.BootstrapExpect = a.config.Server.BootstrapExpect
}
2015-08-31 01:10:23 +00:00
}
if a.config.DataDir != "" {
conf.DataDir = filepath.Join(a.config.DataDir, "server")
}
if a.config.Server.DataDir != "" {
conf.DataDir = a.config.Server.DataDir
}
if a.config.Server.ProtocolVersion != 0 {
conf.ProtocolVersion = uint8(a.config.Server.ProtocolVersion)
}
if a.config.Server.NumSchedulers != 0 {
conf.NumSchedulers = a.config.Server.NumSchedulers
}
if len(a.config.Server.EnabledSchedulers) != 0 {
conf.EnabledSchedulers = a.config.Server.EnabledSchedulers
}
// Set up the advertise addrs
if addr := a.config.AdvertiseAddrs.Serf; addr != "" {
serfAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, fmt.Errorf("error resolving serf advertise address: %s", err)
}
conf.SerfConfig.MemberlistConfig.AdvertiseAddr = serfAddr.IP.String()
conf.SerfConfig.MemberlistConfig.AdvertisePort = serfAddr.Port
}
if addr := a.config.AdvertiseAddrs.RPC; addr != "" {
rpcAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, fmt.Errorf("error resolving rpc advertise address: %s", err)
2015-08-31 01:10:23 +00:00
}
conf.RPCAdvertise = rpcAddr
2015-08-31 01:10:23 +00:00
}
// Set up the bind addresses
if addr := a.config.BindAddr; addr != "" {
conf.RPCAddr.IP = net.ParseIP(addr)
conf.SerfConfig.MemberlistConfig.BindAddr = addr
2015-08-31 01:10:23 +00:00
}
if addr := a.config.Addresses.RPC; addr != "" {
conf.RPCAddr.IP = net.ParseIP(addr)
}
2016-04-07 20:25:38 +00:00
if addr := a.config.Addresses.Serf; addr != "" {
conf.SerfConfig.MemberlistConfig.BindAddr = addr
}
// Set up the ports
if port := a.config.Ports.RPC; port != 0 {
conf.RPCAddr.Port = port
}
if port := a.config.Ports.Serf; port != 0 {
conf.SerfConfig.MemberlistConfig.BindPort = port
}
// Resolve the Server's HTTP Address
if a.config.AdvertiseAddrs.HTTP != "" {
a.serverHttpAddr = a.config.AdvertiseAddrs.HTTP
} else if a.config.Addresses.HTTP != "" {
a.serverHttpAddr = fmt.Sprintf("%v:%v", a.config.Addresses.HTTP, a.config.Ports.HTTP)
} else if a.config.BindAddr != "" {
a.serverHttpAddr = fmt.Sprintf("%v:%v", a.config.BindAddr, a.config.Ports.HTTP)
} else {
a.serverHttpAddr = fmt.Sprintf("%v:%v", "127.0.0.1", a.config.Ports.HTTP)
}
addr, err := net.ResolveTCPAddr("tcp", a.serverHttpAddr)
if err != nil {
return nil, fmt.Errorf("error resolving HTTP addr %q: %v:", a.serverHttpAddr, err)
}
a.serverHttpAddr = fmt.Sprintf("%s:%d", addr.IP.String(), addr.Port)
// Resolve the Server's RPC Address
if a.config.AdvertiseAddrs.RPC != "" {
a.serverRpcAddr = a.config.AdvertiseAddrs.RPC
} else if a.config.Addresses.RPC != "" {
a.serverRpcAddr = fmt.Sprintf("%v:%v", a.config.Addresses.RPC, a.config.Ports.RPC)
} else if a.config.BindAddr != "" {
a.serverRpcAddr = fmt.Sprintf("%v:%v", a.config.BindAddr, a.config.Ports.RPC)
} else {
a.serverRpcAddr = fmt.Sprintf("%v:%v", "127.0.0.1", a.config.Ports.RPC)
}
addr, err = net.ResolveTCPAddr("tcp", a.serverRpcAddr)
if err != nil {
return nil, fmt.Errorf("error resolving RPC addr %q: %v:", a.serverRpcAddr, err)
}
a.serverRpcAddr = fmt.Sprintf("%s:%d", addr.IP.String(), addr.Port)
2015-08-24 00:40:27 +00:00
if gcThreshold := a.config.Server.NodeGCThreshold; gcThreshold != "" {
dur, err := time.ParseDuration(gcThreshold)
if err != nil {
return nil, err
}
conf.NodeGCThreshold = dur
}
2016-03-04 23:44:12 +00:00
if heartbeatGrace := a.config.Server.HeartbeatGrace; heartbeatGrace != "" {
dur, err := time.ParseDuration(heartbeatGrace)
if err != nil {
return nil, err
}
conf.HeartbeatGrace = dur
}
return conf, nil
}
// clientConfig is used to generate a new client configuration struct
// for initializing a Nomad client.
func (a *Agent) clientConfig() (*clientconfig.Config, error) {
2015-08-24 00:40:27 +00:00
// Setup the configuration
2015-09-06 01:41:00 +00:00
conf := a.config.ClientConfig
if conf == nil {
conf = client.DefaultConfig()
}
2015-08-24 00:40:27 +00:00
if a.server != nil {
conf.RPCHandler = a.server
}
conf.LogOutput = a.logOutput
conf.DevMode = a.config.DevMode
2015-08-31 01:14:40 +00:00
if a.config.Region != "" {
conf.Region = a.config.Region
}
if a.config.DataDir != "" {
conf.StateDir = filepath.Join(a.config.DataDir, "client")
conf.AllocDir = filepath.Join(a.config.DataDir, "alloc")
}
if a.config.Client.StateDir != "" {
conf.StateDir = a.config.Client.StateDir
}
if a.config.Client.AllocDir != "" {
conf.AllocDir = a.config.Client.AllocDir
}
2015-08-31 01:10:23 +00:00
conf.Servers = a.config.Client.Servers
if a.config.Client.NetworkInterface != "" {
conf.NetworkInterface = a.config.Client.NetworkInterface
}
conf.Options = a.config.Client.Options
2015-10-03 00:32:11 +00:00
if a.config.Client.NetworkSpeed != 0 {
conf.NetworkSpeed = a.config.Client.NetworkSpeed
}
if a.config.Client.MaxKillTimeout != "" {
dur, err := time.ParseDuration(a.config.Client.MaxKillTimeout)
if err != nil {
return nil, fmt.Errorf("Error parsing retry interval: %s", err)
}
conf.MaxKillTimeout = dur
}
conf.ClientMaxPort = uint(a.config.Client.ClientMaxPort)
conf.ClientMinPort = uint(a.config.Client.ClientMinPort)
2015-08-31 01:10:23 +00:00
// Setup the node
conf.Node = new(structs.Node)
conf.Node.Datacenter = a.config.Datacenter
conf.Node.Name = a.config.NodeName
conf.Node.Meta = a.config.Client.Meta
conf.Node.NodeClass = a.config.Client.NodeClass
// Resolve the Client's HTTP address
if a.config.AdvertiseAddrs.HTTP != "" {
a.clientHttpAddr = a.config.AdvertiseAddrs.HTTP
} else if a.config.Addresses.HTTP != "" {
a.clientHttpAddr = fmt.Sprintf("%v:%v", a.config.Addresses.HTTP, a.config.Ports.HTTP)
} else if a.config.BindAddr != "" {
a.clientHttpAddr = fmt.Sprintf("%v:%v", a.config.BindAddr, a.config.Ports.HTTP)
} else {
a.clientHttpAddr = fmt.Sprintf("%v:%v", "127.0.0.1", a.config.Ports.HTTP)
}
addr, err := net.ResolveTCPAddr("tcp", a.clientHttpAddr)
if err != nil {
return nil, fmt.Errorf("error resolving HTTP addr %q: %v:", a.clientHttpAddr, err)
}
httpAddr := fmt.Sprintf("%s:%d", addr.IP.String(), addr.Port)
conf.Node.HTTPAddr = httpAddr
a.clientHttpAddr = httpAddr
2016-03-14 02:05:41 +00:00
// Resolve the Client's RPC address
if a.config.AdvertiseAddrs.RPC != "" {
a.clientRpcAddr = a.config.AdvertiseAddrs.RPC
} else if a.config.Addresses.RPC != "" {
a.clientRpcAddr = fmt.Sprintf("%v:%v", a.config.Addresses.RPC, a.config.Ports.RPC)
} else if a.config.BindAddr != "" {
a.clientRpcAddr = fmt.Sprintf("%v:%v", a.config.BindAddr, a.config.Ports.RPC)
} else {
a.clientRpcAddr = fmt.Sprintf("%v:%v", "127.0.0.1", a.config.Ports.RPC)
}
addr, err = net.ResolveTCPAddr("tcp", a.clientRpcAddr)
if err != nil {
return nil, fmt.Errorf("error resolving RPC addr %q: %v:", a.clientRpcAddr, err)
}
a.clientRpcAddr = fmt.Sprintf("%s:%d", addr.IP.String(), addr.Port)
2016-03-14 02:05:41 +00:00
// Reserve resources on the node.
r := conf.Node.Reserved
if r == nil {
r = new(structs.Resources)
conf.Node.Reserved = r
}
r.CPU = a.config.Client.Reserved.CPU
r.MemoryMB = a.config.Client.Reserved.MemoryMB
r.DiskMB = a.config.Client.Reserved.DiskMB
r.IOPS = a.config.Client.Reserved.IOPS
conf.GloballyReservedPorts = a.config.Client.Reserved.ParsedReservedPorts
2016-03-23 00:12:30 +00:00
conf.Version = fmt.Sprintf("%s%s", a.config.Version, a.config.VersionPrerelease)
conf.Revision = a.config.Revision
conf.ConsulConfig = a.consulConfig
conf.StatsDataPoints = a.config.Client.StatsConfig.DataPoints
conf.StatsCollectionInterval = a.config.Client.StatsConfig.collectionInterval
return conf, nil
}
// setupServer is used to setup the server if enabled
func (a *Agent) setupServer() error {
if !a.config.Server.Enabled {
return nil
}
// Setup the configuration
conf, err := a.serverConfig()
if err != nil {
return fmt.Errorf("server config setup failed: %s", err)
}
// Create the server
server, err := nomad.NewServer(conf)
if err != nil {
return fmt.Errorf("server setup failed: %v", err)
}
a.server = server
return nil
}
// setupClient is used to setup the client if enabled
func (a *Agent) setupClient() error {
if !a.config.Client.Enabled {
return nil
}
// Setup the configuration
conf, err := a.clientConfig()
if err != nil {
return fmt.Errorf("client setup failed: %v", err)
}
2015-08-31 01:10:23 +00:00
// Reserve some ports for the plugins if we are on Windows
if runtime.GOOS == "windows" {
if err := a.reservePortsForClient(conf); err != nil {
return err
}
2016-02-05 23:17:15 +00:00
}
2015-08-31 01:10:23 +00:00
// Create the client
client, err := client.NewClient(conf, a.consulSyncer)
2015-08-24 00:40:27 +00:00
if err != nil {
return fmt.Errorf("client setup failed: %v", err)
}
a.client = client
return nil
}
2016-02-12 22:25:32 +00:00
// reservePortsForClient reservers a range of ports for the client to use when
// it creates various plugins for log collection, executors, drivers, etc
func (a *Agent) reservePortsForClient(conf *clientconfig.Config) error {
2016-02-12 22:25:32 +00:00
// finding the device name for loopback
deviceName, addr, mask, err := a.findLoopbackDevice()
if err != nil {
return fmt.Errorf("error finding the device name for loopback: %v", err)
}
2016-02-12 22:25:32 +00:00
// seeing if the user has already reserved some resources on this device
var nr *structs.NetworkResource
if conf.Node.Reserved == nil {
conf.Node.Reserved = &structs.Resources{}
}
for _, n := range conf.Node.Reserved.Networks {
if n.Device == deviceName {
nr = n
}
}
2016-02-12 22:25:32 +00:00
// If the user hasn't already created the device, we create it
if nr == nil {
nr = &structs.NetworkResource{
Device: deviceName,
IP: addr,
CIDR: mask,
ReservedPorts: make([]structs.Port, 0),
}
}
2016-02-12 22:25:32 +00:00
// appending the port ranges we want to use for the client to the list of
// reserved ports for this device
for i := conf.ClientMinPort; i <= conf.ClientMaxPort; i++ {
nr.ReservedPorts = append(nr.ReservedPorts, structs.Port{Label: fmt.Sprintf("plugin-%d", i), Value: int(i)})
}
conf.Node.Reserved.Networks = append(conf.Node.Reserved.Networks, nr)
return nil
}
2016-02-12 22:25:32 +00:00
// findLoopbackDevice iterates through all the interfaces on a machine and
// returns the ip addr, mask of the loopback device
func (a *Agent) findLoopbackDevice() (string, string, string, error) {
2016-02-05 23:17:15 +00:00
var ifcs []net.Interface
var err error
ifcs, err = net.Interfaces()
if err != nil {
return "", "", "", err
2016-02-05 23:17:15 +00:00
}
for _, ifc := range ifcs {
addrs, err := ifc.Addrs()
if err != nil {
2016-02-12 22:25:32 +00:00
return "", "", "", err
2016-02-05 23:17:15 +00:00
}
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
if ip.IsLoopback() {
2016-02-12 22:25:32 +00:00
if ip.To4() == nil {
continue
}
return ifc.Name, ip.String(), addr.String(), nil
2016-02-05 23:17:15 +00:00
}
}
}
2016-02-12 22:25:32 +00:00
return "", "", "", fmt.Errorf("no loopback devices with IPV4 addr found")
2016-02-05 23:17:15 +00:00
}
2015-08-16 23:40:04 +00:00
// Leave is used gracefully exit. Clients will inform servers
// of their departure so that allocations can be rescheduled.
2015-08-16 20:54:49 +00:00
func (a *Agent) Leave() error {
2015-08-23 23:53:15 +00:00
if a.client != nil {
if err := a.client.Leave(); err != nil {
a.logger.Printf("[ERR] agent: client leave failed: %v", err)
}
}
if a.server != nil {
if err := a.server.Leave(); err != nil {
a.logger.Printf("[ERR] agent: server leave failed: %v", err)
}
}
2015-08-16 20:54:49 +00:00
return nil
}
2015-08-16 23:40:04 +00:00
// Shutdown is used to terminate the agent.
2015-08-16 20:54:49 +00:00
func (a *Agent) Shutdown() error {
2015-08-16 23:40:04 +00:00
a.shutdownLock.Lock()
defer a.shutdownLock.Unlock()
if a.shutdown {
return nil
}
a.logger.Println("[INFO] agent: requesting shutdown")
2015-08-23 23:53:15 +00:00
if a.client != nil {
if err := a.client.Shutdown(); err != nil {
a.logger.Printf("[ERR] agent: client shutdown failed: %v", err)
}
}
2015-08-16 23:40:04 +00:00
if a.server != nil {
2015-08-23 23:53:15 +00:00
if err := a.server.Shutdown(); err != nil {
a.logger.Printf("[ERR] agent: server shutdown failed: %v", err)
}
2015-08-16 23:40:04 +00:00
}
if a.consulSyncer != nil {
if err := a.consulSyncer.Shutdown(); err != nil {
a.logger.Printf("[ERR] agent: shutting down consul service failed: %v", err)
}
}
2015-08-16 23:40:04 +00:00
a.logger.Println("[INFO] agent: shutdown complete")
a.shutdown = true
close(a.shutdownCh)
2015-08-23 23:53:15 +00:00
return nil
2015-08-16 23:40:04 +00:00
}
// RPC is used to make an RPC call to the Nomad servers
func (a *Agent) RPC(method string, args interface{}, reply interface{}) error {
if a.server != nil {
return a.server.RPC(method, args, reply)
}
return a.client.RPC(method, args, reply)
2015-08-16 20:54:49 +00:00
}
2015-08-24 00:40:27 +00:00
// Client returns the configured client or nil
func (a *Agent) Client() *client.Client {
return a.client
}
// Server returns the configured server or nil
func (a *Agent) Server() *nomad.Server {
return a.server
}
2015-08-31 01:20:00 +00:00
// Stats is used to return statistics for debugging and insight
// for various sub-systems
func (a *Agent) Stats() map[string]map[string]string {
stats := make(map[string]map[string]string)
if a.server != nil {
subStat := a.server.Stats()
for k, v := range subStat {
stats[k] = v
}
}
if a.client != nil {
subStat := a.client.Stats()
for k, v := range subStat {
stats[k] = v
}
}
return stats
}
// setupConsulSyncer creates the Consul task used by this Nomad Agent when
// running in either Client and Server mode.
func (a *Agent) setupConsulSyncer(shutdownCh types.ShutdownChannel) (err error) {
cfg := &config.ConsulConfig{
Addr: a.config.Consul.Addr,
Token: a.config.Consul.Token,
Auth: a.config.Consul.Auth,
EnableSSL: a.config.Consul.EnableSSL,
VerifySSL: a.config.Consul.VerifySSL,
CAFile: a.config.Consul.CAFile,
CertFile: a.config.Consul.CertFile,
KeyFile: a.config.Consul.KeyFile,
ServerServiceName: a.config.Consul.ServerServiceName,
ClientServiceName: a.config.Consul.ClientServiceName,
}
a.consulConfig = cfg
a.consulSyncer, err = consul.NewSyncer(cfg, a.logger)
return nil
}
// syncAgentServicesWithConsul syncs this Nomad Agent's services with Consul
// when running in either Client or Server mode.
func (a *Agent) syncAgentServicesWithConsul() error {
var services []*structs.Service
if a.client != nil {
if a.config.Consul.ClientServiceName != "" {
clientRpcService := &structs.Service{
Name: a.config.Consul.ClientServiceName,
PortLabel: a.clientRpcAddr,
Tags: []string{consul.ServiceTagRpc},
}
services = append(services, clientRpcService)
clientHttpService := &structs.Service{
Name: a.config.Consul.ClientServiceName,
PortLabel: a.clientHttpAddr,
Tags: []string{consul.ServiceTagHttp},
}
services = append(services, clientHttpService)
a.consulSyncer.SetServiceIdentifier("agent-client")
}
}
if a.server != nil {
if a.config.Consul.ServerServiceName != "" {
serverRpcService := &structs.Service{
Name: a.config.Consul.ServerServiceName,
Tags: []string{consul.ServiceTagRpc},
PortLabel: a.serverRpcAddr,
}
services = append(services, serverRpcService)
serverHttpService := &structs.Service{
Name: a.config.Consul.ServerServiceName,
Tags: []string{consul.ServiceTagHttp},
PortLabel: a.serverHttpAddr,
}
services = append(services, serverHttpService)
a.consulSyncer.SetServiceIdentifier("agent-server")
}
}
a.consulSyncer.SetAddrFinder(func(portLabel string) (string, int) {
2016-05-14 07:36:26 +00:00
host, port, err := net.SplitHostPort(portLabel)
if err != nil {
return "", 0
}
2016-05-15 00:08:19 +00:00
// if the addr for the service is ":port", then we default to
// registering the service with ip as the loopback addr
if host == "" {
host = "127.0.0.1"
}
p, err := strconv.Atoi(port)
if err != nil {
return "", 0
}
return host, p
})
return a.consulSyncer.SyncServices(services)
}