add server join info to server and client

This commit is contained in:
Chelsea Holland Komlo 2018-05-11 15:52:05 -04:00 committed by Alex Dadgar
parent ba16cf7f51
commit 064b5481e0
10 changed files with 274 additions and 65 deletions

View File

@ -283,7 +283,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
// Set the preconfigured list of static servers
c.configLock.RLock()
if len(c.configCopy.Servers) > 0 {
if err := c.setServersImpl(c.configCopy.Servers, true); err != nil {
if _, err := c.setServersImpl(c.configCopy.Servers, true); err != nil {
logger.Printf("[WARN] client: None of the configured servers are valid: %v", err)
}
}
@ -623,7 +623,7 @@ func (c *Client) GetServers() []string {
// SetServers sets a new list of nomad servers to connect to. As long as one
// server is resolvable no error is returned.
func (c *Client) SetServers(in []string) error {
func (c *Client) SetServers(in []string) (int, error) {
return c.setServersImpl(in, false)
}
@ -633,7 +633,7 @@ func (c *Client) SetServers(in []string) error {
//
// Force should be used when setting the servers from the initial configuration
// since the server may be starting up in parallel and initial pings may fail.
func (c *Client) setServersImpl(in []string, force bool) error {
func (c *Client) setServersImpl(in []string, force bool) (int, error) {
var mu sync.Mutex
var wg sync.WaitGroup
var merr multierror.Error
@ -673,13 +673,13 @@ func (c *Client) setServersImpl(in []string, force bool) error {
// Only return errors if no servers are valid
if len(endpoints) == 0 {
if len(merr.Errors) > 0 {
return merr.ErrorOrNil()
return 0, merr.ErrorOrNil()
}
return noServersErr
return 0, noServersErr
}
c.servers.SetServers(endpoints)
return nil
return len(endpoints), nil
}
// restoreState is used to restore our state from the data dir

View File

@ -975,13 +975,13 @@ func TestClient_ServerList(t *testing.T) {
if s := client.GetServers(); len(s) != 0 {
t.Fatalf("expected server lit to be empty but found: %+q", s)
}
if err := client.SetServers(nil); err != noServersErr {
if _, err := client.SetServers(nil); err != noServersErr {
t.Fatalf("expected setting an empty list to return a 'no servers' error but received %v", err)
}
if err := client.SetServers([]string{"123.456.13123.123.13:80"}); err == nil {
if _, err := client.SetServers([]string{"123.456.13123.123.13:80"}); err == nil {
t.Fatalf("expected setting a bad server to return an error")
}
if err := client.SetServers([]string{"123.456.13123.123.13:80", "127.0.0.1:1234", "127.0.0.1"}); err == nil {
if _, err := client.SetServers([]string{"123.456.13123.123.13:80", "127.0.0.1:1234", "127.0.0.1"}); err == nil {
t.Fatalf("expected setting at least one good server to succeed but received: %v", err)
}
s := client.GetServers()

View File

@ -222,7 +222,7 @@ func (s *HTTPServer) updateServers(resp http.ResponseWriter, req *http.Request)
// Set the servers list into the client
s.agent.logger.Printf("[TRACE] Adding servers %+q to the client's primary server list", servers)
if err := client.SetServers(servers); err != nil {
if _, err := client.SetServers(servers); err != nil {
s.agent.logger.Printf("[ERR] Attempt to add servers %q to client failed: %v", servers, err)
//TODO is this the right error to return?
return nil, CodedError(400, err.Error())

View File

@ -549,13 +549,49 @@ func (c *Command) Run(args []string) int {
// Start retry join process
c.retryJoinErrCh = make(chan struct{})
if config.Server.Enabled && len(config.Server.RetryJoin) != 0 {
joiner := retryJoiner{
join: c.agent.server.Join,
discover: &discover.Discover{},
errCh: c.retryJoinErrCh,
logger: c.agent.logger,
serverJoin: c.agent.server.Join,
serverEnabled: true,
}
// This is for backwards compatibility, this should be removed in Nomad
// 0.10 and only ServerJoin should be declared on the server
serverJoinInfo := &ServerJoin{
RetryJoin: config.Server.RetryJoin,
StartJoin: config.Server.StartJoin,
RetryMaxAttempts: config.Server.RetryMaxAttempts,
RetryInterval: config.Server.RetryInterval,
}
go joiner.RetryJoin(serverJoinInfo)
}
if config.Server.Enabled && config.Server.ServerJoin != nil {
joiner := retryJoiner{
discover: &discover.Discover{},
errCh: c.retryJoinErrCh,
logger: c.agent.logger,
serverJoin: c.agent.server.Join,
serverEnabled: true,
}
go joiner.RetryJoin(config.Server.ServerJoin)
}
if config.Client.Enabled && config.Client.ServerJoin != nil {
joiner := retryJoiner{
discover: &discover.Discover{},
errCh: c.retryJoinErrCh,
logger: c.agent.logger,
clientJoin: c.agent.client.SetServers,
clientEnabled: true,
}
go joiner.RetryJoin(config.Client.ServerJoin)
}
go joiner.RetryJoin(config)
// Wait for exit
return c.handleSignals()

View File

@ -19,6 +19,7 @@ advertise {
rpc = "127.0.0.3"
serf = "127.0.0.4"
}
client {
enabled = true
state_dir = "/tmp/client-state"
@ -29,6 +30,13 @@ client {
foo = "bar"
baz = "zip"
}
server_join_info {
retry_join = [ "1.1.1.1", "2.2.2.2" ]
start_join = [ "1.1.1.1", "2.2.2.2" ]
retry_max = 3
retry_interval = "15s"
}
options {
foo = "bar"
baz = "zip"
@ -86,6 +94,12 @@ server {
redundancy_zone = "foo"
upgrade_version = "0.8.0"
encrypt = "abc"
server_join_info {
retry_join = [ "1.1.1.1", "2.2.2.2" ]
start_join = [ "1.1.1.1", "2.2.2.2" ]
retry_max = 3
retry_interval = "15s"
}
}
acl {
enabled = true

View File

@ -217,6 +217,9 @@ type ClientConfig struct {
// NoHostUUID disables using the host's UUID and will force generation of a
// random UUID.
NoHostUUID *bool `mapstructure:"no_host_uuid"`
// ServerJoin contains information that is used to attempt to join servers
ServerJoin *ServerJoin `mapstructure:"server_join_info"`
}
// ACLConfig is configuration specific to the ACL system
@ -311,19 +314,23 @@ type ServerConfig struct {
// StartJoin is a list of addresses to attempt to join when the
// agent starts. If Serf is unable to communicate with any of these
// addresses, then the agent will error and exit.
// Deprecated in Nomad 0.10
StartJoin []string `mapstructure:"start_join"`
// RetryJoin is a list of addresses to join with retry enabled.
// Deprecated in Nomad 0.10
RetryJoin []string `mapstructure:"retry_join"`
// RetryMaxAttempts specifies the maximum number of times to retry joining a
// host on startup. This is useful for cases where we know the node will be
// online eventually.
// Deprecated in Nomad 0.10
RetryMaxAttempts int `mapstructure:"retry_max"`
// RetryInterval specifies the amount of time to wait in between join
// attempts on agent start. The minimum allowed value is 1 second and
// the default is 30s.
// Deprecated in Nomad 0.10
RetryInterval string `mapstructure:"retry_interval"`
retryInterval time.Duration `mapstructure:"-"`
@ -346,6 +353,32 @@ type ServerConfig struct {
// Encryption key to use for the Serf communication
EncryptKey string `mapstructure:"encrypt" json:"-"`
// ServerJoin contains information that is used to attempt to join servers
ServerJoin *ServerJoin `mapstructure:"server_join_info"`
}
// ServerJoin is used in both clients and servers to bootstrap connections to
// servers
type ServerJoin struct {
// StartJoin is a list of addresses to attempt to join when the
// agent starts. If Serf is unable to communicate with any of these
// addresses, then the agent will error and exit.
StartJoin []string `mapstructure:"start_join"`
// RetryJoin is a list of addresses to join with retry enabled.
RetryJoin []string `mapstructure:"retry_join"`
// RetryMaxAttempts specifies the maximum number of times to retry joining a
// host on startup. This is useful for cases where we know the node will be
// online eventually.
RetryMaxAttempts int `mapstructure:"retry_max"`
// RetryInterval specifies the amount of time to wait in between join
// attempts on agent start. The minimum allowed value is 1 second and
// the default is 30s.
RetryInterval string `mapstructure:"retry_interval"`
retryInterval time.Duration `mapstructure:"-"`
}
// EncryptBytes returns the encryption key configured.
@ -1055,6 +1088,9 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
if b.EncryptKey != "" {
result.EncryptKey = b.EncryptKey
}
if b.ServerJoin != nil {
result.ServerJoin = b.ServerJoin
}
// Add the schedulers
result.EnabledSchedulers = append(result.EnabledSchedulers, b.EnabledSchedulers...)
@ -1162,6 +1198,10 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig {
result.ChrootEnv[k] = v
}
if b.ServerJoin != nil {
result.ServerJoin = b.ServerJoin
}
return &result
}

View File

@ -370,6 +370,7 @@ func parseClient(result **ClientConfig, list *ast.ObjectList) error {
"gc_parallel_destroys",
"gc_max_allocs",
"no_host_uuid",
"server_join_info",
}
if err := helper.CheckHCLKeys(listVal, valid); err != nil {
return err
@ -385,6 +386,7 @@ func parseClient(result **ClientConfig, list *ast.ObjectList) error {
delete(m, "chroot_env")
delete(m, "reserved")
delete(m, "stats")
delete(m, "server_join_info")
var config ClientConfig
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
@ -448,6 +450,13 @@ func parseClient(result **ClientConfig, list *ast.ObjectList) error {
}
}
// Parse ServerJoin config
if o := listVal.Filter("server_join_info"); len(o.Items) > 0 {
if err := parseServerJoin(&config.ServerJoin, o); err != nil {
return multierror.Prefix(err, "server_join_info->")
}
}
*result = &config
return nil
}
@ -531,16 +540,20 @@ func parseServer(result **ServerConfig, list *ast.ObjectList) error {
"heartbeat_grace",
"min_heartbeat_ttl",
"max_heartbeats_per_second",
"start_join",
"retry_join",
"retry_max",
"retry_interval",
"rejoin_after_leave",
"encrypt",
"authoritative_region",
"non_voting_server",
"redundancy_zone",
"upgrade_version",
"server_join_info",
// For backwards compatibility
"start_join",
"retry_join",
"retry_max",
"retry_interval",
}
if err := helper.CheckHCLKeys(listVal, valid); err != nil {
return err
@ -551,6 +564,8 @@ func parseServer(result **ServerConfig, list *ast.ObjectList) error {
return err
}
delete(m, "server_join_info")
var config ServerConfig
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
@ -570,10 +585,59 @@ func parseServer(result **ServerConfig, list *ast.ObjectList) error {
}
}
// Parse ServerJoin config
if o := listVal.Filter("server_join_info"); len(o.Items) > 0 {
if err := parseServerJoin(&config.ServerJoin, o); err != nil {
return multierror.Prefix(err, "server_join_info->")
}
}
*result = &config
return nil
}
func parseServerJoin(result **ServerJoin, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) > 1 {
return fmt.Errorf("only one 'server_info_join' block allowed")
}
// Get our object
listVal := list.Items[0].Val
// Check for invalid keys
valid := []string{
"start_join",
"retry_join",
"retry_max",
"retry_interval",
}
if err := helper.CheckHCLKeys(listVal, valid); err != nil {
return err
}
var m map[string]interface{}
if err := hcl.DecodeObject(&m, listVal); err != nil {
return err
}
var serverJoinInfo ServerJoin
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
WeaklyTypedInput: true,
Result: &serverJoinInfo,
})
if err != nil {
return err
}
if err := dec.Decode(m); err != nil {
return err
}
*result = &serverJoinInfo
return nil
}
func parseACL(result **ACLConfig, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) > 1 {

View File

@ -47,6 +47,12 @@ func TestConfig_Parse(t *testing.T) {
AllocDir: "/tmp/alloc",
Servers: []string{"a.b.c:80", "127.0.0.1:1234"},
NodeClass: "linux-medium-64bit",
ServerJoin: &ServerJoin{
RetryJoin: []string{"1.1.1.1", "2.2.2.2"},
StartJoin: []string{"1.1.1.1", "2.2.2.2"},
RetryInterval: "15s",
RetryMaxAttempts: 3,
},
Meta: map[string]string{
"foo": "bar",
"baz": "zip",
@ -106,6 +112,12 @@ func TestConfig_Parse(t *testing.T) {
RedundancyZone: "foo",
UpgradeVersion: "0.8.0",
EncryptKey: "abc",
ServerJoin: &ServerJoin{
RetryJoin: []string{"1.1.1.1", "2.2.2.2"},
StartJoin: []string{"1.1.1.1", "2.2.2.2"},
RetryInterval: "15s",
RetryMaxAttempts: 3,
},
},
ACL: &ACLConfig{
Enabled: true,

View File

@ -27,8 +27,17 @@ type DiscoverInterface interface {
// retryJoiner is used to handle retrying a join until it succeeds or all of
// its tries are exhausted.
type retryJoiner struct {
// join adds the specified servers to the serf cluster
join func([]string) (int, error)
// serverJoin adds the specified servers to the serf cluster
serverJoin func([]string) (int, error)
// serverEnabled indicates whether the nomad agent will run in server mode
serverEnabled bool
// clientJoin adds the specified servers to the serf cluster
clientJoin func([]string) (int, error)
// clientEnabled indicates whether the nomad agent will run in client mode
clientEnabled bool
// discover is of type Discover, where this is either the go-discover
// implementation or a mock used for testing
@ -44,21 +53,21 @@ type retryJoiner struct {
// retryJoin is used to handle retrying a join until it succeeds or all retries
// are exhausted.
func (r *retryJoiner) RetryJoin(config *Config) {
if len(config.Server.RetryJoin) == 0 || !config.Server.Enabled {
func (r *retryJoiner) RetryJoin(serverJoin *ServerJoin) {
if len(serverJoin.RetryJoin) == 0 {
return
}
attempt := 0
addrsToJoin := strings.Join(config.Server.RetryJoin, " ")
addrsToJoin := strings.Join(serverJoin.RetryJoin, " ")
r.logger.Printf("[INFO] agent: Joining cluster... %s", addrsToJoin)
for {
var addrs []string
var err error
for _, addr := range config.Server.RetryJoin {
for _, addr := range serverJoin.RetryJoin {
switch {
case strings.HasPrefix(addr, "provider="):
servers, err := r.discover.Addrs(addr, r.logger)
@ -73,14 +82,24 @@ func (r *retryJoiner) RetryJoin(config *Config) {
}
if len(addrs) > 0 {
n, err := r.join(addrs)
if r.serverEnabled && r.serverJoin != nil {
n, err := r.serverJoin(addrs)
if err == nil {
r.logger.Printf("[INFO] agent: Join completed. Synced with %d initial agents", n)
return
}
}
if r.clientEnabled && r.clientJoin != nil {
n, err := r.clientJoin(addrs)
if err == nil {
r.logger.Printf("[INFO] agent: Join completed. Synced with %d initial agents", n)
return
}
}
}
attempt++
if config.Server.RetryMaxAttempts > 0 && attempt > config.Server.RetryMaxAttempts {
if serverJoin.RetryMaxAttempts > 0 && attempt > serverJoin.RetryMaxAttempts {
r.logger.Printf("[ERR] agent: max join retry exhausted, exiting")
close(r.errCh)
return
@ -88,8 +107,8 @@ func (r *retryJoiner) RetryJoin(config *Config) {
if err != nil {
r.logger.Printf("[WARN] agent: Join failed: %v, retrying in %v", err,
config.Server.RetryInterval)
serverJoin.RetryInterval)
}
time.Sleep(config.Server.retryInterval)
time.Sleep(serverJoin.retryInterval)
}
}

View File

@ -82,12 +82,9 @@ func TestRetryJoin_NonCloud(t *testing.T) {
t.Parallel()
require := require.New(t)
newConfig := &Config{
Server: &ServerConfig{
serverJoin := &ServerJoin{
RetryMaxAttempts: 1,
RetryJoin: []string{"127.0.0.1"},
Enabled: true,
},
}
var output []string
@ -99,12 +96,13 @@ func TestRetryJoin_NonCloud(t *testing.T) {
joiner := retryJoiner{
discover: &MockDiscover{},
join: mockJoin,
serverJoin: mockJoin,
serverEnabled: true,
logger: log.New(ioutil.Discard, "", 0),
errCh: make(chan struct{}),
}
joiner.RetryJoin(newConfig)
joiner.RetryJoin(serverJoin)
require.Equal(1, len(output))
require.Equal(stubAddress, output[0])
@ -114,12 +112,9 @@ func TestRetryJoin_Cloud(t *testing.T) {
t.Parallel()
require := require.New(t)
newConfig := &Config{
Server: &ServerConfig{
serverJoin := &ServerJoin{
RetryMaxAttempts: 1,
RetryJoin: []string{"provider=aws, tag_value=foo"},
Enabled: true,
},
}
var output []string
@ -132,12 +127,13 @@ func TestRetryJoin_Cloud(t *testing.T) {
mockDiscover := &MockDiscover{}
joiner := retryJoiner{
discover: mockDiscover,
join: mockJoin,
serverJoin: mockJoin,
serverEnabled: true,
logger: log.New(ioutil.Discard, "", 0),
errCh: make(chan struct{}),
}
joiner.RetryJoin(newConfig)
joiner.RetryJoin(serverJoin)
require.Equal(1, len(output))
require.Equal("provider=aws, tag_value=foo", mockDiscover.ReceivedAddrs)
@ -148,12 +144,9 @@ func TestRetryJoin_MixedProvider(t *testing.T) {
t.Parallel()
require := require.New(t)
newConfig := &Config{
Server: &ServerConfig{
serverJoin := &ServerJoin{
RetryMaxAttempts: 1,
RetryJoin: []string{"provider=aws, tag_value=foo", "127.0.0.1"},
Enabled: true,
},
}
var output []string
@ -166,14 +159,45 @@ func TestRetryJoin_MixedProvider(t *testing.T) {
mockDiscover := &MockDiscover{}
joiner := retryJoiner{
discover: mockDiscover,
join: mockJoin,
serverJoin: mockJoin,
serverEnabled: true,
logger: log.New(ioutil.Discard, "", 0),
errCh: make(chan struct{}),
}
joiner.RetryJoin(newConfig)
joiner.RetryJoin(serverJoin)
require.Equal(2, len(output))
require.Equal("provider=aws, tag_value=foo", mockDiscover.ReceivedAddrs)
require.Equal(stubAddress, output[0])
}
func TestRetryJoin_Client(t *testing.T) {
t.Parallel()
require := require.New(t)
serverJoin := &ServerJoin{
RetryMaxAttempts: 1,
RetryJoin: []string{"127.0.0.1"},
}
var output []string
mockJoin := func(s []string) (int, error) {
output = s
return 0, nil
}
joiner := retryJoiner{
discover: &MockDiscover{},
clientJoin: mockJoin,
clientEnabled: true,
logger: log.New(ioutil.Discard, "", 0),
errCh: make(chan struct{}),
}
joiner.RetryJoin(serverJoin)
require.Equal(1, len(output))
require.Equal(stubAddress, output[0])
}