512 lines
14 KiB
Go
512 lines
14 KiB
Go
package agent
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/armon/go-metrics"
|
|
"github.com/hashicorp/go-checkpoint"
|
|
"github.com/hashicorp/go-syslog"
|
|
"github.com/hashicorp/logutils"
|
|
"github.com/hashicorp/nomad/helper/flag-slice"
|
|
"github.com/hashicorp/nomad/helper/gated-writer"
|
|
scada "github.com/hashicorp/scada-client"
|
|
"github.com/mitchellh/cli"
|
|
)
|
|
|
|
// gracefulTimeout controls how long we wait before forcefully terminating
|
|
const gracefulTimeout = 5 * time.Second
|
|
|
|
// Command is a Command implementation that runs a Nomad agent.
|
|
// The command will not end unless a shutdown message is sent on the
|
|
// ShutdownCh. If two messages are sent on the ShutdownCh it will forcibly
|
|
// exit.
|
|
type Command struct {
|
|
Revision string
|
|
Version string
|
|
VersionPrerelease string
|
|
Ui cli.Ui
|
|
ShutdownCh <-chan struct{}
|
|
|
|
args []string
|
|
agent *Agent
|
|
httpServer *HTTPServer
|
|
logFilter *logutils.LevelFilter
|
|
logOutput io.Writer
|
|
|
|
scadaProvider *scada.Provider
|
|
scadaHttp *HTTPServer
|
|
}
|
|
|
|
func (c *Command) readConfig() *Config {
|
|
var dev bool
|
|
var configPath []string
|
|
var logLevel string
|
|
flags := flag.NewFlagSet("agent", flag.ContinueOnError)
|
|
flags.BoolVar(&dev, "dev", false, "")
|
|
flags.StringVar(&logLevel, "log-level", "info", "")
|
|
flags.Usage = func() { c.Ui.Error(c.Help()) }
|
|
flags.Var((*sliceflag.StringFlag)(&configPath), "config", "config")
|
|
if err := flags.Parse(c.args); err != nil {
|
|
return nil
|
|
}
|
|
|
|
// Load the configuration
|
|
var config *Config
|
|
if dev {
|
|
config = DevConfig()
|
|
} else {
|
|
config = DefaultConfig()
|
|
}
|
|
for _, path := range configPath {
|
|
current, err := LoadConfig(path)
|
|
if err != nil {
|
|
c.Ui.Error(fmt.Sprintf(
|
|
"Error loading configuration from %s: %s", path, err))
|
|
return nil
|
|
}
|
|
|
|
if config == nil {
|
|
config = current
|
|
} else {
|
|
config = config.Merge(current)
|
|
}
|
|
}
|
|
|
|
// Ensure the sub-structs at least exist
|
|
if config.Client == nil {
|
|
config.Client = &ClientConfig{}
|
|
}
|
|
if config.Server == nil {
|
|
config.Server = &ServerConfig{}
|
|
}
|
|
|
|
// Set the version info
|
|
config.Revision = c.Revision
|
|
config.Version = c.Version
|
|
config.VersionPrerelease = c.VersionPrerelease
|
|
|
|
return config
|
|
}
|
|
|
|
// setupLoggers is used to setup the logGate, logWriter, and our logOutput
|
|
func (c *Command) setupLoggers(config *Config) (*gatedwriter.Writer, *logWriter, io.Writer) {
|
|
// Setup logging. First create the gated log writer, which will
|
|
// store logs until we're ready to show them. Then create the level
|
|
// filter, filtering logs of the specified level.
|
|
logGate := &gatedwriter.Writer{
|
|
Writer: &cli.UiWriter{Ui: c.Ui},
|
|
}
|
|
|
|
c.logFilter = LevelFilter()
|
|
c.logFilter.MinLevel = logutils.LogLevel(strings.ToUpper(config.LogLevel))
|
|
c.logFilter.Writer = logGate
|
|
if !ValidateLevelFilter(c.logFilter.MinLevel, c.logFilter) {
|
|
c.Ui.Error(fmt.Sprintf(
|
|
"Invalid log level: %s. Valid log levels are: %v",
|
|
c.logFilter.MinLevel, c.logFilter.Levels))
|
|
return nil, nil, nil
|
|
}
|
|
|
|
// Check if syslog is enabled
|
|
var syslog io.Writer
|
|
if config.EnableSyslog {
|
|
l, err := gsyslog.NewLogger(gsyslog.LOG_NOTICE, config.SyslogFacility, "nomad")
|
|
if err != nil {
|
|
c.Ui.Error(fmt.Sprintf("Syslog setup failed: %v", err))
|
|
return nil, nil, nil
|
|
}
|
|
syslog = &SyslogWrapper{l, c.logFilter}
|
|
}
|
|
|
|
// Create a log writer, and wrap a logOutput around it
|
|
logWriter := NewLogWriter(512)
|
|
var logOutput io.Writer
|
|
if syslog != nil {
|
|
logOutput = io.MultiWriter(c.logFilter, logWriter, syslog)
|
|
} else {
|
|
logOutput = io.MultiWriter(c.logFilter, logWriter)
|
|
}
|
|
c.logOutput = logOutput
|
|
return logGate, logWriter, logOutput
|
|
}
|
|
|
|
// setupAgent is used to start the agent and various interfaces
|
|
func (c *Command) setupAgent(config *Config, logOutput io.Writer) error {
|
|
c.Ui.Output("Starting Nomad agent...")
|
|
agent, err := NewAgent(config, logOutput)
|
|
if err != nil {
|
|
c.Ui.Error(fmt.Sprintf("Error starting agent: %s", err))
|
|
return err
|
|
}
|
|
c.agent = agent
|
|
|
|
// Enable the SCADA integration
|
|
if err := c.setupSCADA(config); err != nil {
|
|
agent.Shutdown()
|
|
c.Ui.Error(fmt.Sprintf("Error starting SCADA: %s", err))
|
|
return err
|
|
}
|
|
|
|
// Setup the HTTP server
|
|
http, err := NewHTTPServer(agent, config, logOutput)
|
|
if err != nil {
|
|
agent.Shutdown()
|
|
c.Ui.Error(fmt.Sprintf("Error starting http server: %s", err))
|
|
return err
|
|
}
|
|
c.httpServer = http
|
|
|
|
// Setup update checking
|
|
if !config.DisableUpdateCheck {
|
|
version := config.Version
|
|
if config.VersionPrerelease != "" {
|
|
version += fmt.Sprintf("-%s", config.VersionPrerelease)
|
|
}
|
|
updateParams := &checkpoint.CheckParams{
|
|
Product: "nomad",
|
|
Version: version,
|
|
}
|
|
if !config.DisableAnonymousSignature {
|
|
updateParams.SignatureFile = filepath.Join(config.DataDir, "checkpoint-signature")
|
|
}
|
|
|
|
// Schedule a periodic check with expected interval of 24 hours
|
|
checkpoint.CheckInterval(updateParams, 24*time.Hour, c.checkpointResults)
|
|
|
|
// Do an immediate check within the next 30 seconds
|
|
go func() {
|
|
time.Sleep(randomStagger(30 * time.Second))
|
|
c.checkpointResults(checkpoint.Check(updateParams))
|
|
}()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// checkpointResults is used to handler periodic results from our update checker
|
|
func (c *Command) checkpointResults(results *checkpoint.CheckResponse, err error) {
|
|
if err != nil {
|
|
c.Ui.Error(fmt.Sprintf("Failed to check for updates: %v", err))
|
|
return
|
|
}
|
|
if results.Outdated {
|
|
c.Ui.Error(fmt.Sprintf("Newer Nomad version available: %s", results.CurrentVersion))
|
|
}
|
|
for _, alert := range results.Alerts {
|
|
switch alert.Level {
|
|
case "info":
|
|
c.Ui.Info(fmt.Sprintf("Bulletin [%s]: %s (%s)", alert.Level, alert.Message, alert.URL))
|
|
default:
|
|
c.Ui.Error(fmt.Sprintf("Bulletin [%s]: %s (%s)", alert.Level, alert.Message, alert.URL))
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Command) Run(args []string) int {
|
|
c.Ui = &cli.PrefixedUi{
|
|
OutputPrefix: "==> ",
|
|
InfoPrefix: " ",
|
|
ErrorPrefix: "==> ",
|
|
Ui: c.Ui,
|
|
}
|
|
|
|
// Parse our configs
|
|
c.args = args
|
|
config := c.readConfig()
|
|
if config == nil {
|
|
return 1
|
|
}
|
|
|
|
// Setup the log outputs
|
|
logGate, _, logOutput := c.setupLoggers(config)
|
|
if logGate == nil {
|
|
return 1
|
|
}
|
|
|
|
// Initialize the telemetry
|
|
if err := c.setupTelementry(config); err != nil {
|
|
c.Ui.Error(fmt.Sprintf("Error initializing telemetry: %s", err))
|
|
return 1
|
|
}
|
|
|
|
// Create the agent
|
|
if err := c.setupAgent(config, logOutput); err != nil {
|
|
return 1
|
|
}
|
|
defer c.agent.Shutdown()
|
|
|
|
// Check and shut down the SCADA listeners at the end
|
|
defer func() {
|
|
if c.httpServer != nil {
|
|
c.httpServer.Shutdown()
|
|
}
|
|
if c.scadaHttp != nil {
|
|
c.scadaHttp.Shutdown()
|
|
}
|
|
if c.scadaProvider != nil {
|
|
c.scadaProvider.Shutdown()
|
|
}
|
|
}()
|
|
|
|
// Compile agent information for output later
|
|
info := make(map[string]string)
|
|
info["client"] = strconv.FormatBool(config.Client.Enabled)
|
|
info["log level"] = config.LogLevel
|
|
info["server"] = strconv.FormatBool(config.Server.Enabled)
|
|
|
|
// Sort the keys for output
|
|
infoKeys := make([]string, 0, len(info))
|
|
for key := range info {
|
|
infoKeys = append(infoKeys, key)
|
|
}
|
|
sort.Strings(infoKeys)
|
|
|
|
// Agent configuration output
|
|
padding := 18
|
|
c.Ui.Output("Nomad agent configuration:\n")
|
|
for _, k := range infoKeys {
|
|
c.Ui.Info(fmt.Sprintf(
|
|
"%s%s: %s",
|
|
strings.Repeat(" ", padding-len(k)),
|
|
strings.Title(k),
|
|
info[k]))
|
|
}
|
|
c.Ui.Output("")
|
|
|
|
// Output the header that the server has started
|
|
c.Ui.Output("Nomad agent started! Log data will stream in below:\n")
|
|
|
|
// Enable log streaming
|
|
logGate.Flush()
|
|
|
|
// Wait for exit
|
|
return c.handleSignals(config)
|
|
}
|
|
|
|
// handleSignals blocks until we get an exit-causing signal
|
|
func (c *Command) handleSignals(config *Config) int {
|
|
signalCh := make(chan os.Signal, 4)
|
|
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)
|
|
|
|
// Wait for a signal
|
|
WAIT:
|
|
var sig os.Signal
|
|
select {
|
|
case s := <-signalCh:
|
|
sig = s
|
|
case <-c.ShutdownCh:
|
|
sig = os.Interrupt
|
|
}
|
|
c.Ui.Output(fmt.Sprintf("Caught signal: %v", sig))
|
|
|
|
// Check if this is a SIGHUP
|
|
if sig == syscall.SIGHUP {
|
|
if conf := c.handleReload(config); conf != nil {
|
|
config = conf
|
|
}
|
|
goto WAIT
|
|
}
|
|
|
|
// Check if we should do a graceful leave
|
|
graceful := false
|
|
if sig == os.Interrupt && !config.LeaveOnInt {
|
|
graceful = true
|
|
} else if sig == syscall.SIGTERM && config.LeaveOnTerm {
|
|
graceful = true
|
|
}
|
|
|
|
// Bail fast if not doing a graceful leave
|
|
if !graceful {
|
|
return 1
|
|
}
|
|
|
|
// Attempt a graceful leave
|
|
gracefulCh := make(chan struct{})
|
|
c.Ui.Output("Gracefully shutting down agent...")
|
|
go func() {
|
|
if err := c.agent.Leave(); err != nil {
|
|
c.Ui.Error(fmt.Sprintf("Error: %s", err))
|
|
return
|
|
}
|
|
close(gracefulCh)
|
|
}()
|
|
|
|
// Wait for leave or another signal
|
|
select {
|
|
case <-signalCh:
|
|
return 1
|
|
case <-time.After(gracefulTimeout):
|
|
return 1
|
|
case <-gracefulCh:
|
|
return 0
|
|
}
|
|
}
|
|
|
|
// handleReload is invoked when we should reload our configs, e.g. SIGHUP
|
|
func (c *Command) handleReload(config *Config) *Config {
|
|
c.Ui.Output("Reloading configuration...")
|
|
newConf := c.readConfig()
|
|
if newConf == nil {
|
|
c.Ui.Error(fmt.Sprintf("Failed to reload configs"))
|
|
return config
|
|
}
|
|
|
|
// Change the log level
|
|
minLevel := logutils.LogLevel(strings.ToUpper(newConf.LogLevel))
|
|
if ValidateLevelFilter(minLevel, c.logFilter) {
|
|
c.logFilter.SetMinLevel(minLevel)
|
|
} else {
|
|
c.Ui.Error(fmt.Sprintf(
|
|
"Invalid log level: %s. Valid log levels are: %v",
|
|
minLevel, c.logFilter.Levels))
|
|
|
|
// Keep the current log level
|
|
newConf.LogLevel = config.LogLevel
|
|
}
|
|
return newConf
|
|
}
|
|
|
|
// setupTelementry is used ot setup the telemetry sub-systems
|
|
func (c *Command) setupTelementry(config *Config) error {
|
|
/* Setup telemetry
|
|
Aggregate on 10 second intervals for 1 minute. Expose the
|
|
metrics over stderr when there is a SIGUSR1 received.
|
|
*/
|
|
inm := metrics.NewInmemSink(10*time.Second, time.Minute)
|
|
metrics.DefaultInmemSignal(inm)
|
|
|
|
var telConfig *Telemetry
|
|
if config.Telemetry == nil {
|
|
telConfig = &Telemetry{}
|
|
} else {
|
|
telConfig = config.Telemetry
|
|
}
|
|
|
|
metricsConf := metrics.DefaultConfig("nomad")
|
|
metricsConf.EnableHostname = !telConfig.DisableHostname
|
|
|
|
// Configure the statsite sink
|
|
var fanout metrics.FanoutSink
|
|
if telConfig.StatsiteAddr != "" {
|
|
sink, err := metrics.NewStatsiteSink(telConfig.StatsiteAddr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
fanout = append(fanout, sink)
|
|
}
|
|
|
|
// Configure the statsd sink
|
|
if telConfig.StatsdAddr != "" {
|
|
sink, err := metrics.NewStatsdSink(telConfig.StatsdAddr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
fanout = append(fanout, sink)
|
|
}
|
|
|
|
// Initialize the global sink
|
|
if len(fanout) > 0 {
|
|
fanout = append(fanout, inm)
|
|
metrics.NewGlobal(metricsConf, fanout)
|
|
} else {
|
|
metricsConf.EnableHostname = false
|
|
metrics.NewGlobal(metricsConf, inm)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// setupSCADA is used to start a new SCADA provider and listener,
|
|
// replacing any existing listeners.
|
|
func (c *Command) setupSCADA(config *Config) error {
|
|
// Shut down existing SCADA listeners
|
|
if c.scadaProvider != nil {
|
|
c.scadaProvider.Shutdown()
|
|
}
|
|
if c.scadaHttp != nil {
|
|
c.scadaHttp.Shutdown()
|
|
}
|
|
|
|
// No-op if we don't have an infrastructure
|
|
if config.Atlas == nil || config.Atlas.Infrastructure == "" {
|
|
return nil
|
|
}
|
|
|
|
// Create the new provider and listener
|
|
c.Ui.Output("Connecting to Atlas: " + config.Atlas.Infrastructure)
|
|
provider, list, err := NewProvider(config, c.logOutput)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.scadaProvider = provider
|
|
c.scadaHttp = newScadaHttp(c.agent, list)
|
|
return nil
|
|
}
|
|
|
|
func (c *Command) Synopsis() string {
|
|
return "Runs a Nomad agent"
|
|
}
|
|
|
|
func (c *Command) Help() string {
|
|
helpText := `
|
|
Usage: nomad agent [options]
|
|
|
|
Starts the Nomad agent and runs until an interrupt is received.
|
|
The agent may be a client and/or server.
|
|
|
|
Options:
|
|
|
|
-advertise=addr Sets the advertise address to use
|
|
-atlas=org/name Sets the Atlas infrastructure name, enables SCADA.
|
|
-atlas-join Enables auto-joining the Atlas cluster
|
|
-atlas-token=token Provides the Atlas API token
|
|
-bootstrap Sets server to bootstrap mode
|
|
-bind=0.0.0.0 Sets the bind address for cluster communication
|
|
-bootstrap-expect=0 Sets server to expect bootstrap mode.
|
|
-client=127.0.0.1 Sets the address to bind for client access.
|
|
This includes RPC, DNS, HTTP and HTTPS (if configured)
|
|
-config-file=foo Path to a JSON file to read configuration from.
|
|
This can be specified multiple times.
|
|
-config-dir=foo Path to a directory to read configuration files
|
|
from. This will read every file ending in ".json"
|
|
as configuration in this directory in alphabetical
|
|
order. This can be specified multiple times.
|
|
-data-dir=path Path to a data directory to store agent state
|
|
-recursor=1.2.3.4 Address of an upstream DNS server.
|
|
Can be specified multiple times.
|
|
-dc=east-aws Datacenter of the agent
|
|
-encrypt=key Provides the gossip encryption key
|
|
-join=1.2.3.4 Address of an agent to join at start time.
|
|
Can be specified multiple times.
|
|
-join-wan=1.2.3.4 Address of an agent to join -wan at start time.
|
|
Can be specified multiple times.
|
|
-retry-join=1.2.3.4 Address of an agent to join at start time with
|
|
retries enabled. Can be specified multiple times.
|
|
-retry-interval=30s Time to wait between join attempts.
|
|
-retry-max=0 Maximum number of join attempts. Defaults to 0, which
|
|
will retry indefinitely.
|
|
-retry-join-wan=1.2.3.4 Address of an agent to join -wan at start time with
|
|
retries enabled. Can be specified multiple times.
|
|
-retry-interval-wan=30s Time to wait between join -wan attempts.
|
|
-retry-max-wan=0 Maximum number of join -wan attempts. Defaults to 0, which
|
|
will retry indefinitely.
|
|
-log-level=info Log level of the agent.
|
|
-node=hostname Name of this node. Must be unique in the cluster
|
|
-protocol=N Sets the protocol version. Defaults to latest.
|
|
-rejoin Ignores a previous leave and attempts to rejoin the cluster.
|
|
-server Switches agent to server mode.
|
|
-syslog Enables logging to syslog
|
|
-ui-dir=path Path to directory containing the Web UI resources
|
|
-pid-file=path Path to file to store agent PID
|
|
`
|
|
return strings.TrimSpace(helpText)
|
|
}
|