open-consul/command/agent/command.go

1089 lines
34 KiB
Go
Raw Normal View History

2013-12-19 20:18:06 +00:00
package agent
import (
2013-12-21 00:39:32 +00:00
"flag"
2013-12-20 01:14:46 +00:00
"fmt"
2013-12-21 00:39:32 +00:00
"io"
2013-12-30 23:27:41 +00:00
"net"
2013-12-21 00:39:32 +00:00
"os"
"os/signal"
2014-09-02 21:23:43 +00:00
"path/filepath"
"regexp"
2013-12-19 20:18:06 +00:00
"strings"
2013-12-21 00:39:32 +00:00
"syscall"
"time"
"github.com/armon/go-metrics"
2015-06-16 22:05:55 +00:00
"github.com/armon/go-metrics/datadog"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/watch"
2014-09-02 21:23:43 +00:00
"github.com/hashicorp/go-checkpoint"
"github.com/hashicorp/go-reap"
"github.com/hashicorp/go-syslog"
"github.com/hashicorp/logutils"
2015-02-05 02:06:36 +00:00
scada "github.com/hashicorp/scada-client"
"github.com/mitchellh/cli"
2013-12-19 20:18:06 +00:00
)
2013-12-21 00:39:32 +00:00
// gracefulTimeout controls how long we wait before forcefully terminating
var gracefulTimeout = 5 * time.Second
// validDatacenter is used to validate a datacenter
var validDatacenter = regexp.MustCompile("^[a-zA-Z0-9_-]+$")
// Command is a Command implementation that runs a Consul agent.
2013-12-19 20:18:06 +00:00
// The command will not end unless a shutdown message is sent on the
// ShutdownCh. If two messages are sent on the ShutdownCh it will forcibly
// exit.
type Command struct {
2014-06-06 21:40:22 +00:00
Revision string
Version string
VersionPrerelease string
Ui cli.Ui
ShutdownCh <-chan struct{}
args []string
logFilter *logutils.LevelFilter
logOutput io.Writer
2014-06-06 21:40:22 +00:00
agent *Agent
rpcServer *AgentRPC
httpServers []*HTTPServer
2014-06-06 21:40:22 +00:00
dnsServer *DNSServer
2015-02-05 02:06:36 +00:00
scadaProvider *scada.Provider
scadaHttp *HTTPServer
2013-12-21 00:39:32 +00:00
}
// readConfig is responsible for setup of our configuration using
// the command line and any file configs
func (c *Command) readConfig() *Config {
var cmdConfig Config
var configFiles []string
2014-10-12 17:50:15 +00:00
var retryInterval string
var retryIntervalWan string
2015-02-21 06:45:52 +00:00
var dnsRecursors []string
2015-11-29 04:40:05 +00:00
var dev bool
2013-12-21 00:39:32 +00:00
cmdFlags := flag.NewFlagSet("agent", flag.ContinueOnError)
cmdFlags.Usage = func() { c.Ui.Output(c.Help()) }
2014-04-11 22:22:35 +00:00
cmdFlags.Var((*AppendSliceValue)(&configFiles), "config-file", "json file to read config from")
cmdFlags.Var((*AppendSliceValue)(&configFiles), "config-dir", "directory of json files to read")
2015-02-21 06:45:52 +00:00
cmdFlags.Var((*AppendSliceValue)(&dnsRecursors), "recursor", "address of an upstream DNS server")
2015-11-29 04:40:05 +00:00
cmdFlags.BoolVar(&dev, "dev", false, "development server mode")
2014-04-11 22:22:35 +00:00
2013-12-21 00:39:32 +00:00
cmdFlags.StringVar(&cmdConfig.LogLevel, "log-level", "", "log level")
cmdFlags.StringVar(&cmdConfig.NodeName, "node", "", "node name")
cmdFlags.StringVar(&cmdConfig.Datacenter, "dc", "", "node datacenter")
2014-04-11 22:22:35 +00:00
cmdFlags.StringVar(&cmdConfig.DataDir, "data-dir", "", "path to the data directory")
2015-12-22 17:30:19 +00:00
cmdFlags.BoolVar(&cmdConfig.EnableUi, "ui", false, "enable the built-in web UI")
2014-04-23 19:37:53 +00:00
cmdFlags.StringVar(&cmdConfig.UiDir, "ui-dir", "", "path to the web UI directory")
2014-05-06 03:29:50 +00:00
cmdFlags.StringVar(&cmdConfig.PidFile, "pid-file", "", "path to file to store PID")
cmdFlags.StringVar(&cmdConfig.EncryptKey, "encrypt", "", "gossip encryption key")
2014-09-15 00:31:44 +00:00
cmdFlags.BoolVar(&cmdConfig.Server, "server", false, "run agent as server")
cmdFlags.BoolVar(&cmdConfig.Bootstrap, "bootstrap", false, "enable server bootstrap mode")
cmdFlags.IntVar(&cmdConfig.BootstrapExpect, "bootstrap-expect", 0, "enable automatic bootstrap via expect mode")
cmdFlags.StringVar(&cmdConfig.Domain, "domain", "", "domain to use for DNS interface")
2014-04-11 22:22:35 +00:00
cmdFlags.StringVar(&cmdConfig.ClientAddr, "client", "", "address to bind client listeners to (DNS, HTTP, HTTPS, RPC)")
2014-04-11 22:22:35 +00:00
cmdFlags.StringVar(&cmdConfig.BindAddr, "bind", "", "address to bind server listeners to")
2015-08-20 18:27:20 +00:00
cmdFlags.IntVar(&cmdConfig.Ports.HTTP, "http-port", 0, "http port to use")
cmdFlags.StringVar(&cmdConfig.AdvertiseAddr, "advertise", "", "address to advertise instead of bind addr")
2015-03-28 14:48:06 +00:00
cmdFlags.StringVar(&cmdConfig.AdvertiseAddrWan, "advertise-wan", "", "address to advertise on wan instead of bind or advertise addr")
2014-04-11 22:22:35 +00:00
cmdFlags.StringVar(&cmdConfig.AtlasInfrastructure, "atlas", "", "infrastructure name in Atlas")
2015-02-05 01:23:42 +00:00
cmdFlags.StringVar(&cmdConfig.AtlasToken, "atlas-token", "", "authentication token for Atlas")
2015-02-05 02:45:08 +00:00
cmdFlags.BoolVar(&cmdConfig.AtlasJoin, "atlas-join", false, "auto-join with Atlas")
2015-08-27 18:11:05 +00:00
cmdFlags.StringVar(&cmdConfig.AtlasEndpoint, "atlas-endpoint", "", "endpoint for Atlas integration")
2015-02-05 01:23:42 +00:00
cmdFlags.IntVar(&cmdConfig.Protocol, "protocol", -1, "protocol version")
2014-04-11 22:22:35 +00:00
2014-05-21 19:06:03 +00:00
cmdFlags.BoolVar(&cmdConfig.EnableSyslog, "syslog", false,
"enable logging to syslog facility")
cmdFlags.BoolVar(&cmdConfig.RejoinAfterLeave, "rejoin", false,
"enable re-joining after a previous leave")
cmdFlags.Var((*AppendSliceValue)(&cmdConfig.StartJoin), "join",
"address of agent to join on startup")
cmdFlags.Var((*AppendSliceValue)(&cmdConfig.StartJoinWan), "join-wan",
"address of agent to join -wan on startup")
2014-10-12 17:50:15 +00:00
cmdFlags.Var((*AppendSliceValue)(&cmdConfig.RetryJoin), "retry-join",
"address of agent to join on startup with retry")
cmdFlags.IntVar(&cmdConfig.RetryMaxAttempts, "retry-max", 0,
"number of retries for joining")
cmdFlags.StringVar(&retryInterval, "retry-interval", "",
"interval between join attempts")
cmdFlags.Var((*AppendSliceValue)(&cmdConfig.RetryJoinWan), "retry-join-wan",
"address of agent to join -wan on startup with retry")
cmdFlags.IntVar(&cmdConfig.RetryMaxAttemptsWan, "retry-max-wan", 0,
"number of retries for joining -wan")
cmdFlags.StringVar(&retryIntervalWan, "retry-interval-wan", "",
"interval between join -wan attempts")
2013-12-21 00:39:32 +00:00
if err := cmdFlags.Parse(c.args); err != nil {
return nil
}
2014-10-12 17:50:15 +00:00
if retryInterval != "" {
dur, err := time.ParseDuration(retryInterval)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error: %s", err))
return nil
}
cmdConfig.RetryInterval = dur
}
if retryIntervalWan != "" {
dur, err := time.ParseDuration(retryIntervalWan)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error: %s", err))
return nil
}
cmdConfig.RetryIntervalWan = dur
}
2015-11-29 04:40:05 +00:00
var config *Config
if dev {
config = DevConfig()
} else {
config = DefaultConfig()
}
2013-12-21 00:39:32 +00:00
if len(configFiles) > 0 {
fileConfig, err := ReadConfigPaths(configFiles)
if err != nil {
c.Ui.Error(err.Error())
return nil
}
config = MergeConfig(config, fileConfig)
}
2015-02-21 06:45:52 +00:00
cmdConfig.DNSRecursors = append(cmdConfig.DNSRecursors, dnsRecursors...)
2013-12-21 00:39:32 +00:00
config = MergeConfig(config, &cmdConfig)
if config.NodeName == "" {
hostname, err := os.Hostname()
if err != nil {
2016-03-31 21:47:55 +00:00
c.Ui.Error(fmt.Sprintf("Error determining node name: %s", err))
return nil
}
2013-12-21 00:39:32 +00:00
config.NodeName = hostname
}
config.NodeName = strings.TrimSpace(config.NodeName)
if config.NodeName == "" {
c.Ui.Error("Node name can not be empty")
return nil
}
2013-12-21 00:39:32 +00:00
// Make sure SkipLeaveOnInt is set to the right default based on the
// agent's mode (client or server)
if config.SkipLeaveOnInt == nil {
config.SkipLeaveOnInt = new(bool)
if config.Server {
*config.SkipLeaveOnInt = true
} else {
*config.SkipLeaveOnInt = false
}
}
// Ensure we have a data directory
2015-11-29 04:40:05 +00:00
if config.DataDir == "" && !dev {
c.Ui.Error("Must specify data directory using -data-dir")
return nil
}
// Ensure all endpoints are unique
if err := config.verifyUniqueListeners(); err != nil {
c.Ui.Error(fmt.Sprintf("All listening endpoints must be unique: %s", err))
return nil
}
// Check the data dir for signs of an un-migrated Consul 0.5.x or older
// server. Consul refuses to start if this is present to protect a server
// with existing data from starting on a fresh data set.
if config.Server {
2015-10-15 21:21:35 +00:00
mdbPath := filepath.Join(config.DataDir, "mdb")
if _, err := os.Stat(mdbPath); !os.IsNotExist(err) {
if os.IsPermission(err) {
c.Ui.Error(fmt.Sprintf("CRITICAL: Permission denied for data folder at %q!", mdbPath))
c.Ui.Error("Consul will refuse to boot without access to this directory.")
c.Ui.Error("Please correct permissions and try starting again.")
return nil
}
2015-10-15 21:21:35 +00:00
c.Ui.Error(fmt.Sprintf("CRITICAL: Deprecated data folder found at %q!", mdbPath))
c.Ui.Error("Consul will refuse to boot with this directory present.")
2016-01-13 22:44:01 +00:00
c.Ui.Error("See https://www.consul.io/docs/upgrade-specific.html for more information.")
return nil
}
}
// Verify DNS settings
if config.DNSConfig.UDPAnswerLimit < 1 {
c.Ui.Error(fmt.Sprintf("dns_config.udp_answer_limit %d too low, must always be greater than zero", config.DNSConfig.UDPAnswerLimit))
}
2014-09-15 00:31:44 +00:00
if config.EncryptKey != "" {
if _, err := config.EncryptBytes(); err != nil {
c.Ui.Error(fmt.Sprintf("Invalid encryption key: %s", err))
return nil
}
keyfileLAN := filepath.Join(config.DataDir, serfLANKeyring)
if _, err := os.Stat(keyfileLAN); err == nil {
c.Ui.Error("WARNING: LAN keyring exists but -encrypt given, using keyring")
}
if config.Server {
keyfileWAN := filepath.Join(config.DataDir, serfWANKeyring)
if _, err := os.Stat(keyfileWAN); err == nil {
c.Ui.Error("WARNING: WAN keyring exists but -encrypt given, using keyring")
}
}
}
// Ensure the datacenter is always lowercased. The DNS endpoints automatically
// lowercase all queries, and internally we expect DC1 and dc1 to be the same.
config.Datacenter = strings.ToLower(config.Datacenter)
2015-02-19 22:45:47 +00:00
// Verify datacenter is valid
if !validDatacenter.MatchString(config.Datacenter) {
c.Ui.Error("Datacenter must be alpha-numeric with underscores and hypens only")
return nil
}
// Only allow bootstrap mode when acting as a server
if config.Bootstrap && !config.Server {
c.Ui.Error("Bootstrap mode cannot be enabled when server mode is not enabled")
return nil
}
// Expect can only work when acting as a server
if config.BootstrapExpect != 0 && !config.Server {
c.Ui.Error("Expect mode cannot be enabled when server mode is not enabled")
return nil
}
// Expect & Bootstrap are mutually exclusive
if config.BootstrapExpect != 0 && config.Bootstrap {
c.Ui.Error("Bootstrap cannot be provided with an expected server count")
return nil
}
// Compile all the watches
for _, params := range config.Watches {
// Parse the watches, excluding the handler
wp, err := watch.ParseExempt(params, []string{"handler"})
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to parse watch (%#v): %v", params, err))
return nil
}
// Get the handler
if err := verifyWatchHandler(wp.Exempt["handler"]); err != nil {
c.Ui.Error(fmt.Sprintf("Failed to setup watch handler (%#v): %v", params, err))
return nil
}
// Store the watch plan
config.WatchPlans = append(config.WatchPlans, wp)
}
// Warn if we are in expect mode
if config.BootstrapExpect == 1 {
c.Ui.Error("WARNING: BootstrapExpect Mode is specified as 1; this is the same as Bootstrap mode.")
config.BootstrapExpect = 0
config.Bootstrap = true
} else if config.BootstrapExpect > 0 {
c.Ui.Error(fmt.Sprintf("WARNING: Expect Mode enabled, expecting %d servers", config.BootstrapExpect))
}
// Warn if we are in bootstrap mode
if config.Bootstrap {
c.Ui.Error("WARNING: Bootstrap mode enabled! Do not enable unless necessary")
}
2014-06-06 21:40:22 +00:00
// Set the version info
config.Revision = c.Revision
config.Version = c.Version
config.VersionPrerelease = c.VersionPrerelease
2013-12-21 00:39:32 +00:00
return config
}
// verifyUniqueListeners checks to see if an address was used more than once in
// the config
func (config *Config) verifyUniqueListeners() error {
listeners := []struct {
host string
port int
descr string
}{
{config.Addresses.RPC, config.Ports.RPC, "RPC"},
{config.Addresses.DNS, config.Ports.DNS, "DNS"},
{config.Addresses.HTTP, config.Ports.HTTP, "HTTP"},
{config.Addresses.HTTPS, config.Ports.HTTPS, "HTTPS"},
{config.AdvertiseAddr, config.Ports.Server, "Server RPC"},
{config.AdvertiseAddr, config.Ports.SerfLan, "Serf LAN"},
{config.AdvertiseAddr, config.Ports.SerfWan, "Serf WAN"},
}
type key struct {
host string
port int
}
m := make(map[key]string, len(listeners))
for _, l := range listeners {
if l.host == "" {
l.host = "0.0.0.0"
} else if strings.HasPrefix(l.host, "unix") {
// Don't compare ports on unix sockets
l.port = 0
}
if l.host == "0.0.0.0" && l.port <= 0 {
continue
}
k := key{l.host, l.port}
v, ok := m[k]
if ok {
return fmt.Errorf("%s address already configured for %s", l.descr, v)
}
m[k] = l.descr
}
return nil
}
2013-12-21 00:39:32 +00:00
// setupLoggers is used to setup the logGate, logWriter, and our logOutput
func (c *Command) setupLoggers(config *Config) (*GatedWriter, *logWriter, io.Writer) {
// Setup logging. First create the gated log writer, which will
// store logs until we're ready to show them. Then create the level
// filter, filtering logs of the specified level.
logGate := &GatedWriter{
Writer: &cli.UiWriter{Ui: c.Ui},
}
c.logFilter = LevelFilter()
c.logFilter.MinLevel = logutils.LogLevel(strings.ToUpper(config.LogLevel))
c.logFilter.Writer = logGate
if !ValidateLevelFilter(c.logFilter.MinLevel, c.logFilter) {
c.Ui.Error(fmt.Sprintf(
"Invalid log level: %s. Valid log levels are: %v",
c.logFilter.MinLevel, c.logFilter.Levels))
return nil, nil, nil
}
2014-05-21 19:06:03 +00:00
// Check if syslog is enabled
var syslog io.Writer
if config.EnableSyslog {
l, err := gsyslog.NewLogger(gsyslog.LOG_NOTICE, config.SyslogFacility, "consul")
2014-05-21 19:06:03 +00:00
if err != nil {
c.Ui.Error(fmt.Sprintf("Syslog setup failed: %v", err))
return nil, nil, nil
}
syslog = &SyslogWrapper{l, c.logFilter}
2014-05-21 19:06:03 +00:00
}
2013-12-21 00:39:32 +00:00
// Create a log writer, and wrap a logOutput around it
logWriter := NewLogWriter(512)
2014-05-21 19:06:03 +00:00
var logOutput io.Writer
if syslog != nil {
logOutput = io.MultiWriter(c.logFilter, logWriter, syslog)
} else {
logOutput = io.MultiWriter(c.logFilter, logWriter)
}
c.logOutput = logOutput
2013-12-21 00:39:32 +00:00
return logGate, logWriter, logOutput
2013-12-19 20:18:06 +00:00
}
2013-12-23 19:38:51 +00:00
// setupAgent is used to start the agent and various interfaces
2013-12-30 23:27:41 +00:00
func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *logWriter) error {
2013-12-23 19:38:51 +00:00
c.Ui.Output("Starting Consul agent...")
agent, err := Create(config, logOutput)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error starting agent: %s", err))
return err
}
c.agent = agent
2013-12-30 23:27:41 +00:00
// Setup the RPC listener
rpcAddr, err := config.ClientListener(config.Addresses.RPC, config.Ports.RPC)
2014-04-11 22:22:35 +00:00
if err != nil {
c.Ui.Error(fmt.Sprintf("Invalid RPC bind address: %s", err))
return err
}
// Clear the domain socket file if it exists
socketPath, isSocket := unixSocketAddr(config.Addresses.RPC)
if isSocket {
if _, err := os.Stat(socketPath); !os.IsNotExist(err) {
agent.logger.Printf("[WARN] agent: Replacing socket %q", socketPath)
}
if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) {
c.Ui.Output(fmt.Sprintf("Error removing socket file: %s", err))
return err
2015-01-16 01:24:15 +00:00
}
}
rpcListener, err := net.Listen(rpcAddr.Network(), rpcAddr.String())
2013-12-30 23:27:41 +00:00
if err != nil {
agent.Shutdown()
c.Ui.Error(fmt.Sprintf("Error starting RPC listener: %s", err))
return err
}
// Set up ownership/permission bits on the socket file
if isSocket {
if err := setFilePermissions(socketPath, config.UnixSockets); err != nil {
agent.Shutdown()
c.Ui.Error(fmt.Sprintf("Error setting up socket: %s", err))
return err
}
}
2013-12-30 23:27:41 +00:00
// Start the IPC layer
c.Ui.Output("Starting Consul agent RPC...")
2013-12-30 23:27:41 +00:00
c.rpcServer = NewAgentRPC(agent, rpcListener, logOutput, logWriter)
2015-02-05 02:17:45 +00:00
// Enable the SCADA integration
if err := c.setupScadaConn(config); err != nil {
agent.Shutdown()
c.Ui.Error(fmt.Sprintf("Error starting SCADA connection: %s", err))
return err
2015-02-05 02:17:45 +00:00
}
if config.Ports.HTTP > 0 || config.Ports.HTTPS > 0 {
servers, err := NewHTTPServers(agent, config, logOutput)
2013-12-23 19:38:51 +00:00
if err != nil {
agent.Shutdown()
2014-11-25 19:06:33 +00:00
c.Ui.Error(fmt.Sprintf("Error starting http servers: %s", err))
2013-12-23 19:38:51 +00:00
return err
}
c.httpServers = servers
2013-12-23 19:38:51 +00:00
}
2014-04-11 22:22:35 +00:00
if config.Ports.DNS > 0 {
dnsAddr, err := config.ClientListener(config.Addresses.DNS, config.Ports.DNS)
2014-04-11 22:22:35 +00:00
if err != nil {
agent.Shutdown()
2014-04-11 22:22:35 +00:00
c.Ui.Error(fmt.Sprintf("Invalid DNS bind address: %s", err))
return err
}
server, err := NewDNSServer(agent, &config.DNSConfig, logOutput,
config.Domain, dnsAddr.String(), config.DNSRecursors)
if err != nil {
agent.Shutdown()
c.Ui.Error(fmt.Sprintf("Error starting dns server: %s", err))
return err
}
c.dnsServer = server
}
2014-09-02 21:23:43 +00:00
// Setup update checking
if !config.DisableUpdateCheck {
version := config.Version
if config.VersionPrerelease != "" {
version += fmt.Sprintf("-%s", config.VersionPrerelease)
}
2014-09-02 21:23:43 +00:00
updateParams := &checkpoint.CheckParams{
Product: "consul",
Version: version,
2014-09-02 21:23:43 +00:00
}
if !config.DisableAnonymousSignature {
updateParams.SignatureFile = filepath.Join(config.DataDir, "checkpoint-signature")
}
// Schedule a periodic check with expected interval of 24 hours
checkpoint.CheckInterval(updateParams, 24*time.Hour, c.checkpointResults)
// Do an immediate check within the next 30 seconds
go func() {
time.Sleep(lib.RandomStagger(30 * time.Second))
2014-09-02 21:23:43 +00:00
c.checkpointResults(checkpoint.Check(updateParams))
}()
}
2013-12-23 19:38:51 +00:00
return nil
}
2014-09-02 21:23:43 +00:00
// checkpointResults is used to handler periodic results from our update checker
func (c *Command) checkpointResults(results *checkpoint.CheckResponse, err error) {
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to check for updates: %v", err))
return
}
if results.Outdated {
versionStr := c.Version
if c.VersionPrerelease != "" {
versionStr += fmt.Sprintf("-%s", c.VersionPrerelease)
}
c.Ui.Error(fmt.Sprintf("Newer Consul version available: %s (currently running: %s)", results.CurrentVersion, versionStr))
2014-09-02 21:23:43 +00:00
}
for _, alert := range results.Alerts {
switch alert.Level {
case "info":
c.Ui.Info(fmt.Sprintf("Bulletin [%s]: %s (%s)", alert.Level, alert.Message, alert.URL))
default:
c.Ui.Error(fmt.Sprintf("Bulletin [%s]: %s (%s)", alert.Level, alert.Message, alert.URL))
}
}
}
// startupJoin is invoked to handle any joins specified to take place at start time
func (c *Command) startupJoin(config *Config) error {
if len(config.StartJoin) == 0 {
return nil
}
c.Ui.Output("Joining cluster...")
n, err := c.agent.JoinLAN(config.StartJoin)
if err != nil {
return err
}
c.Ui.Info(fmt.Sprintf("Join completed. Synced with %d initial agents", n))
return nil
}
// startupJoinWan is invoked to handle any joins -wan specified to take place at start time
func (c *Command) startupJoinWan(config *Config) error {
if len(config.StartJoinWan) == 0 {
return nil
}
c.Ui.Output("Joining -wan cluster...")
n, err := c.agent.JoinWAN(config.StartJoinWan)
if err != nil {
return err
}
c.Ui.Info(fmt.Sprintf("Join -wan completed. Synced with %d initial agents", n))
return nil
}
2014-10-12 19:45:40 +00:00
// retryJoin is used to handle retrying a join until it succeeds or all
// retries are exhausted.
2014-10-12 17:50:15 +00:00
func (c *Command) retryJoin(config *Config, errCh chan<- struct{}) {
if len(config.RetryJoin) == 0 {
return
}
logger := c.agent.logger
logger.Printf("[INFO] agent: Joining cluster...")
attempt := 0
for {
n, err := c.agent.JoinLAN(config.RetryJoin)
if err == nil {
logger.Printf("[INFO] agent: Join completed. Synced with %d initial agents", n)
return
}
attempt++
if config.RetryMaxAttempts > 0 && attempt > config.RetryMaxAttempts {
logger.Printf("[ERROR] agent: max join retry exhausted, exiting")
close(errCh)
return
}
logger.Printf("[WARN] agent: Join failed: %v, retrying in %v", err,
config.RetryInterval)
time.Sleep(config.RetryInterval)
}
}
// retryJoinWan is used to handle retrying a join -wan until it succeeds or all
// retries are exhausted.
func (c *Command) retryJoinWan(config *Config, errCh chan<- struct{}) {
if len(config.RetryJoinWan) == 0 {
return
}
logger := c.agent.logger
logger.Printf("[INFO] agent: Joining WAN cluster...")
attempt := 0
for {
n, err := c.agent.JoinWAN(config.RetryJoinWan)
if err == nil {
logger.Printf("[INFO] agent: Join -wan completed. Synced with %d initial agents", n)
return
}
attempt++
if config.RetryMaxAttemptsWan > 0 && attempt > config.RetryMaxAttemptsWan {
logger.Printf("[ERROR] agent: max join -wan retry exhausted, exiting")
close(errCh)
return
}
logger.Printf("[WARN] agent: Join -wan failed: %v, retrying in %v", err,
config.RetryIntervalWan)
time.Sleep(config.RetryIntervalWan)
}
}
// gossipEncrypted determines if the consul instance is using symmetric
// encryption keys to protect gossip protocol messages.
func (c *Command) gossipEncrypted() bool {
if c.agent.config.EncryptKey != "" {
return true
}
server := c.agent.server
if server != nil {
return server.KeyManagerLAN() != nil || server.KeyManagerWAN() != nil
}
client := c.agent.client
return client != nil && client.KeyManagerLAN() != nil
}
2013-12-19 20:18:06 +00:00
func (c *Command) Run(args []string) int {
2013-12-20 01:14:46 +00:00
c.Ui = &cli.PrefixedUi{
OutputPrefix: "==> ",
InfoPrefix: " ",
ErrorPrefix: "==> ",
Ui: c.Ui,
}
2013-12-21 00:39:32 +00:00
// Parse our configs
c.args = args
config := c.readConfig()
if config == nil {
return 1
}
// Setup the log outputs
logGate, logWriter, logOutput := c.setupLoggers(config)
if logWriter == nil {
return 1
}
/* Setup telemetry
Aggregate on 10 second intervals for 1 minute. Expose the
metrics over stderr when there is a SIGUSR1 received.
*/
inm := metrics.NewInmemSink(10*time.Second, time.Minute)
metrics.DefaultInmemSignal(inm)
metricsConf := metrics.DefaultConfig(config.Telemetry.StatsitePrefix)
metricsConf.EnableHostname = !config.Telemetry.DisableHostname
2014-09-02 18:26:08 +00:00
// Configure the statsite sink
var fanout metrics.FanoutSink
if config.Telemetry.StatsiteAddr != "" {
sink, err := metrics.NewStatsiteSink(config.Telemetry.StatsiteAddr)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to start statsite sink. Got: %s", err))
return 1
}
2014-09-02 18:26:08 +00:00
fanout = append(fanout, sink)
}
// Configure the statsd sink
if config.Telemetry.StatsdAddr != "" {
sink, err := metrics.NewStatsdSink(config.Telemetry.StatsdAddr)
2014-09-02 18:26:08 +00:00
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to start statsd sink. Got: %s", err))
return 1
}
fanout = append(fanout, sink)
2015-06-16 22:05:55 +00:00
}
// Configure the DogStatsd sink
if config.Telemetry.DogStatsdAddr != "" {
2015-06-16 22:05:55 +00:00
var tags []string
if config.Telemetry.DogStatsdTags != nil {
tags = config.Telemetry.DogStatsdTags
2015-06-16 22:05:55 +00:00
}
sink, err := datadog.NewDogStatsdSink(config.Telemetry.DogStatsdAddr, metricsConf.HostName)
2015-06-16 22:05:55 +00:00
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to start DogStatsd sink. Got: %s", err))
return 1
}
sink.SetTags(tags)
fanout = append(fanout, sink)
2014-09-02 18:26:08 +00:00
}
2014-09-02 18:26:08 +00:00
// Initialize the global sink
if len(fanout) > 0 {
fanout = append(fanout, inm)
metrics.NewGlobal(metricsConf, fanout)
} else {
metricsConf.EnableHostname = false
metrics.NewGlobal(metricsConf, inm)
}
2013-12-21 00:39:32 +00:00
// Create the agent
2013-12-30 23:27:41 +00:00
if err := c.setupAgent(config, logOutput, logWriter); err != nil {
2013-12-20 01:14:46 +00:00
return 1
}
2013-12-23 19:38:51 +00:00
defer c.agent.Shutdown()
2013-12-30 23:27:41 +00:00
if c.rpcServer != nil {
defer c.rpcServer.Shutdown()
}
if c.dnsServer != nil {
defer c.dnsServer.Shutdown()
}
for _, server := range c.httpServers {
defer server.Shutdown()
2013-12-23 19:38:51 +00:00
}
// Enable child process reaping
if (config.Reap != nil && *config.Reap) || (config.Reap == nil && os.Getpid() == 1) {
if !reap.IsSupported() {
c.Ui.Error("Child process reaping is not supported on this platform (set reap=false)")
return 1
} else {
logger := c.agent.logger
logger.Printf("[DEBUG] Automatically reaping child processes")
pids := make(reap.PidCh, 1)
errors := make(reap.ErrorCh, 1)
go func() {
for {
select {
case pid := <-pids:
logger.Printf("[DEBUG] Reaped child process %d", pid)
case err := <-errors:
logger.Printf("[ERR] Error reaping child process: %v", err)
case <-c.agent.shutdownCh:
return
}
}
}()
2016-01-13 05:10:25 +00:00
go reap.ReapChildren(pids, errors, c.agent.shutdownCh, &c.agent.reapLock)
}
}
// Check and shut down the SCADA listeners at the end
defer func() {
if c.scadaHttp != nil {
c.scadaHttp.Shutdown()
}
if c.scadaProvider != nil {
c.scadaProvider.Shutdown()
}
}()
2013-12-20 01:14:46 +00:00
// Join startup nodes if specified
if err := c.startupJoin(config); err != nil {
c.Ui.Error(err.Error())
return 1
}
// Join startup nodes if specified
if err := c.startupJoinWan(config); err != nil {
c.Ui.Error(err.Error())
return 1
}
// Get the new client http listener addr
httpAddr, err := config.ClientListener(config.Addresses.HTTP, config.Ports.HTTP)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to determine HTTP address: %v", err))
}
// Register the watches
for _, wp := range config.WatchPlans {
go func(wp *watch.WatchPlan) {
2016-01-13 05:10:25 +00:00
wp.Handler = makeWatchHandler(logOutput, wp.Exempt["handler"], &c.agent.reapLock)
wp.LogOutput = c.logOutput
if err := wp.Run(httpAddr.String()); err != nil {
c.Ui.Error(fmt.Sprintf("Error running watch: %v", err))
}
}(wp)
}
// Figure out if gossip is encrypted
2014-10-12 00:29:24 +00:00
var gossipEncrypted bool
if config.Server {
gossipEncrypted = c.agent.server.Encrypted()
} else {
gossipEncrypted = c.agent.client.Encrypted()
}
2015-02-05 02:06:36 +00:00
// Determine the Atlas cluster
atlas := "<disabled>"
if config.AtlasInfrastructure != "" {
atlas = fmt.Sprintf("(Infrastructure: '%s' Join: %v)", config.AtlasInfrastructure, config.AtlasJoin)
2015-02-05 02:06:36 +00:00
}
// Let the agent know we've finished registration
2014-01-21 19:52:25 +00:00
c.agent.StartSync()
2013-12-21 00:39:32 +00:00
c.Ui.Output("Consul agent running!")
c.Ui.Info(fmt.Sprintf(" Node name: '%s'", config.NodeName))
c.Ui.Info(fmt.Sprintf(" Datacenter: '%s'", config.Datacenter))
c.Ui.Info(fmt.Sprintf(" Server: %v (bootstrap: %v)", config.Server, config.Bootstrap))
c.Ui.Info(fmt.Sprintf(" Client Addr: %v (HTTP: %d, HTTPS: %d, DNS: %d, RPC: %d)", config.ClientAddr,
config.Ports.HTTP, config.Ports.HTTPS, config.Ports.DNS, config.Ports.RPC))
c.Ui.Info(fmt.Sprintf(" Cluster Addr: %v (LAN: %d, WAN: %d)", config.AdvertiseAddr,
2014-04-11 22:54:03 +00:00
config.Ports.SerfLan, config.Ports.SerfWan))
c.Ui.Info(fmt.Sprintf("Gossip encrypt: %v, RPC-TLS: %v, TLS-Incoming: %v",
gossipEncrypted, config.VerifyOutgoing, config.VerifyIncoming))
c.Ui.Info(fmt.Sprintf(" Atlas: %s", atlas))
2013-12-21 00:39:32 +00:00
// Enable log streaming
c.Ui.Info("")
c.Ui.Output("Log data will now stream in as it occurs:\n")
logGate.Flush()
2014-10-12 17:50:15 +00:00
// Start retry join process
errCh := make(chan struct{})
go c.retryJoin(config, errCh)
// Start retry -wan join process
errWanCh := make(chan struct{})
go c.retryJoinWan(config, errWanCh)
2013-12-21 00:39:32 +00:00
// Wait for exit
return c.handleSignals(config, errCh, errWanCh)
2013-12-21 00:39:32 +00:00
}
// handleSignals blocks until we get an exit-causing signal
func (c *Command) handleSignals(config *Config, retryJoin <-chan struct{}, retryJoinWan <-chan struct{}) int {
2013-12-21 00:39:32 +00:00
signalCh := make(chan os.Signal, 4)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)
// Wait for a signal
WAIT:
var sig os.Signal
2013-12-20 01:14:46 +00:00
select {
2013-12-21 00:39:32 +00:00
case s := <-signalCh:
sig = s
case <-c.rpcServer.ReloadCh():
sig = syscall.SIGHUP
2013-12-20 01:14:46 +00:00
case <-c.ShutdownCh:
2013-12-21 00:39:32 +00:00
sig = os.Interrupt
2014-10-12 17:50:15 +00:00
case <-retryJoin:
return 1
case <-retryJoinWan:
return 1
2013-12-23 19:38:51 +00:00
case <-c.agent.ShutdownCh():
2013-12-21 00:39:32 +00:00
// Agent is already shutdown!
2013-12-20 01:14:46 +00:00
return 0
}
2013-12-21 00:39:32 +00:00
c.Ui.Output(fmt.Sprintf("Caught signal: %v", sig))
// Check if this is a SIGHUP
if sig == syscall.SIGHUP {
if conf := c.handleReload(config); conf != nil {
config = conf
}
2013-12-21 00:39:32 +00:00
goto WAIT
}
// Check if we should do a graceful leave
graceful := false
if sig == os.Interrupt && !(*config.SkipLeaveOnInt) {
2013-12-21 00:39:32 +00:00
graceful = true
} else if sig == syscall.SIGTERM && config.LeaveOnTerm {
graceful = true
}
// Bail fast if not doing a graceful leave
if !graceful {
return 1
}
// Attempt a graceful leave
gracefulCh := make(chan struct{})
c.Ui.Output("Gracefully shutting down agent...")
go func() {
2013-12-23 19:38:51 +00:00
if err := c.agent.Leave(); err != nil {
2013-12-21 00:39:32 +00:00
c.Ui.Error(fmt.Sprintf("Error: %s", err))
return
}
close(gracefulCh)
}()
// Wait for leave or another signal
select {
case <-signalCh:
return 1
case <-time.After(gracefulTimeout):
return 1
case <-gracefulCh:
return 0
}
}
// handleReload is invoked when we should reload our configs, e.g. SIGHUP
2013-12-23 19:38:51 +00:00
func (c *Command) handleReload(config *Config) *Config {
2013-12-21 00:39:32 +00:00
c.Ui.Output("Reloading configuration...")
newConf := c.readConfig()
if newConf == nil {
c.Ui.Error(fmt.Sprintf("Failed to reload configs"))
return config
}
// Change the log level
minLevel := logutils.LogLevel(strings.ToUpper(newConf.LogLevel))
if ValidateLevelFilter(minLevel, c.logFilter) {
c.logFilter.SetMinLevel(minLevel)
} else {
c.Ui.Error(fmt.Sprintf(
"Invalid log level: %s. Valid log levels are: %v",
minLevel, c.logFilter.Levels))
// Keep the current log level
newConf.LogLevel = config.LogLevel
}
// Bulk update the services and checks
c.agent.PauseSync()
defer c.agent.ResumeSync()
// Snapshot the current state, and restore it afterwards
snap := c.agent.snapshotCheckState()
defer c.agent.restoreCheckState(snap)
// First unload all checks and services. This lets us begin the reload
// with a clean slate.
if err := c.agent.unloadServices(); err != nil {
c.Ui.Error(fmt.Sprintf("Failed unloading services: %s", err))
return nil
}
if err := c.agent.unloadChecks(); err != nil {
c.Ui.Error(fmt.Sprintf("Failed unloading checks: %s", err))
return nil
}
// Reload services and check definitions.
if err := c.agent.loadServices(newConf); err != nil {
c.Ui.Error(fmt.Sprintf("Failed reloading services: %s", err))
return nil
}
if err := c.agent.loadChecks(newConf); err != nil {
c.Ui.Error(fmt.Sprintf("Failed reloading checks: %s", err))
return nil
}
// Get the new client listener addr
httpAddr, err := newConf.ClientListener(config.Addresses.HTTP, config.Ports.HTTP)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to determine HTTP address: %v", err))
}
// Deregister the old watches
for _, wp := range config.WatchPlans {
wp.Stop()
}
// Register the new watches
for _, wp := range newConf.WatchPlans {
go func(wp *watch.WatchPlan) {
2016-01-13 05:10:25 +00:00
wp.Handler = makeWatchHandler(c.logOutput, wp.Exempt["handler"], &c.agent.reapLock)
wp.LogOutput = c.logOutput
if err := wp.Run(httpAddr.String()); err != nil {
c.Ui.Error(fmt.Sprintf("Error running watch: %v", err))
}
}(wp)
}
// Reload SCADA client if we have a change
if newConf.AtlasInfrastructure != config.AtlasInfrastructure ||
newConf.AtlasToken != config.AtlasToken ||
newConf.AtlasEndpoint != config.AtlasEndpoint {
if err := c.setupScadaConn(newConf); err != nil {
c.Ui.Error(fmt.Sprintf("Failed reloading SCADA client: %s", err))
return nil
}
}
return newConf
}
// startScadaClient is used to start a new SCADA provider and listener,
// replacing any existing listeners.
func (c *Command) setupScadaConn(config *Config) error {
// Shut down existing SCADA listeners
if c.scadaProvider != nil {
c.scadaProvider.Shutdown()
}
if c.scadaHttp != nil {
c.scadaHttp.Shutdown()
}
// No-op if we don't have an infrastructure
if config.AtlasInfrastructure == "" {
return nil
}
// Create the new provider and listener
c.Ui.Output("Connecting to Atlas: " + config.AtlasInfrastructure)
provider, list, err := NewProvider(config, c.logOutput)
if err != nil {
return err
}
c.scadaProvider = provider
c.scadaHttp = newScadaHttp(c.agent, list)
return nil
2013-12-19 20:18:06 +00:00
}
func (c *Command) Synopsis() string {
return "Runs a Consul agent"
}
func (c *Command) Help() string {
helpText := `
Usage: consul agent [options]
Starts the Consul agent and runs until an interrupt is received. The
2014-04-11 22:22:35 +00:00
agent represents a single node in a cluster.
2013-12-19 20:18:06 +00:00
Options:
-advertise=addr Sets the advertise address to use
-advertise-wan=addr Sets address to advertise on wan instead of advertise addr
-atlas=org/name Sets the Atlas infrastructure name, enables SCADA.
2015-02-05 02:45:08 +00:00
-atlas-join Enables auto-joining the Atlas cluster
2015-02-05 01:23:42 +00:00
-atlas-token=token Provides the Atlas API token
2015-08-27 18:11:05 +00:00
-atlas-endpoint=1.2.3.4 The address of the endpoint for Atlas integration.
2014-04-11 22:22:35 +00:00
-bootstrap Sets server to bootstrap mode
-bind=0.0.0.0 Sets the bind address for cluster communication
2015-08-20 18:19:35 +00:00
-http-port=8500 Sets the HTTP API port to listen on
-bootstrap-expect=0 Sets server to expect bootstrap mode.
2014-04-11 22:22:35 +00:00
-client=127.0.0.1 Sets the address to bind for client access.
This includes RPC, DNS, HTTP and HTTPS (if configured)
2014-04-11 22:22:35 +00:00
-config-file=foo Path to a JSON file to read configuration from.
This can be specified multiple times.
-config-dir=foo Path to a directory to read configuration files
from. This will read every file ending in ".json"
as configuration in this directory in alphabetical
order. This can be specified multiple times.
2014-04-11 22:22:35 +00:00
-data-dir=path Path to a data directory to store agent state
2016-03-21 11:57:57 +00:00
-dev Starts the agent in development mode.
2015-02-23 17:32:09 +00:00
-recursor=1.2.3.4 Address of an upstream DNS server.
Can be specified multiple times.
2014-04-11 22:22:35 +00:00
-dc=east-aws Datacenter of the agent
-encrypt=key Provides the gossip encryption key
-join=1.2.3.4 Address of an agent to join at start time.
Can be specified multiple times.
-join-wan=1.2.3.4 Address of an agent to join -wan at start time.
Can be specified multiple times.
2014-10-12 19:35:25 +00:00
-retry-join=1.2.3.4 Address of an agent to join at start time with
retries enabled. Can be specified multiple times.
-retry-interval=30s Time to wait between join attempts.
-retry-max=0 Maximum number of join attempts. Defaults to 0, which
will retry indefinitely.
-retry-join-wan=1.2.3.4 Address of an agent to join -wan at start time with
retries enabled. Can be specified multiple times.
-retry-interval-wan=30s Time to wait between join -wan attempts.
-retry-max-wan=0 Maximum number of join -wan attempts. Defaults to 0, which
will retry indefinitely.
2014-04-11 22:22:35 +00:00
-log-level=info Log level of the agent.
-node=hostname Name of this node. Must be unique in the cluster
-protocol=N Sets the protocol version. Defaults to latest.
-rejoin Ignores a previous leave and attempts to rejoin the cluster.
2014-04-11 22:22:35 +00:00
-server Switches agent to server mode.
2014-05-21 19:06:03 +00:00
-syslog Enables logging to syslog
2015-12-22 17:30:19 +00:00
-ui Enables the built-in static web UI server
2014-04-23 19:37:53 +00:00
-ui-dir=path Path to directory containing the Web UI resources
-pid-file=path Path to file to store agent PID
2014-04-11 22:22:35 +00:00
`
2013-12-19 20:18:06 +00:00
return strings.TrimSpace(helpText)
}