open-nomad/command/agent/command.go

512 lines
14 KiB
Go

package agent
import (
"flag"
"fmt"
"io"
"os"
"os/signal"
"path/filepath"
"sort"
"strconv"
"strings"
"syscall"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/go-checkpoint"
"github.com/hashicorp/go-syslog"
"github.com/hashicorp/logutils"
"github.com/hashicorp/nomad/helper/flag-slice"
"github.com/hashicorp/nomad/helper/gated-writer"
scada "github.com/hashicorp/scada-client"
"github.com/mitchellh/cli"
)
// gracefulTimeout controls how long we wait before forcefully terminating
const gracefulTimeout = 5 * time.Second
// Command is a Command implementation that runs a Nomad agent.
// The command will not end unless a shutdown message is sent on the
// ShutdownCh. If two messages are sent on the ShutdownCh it will forcibly
// exit.
type Command struct {
Revision string
Version string
VersionPrerelease string
Ui cli.Ui
ShutdownCh <-chan struct{}
args []string
agent *Agent
httpServer *HTTPServer
logFilter *logutils.LevelFilter
logOutput io.Writer
scadaProvider *scada.Provider
scadaHttp *HTTPServer
}
func (c *Command) readConfig() *Config {
var dev bool
var configPath []string
var logLevel string
flags := flag.NewFlagSet("agent", flag.ContinueOnError)
flags.BoolVar(&dev, "dev", false, "")
flags.StringVar(&logLevel, "log-level", "info", "")
flags.Usage = func() { c.Ui.Error(c.Help()) }
flags.Var((*sliceflag.StringFlag)(&configPath), "config", "config")
if err := flags.Parse(c.args); err != nil {
return nil
}
// Load the configuration
var config *Config
if dev {
config = DevConfig()
} else {
config = DefaultConfig()
}
for _, path := range configPath {
current, err := LoadConfig(path)
if err != nil {
c.Ui.Error(fmt.Sprintf(
"Error loading configuration from %s: %s", path, err))
return nil
}
if config == nil {
config = current
} else {
config = config.Merge(current)
}
}
// Ensure the sub-structs at least exist
if config.Client == nil {
config.Client = &ClientConfig{}
}
if config.Server == nil {
config.Server = &ServerConfig{}
}
// Set the version info
config.Revision = c.Revision
config.Version = c.Version
config.VersionPrerelease = c.VersionPrerelease
return config
}
// setupLoggers is used to setup the logGate, logWriter, and our logOutput
func (c *Command) setupLoggers(config *Config) (*gatedwriter.Writer, *logWriter, io.Writer) {
// Setup logging. First create the gated log writer, which will
// store logs until we're ready to show them. Then create the level
// filter, filtering logs of the specified level.
logGate := &gatedwriter.Writer{
Writer: &cli.UiWriter{Ui: c.Ui},
}
c.logFilter = LevelFilter()
c.logFilter.MinLevel = logutils.LogLevel(strings.ToUpper(config.LogLevel))
c.logFilter.Writer = logGate
if !ValidateLevelFilter(c.logFilter.MinLevel, c.logFilter) {
c.Ui.Error(fmt.Sprintf(
"Invalid log level: %s. Valid log levels are: %v",
c.logFilter.MinLevel, c.logFilter.Levels))
return nil, nil, nil
}
// Check if syslog is enabled
var syslog io.Writer
if config.EnableSyslog {
l, err := gsyslog.NewLogger(gsyslog.LOG_NOTICE, config.SyslogFacility, "nomad")
if err != nil {
c.Ui.Error(fmt.Sprintf("Syslog setup failed: %v", err))
return nil, nil, nil
}
syslog = &SyslogWrapper{l, c.logFilter}
}
// Create a log writer, and wrap a logOutput around it
logWriter := NewLogWriter(512)
var logOutput io.Writer
if syslog != nil {
logOutput = io.MultiWriter(c.logFilter, logWriter, syslog)
} else {
logOutput = io.MultiWriter(c.logFilter, logWriter)
}
c.logOutput = logOutput
return logGate, logWriter, logOutput
}
// setupAgent is used to start the agent and various interfaces
func (c *Command) setupAgent(config *Config, logOutput io.Writer) error {
c.Ui.Output("Starting Nomad agent...")
agent, err := NewAgent(config, logOutput)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error starting agent: %s", err))
return err
}
c.agent = agent
// Enable the SCADA integration
if err := c.setupSCADA(config); err != nil {
agent.Shutdown()
c.Ui.Error(fmt.Sprintf("Error starting SCADA: %s", err))
return err
}
// Setup the HTTP server
http, err := NewHTTPServer(agent, config, logOutput)
if err != nil {
agent.Shutdown()
c.Ui.Error(fmt.Sprintf("Error starting http server: %s", err))
return err
}
c.httpServer = http
// Setup update checking
if !config.DisableUpdateCheck {
version := config.Version
if config.VersionPrerelease != "" {
version += fmt.Sprintf("-%s", config.VersionPrerelease)
}
updateParams := &checkpoint.CheckParams{
Product: "nomad",
Version: version,
}
if !config.DisableAnonymousSignature {
updateParams.SignatureFile = filepath.Join(config.DataDir, "checkpoint-signature")
}
// Schedule a periodic check with expected interval of 24 hours
checkpoint.CheckInterval(updateParams, 24*time.Hour, c.checkpointResults)
// Do an immediate check within the next 30 seconds
go func() {
time.Sleep(randomStagger(30 * time.Second))
c.checkpointResults(checkpoint.Check(updateParams))
}()
}
return nil
}
// checkpointResults is used to handler periodic results from our update checker
func (c *Command) checkpointResults(results *checkpoint.CheckResponse, err error) {
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to check for updates: %v", err))
return
}
if results.Outdated {
c.Ui.Error(fmt.Sprintf("Newer Nomad version available: %s", results.CurrentVersion))
}
for _, alert := range results.Alerts {
switch alert.Level {
case "info":
c.Ui.Info(fmt.Sprintf("Bulletin [%s]: %s (%s)", alert.Level, alert.Message, alert.URL))
default:
c.Ui.Error(fmt.Sprintf("Bulletin [%s]: %s (%s)", alert.Level, alert.Message, alert.URL))
}
}
}
func (c *Command) Run(args []string) int {
c.Ui = &cli.PrefixedUi{
OutputPrefix: "==> ",
InfoPrefix: " ",
ErrorPrefix: "==> ",
Ui: c.Ui,
}
// Parse our configs
c.args = args
config := c.readConfig()
if config == nil {
return 1
}
// Setup the log outputs
logGate, _, logOutput := c.setupLoggers(config)
if logGate == nil {
return 1
}
// Initialize the telemetry
if err := c.setupTelementry(config); err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing telemetry: %s", err))
return 1
}
// Create the agent
if err := c.setupAgent(config, logOutput); err != nil {
return 1
}
defer c.agent.Shutdown()
// Check and shut down the SCADA listeners at the end
defer func() {
if c.httpServer != nil {
c.httpServer.Shutdown()
}
if c.scadaHttp != nil {
c.scadaHttp.Shutdown()
}
if c.scadaProvider != nil {
c.scadaProvider.Shutdown()
}
}()
// Compile agent information for output later
info := make(map[string]string)
info["client"] = strconv.FormatBool(config.Client.Enabled)
info["log level"] = config.LogLevel
info["server"] = strconv.FormatBool(config.Server.Enabled)
// Sort the keys for output
infoKeys := make([]string, 0, len(info))
for key := range info {
infoKeys = append(infoKeys, key)
}
sort.Strings(infoKeys)
// Agent configuration output
padding := 18
c.Ui.Output("Nomad agent configuration:\n")
for _, k := range infoKeys {
c.Ui.Info(fmt.Sprintf(
"%s%s: %s",
strings.Repeat(" ", padding-len(k)),
strings.Title(k),
info[k]))
}
c.Ui.Output("")
// Output the header that the server has started
c.Ui.Output("Nomad agent started! Log data will stream in below:\n")
// Enable log streaming
logGate.Flush()
// Wait for exit
return c.handleSignals(config)
}
// handleSignals blocks until we get an exit-causing signal
func (c *Command) handleSignals(config *Config) int {
signalCh := make(chan os.Signal, 4)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)
// Wait for a signal
WAIT:
var sig os.Signal
select {
case s := <-signalCh:
sig = s
case <-c.ShutdownCh:
sig = os.Interrupt
}
c.Ui.Output(fmt.Sprintf("Caught signal: %v", sig))
// Check if this is a SIGHUP
if sig == syscall.SIGHUP {
if conf := c.handleReload(config); conf != nil {
config = conf
}
goto WAIT
}
// Check if we should do a graceful leave
graceful := false
if sig == os.Interrupt && !config.LeaveOnInt {
graceful = true
} else if sig == syscall.SIGTERM && config.LeaveOnTerm {
graceful = true
}
// Bail fast if not doing a graceful leave
if !graceful {
return 1
}
// Attempt a graceful leave
gracefulCh := make(chan struct{})
c.Ui.Output("Gracefully shutting down agent...")
go func() {
if err := c.agent.Leave(); err != nil {
c.Ui.Error(fmt.Sprintf("Error: %s", err))
return
}
close(gracefulCh)
}()
// Wait for leave or another signal
select {
case <-signalCh:
return 1
case <-time.After(gracefulTimeout):
return 1
case <-gracefulCh:
return 0
}
}
// handleReload is invoked when we should reload our configs, e.g. SIGHUP
func (c *Command) handleReload(config *Config) *Config {
c.Ui.Output("Reloading configuration...")
newConf := c.readConfig()
if newConf == nil {
c.Ui.Error(fmt.Sprintf("Failed to reload configs"))
return config
}
// Change the log level
minLevel := logutils.LogLevel(strings.ToUpper(newConf.LogLevel))
if ValidateLevelFilter(minLevel, c.logFilter) {
c.logFilter.SetMinLevel(minLevel)
} else {
c.Ui.Error(fmt.Sprintf(
"Invalid log level: %s. Valid log levels are: %v",
minLevel, c.logFilter.Levels))
// Keep the current log level
newConf.LogLevel = config.LogLevel
}
return newConf
}
// setupTelementry is used ot setup the telemetry sub-systems
func (c *Command) setupTelementry(config *Config) error {
/* Setup telemetry
Aggregate on 10 second intervals for 1 minute. Expose the
metrics over stderr when there is a SIGUSR1 received.
*/
inm := metrics.NewInmemSink(10*time.Second, time.Minute)
metrics.DefaultInmemSignal(inm)
var telConfig *Telemetry
if config.Telemetry == nil {
telConfig = &Telemetry{}
} else {
telConfig = config.Telemetry
}
metricsConf := metrics.DefaultConfig("nomad")
metricsConf.EnableHostname = !telConfig.DisableHostname
// Configure the statsite sink
var fanout metrics.FanoutSink
if telConfig.StatsiteAddr != "" {
sink, err := metrics.NewStatsiteSink(telConfig.StatsiteAddr)
if err != nil {
return err
}
fanout = append(fanout, sink)
}
// Configure the statsd sink
if telConfig.StatsdAddr != "" {
sink, err := metrics.NewStatsdSink(telConfig.StatsdAddr)
if err != nil {
return err
}
fanout = append(fanout, sink)
}
// Initialize the global sink
if len(fanout) > 0 {
fanout = append(fanout, inm)
metrics.NewGlobal(metricsConf, fanout)
} else {
metricsConf.EnableHostname = false
metrics.NewGlobal(metricsConf, inm)
}
return nil
}
// setupSCADA is used to start a new SCADA provider and listener,
// replacing any existing listeners.
func (c *Command) setupSCADA(config *Config) error {
// Shut down existing SCADA listeners
if c.scadaProvider != nil {
c.scadaProvider.Shutdown()
}
if c.scadaHttp != nil {
c.scadaHttp.Shutdown()
}
// No-op if we don't have an infrastructure
if config.Atlas == nil || config.Atlas.Infrastructure == "" {
return nil
}
// Create the new provider and listener
c.Ui.Output("Connecting to Atlas: " + config.Atlas.Infrastructure)
provider, list, err := NewProvider(config, c.logOutput)
if err != nil {
return err
}
c.scadaProvider = provider
c.scadaHttp = newScadaHttp(c.agent, list)
return nil
}
func (c *Command) Synopsis() string {
return "Runs a Nomad agent"
}
func (c *Command) Help() string {
helpText := `
Usage: nomad agent [options]
Starts the Nomad agent and runs until an interrupt is received.
The agent may be a client and/or server.
Options:
-advertise=addr Sets the advertise address to use
-atlas=org/name Sets the Atlas infrastructure name, enables SCADA.
-atlas-join Enables auto-joining the Atlas cluster
-atlas-token=token Provides the Atlas API token
-bootstrap Sets server to bootstrap mode
-bind=0.0.0.0 Sets the bind address for cluster communication
-bootstrap-expect=0 Sets server to expect bootstrap mode.
-client=127.0.0.1 Sets the address to bind for client access.
This includes RPC, DNS, HTTP and HTTPS (if configured)
-config-file=foo Path to a JSON file to read configuration from.
This can be specified multiple times.
-config-dir=foo Path to a directory to read configuration files
from. This will read every file ending in ".json"
as configuration in this directory in alphabetical
order. This can be specified multiple times.
-data-dir=path Path to a data directory to store agent state
-recursor=1.2.3.4 Address of an upstream DNS server.
Can be specified multiple times.
-dc=east-aws Datacenter of the agent
-encrypt=key Provides the gossip encryption key
-join=1.2.3.4 Address of an agent to join at start time.
Can be specified multiple times.
-join-wan=1.2.3.4 Address of an agent to join -wan at start time.
Can be specified multiple times.
-retry-join=1.2.3.4 Address of an agent to join at start time with
retries enabled. Can be specified multiple times.
-retry-interval=30s Time to wait between join attempts.
-retry-max=0 Maximum number of join attempts. Defaults to 0, which
will retry indefinitely.
-retry-join-wan=1.2.3.4 Address of an agent to join -wan at start time with
retries enabled. Can be specified multiple times.
-retry-interval-wan=30s Time to wait between join -wan attempts.
-retry-max-wan=0 Maximum number of join -wan attempts. Defaults to 0, which
will retry indefinitely.
-log-level=info Log level of the agent.
-node=hostname Name of this node. Must be unique in the cluster
-protocol=N Sets the protocol version. Defaults to latest.
-rejoin Ignores a previous leave and attempts to rejoin the cluster.
-server Switches agent to server mode.
-syslog Enables logging to syslog
-ui-dir=path Path to directory containing the Web UI resources
-pid-file=path Path to file to store agent PID
`
return strings.TrimSpace(helpText)
}