7568a19433
This patch refactors the commands that use the mitchellh/cli library to populate the command line flag set in both the Run() and the Help() method. Earlier versions of the mitchellh/cli library relied on the Run() method to populuate the flagset for generating the usage screen. This has changed in later versions and was previously solved with a small monkey patch to the library to restore the old behavior. However, this makes upgrading the library difficult since the patch has to be restored every time. This patch addresses this by moving the command line flags into an initFlags() method where appropriate and also moving all variables for the flags from the Run() method into the command itself. Fixes #3536
504 lines
14 KiB
Go
504 lines
14 KiB
Go
package command
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"regexp"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/armon/go-metrics"
|
|
"github.com/armon/go-metrics/circonus"
|
|
"github.com/armon/go-metrics/datadog"
|
|
"github.com/hashicorp/consul/agent"
|
|
"github.com/hashicorp/consul/agent/config"
|
|
"github.com/hashicorp/consul/lib"
|
|
"github.com/hashicorp/consul/logger"
|
|
"github.com/hashicorp/go-checkpoint"
|
|
multierror "github.com/hashicorp/go-multierror"
|
|
"github.com/hashicorp/logutils"
|
|
"github.com/mitchellh/cli"
|
|
)
|
|
|
|
// validDatacenter is used to validate a datacenter
|
|
var validDatacenter = regexp.MustCompile("^[a-zA-Z0-9_-]+$")
|
|
|
|
// AgentCommand is a Command implementation that runs a Consul agent.
|
|
// The command will not end unless a shutdown message is sent on the
|
|
// ShutdownCh. If two messages are sent on the ShutdownCh it will forcibly
|
|
// exit.
|
|
type AgentCommand struct {
|
|
BaseCommand
|
|
Revision string
|
|
Version string
|
|
VersionPrerelease string
|
|
HumanVersion string
|
|
ShutdownCh <-chan struct{}
|
|
args []string
|
|
logFilter *logutils.LevelFilter
|
|
logOutput io.Writer
|
|
logger *log.Logger
|
|
}
|
|
|
|
// readConfig is responsible for setup of our configuration using
|
|
// the command line and any file configs
|
|
func (cmd *AgentCommand) readConfig() *config.RuntimeConfig {
|
|
cmd.InitFlagSet()
|
|
|
|
var flags config.Flags
|
|
config.AddFlags(cmd.FlagSet, &flags)
|
|
|
|
if err := cmd.FlagSet.Parse(cmd.args); err != nil {
|
|
if !strings.Contains(err.Error(), "help requested") {
|
|
cmd.UI.Error(fmt.Sprintf("error parsing flags: %v", err))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
b, err := config.NewBuilder(flags)
|
|
if err != nil {
|
|
cmd.UI.Error(err.Error())
|
|
return nil
|
|
}
|
|
cfg, err := b.BuildAndValidate()
|
|
if err != nil {
|
|
cmd.UI.Error(err.Error())
|
|
return nil
|
|
}
|
|
for _, w := range b.Warnings {
|
|
cmd.UI.Warn(w)
|
|
}
|
|
return &cfg
|
|
}
|
|
|
|
// checkpointResults is used to handler periodic results from our update checker
|
|
func (cmd *AgentCommand) checkpointResults(results *checkpoint.CheckResponse, err error) {
|
|
if err != nil {
|
|
cmd.UI.Error(fmt.Sprintf("Failed to check for updates: %v", err))
|
|
return
|
|
}
|
|
if results.Outdated {
|
|
cmd.UI.Error(fmt.Sprintf("Newer Consul version available: %s (currently running: %s)", results.CurrentVersion, cmd.Version))
|
|
}
|
|
for _, alert := range results.Alerts {
|
|
switch alert.Level {
|
|
case "info":
|
|
cmd.UI.Info(fmt.Sprintf("Bulletin [%s]: %s (%s)", alert.Level, alert.Message, alert.URL))
|
|
default:
|
|
cmd.UI.Error(fmt.Sprintf("Bulletin [%s]: %s (%s)", alert.Level, alert.Message, alert.URL))
|
|
}
|
|
}
|
|
}
|
|
|
|
func (cmd *AgentCommand) startupUpdateCheck(config *config.RuntimeConfig) {
|
|
version := config.Version
|
|
if config.VersionPrerelease != "" {
|
|
version += fmt.Sprintf("-%s", config.VersionPrerelease)
|
|
}
|
|
updateParams := &checkpoint.CheckParams{
|
|
Product: "consul",
|
|
Version: version,
|
|
}
|
|
if !config.DisableAnonymousSignature {
|
|
updateParams.SignatureFile = filepath.Join(config.DataDir, "checkpoint-signature")
|
|
}
|
|
|
|
// Schedule a periodic check with expected interval of 24 hours
|
|
checkpoint.CheckInterval(updateParams, 24*time.Hour, cmd.checkpointResults)
|
|
|
|
// Do an immediate check within the next 30 seconds
|
|
go func() {
|
|
time.Sleep(lib.RandomStagger(30 * time.Second))
|
|
cmd.checkpointResults(checkpoint.Check(updateParams))
|
|
}()
|
|
}
|
|
|
|
// startupJoin is invoked to handle any joins specified to take place at start time
|
|
func (cmd *AgentCommand) startupJoin(agent *agent.Agent, cfg *config.RuntimeConfig) error {
|
|
if len(cfg.StartJoinAddrsLAN) == 0 {
|
|
return nil
|
|
}
|
|
|
|
cmd.UI.Output("Joining cluster...")
|
|
n, err := agent.JoinLAN(cfg.StartJoinAddrsLAN)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
cmd.UI.Info(fmt.Sprintf("Join completed. Synced with %d initial agents", n))
|
|
return nil
|
|
}
|
|
|
|
// startupJoinWan is invoked to handle any joins -wan specified to take place at start time
|
|
func (cmd *AgentCommand) startupJoinWan(agent *agent.Agent, cfg *config.RuntimeConfig) error {
|
|
if len(cfg.StartJoinAddrsWAN) == 0 {
|
|
return nil
|
|
}
|
|
|
|
cmd.UI.Output("Joining -wan cluster...")
|
|
n, err := agent.JoinWAN(cfg.StartJoinAddrsWAN)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
cmd.UI.Info(fmt.Sprintf("Join -wan completed. Synced with %d initial agents", n))
|
|
return nil
|
|
}
|
|
|
|
func statsiteSink(config *config.RuntimeConfig, hostname string) (metrics.MetricSink, error) {
|
|
if config.TelemetryStatsiteAddr == "" {
|
|
return nil, nil
|
|
}
|
|
return metrics.NewStatsiteSink(config.TelemetryStatsiteAddr)
|
|
}
|
|
|
|
func statsdSink(config *config.RuntimeConfig, hostname string) (metrics.MetricSink, error) {
|
|
if config.TelemetryStatsdAddr == "" {
|
|
return nil, nil
|
|
}
|
|
return metrics.NewStatsdSink(config.TelemetryStatsdAddr)
|
|
}
|
|
|
|
func dogstatdSink(config *config.RuntimeConfig, hostname string) (metrics.MetricSink, error) {
|
|
if config.TelemetryDogstatsdAddr == "" {
|
|
return nil, nil
|
|
}
|
|
sink, err := datadog.NewDogStatsdSink(config.TelemetryDogstatsdAddr, hostname)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
sink.SetTags(config.TelemetryDogstatsdTags)
|
|
return sink, nil
|
|
}
|
|
|
|
func circonusSink(config *config.RuntimeConfig, hostname string) (metrics.MetricSink, error) {
|
|
if config.TelemetryCirconusAPIToken == "" && config.TelemetryCirconusSubmissionURL == "" {
|
|
return nil, nil
|
|
}
|
|
|
|
cfg := &circonus.Config{}
|
|
cfg.Interval = config.TelemetryCirconusSubmissionInterval
|
|
cfg.CheckManager.API.TokenKey = config.TelemetryCirconusAPIToken
|
|
cfg.CheckManager.API.TokenApp = config.TelemetryCirconusAPIApp
|
|
cfg.CheckManager.API.URL = config.TelemetryCirconusAPIURL
|
|
cfg.CheckManager.Check.SubmissionURL = config.TelemetryCirconusSubmissionURL
|
|
cfg.CheckManager.Check.ID = config.TelemetryCirconusCheckID
|
|
cfg.CheckManager.Check.ForceMetricActivation = config.TelemetryCirconusCheckForceMetricActivation
|
|
cfg.CheckManager.Check.InstanceID = config.TelemetryCirconusCheckInstanceID
|
|
cfg.CheckManager.Check.SearchTag = config.TelemetryCirconusCheckSearchTag
|
|
cfg.CheckManager.Check.DisplayName = config.TelemetryCirconusCheckDisplayName
|
|
cfg.CheckManager.Check.Tags = config.TelemetryCirconusCheckTags
|
|
cfg.CheckManager.Broker.ID = config.TelemetryCirconusBrokerID
|
|
cfg.CheckManager.Broker.SelectTag = config.TelemetryCirconusBrokerSelectTag
|
|
|
|
if cfg.CheckManager.Check.DisplayName == "" {
|
|
cfg.CheckManager.Check.DisplayName = "Consul"
|
|
}
|
|
|
|
if cfg.CheckManager.API.TokenApp == "" {
|
|
cfg.CheckManager.API.TokenApp = "consul"
|
|
}
|
|
|
|
if cfg.CheckManager.Check.SearchTag == "" {
|
|
cfg.CheckManager.Check.SearchTag = "service:consul"
|
|
}
|
|
|
|
sink, err := circonus.NewCirconusSink(cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
sink.Start()
|
|
return sink, nil
|
|
}
|
|
|
|
func startupTelemetry(conf *config.RuntimeConfig) (*metrics.InmemSink, error) {
|
|
// Setup telemetry
|
|
// Aggregate on 10 second intervals for 1 minute. Expose the
|
|
// metrics over stderr when there is a SIGUSR1 received.
|
|
memSink := metrics.NewInmemSink(10*time.Second, time.Minute)
|
|
metrics.DefaultInmemSignal(memSink)
|
|
metricsConf := metrics.DefaultConfig(conf.TelemetryMetricsPrefix)
|
|
metricsConf.EnableHostname = !conf.TelemetryDisableHostname
|
|
metricsConf.FilterDefault = conf.TelemetryFilterDefault
|
|
metricsConf.AllowedPrefixes = conf.TelemetryAllowedPrefixes
|
|
metricsConf.BlockedPrefixes = conf.TelemetryBlockedPrefixes
|
|
|
|
var sinks metrics.FanoutSink
|
|
addSink := func(name string, fn func(*config.RuntimeConfig, string) (metrics.MetricSink, error)) error {
|
|
s, err := fn(conf, metricsConf.HostName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if s != nil {
|
|
sinks = append(sinks, s)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if err := addSink("statsite", statsiteSink); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := addSink("statsd", statsdSink); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := addSink("dogstatd", dogstatdSink); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := addSink("circonus", circonusSink); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(sinks) > 0 {
|
|
sinks = append(sinks, memSink)
|
|
metrics.NewGlobal(metricsConf, sinks)
|
|
} else {
|
|
metricsConf.EnableHostname = false
|
|
metrics.NewGlobal(metricsConf, memSink)
|
|
}
|
|
return memSink, nil
|
|
}
|
|
|
|
func (cmd *AgentCommand) Run(args []string) int {
|
|
code := cmd.run(args)
|
|
if cmd.logger != nil {
|
|
cmd.logger.Println("[INFO] Exit code:", code)
|
|
}
|
|
return code
|
|
}
|
|
|
|
func (cmd *AgentCommand) run(args []string) int {
|
|
cmd.UI = &cli.PrefixedUi{
|
|
OutputPrefix: "==> ",
|
|
InfoPrefix: " ",
|
|
ErrorPrefix: "==> ",
|
|
Ui: cmd.UI,
|
|
}
|
|
|
|
// Parse our configs
|
|
cmd.args = args
|
|
config := cmd.readConfig()
|
|
if config == nil {
|
|
return 1
|
|
}
|
|
|
|
// Setup the log outputs
|
|
logConfig := &logger.Config{
|
|
LogLevel: config.LogLevel,
|
|
EnableSyslog: config.EnableSyslog,
|
|
SyslogFacility: config.SyslogFacility,
|
|
}
|
|
logFilter, logGate, logWriter, logOutput, ok := logger.Setup(logConfig, cmd.UI)
|
|
if !ok {
|
|
return 1
|
|
}
|
|
cmd.logFilter = logFilter
|
|
cmd.logOutput = logOutput
|
|
cmd.logger = log.New(logOutput, "", log.LstdFlags)
|
|
|
|
memSink, err := startupTelemetry(config)
|
|
if err != nil {
|
|
cmd.UI.Error(err.Error())
|
|
return 1
|
|
}
|
|
|
|
// Create the agent
|
|
cmd.UI.Output("Starting Consul agent...")
|
|
agent, err := agent.New(config)
|
|
if err != nil {
|
|
cmd.UI.Error(fmt.Sprintf("Error creating agent: %s", err))
|
|
return 1
|
|
}
|
|
agent.LogOutput = logOutput
|
|
agent.LogWriter = logWriter
|
|
agent.MemSink = memSink
|
|
|
|
if err := agent.Start(); err != nil {
|
|
cmd.UI.Error(fmt.Sprintf("Error starting agent: %s", err))
|
|
return 1
|
|
}
|
|
|
|
// shutdown agent before endpoints
|
|
defer agent.ShutdownEndpoints()
|
|
defer agent.ShutdownAgent()
|
|
|
|
if !config.DisableUpdateCheck {
|
|
cmd.startupUpdateCheck(config)
|
|
}
|
|
|
|
if err := cmd.startupJoin(agent, config); err != nil {
|
|
cmd.UI.Error(err.Error())
|
|
return 1
|
|
}
|
|
|
|
if err := cmd.startupJoinWan(agent, config); err != nil {
|
|
cmd.UI.Error(err.Error())
|
|
return 1
|
|
}
|
|
|
|
// Let the agent know we've finished registration
|
|
agent.StartSync()
|
|
|
|
segment := config.SegmentName
|
|
if config.ServerMode {
|
|
segment = "<all>"
|
|
}
|
|
|
|
cmd.UI.Output("Consul agent running!")
|
|
cmd.UI.Info(fmt.Sprintf(" Version: '%s'", cmd.HumanVersion))
|
|
cmd.UI.Info(fmt.Sprintf(" Node ID: '%s'", config.NodeID))
|
|
cmd.UI.Info(fmt.Sprintf(" Node name: '%s'", config.NodeName))
|
|
cmd.UI.Info(fmt.Sprintf(" Datacenter: '%s' (Segment: '%s')", config.Datacenter, segment))
|
|
cmd.UI.Info(fmt.Sprintf(" Server: %v (Bootstrap: %v)", config.ServerMode, config.Bootstrap))
|
|
cmd.UI.Info(fmt.Sprintf(" Client Addr: %v (HTTP: %d, HTTPS: %d, DNS: %d)", config.ClientAddrs,
|
|
config.HTTPPort, config.HTTPSPort, config.DNSPort))
|
|
cmd.UI.Info(fmt.Sprintf(" Cluster Addr: %v (LAN: %d, WAN: %d)", config.AdvertiseAddrLAN,
|
|
config.SerfPortLAN, config.SerfPortWAN))
|
|
cmd.UI.Info(fmt.Sprintf(" Encrypt: Gossip: %v, TLS-Outgoing: %v, TLS-Incoming: %v",
|
|
agent.GossipEncrypted(), config.VerifyOutgoing, config.VerifyIncoming))
|
|
|
|
// Enable log streaming
|
|
cmd.UI.Info("")
|
|
cmd.UI.Output("Log data will now stream in as it occurs:\n")
|
|
logGate.Flush()
|
|
|
|
// wait for signal
|
|
signalCh := make(chan os.Signal, 10)
|
|
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)
|
|
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGPIPE)
|
|
|
|
for {
|
|
var sig os.Signal
|
|
var reloadErrCh chan error
|
|
select {
|
|
case s := <-signalCh:
|
|
sig = s
|
|
case ch := <-agent.ReloadCh():
|
|
sig = syscall.SIGHUP
|
|
reloadErrCh = ch
|
|
case <-cmd.ShutdownCh:
|
|
sig = os.Interrupt
|
|
case err := <-agent.RetryJoinCh():
|
|
cmd.logger.Println("[ERR] Retry join failed: ", err)
|
|
return 1
|
|
case <-agent.ShutdownCh():
|
|
// agent is already down!
|
|
return 0
|
|
}
|
|
|
|
switch sig {
|
|
case syscall.SIGPIPE:
|
|
continue
|
|
|
|
case syscall.SIGHUP:
|
|
cmd.logger.Println("[INFO] Caught signal: ", sig)
|
|
|
|
conf, err := cmd.handleReload(agent, config)
|
|
if conf != nil {
|
|
config = conf
|
|
}
|
|
if err != nil {
|
|
cmd.logger.Println("[ERR] Reload config failed: ", err)
|
|
}
|
|
// Send result back if reload was called via HTTP
|
|
if reloadErrCh != nil {
|
|
reloadErrCh <- err
|
|
}
|
|
|
|
default:
|
|
cmd.logger.Println("[INFO] Caught signal: ", sig)
|
|
|
|
graceful := (sig == os.Interrupt && !(config.SkipLeaveOnInt)) || (sig == syscall.SIGTERM && (config.LeaveOnTerm))
|
|
if !graceful {
|
|
cmd.logger.Println("[INFO] Graceful shutdown disabled. Exiting")
|
|
return 1
|
|
}
|
|
|
|
cmd.logger.Println("[INFO] Gracefully shutting down agent...")
|
|
gracefulCh := make(chan struct{})
|
|
go func() {
|
|
if err := agent.Leave(); err != nil {
|
|
cmd.logger.Println("[ERR] Error on leave:", err)
|
|
return
|
|
}
|
|
close(gracefulCh)
|
|
}()
|
|
|
|
gracefulTimeout := 15 * time.Second
|
|
select {
|
|
case <-signalCh:
|
|
cmd.logger.Printf("[INFO] Caught second signal %v. Exiting\n", sig)
|
|
return 1
|
|
case <-time.After(gracefulTimeout):
|
|
cmd.logger.Println("[INFO] Timeout on graceful leave. Exiting")
|
|
return 1
|
|
case <-gracefulCh:
|
|
cmd.logger.Println("[INFO] Graceful exit completed")
|
|
return 0
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleReload is invoked when we should reload our configs, e.g. SIGHUP
|
|
func (cmd *AgentCommand) handleReload(agent *agent.Agent, cfg *config.RuntimeConfig) (*config.RuntimeConfig, error) {
|
|
cmd.logger.Println("[INFO] Reloading configuration...")
|
|
var errs error
|
|
newCfg := cmd.readConfig()
|
|
if newCfg == nil {
|
|
errs = multierror.Append(errs, fmt.Errorf("Failed to reload configs"))
|
|
return cfg, errs
|
|
}
|
|
|
|
// Change the log level
|
|
minLevel := logutils.LogLevel(strings.ToUpper(newCfg.LogLevel))
|
|
if logger.ValidateLevelFilter(minLevel, cmd.logFilter) {
|
|
cmd.logFilter.SetMinLevel(minLevel)
|
|
} else {
|
|
errs = multierror.Append(fmt.Errorf(
|
|
"Invalid log level: %s. Valid log levels are: %v",
|
|
minLevel, cmd.logFilter.Levels))
|
|
|
|
// Keep the current log level
|
|
newCfg.LogLevel = cfg.LogLevel
|
|
}
|
|
|
|
if err := agent.ReloadConfig(newCfg); err != nil {
|
|
errs = multierror.Append(fmt.Errorf(
|
|
"Failed to reload configs: %v", err))
|
|
}
|
|
|
|
return cfg, errs
|
|
}
|
|
|
|
func (cmd *AgentCommand) Synopsis() string {
|
|
return "Runs a Consul agent"
|
|
}
|
|
|
|
func (cmd *AgentCommand) Help() string {
|
|
cmd.InitFlagSet()
|
|
config.AddFlags(cmd.FlagSet, &config.Flags{})
|
|
return cmd.HelpCommand(`
|
|
Usage: consul agent [options]
|
|
|
|
Starts the Consul agent and runs until an interrupt is received. The
|
|
agent represents a single node in a cluster.
|
|
|
|
`)
|
|
}
|
|
|
|
func printJSON(name string, v interface{}) {
|
|
fmt.Println(name)
|
|
b, err := json.MarshalIndent(v, "", " ")
|
|
if err != nil {
|
|
fmt.Printf("%#v\n", v)
|
|
return
|
|
}
|
|
fmt.Println(string(b))
|
|
}
|