Merge pull request #396 from ryanuber/f-retry-join
agent: Retry failed joins on agent start
This commit is contained in:
commit
2c1add1c67
|
@ -52,6 +52,7 @@ type Command struct {
|
|||
func (c *Command) readConfig() *Config {
|
||||
var cmdConfig Config
|
||||
var configFiles []string
|
||||
var retryInterval string
|
||||
cmdFlags := flag.NewFlagSet("agent", flag.ContinueOnError)
|
||||
cmdFlags.Usage = func() { c.Ui.Output(c.Help()) }
|
||||
|
||||
|
@ -82,11 +83,26 @@ func (c *Command) readConfig() *Config {
|
|||
"enable re-joining after a previous leave")
|
||||
cmdFlags.Var((*AppendSliceValue)(&cmdConfig.StartJoin), "join",
|
||||
"address of agent to join on startup")
|
||||
cmdFlags.Var((*AppendSliceValue)(&cmdConfig.RetryJoin), "retry-join",
|
||||
"address of agent to join on startup with retry")
|
||||
cmdFlags.IntVar(&cmdConfig.RetryMaxAttempts, "retry-max", 0,
|
||||
"number of retries for joining")
|
||||
cmdFlags.StringVar(&retryInterval, "retry-interval", "",
|
||||
"interval between join attempts")
|
||||
|
||||
if err := cmdFlags.Parse(c.args); err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if retryInterval != "" {
|
||||
dur, err := time.ParseDuration(retryInterval)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Error: %s", err))
|
||||
return nil
|
||||
}
|
||||
cmdConfig.RetryInterval = dur
|
||||
}
|
||||
|
||||
config := DefaultConfig()
|
||||
if len(configFiles) > 0 {
|
||||
fileConfig, err := ReadConfigPaths(configFiles)
|
||||
|
@ -353,6 +369,37 @@ func (c *Command) startupJoin(config *Config) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// retryJoin is used to handle retrying a join until it succeeds or all
|
||||
// retries are exhausted.
|
||||
func (c *Command) retryJoin(config *Config, errCh chan<- struct{}) {
|
||||
if len(config.RetryJoin) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
logger := c.agent.logger
|
||||
logger.Printf("[INFO] agent: Joining cluster...")
|
||||
|
||||
attempt := 0
|
||||
for {
|
||||
n, err := c.agent.JoinLAN(config.RetryJoin)
|
||||
if err == nil {
|
||||
logger.Printf("[INFO] agent: Join completed. Synced with %d initial agents", n)
|
||||
return
|
||||
}
|
||||
|
||||
attempt++
|
||||
if config.RetryMaxAttempts > 0 && attempt > config.RetryMaxAttempts {
|
||||
logger.Printf("[ERROR] agent: max join retry exhausted, exiting")
|
||||
close(errCh)
|
||||
return
|
||||
}
|
||||
|
||||
logger.Printf("[WARN] agent: Join failed: %v, retrying in %v", err,
|
||||
config.RetryInterval)
|
||||
time.Sleep(config.RetryInterval)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Command) Run(args []string) int {
|
||||
c.Ui = &cli.PrefixedUi{
|
||||
OutputPrefix: "==> ",
|
||||
|
@ -491,12 +538,16 @@ func (c *Command) Run(args []string) int {
|
|||
c.Ui.Output("Log data will now stream in as it occurs:\n")
|
||||
logGate.Flush()
|
||||
|
||||
// Start retry join process
|
||||
errCh := make(chan struct{})
|
||||
go c.retryJoin(config, errCh)
|
||||
|
||||
// Wait for exit
|
||||
return c.handleSignals(config)
|
||||
return c.handleSignals(config, errCh)
|
||||
}
|
||||
|
||||
// handleSignals blocks until we get an exit-causing signal
|
||||
func (c *Command) handleSignals(config *Config) int {
|
||||
func (c *Command) handleSignals(config *Config, retryJoin <-chan struct{}) int {
|
||||
signalCh := make(chan os.Signal, 4)
|
||||
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)
|
||||
|
||||
|
@ -510,6 +561,8 @@ WAIT:
|
|||
sig = syscall.SIGHUP
|
||||
case <-c.ShutdownCh:
|
||||
sig = os.Interrupt
|
||||
case <-retryJoin:
|
||||
return 1
|
||||
case <-c.agent.ShutdownCh():
|
||||
// Agent is already shutdown!
|
||||
return 0
|
||||
|
@ -668,6 +721,11 @@ Options:
|
|||
-encrypt=key Provides the gossip encryption key
|
||||
-join=1.2.3.4 Address of an agent to join at start time.
|
||||
Can be specified multiple times.
|
||||
-retry-join=1.2.3.4 Address of an agent to join at start time with
|
||||
retries enabled. Can be specified multiple times.
|
||||
-retry-interval=30s Time to wait between join attempts.
|
||||
-retry-max=0 Maximum number of join attempts. Defaults to 0, which
|
||||
will retry indefinitely.
|
||||
-log-level=info Log level of the agent.
|
||||
-node=hostname Name of this node. Must be unique in the cluster
|
||||
-protocol=N Sets the protocol version. Defaults to latest.
|
||||
|
|
|
@ -1,8 +1,14 @@
|
|||
package agent
|
||||
|
||||
import (
|
||||
"github.com/mitchellh/cli"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/mitchellh/cli"
|
||||
)
|
||||
|
||||
func TestCommand_implements(t *testing.T) {
|
||||
|
@ -31,3 +37,87 @@ func TestValidDatacenter(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRetryJoin(t *testing.T) {
|
||||
dir, agent := makeAgent(t, nextConfig())
|
||||
defer os.RemoveAll(dir)
|
||||
defer agent.Shutdown()
|
||||
|
||||
conf2 := nextConfig()
|
||||
tmpDir, err := ioutil.TempDir("", "consul")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
doneCh := make(chan struct{})
|
||||
shutdownCh := make(chan struct{})
|
||||
|
||||
defer func() {
|
||||
close(shutdownCh)
|
||||
<-doneCh
|
||||
}()
|
||||
|
||||
cmd := &Command{
|
||||
ShutdownCh: shutdownCh,
|
||||
Ui: new(cli.MockUi),
|
||||
}
|
||||
|
||||
serfAddr := fmt.Sprintf(
|
||||
"%s:%d",
|
||||
agent.config.BindAddr,
|
||||
agent.config.Ports.SerfLan)
|
||||
|
||||
args := []string{
|
||||
"-data-dir", tmpDir,
|
||||
"-node", conf2.NodeName,
|
||||
"-retry-join", serfAddr,
|
||||
"-retry-interval", "1s",
|
||||
}
|
||||
|
||||
go func() {
|
||||
if code := cmd.Run(args); code != 0 {
|
||||
log.Printf("bad: %d", code)
|
||||
}
|
||||
close(doneCh)
|
||||
}()
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
mem := agent.LANMembers()
|
||||
if len(mem) != 2 {
|
||||
return false, fmt.Errorf("bad: %#v", mem)
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf(err.Error())
|
||||
})
|
||||
}
|
||||
|
||||
func TestRetryJoinFail(t *testing.T) {
|
||||
conf := nextConfig()
|
||||
tmpDir, err := ioutil.TempDir("", "consul")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
defer os.RemoveAll(tmpDir)
|
||||
|
||||
shutdownCh := make(chan struct{})
|
||||
defer close(shutdownCh)
|
||||
|
||||
cmd := &Command{
|
||||
ShutdownCh: shutdownCh,
|
||||
Ui: new(cli.MockUi),
|
||||
}
|
||||
|
||||
serfAddr := fmt.Sprintf("%s:%d", conf.BindAddr, conf.Ports.SerfLan)
|
||||
|
||||
args := []string{
|
||||
"-data-dir", tmpDir,
|
||||
"-retry-join", serfAddr,
|
||||
"-retry-max", "1",
|
||||
}
|
||||
|
||||
if code := cmd.Run(args); code == 0 {
|
||||
t.Fatalf("bad: %d", code)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -189,6 +189,20 @@ type Config struct {
|
|||
// addresses, then the agent will error and exit.
|
||||
StartJoin []string `mapstructure:"start_join"`
|
||||
|
||||
// RetryJoin is a list of addresses to join with retry enabled.
|
||||
RetryJoin []string `mapstructure:"retry_join"`
|
||||
|
||||
// RetryMaxAttempts specifies the maximum number of times to retry joining a
|
||||
// host on startup. This is useful for cases where we know the node will be
|
||||
// online eventually.
|
||||
RetryMaxAttempts int `mapstructure:"retry_max"`
|
||||
|
||||
// RetryInterval specifies the amount of time to wait in between join
|
||||
// attempts on agent start. The minimum allowed value is 1 second and
|
||||
// the default is 30s.
|
||||
RetryInterval time.Duration `mapstructure:"-" json:"-"`
|
||||
RetryIntervalRaw string `mapstructure:"retry_interval"`
|
||||
|
||||
// UiDir is the directory containing the Web UI resources.
|
||||
// If provided, the UI endpoints will be enabled.
|
||||
UiDir string `mapstructure:"ui_dir"`
|
||||
|
@ -328,6 +342,7 @@ func DefaultConfig() *Config {
|
|||
ACLTTL: 30 * time.Second,
|
||||
ACLDownPolicy: "extend-cache",
|
||||
ACLDefaultPolicy: "allow",
|
||||
RetryInterval: 30 * time.Second,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -450,6 +465,14 @@ func DecodeConfig(r io.Reader) (*Config, error) {
|
|||
result.ACLTTL = dur
|
||||
}
|
||||
|
||||
if raw := result.RetryIntervalRaw; raw != "" {
|
||||
dur, err := time.ParseDuration(raw)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("RetryInterval invalid: %v", err)
|
||||
}
|
||||
result.RetryInterval = dur
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
|
@ -681,6 +704,12 @@ func MergeConfig(a, b *Config) *Config {
|
|||
if b.RejoinAfterLeave {
|
||||
result.RejoinAfterLeave = true
|
||||
}
|
||||
if b.RetryMaxAttempts != 0 {
|
||||
result.RetryMaxAttempts = b.RetryMaxAttempts
|
||||
}
|
||||
if b.RetryInterval != 0 {
|
||||
result.RetryInterval = b.RetryInterval
|
||||
}
|
||||
if b.DNSConfig.NodeTTL != 0 {
|
||||
result.DNSConfig.NodeTTL = b.DNSConfig.NodeTTL
|
||||
}
|
||||
|
@ -747,6 +776,11 @@ func MergeConfig(a, b *Config) *Config {
|
|||
result.StartJoin = append(result.StartJoin, a.StartJoin...)
|
||||
result.StartJoin = append(result.StartJoin, b.StartJoin...)
|
||||
|
||||
// Copy the retry join addresses
|
||||
result.RetryJoin = make([]string, 0, len(a.RetryJoin)+len(b.RetryJoin))
|
||||
result.RetryJoin = append(result.RetryJoin, a.RetryJoin...)
|
||||
result.RetryJoin = append(result.RetryJoin, b.RetryJoin...)
|
||||
|
||||
return &result
|
||||
}
|
||||
|
||||
|
|
|
@ -265,6 +265,48 @@ func TestDecodeConfig(t *testing.T) {
|
|||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
|
||||
// Retry join
|
||||
input = `{"retry_join": ["1.1.1.1", "2.2.2.2"]}`
|
||||
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
if len(config.RetryJoin) != 2 {
|
||||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
if config.RetryJoin[0] != "1.1.1.1" {
|
||||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
if config.RetryJoin[1] != "2.2.2.2" {
|
||||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
|
||||
// Retry interval
|
||||
input = `{"retry_interval": "10s"}`
|
||||
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
if config.RetryIntervalRaw != "10s" {
|
||||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
if config.RetryInterval.String() != "10s" {
|
||||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
|
||||
// Retry Max
|
||||
input = `{"retry_max": 3}`
|
||||
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
if config.RetryMaxAttempts != 3 {
|
||||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
|
||||
// UI Dir
|
||||
input = `{"ui_dir": "/opt/consul-ui"}`
|
||||
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
|
||||
|
@ -572,6 +614,7 @@ func TestMergeConfig(t *testing.T) {
|
|||
SkipLeaveOnInt: false,
|
||||
EnableDebug: false,
|
||||
CheckUpdateIntervalRaw: "8m",
|
||||
RetryIntervalRaw: "10s",
|
||||
}
|
||||
|
||||
b := &Config{
|
||||
|
@ -623,6 +666,9 @@ func TestMergeConfig(t *testing.T) {
|
|||
UiDir: "/opt/consul-ui",
|
||||
EnableSyslog: true,
|
||||
RejoinAfterLeave: true,
|
||||
RetryJoin: []string{"1.1.1.1"},
|
||||
RetryIntervalRaw: "10s",
|
||||
RetryInterval: 10 * time.Second,
|
||||
CheckUpdateInterval: 8 * time.Minute,
|
||||
CheckUpdateIntervalRaw: "8m",
|
||||
ACLToken: "1234",
|
||||
|
|
|
@ -91,6 +91,16 @@ The options below are all specified on the command-line.
|
|||
unable to join with any of the specified addresses, agent startup will
|
||||
fail. By default, the agent won't join any nodes when it starts up.
|
||||
|
||||
* `-retry-join` - Similar to `-join`, but allows retrying a join if the first
|
||||
attempt fails. This is useful for cases where we know the address will become
|
||||
available eventually.
|
||||
|
||||
* `-retry-interval` - Time to wait between join attempts. Defaults to 30s.
|
||||
|
||||
* `-retry-max` - The maximum number of join attempts to be made before exiting
|
||||
with return code 1. By default, this is set to 0, which will continue to
|
||||
retry the join indefinitely.
|
||||
|
||||
* `-log-level` - The level of logging to show after the Consul agent has
|
||||
started. This defaults to "info". The available log levels are "trace",
|
||||
"debug", "info", "warn", "err". This is the log level that will be shown
|
||||
|
|
Loading…
Reference in New Issue