package agent import ( "flag" "fmt" "io" "os" "os/signal" "path/filepath" "sort" "strconv" "strings" "syscall" "time" "github.com/armon/go-metrics" "github.com/hashicorp/go-checkpoint" "github.com/hashicorp/go-syslog" "github.com/hashicorp/logutils" "github.com/hashicorp/nomad/helper/flag-slice" "github.com/hashicorp/nomad/helper/gated-writer" scada "github.com/hashicorp/scada-client" "github.com/mitchellh/cli" ) // gracefulTimeout controls how long we wait before forcefully terminating const gracefulTimeout = 5 * time.Second // Command is a Command implementation that runs a Nomad agent. // The command will not end unless a shutdown message is sent on the // ShutdownCh. If two messages are sent on the ShutdownCh it will forcibly // exit. type Command struct { Revision string Version string VersionPrerelease string Ui cli.Ui ShutdownCh <-chan struct{} args []string agent *Agent httpServer *HTTPServer logFilter *logutils.LevelFilter logOutput io.Writer scadaProvider *scada.Provider scadaHttp *HTTPServer } func (c *Command) readConfig() *Config { var dev bool var configPath []string var logLevel string flags := flag.NewFlagSet("agent", flag.ContinueOnError) flags.BoolVar(&dev, "dev", false, "") flags.StringVar(&logLevel, "log-level", "info", "") flags.Usage = func() { c.Ui.Error(c.Help()) } flags.Var((*sliceflag.StringFlag)(&configPath), "config", "config") if err := flags.Parse(c.args); err != nil { return nil } // Load the configuration var config *Config if dev { config = DevConfig() } else { config = DefaultConfig() } for _, path := range configPath { current, err := LoadConfig(path) if err != nil { c.Ui.Error(fmt.Sprintf( "Error loading configuration from %s: %s", path, err)) return nil } if config == nil { config = current } else { config = config.Merge(current) } } // Ensure the sub-structs at least exist if config.Client == nil { config.Client = &ClientConfig{} } if config.Server == nil { config.Server = &ServerConfig{} } // Set the version info config.Revision = c.Revision config.Version = c.Version config.VersionPrerelease = c.VersionPrerelease return config } // setupLoggers is used to setup the logGate, logWriter, and our logOutput func (c *Command) setupLoggers(config *Config) (*gatedwriter.Writer, *logWriter, io.Writer) { // Setup logging. First create the gated log writer, which will // store logs until we're ready to show them. Then create the level // filter, filtering logs of the specified level. logGate := &gatedwriter.Writer{ Writer: &cli.UiWriter{Ui: c.Ui}, } c.logFilter = LevelFilter() c.logFilter.MinLevel = logutils.LogLevel(strings.ToUpper(config.LogLevel)) c.logFilter.Writer = logGate if !ValidateLevelFilter(c.logFilter.MinLevel, c.logFilter) { c.Ui.Error(fmt.Sprintf( "Invalid log level: %s. Valid log levels are: %v", c.logFilter.MinLevel, c.logFilter.Levels)) return nil, nil, nil } // Check if syslog is enabled var syslog io.Writer if config.EnableSyslog { l, err := gsyslog.NewLogger(gsyslog.LOG_NOTICE, config.SyslogFacility, "nomad") if err != nil { c.Ui.Error(fmt.Sprintf("Syslog setup failed: %v", err)) return nil, nil, nil } syslog = &SyslogWrapper{l, c.logFilter} } // Create a log writer, and wrap a logOutput around it logWriter := NewLogWriter(512) var logOutput io.Writer if syslog != nil { logOutput = io.MultiWriter(c.logFilter, logWriter, syslog) } else { logOutput = io.MultiWriter(c.logFilter, logWriter) } c.logOutput = logOutput return logGate, logWriter, logOutput } // setupAgent is used to start the agent and various interfaces func (c *Command) setupAgent(config *Config, logOutput io.Writer) error { c.Ui.Output("Starting Nomad agent...") agent, err := NewAgent(config, logOutput) if err != nil { c.Ui.Error(fmt.Sprintf("Error starting agent: %s", err)) return err } c.agent = agent // Enable the SCADA integration if err := c.setupSCADA(config); err != nil { agent.Shutdown() c.Ui.Error(fmt.Sprintf("Error starting SCADA: %s", err)) return err } // Setup the HTTP server http, err := NewHTTPServer(agent, config, logOutput) if err != nil { agent.Shutdown() c.Ui.Error(fmt.Sprintf("Error starting http server: %s", err)) return err } c.httpServer = http // Setup update checking if !config.DisableUpdateCheck { version := config.Version if config.VersionPrerelease != "" { version += fmt.Sprintf("-%s", config.VersionPrerelease) } updateParams := &checkpoint.CheckParams{ Product: "nomad", Version: version, } if !config.DisableAnonymousSignature { updateParams.SignatureFile = filepath.Join(config.DataDir, "checkpoint-signature") } // Schedule a periodic check with expected interval of 24 hours checkpoint.CheckInterval(updateParams, 24*time.Hour, c.checkpointResults) // Do an immediate check within the next 30 seconds go func() { time.Sleep(randomStagger(30 * time.Second)) c.checkpointResults(checkpoint.Check(updateParams)) }() } return nil } // checkpointResults is used to handler periodic results from our update checker func (c *Command) checkpointResults(results *checkpoint.CheckResponse, err error) { if err != nil { c.Ui.Error(fmt.Sprintf("Failed to check for updates: %v", err)) return } if results.Outdated { c.Ui.Error(fmt.Sprintf("Newer Nomad version available: %s", results.CurrentVersion)) } for _, alert := range results.Alerts { switch alert.Level { case "info": c.Ui.Info(fmt.Sprintf("Bulletin [%s]: %s (%s)", alert.Level, alert.Message, alert.URL)) default: c.Ui.Error(fmt.Sprintf("Bulletin [%s]: %s (%s)", alert.Level, alert.Message, alert.URL)) } } } func (c *Command) Run(args []string) int { c.Ui = &cli.PrefixedUi{ OutputPrefix: "==> ", InfoPrefix: " ", ErrorPrefix: "==> ", Ui: c.Ui, } // Parse our configs c.args = args config := c.readConfig() if config == nil { return 1 } // Setup the log outputs logGate, _, logOutput := c.setupLoggers(config) if logGate == nil { return 1 } // Initialize the telemetry if err := c.setupTelementry(config); err != nil { c.Ui.Error(fmt.Sprintf("Error initializing telemetry: %s", err)) return 1 } // Create the agent if err := c.setupAgent(config, logOutput); err != nil { return 1 } defer c.agent.Shutdown() // Check and shut down the SCADA listeners at the end defer func() { if c.httpServer != nil { c.httpServer.Shutdown() } if c.scadaHttp != nil { c.scadaHttp.Shutdown() } if c.scadaProvider != nil { c.scadaProvider.Shutdown() } }() // Compile agent information for output later info := make(map[string]string) info["client"] = strconv.FormatBool(config.Client.Enabled) info["log level"] = config.LogLevel info["server"] = strconv.FormatBool(config.Server.Enabled) info["region"] = fmt.Sprintf("%s (DC: %s)", config.Region, config.Datacenter) if config.Atlas != nil && config.Atlas.Infrastructure != "" { info["atlas"] = fmt.Sprintf("(Infrastructure: '%s' Join: %v)", config.Atlas.Infrastructure, config.Atlas.Join) } else { info["atlas"] = "" } // Sort the keys for output infoKeys := make([]string, 0, len(info)) for key := range info { infoKeys = append(infoKeys, key) } sort.Strings(infoKeys) // Agent configuration output padding := 18 c.Ui.Output("Nomad agent configuration:\n") for _, k := range infoKeys { c.Ui.Info(fmt.Sprintf( "%s%s: %s", strings.Repeat(" ", padding-len(k)), strings.Title(k), info[k])) } c.Ui.Output("") // Output the header that the server has started c.Ui.Output("Nomad agent started! Log data will stream in below:\n") // Enable log streaming logGate.Flush() // Wait for exit return c.handleSignals(config) } // handleSignals blocks until we get an exit-causing signal func (c *Command) handleSignals(config *Config) int { signalCh := make(chan os.Signal, 4) signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP) // Wait for a signal WAIT: var sig os.Signal select { case s := <-signalCh: sig = s case <-c.ShutdownCh: sig = os.Interrupt } c.Ui.Output(fmt.Sprintf("Caught signal: %v", sig)) // Check if this is a SIGHUP if sig == syscall.SIGHUP { if conf := c.handleReload(config); conf != nil { config = conf } goto WAIT } // Check if we should do a graceful leave graceful := false if sig == os.Interrupt && !config.LeaveOnInt { graceful = true } else if sig == syscall.SIGTERM && config.LeaveOnTerm { graceful = true } // Bail fast if not doing a graceful leave if !graceful { return 1 } // Attempt a graceful leave gracefulCh := make(chan struct{}) c.Ui.Output("Gracefully shutting down agent...") go func() { if err := c.agent.Leave(); err != nil { c.Ui.Error(fmt.Sprintf("Error: %s", err)) return } close(gracefulCh) }() // Wait for leave or another signal select { case <-signalCh: return 1 case <-time.After(gracefulTimeout): return 1 case <-gracefulCh: return 0 } } // handleReload is invoked when we should reload our configs, e.g. SIGHUP func (c *Command) handleReload(config *Config) *Config { c.Ui.Output("Reloading configuration...") newConf := c.readConfig() if newConf == nil { c.Ui.Error(fmt.Sprintf("Failed to reload configs")) return config } // Change the log level minLevel := logutils.LogLevel(strings.ToUpper(newConf.LogLevel)) if ValidateLevelFilter(minLevel, c.logFilter) { c.logFilter.SetMinLevel(minLevel) } else { c.Ui.Error(fmt.Sprintf( "Invalid log level: %s. Valid log levels are: %v", minLevel, c.logFilter.Levels)) // Keep the current log level newConf.LogLevel = config.LogLevel } return newConf } // setupTelementry is used ot setup the telemetry sub-systems func (c *Command) setupTelementry(config *Config) error { /* Setup telemetry Aggregate on 10 second intervals for 1 minute. Expose the metrics over stderr when there is a SIGUSR1 received. */ inm := metrics.NewInmemSink(10*time.Second, time.Minute) metrics.DefaultInmemSignal(inm) var telConfig *Telemetry if config.Telemetry == nil { telConfig = &Telemetry{} } else { telConfig = config.Telemetry } metricsConf := metrics.DefaultConfig("nomad") metricsConf.EnableHostname = !telConfig.DisableHostname // Configure the statsite sink var fanout metrics.FanoutSink if telConfig.StatsiteAddr != "" { sink, err := metrics.NewStatsiteSink(telConfig.StatsiteAddr) if err != nil { return err } fanout = append(fanout, sink) } // Configure the statsd sink if telConfig.StatsdAddr != "" { sink, err := metrics.NewStatsdSink(telConfig.StatsdAddr) if err != nil { return err } fanout = append(fanout, sink) } // Initialize the global sink if len(fanout) > 0 { fanout = append(fanout, inm) metrics.NewGlobal(metricsConf, fanout) } else { metricsConf.EnableHostname = false metrics.NewGlobal(metricsConf, inm) } return nil } // setupSCADA is used to start a new SCADA provider and listener, // replacing any existing listeners. func (c *Command) setupSCADA(config *Config) error { // Shut down existing SCADA listeners if c.scadaProvider != nil { c.scadaProvider.Shutdown() } if c.scadaHttp != nil { c.scadaHttp.Shutdown() } // No-op if we don't have an infrastructure if config.Atlas == nil || config.Atlas.Infrastructure == "" { return nil } // Create the new provider and listener c.Ui.Output("Connecting to Atlas: " + config.Atlas.Infrastructure) provider, list, err := NewProvider(config, c.logOutput) if err != nil { return err } c.scadaProvider = provider c.scadaHttp = newScadaHttp(c.agent, list) return nil } func (c *Command) Synopsis() string { return "Runs a Nomad agent" } func (c *Command) Help() string { helpText := ` Usage: nomad agent [options] Starts the Nomad agent and runs until an interrupt is received. The agent may be a client and/or server. Options: -advertise=addr Sets the advertise address to use -atlas=org/name Sets the Atlas infrastructure name, enables SCADA. -atlas-join Enables auto-joining the Atlas cluster -atlas-token=token Provides the Atlas API token -bootstrap Sets server to bootstrap mode -bind=0.0.0.0 Sets the bind address for cluster communication -bootstrap-expect=0 Sets server to expect bootstrap mode. -client=127.0.0.1 Sets the address to bind for client access. This includes RPC, DNS, HTTP and HTTPS (if configured) -config-file=foo Path to a JSON file to read configuration from. This can be specified multiple times. -config-dir=foo Path to a directory to read configuration files from. This will read every file ending in ".json" as configuration in this directory in alphabetical order. This can be specified multiple times. -data-dir=path Path to a data directory to store agent state -recursor=1.2.3.4 Address of an upstream DNS server. Can be specified multiple times. -dc=east-aws Datacenter of the agent -encrypt=key Provides the gossip encryption key -join=1.2.3.4 Address of an agent to join at start time. Can be specified multiple times. -join-wan=1.2.3.4 Address of an agent to join -wan at start time. Can be specified multiple times. -retry-join=1.2.3.4 Address of an agent to join at start time with retries enabled. Can be specified multiple times. -retry-interval=30s Time to wait between join attempts. -retry-max=0 Maximum number of join attempts. Defaults to 0, which will retry indefinitely. -retry-join-wan=1.2.3.4 Address of an agent to join -wan at start time with retries enabled. Can be specified multiple times. -retry-interval-wan=30s Time to wait between join -wan attempts. -retry-max-wan=0 Maximum number of join -wan attempts. Defaults to 0, which will retry indefinitely. -log-level=info Log level of the agent. -node=hostname Name of this node. Must be unique in the cluster -protocol=N Sets the protocol version. Defaults to latest. -rejoin Ignores a previous leave and attempts to rejoin the cluster. -server Switches agent to server mode. -syslog Enables logging to syslog -ui-dir=path Path to directory containing the Web UI resources -pid-file=path Path to file to store agent PID ` return strings.TrimSpace(helpText) }