commands: move agent command to separate pkg

This commit is contained in:
Frank Schroeder 2017-10-17 12:55:04 +02:00 committed by Frank Schröder
parent 127bd3d295
commit a0b017d976
3 changed files with 145 additions and 192 deletions

View File

@ -1,14 +1,13 @@
package command
package agent
import (
"encoding/json"
"flag"
"fmt"
"io"
"log"
"os"
"os/signal"
"path/filepath"
"regexp"
"strings"
"syscall"
"time"
@ -18,6 +17,7 @@ import (
"github.com/armon/go-metrics/datadog"
"github.com/hashicorp/consul/agent"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/command/flags"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/go-checkpoint"
@ -26,77 +26,115 @@ import (
"github.com/mitchellh/cli"
)
// validDatacenter is used to validate a datacenter
var validDatacenter = regexp.MustCompile("^[a-zA-Z0-9_-]+$")
func New(ui cli.Ui, revision, version, versionPre, versionHuman string, shutdownCh <-chan struct{}) *cmd {
ui = &cli.PrefixedUi{
OutputPrefix: "==> ",
InfoPrefix: " ",
ErrorPrefix: "==> ",
Ui: ui,
}
c := &cmd{
UI: ui,
revision: revision,
version: version,
versionPrerelease: versionPre,
versionHuman: versionHuman,
shutdownCh: shutdownCh,
}
c.init()
return c
}
// AgentCommand is a Command implementation that runs a Consul agent.
// The command will not end unless a shutdown message is sent on the
// ShutdownCh. If two messages are sent on the ShutdownCh it will forcibly
// exit.
type AgentCommand struct {
BaseCommand
Revision string
Version string
VersionPrerelease string
HumanVersion string
ShutdownCh <-chan struct{}
type cmd struct {
UI cli.Ui
flags *flag.FlagSet
http *flags.HTTPFlags
usage string
revision string
version string
versionPrerelease string
versionHuman string
shutdownCh <-chan struct{}
args []string
flagArgs config.Flags
logFilter *logutils.LevelFilter
logOutput io.Writer
logger *log.Logger
}
func (c *cmd) init() {
c.flags = flag.NewFlagSet("", flag.ContinueOnError)
config.AddFlags(c.flags, &c.flagArgs)
c.usage = flags.Usage(usage, c.flags, nil, nil)
}
func (c *cmd) Synopsis() string {
return "Runs a Consul agent"
}
func (c *cmd) Help() string {
return c.usage
}
func (c *cmd) Run(args []string) int {
code := c.run(args)
if c.logger != nil {
c.logger.Println("[INFO] Exit code:", code)
}
return code
}
// readConfig is responsible for setup of our configuration using
// the command line and any file configs
func (cmd *AgentCommand) readConfig() *config.RuntimeConfig {
cmd.InitFlagSet()
var flags config.Flags
config.AddFlags(cmd.FlagSet, &flags)
if err := cmd.FlagSet.Parse(cmd.args); err != nil {
func (c *cmd) readConfig() *config.RuntimeConfig {
if err := c.flags.Parse(c.args); err != nil {
if !strings.Contains(err.Error(), "help requested") {
cmd.UI.Error(fmt.Sprintf("error parsing flags: %v", err))
c.UI.Error(fmt.Sprintf("error parsing flags: %v", err))
}
return nil
}
b, err := config.NewBuilder(flags)
b, err := config.NewBuilder(c.flagArgs)
if err != nil {
cmd.UI.Error(err.Error())
c.UI.Error(err.Error())
return nil
}
cfg, err := b.BuildAndValidate()
if err != nil {
cmd.UI.Error(err.Error())
c.UI.Error(err.Error())
return nil
}
for _, w := range b.Warnings {
cmd.UI.Warn(w)
c.UI.Warn(w)
}
return &cfg
}
// checkpointResults is used to handler periodic results from our update checker
func (cmd *AgentCommand) checkpointResults(results *checkpoint.CheckResponse, err error) {
func (c *cmd) checkpointResults(results *checkpoint.CheckResponse, err error) {
if err != nil {
cmd.UI.Error(fmt.Sprintf("Failed to check for updates: %v", err))
c.UI.Error(fmt.Sprintf("Failed to check for updates: %v", err))
return
}
if results.Outdated {
cmd.UI.Error(fmt.Sprintf("Newer Consul version available: %s (currently running: %s)", results.CurrentVersion, cmd.Version))
c.UI.Error(fmt.Sprintf("Newer Consul version available: %s (currently running: %s)", results.CurrentVersion, c.version))
}
for _, alert := range results.Alerts {
switch alert.Level {
case "info":
cmd.UI.Info(fmt.Sprintf("Bulletin [%s]: %s (%s)", alert.Level, alert.Message, alert.URL))
c.UI.Info(fmt.Sprintf("Bulletin [%s]: %s (%s)", alert.Level, alert.Message, alert.URL))
default:
cmd.UI.Error(fmt.Sprintf("Bulletin [%s]: %s (%s)", alert.Level, alert.Message, alert.URL))
c.UI.Error(fmt.Sprintf("Bulletin [%s]: %s (%s)", alert.Level, alert.Message, alert.URL))
}
}
}
func (cmd *AgentCommand) startupUpdateCheck(config *config.RuntimeConfig) {
func (c *cmd) startupUpdateCheck(config *config.RuntimeConfig) {
version := config.Version
if config.VersionPrerelease != "" {
version += fmt.Sprintf("-%s", config.VersionPrerelease)
@ -110,44 +148,44 @@ func (cmd *AgentCommand) startupUpdateCheck(config *config.RuntimeConfig) {
}
// Schedule a periodic check with expected interval of 24 hours
checkpoint.CheckInterval(updateParams, 24*time.Hour, cmd.checkpointResults)
checkpoint.CheckInterval(updateParams, 24*time.Hour, c.checkpointResults)
// Do an immediate check within the next 30 seconds
go func() {
time.Sleep(lib.RandomStagger(30 * time.Second))
cmd.checkpointResults(checkpoint.Check(updateParams))
c.checkpointResults(checkpoint.Check(updateParams))
}()
}
// startupJoin is invoked to handle any joins specified to take place at start time
func (cmd *AgentCommand) startupJoin(agent *agent.Agent, cfg *config.RuntimeConfig) error {
func (c *cmd) startupJoin(agent *agent.Agent, cfg *config.RuntimeConfig) error {
if len(cfg.StartJoinAddrsLAN) == 0 {
return nil
}
cmd.UI.Output("Joining cluster...")
c.UI.Output("Joining cluster...")
n, err := agent.JoinLAN(cfg.StartJoinAddrsLAN)
if err != nil {
return err
}
cmd.UI.Info(fmt.Sprintf("Join completed. Synced with %d initial agents", n))
c.UI.Info(fmt.Sprintf("Join completed. Synced with %d initial agents", n))
return nil
}
// startupJoinWan is invoked to handle any joins -wan specified to take place at start time
func (cmd *AgentCommand) startupJoinWan(agent *agent.Agent, cfg *config.RuntimeConfig) error {
func (c *cmd) startupJoinWan(agent *agent.Agent, cfg *config.RuntimeConfig) error {
if len(cfg.StartJoinAddrsWAN) == 0 {
return nil
}
cmd.UI.Output("Joining -wan cluster...")
c.UI.Output("Joining -wan cluster...")
n, err := agent.JoinWAN(cfg.StartJoinAddrsWAN)
if err != nil {
return err
}
cmd.UI.Info(fmt.Sprintf("Join -wan completed. Synced with %d initial agents", n))
c.UI.Info(fmt.Sprintf("Join -wan completed. Synced with %d initial agents", n))
return nil
}
@ -264,25 +302,10 @@ func startupTelemetry(conf *config.RuntimeConfig) (*metrics.InmemSink, error) {
return memSink, nil
}
func (cmd *AgentCommand) Run(args []string) int {
code := cmd.run(args)
if cmd.logger != nil {
cmd.logger.Println("[INFO] Exit code:", code)
}
return code
}
func (cmd *AgentCommand) run(args []string) int {
cmd.UI = &cli.PrefixedUi{
OutputPrefix: "==> ",
InfoPrefix: " ",
ErrorPrefix: "==> ",
Ui: cmd.UI,
}
func (c *cmd) run(args []string) int {
// Parse our configs
cmd.args = args
config := cmd.readConfig()
c.args = args
config := c.readConfig()
if config == nil {
return 1
}
@ -293,25 +316,25 @@ func (cmd *AgentCommand) run(args []string) int {
EnableSyslog: config.EnableSyslog,
SyslogFacility: config.SyslogFacility,
}
logFilter, logGate, logWriter, logOutput, ok := logger.Setup(logConfig, cmd.UI)
logFilter, logGate, logWriter, logOutput, ok := logger.Setup(logConfig, c.UI)
if !ok {
return 1
}
cmd.logFilter = logFilter
cmd.logOutput = logOutput
cmd.logger = log.New(logOutput, "", log.LstdFlags)
c.logFilter = logFilter
c.logOutput = logOutput
c.logger = log.New(logOutput, "", log.LstdFlags)
memSink, err := startupTelemetry(config)
if err != nil {
cmd.UI.Error(err.Error())
c.UI.Error(err.Error())
return 1
}
// Create the agent
cmd.UI.Output("Starting Consul agent...")
c.UI.Output("Starting Consul agent...")
agent, err := agent.New(config)
if err != nil {
cmd.UI.Error(fmt.Sprintf("Error creating agent: %s", err))
c.UI.Error(fmt.Sprintf("Error creating agent: %s", err))
return 1
}
agent.LogOutput = logOutput
@ -319,7 +342,7 @@ func (cmd *AgentCommand) run(args []string) int {
agent.MemSink = memSink
if err := agent.Start(); err != nil {
cmd.UI.Error(fmt.Sprintf("Error starting agent: %s", err))
c.UI.Error(fmt.Sprintf("Error starting agent: %s", err))
return 1
}
@ -328,16 +351,16 @@ func (cmd *AgentCommand) run(args []string) int {
defer agent.ShutdownAgent()
if !config.DisableUpdateCheck {
cmd.startupUpdateCheck(config)
c.startupUpdateCheck(config)
}
if err := cmd.startupJoin(agent, config); err != nil {
cmd.UI.Error(err.Error())
if err := c.startupJoin(agent, config); err != nil {
c.UI.Error(err.Error())
return 1
}
if err := cmd.startupJoinWan(agent, config); err != nil {
cmd.UI.Error(err.Error())
if err := c.startupJoinWan(agent, config); err != nil {
c.UI.Error(err.Error())
return 1
}
@ -349,22 +372,22 @@ func (cmd *AgentCommand) run(args []string) int {
segment = "<all>"
}
cmd.UI.Output("Consul agent running!")
cmd.UI.Info(fmt.Sprintf(" Version: '%s'", cmd.HumanVersion))
cmd.UI.Info(fmt.Sprintf(" Node ID: '%s'", config.NodeID))
cmd.UI.Info(fmt.Sprintf(" Node name: '%s'", config.NodeName))
cmd.UI.Info(fmt.Sprintf(" Datacenter: '%s' (Segment: '%s')", config.Datacenter, segment))
cmd.UI.Info(fmt.Sprintf(" Server: %v (Bootstrap: %v)", config.ServerMode, config.Bootstrap))
cmd.UI.Info(fmt.Sprintf(" Client Addr: %v (HTTP: %d, HTTPS: %d, DNS: %d)", config.ClientAddrs,
c.UI.Output("Consul agent running!")
c.UI.Info(fmt.Sprintf(" Version: '%s'", c.versionHuman))
c.UI.Info(fmt.Sprintf(" Node ID: '%s'", config.NodeID))
c.UI.Info(fmt.Sprintf(" Node name: '%s'", config.NodeName))
c.UI.Info(fmt.Sprintf(" Datacenter: '%s' (Segment: '%s')", config.Datacenter, segment))
c.UI.Info(fmt.Sprintf(" Server: %v (Bootstrap: %v)", config.ServerMode, config.Bootstrap))
c.UI.Info(fmt.Sprintf(" Client Addr: %v (HTTP: %d, HTTPS: %d, DNS: %d)", config.ClientAddrs,
config.HTTPPort, config.HTTPSPort, config.DNSPort))
cmd.UI.Info(fmt.Sprintf(" Cluster Addr: %v (LAN: %d, WAN: %d)", config.AdvertiseAddrLAN,
c.UI.Info(fmt.Sprintf(" Cluster Addr: %v (LAN: %d, WAN: %d)", config.AdvertiseAddrLAN,
config.SerfPortLAN, config.SerfPortWAN))
cmd.UI.Info(fmt.Sprintf(" Encrypt: Gossip: %v, TLS-Outgoing: %v, TLS-Incoming: %v",
c.UI.Info(fmt.Sprintf(" Encrypt: Gossip: %v, TLS-Outgoing: %v, TLS-Incoming: %v",
agent.GossipEncrypted(), config.VerifyOutgoing, config.VerifyIncoming))
// Enable log streaming
cmd.UI.Info("")
cmd.UI.Output("Log data will now stream in as it occurs:\n")
c.UI.Info("")
c.UI.Output("Log data will now stream in as it occurs:\n")
logGate.Flush()
// wait for signal
@ -381,10 +404,10 @@ func (cmd *AgentCommand) run(args []string) int {
case ch := <-agent.ReloadCh():
sig = syscall.SIGHUP
reloadErrCh = ch
case <-cmd.ShutdownCh:
case <-c.shutdownCh:
sig = os.Interrupt
case err := <-agent.RetryJoinCh():
cmd.logger.Println("[ERR] Retry join failed: ", err)
c.logger.Println("[ERR] Retry join failed: ", err)
return 1
case <-agent.ShutdownCh():
// agent is already down!
@ -396,14 +419,14 @@ func (cmd *AgentCommand) run(args []string) int {
continue
case syscall.SIGHUP:
cmd.logger.Println("[INFO] Caught signal: ", sig)
c.logger.Println("[INFO] Caught signal: ", sig)
conf, err := cmd.handleReload(agent, config)
conf, err := c.handleReload(agent, config)
if conf != nil {
config = conf
}
if err != nil {
cmd.logger.Println("[ERR] Reload config failed: ", err)
c.logger.Println("[ERR] Reload config failed: ", err)
}
// Send result back if reload was called via HTTP
if reloadErrCh != nil {
@ -411,19 +434,19 @@ func (cmd *AgentCommand) run(args []string) int {
}
default:
cmd.logger.Println("[INFO] Caught signal: ", sig)
c.logger.Println("[INFO] Caught signal: ", sig)
graceful := (sig == os.Interrupt && !(config.SkipLeaveOnInt)) || (sig == syscall.SIGTERM && (config.LeaveOnTerm))
if !graceful {
cmd.logger.Println("[INFO] Graceful shutdown disabled. Exiting")
c.logger.Println("[INFO] Graceful shutdown disabled. Exiting")
return 1
}
cmd.logger.Println("[INFO] Gracefully shutting down agent...")
c.logger.Println("[INFO] Gracefully shutting down agent...")
gracefulCh := make(chan struct{})
go func() {
if err := agent.Leave(); err != nil {
cmd.logger.Println("[ERR] Error on leave:", err)
c.logger.Println("[ERR] Error on leave:", err)
return
}
close(gracefulCh)
@ -432,13 +455,13 @@ func (cmd *AgentCommand) run(args []string) int {
gracefulTimeout := 15 * time.Second
select {
case <-signalCh:
cmd.logger.Printf("[INFO] Caught second signal %v. Exiting\n", sig)
c.logger.Printf("[INFO] Caught second signal %v. Exiting\n", sig)
return 1
case <-time.After(gracefulTimeout):
cmd.logger.Println("[INFO] Timeout on graceful leave. Exiting")
c.logger.Println("[INFO] Timeout on graceful leave. Exiting")
return 1
case <-gracefulCh:
cmd.logger.Println("[INFO] Graceful exit completed")
c.logger.Println("[INFO] Graceful exit completed")
return 0
}
}
@ -446,10 +469,10 @@ func (cmd *AgentCommand) run(args []string) int {
}
// handleReload is invoked when we should reload our configs, e.g. SIGHUP
func (cmd *AgentCommand) handleReload(agent *agent.Agent, cfg *config.RuntimeConfig) (*config.RuntimeConfig, error) {
cmd.logger.Println("[INFO] Reloading configuration...")
func (c *cmd) handleReload(agent *agent.Agent, cfg *config.RuntimeConfig) (*config.RuntimeConfig, error) {
c.logger.Println("[INFO] Reloading configuration...")
var errs error
newCfg := cmd.readConfig()
newCfg := c.readConfig()
if newCfg == nil {
errs = multierror.Append(errs, fmt.Errorf("Failed to reload configs"))
return cfg, errs
@ -457,12 +480,12 @@ func (cmd *AgentCommand) handleReload(agent *agent.Agent, cfg *config.RuntimeCon
// Change the log level
minLevel := logutils.LogLevel(strings.ToUpper(newCfg.LogLevel))
if logger.ValidateLevelFilter(minLevel, cmd.logFilter) {
cmd.logFilter.SetMinLevel(minLevel)
if logger.ValidateLevelFilter(minLevel, c.logFilter) {
c.logFilter.SetMinLevel(minLevel)
} else {
errs = multierror.Append(fmt.Errorf(
"Invalid log level: %s. Valid log levels are: %v",
minLevel, cmd.logFilter.Levels))
minLevel, c.logFilter.Levels))
// Keep the current log level
newCfg.LogLevel = cfg.LogLevel
@ -476,28 +499,7 @@ func (cmd *AgentCommand) handleReload(agent *agent.Agent, cfg *config.RuntimeCon
return cfg, errs
}
func (cmd *AgentCommand) Synopsis() string {
return "Runs a Consul agent"
}
func (cmd *AgentCommand) Help() string {
cmd.InitFlagSet()
config.AddFlags(cmd.FlagSet, &config.Flags{})
return cmd.HelpCommand(`
Usage: consul agent [options]
const usage = `Usage: consul agent [options]
Starts the Consul agent and runs until an interrupt is received. The
agent represents a single node in a cluster.
`)
}
func printJSON(name string, v interface{}) {
fmt.Println(name)
b, err := json.MarshalIndent(v, "", " ")
if err != nil {
fmt.Printf("%#v\n", v)
return
}
fmt.Println(string(b))
}
agent represents a single node in a cluster.`

View File

@ -1,4 +1,4 @@
package command
package agent
import (
"fmt"
@ -16,42 +16,6 @@ import (
"github.com/mitchellh/cli"
)
func baseCommand(ui *cli.MockUi) BaseCommand {
return BaseCommand{
Flags: FlagSetNone,
UI: ui,
}
}
func TestCommand_implements(t *testing.T) {
t.Parallel()
var _ cli.Command = new(AgentCommand)
}
func TestValidDatacenter(t *testing.T) {
t.Parallel()
shouldMatch := []string{
"dc1",
"east-aws-001",
"PROD_aws01-small",
}
noMatch := []string{
"east.aws",
"east!aws",
"first,second",
}
for _, m := range shouldMatch {
if !validDatacenter.MatchString(m) {
t.Fatalf("expected match: %s", m)
}
}
for _, m := range noMatch {
if validDatacenter.MatchString(m) {
t.Fatalf("expected no match: %s", m)
}
}
}
// TestConfigFail should test command line flags that lead to an immediate error.
func TestConfigFail(t *testing.T) {
t.Parallel()
@ -129,11 +93,8 @@ func TestRetryJoin(t *testing.T) {
<-doneCh
}()
cmd := &AgentCommand{
Version: version.Version,
ShutdownCh: shutdownCh,
BaseCommand: baseCommand(cli.NewMockUi()),
}
ui := cli.NewMockUi()
cmd := New(ui, "", version.Version, "", "", shutdownCh)
args := []string{
"-server",
@ -173,10 +134,8 @@ func TestRetryJoinFail(t *testing.T) {
shutdownCh := make(chan struct{})
defer close(shutdownCh)
cmd := &AgentCommand{
ShutdownCh: shutdownCh,
BaseCommand: baseCommand(cli.NewMockUi()),
}
ui := cli.NewMockUi()
cmd := New(ui, "", "", "", "", shutdownCh)
args := []string{
"-bind", cfg.BindAddr.String(),
@ -201,10 +160,8 @@ func TestRetryJoinWanFail(t *testing.T) {
shutdownCh := make(chan struct{})
defer close(shutdownCh)
cmd := &AgentCommand{
ShutdownCh: shutdownCh,
BaseCommand: baseCommand(cli.NewMockUi()),
}
ui := cli.NewMockUi()
cmd := New(ui, "", "", "", "", shutdownCh)
args := []string{
"-server",
@ -245,11 +202,9 @@ func TestProtectDataDir(t *testing.T) {
}
ui := cli.NewMockUi()
cmd := &AgentCommand{
BaseCommand: baseCommand(ui),
args: []string{"-config-file=" + cfgFile.Name()},
}
if conf := cmd.readConfig(); conf != nil {
cmd := New(ui, "", "", "", "", nil)
args := []string{"-config-file=" + cfgFile.Name()}
if code := cmd.Run(args); code == 0 {
t.Fatalf("should fail")
}
if out := ui.ErrorWriter.String(); !strings.Contains(out, dir) {
@ -269,11 +224,9 @@ func TestBadDataDirPermissions(t *testing.T) {
defer os.RemoveAll(dataDir)
ui := cli.NewMockUi()
cmd := &AgentCommand{
BaseCommand: baseCommand(ui),
args: []string{"-data-dir=" + dataDir, "-server=true", "-bind=10.0.0.1"},
}
if conf := cmd.readConfig(); conf != nil {
cmd := New(ui, "", "", "", "", nil)
args := []string{"-data-dir=" + dataDir, "-server=true", "-bind=10.0.0.1"}
if code := cmd.Run(args); code == 0 {
t.Fatalf("Should fail with bad data directory permissions")
}
if out := ui.ErrorWriter.String(); !strings.Contains(out, "Permission denied") {

View File

@ -5,6 +5,7 @@ import (
"os/signal"
"syscall"
agentcmd "github.com/hashicorp/consul/command/agent"
"github.com/hashicorp/consul/command/cat"
"github.com/hashicorp/consul/command/catlistdc"
"github.com/hashicorp/consul/command/catlistnodes"
@ -55,17 +56,14 @@ func init() {
Commands = map[string]cli.CommandFactory{
"agent": func() (cli.Command, error) {
return &AgentCommand{
BaseCommand: BaseCommand{
Flags: FlagSetNone,
UI: ui,
},
Revision: version.GitCommit,
Version: version.Version,
VersionPrerelease: version.VersionPrerelease,
HumanVersion: version.GetHumanVersion(),
ShutdownCh: make(chan struct{}),
}, nil
return agentcmd.New(
ui,
version.GitCommit,
version.Version,
version.VersionPrerelease,
version.GetHumanVersion(),
make(chan struct{}),
), nil
},
"catalog": func() (cli.Command, error) {