open-consul/agent/retry_join.go
cskh 647f9787f8
fix: shadowed err in retryJoin() (#14112)
- err value will be used later to surface the error message
  if r.join() returns any err.
2022-08-10 10:53:57 -04:00

275 lines
7.2 KiB
Go

package agent
import (
"fmt"
"strings"
"time"
discover "github.com/hashicorp/go-discover"
discoverk8s "github.com/hashicorp/go-discover/provider/k8s"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/lib"
)
func (a *Agent) retryJoinLAN() {
r := &retryJoiner{
variant: retryJoinSerfVariant,
cluster: "LAN",
addrs: a.config.RetryJoinLAN,
maxAttempts: a.config.RetryJoinMaxAttemptsLAN,
interval: a.config.RetryJoinIntervalLAN,
join: func(addrs []string) (int, error) {
// NOTE: For partitioned servers you are only capable of using retry join
// to join nodes in the default partition.
return a.JoinLAN(addrs, a.AgentEnterpriseMeta())
},
logger: a.logger.With("cluster", "LAN"),
}
if err := r.retryJoin(); err != nil {
a.retryJoinCh <- err
}
}
func (a *Agent) retryJoinWAN() {
if !a.config.ServerMode {
a.logger.Warn("(WAN) couldn't join: Err: Must be a server to join WAN cluster")
return
}
isPrimary := a.config.PrimaryDatacenter == a.config.Datacenter
var joinAddrs []string
if a.config.ConnectMeshGatewayWANFederationEnabled {
// When wanfed is activated each datacenter 100% relies upon flood-join
// to replicate the LAN members in a dc into the WAN pool. We
// completely hijack whatever the user configured to correctly
// implement the star-join.
//
// Elsewhere we enforce that start-join-wan and retry-join-wan cannot
// be set if wanfed is enabled so we don't have to emit any warnings
// related to that here.
if isPrimary {
// Wanfed requires that secondaries join TO the primary and the
// primary doesn't explicitly join down to the secondaries, so as
// such in the primary a retry-join operation is a no-op.
return
}
// First get a handle on dialing the primary
a.refreshPrimaryGatewayFallbackAddresses()
// Then "retry join" a special address via the gateway which is
// load balanced to all servers in the primary datacenter
//
// Since this address is merely a placeholder we use an address from the
// TEST-NET-1 block as described in https://tools.ietf.org/html/rfc5735#section-3
const placeholderIPAddress = "192.0.2.2"
joinAddrs = []string{
fmt.Sprintf("*.%s/%s", a.config.PrimaryDatacenter, placeholderIPAddress),
}
} else {
joinAddrs = a.config.RetryJoinWAN
}
r := &retryJoiner{
variant: retryJoinSerfVariant,
cluster: "WAN",
addrs: joinAddrs,
maxAttempts: a.config.RetryJoinMaxAttemptsWAN,
interval: a.config.RetryJoinIntervalWAN,
join: a.JoinWAN,
logger: a.logger.With("cluster", "WAN"),
}
if err := r.retryJoin(); err != nil {
a.retryJoinCh <- err
}
}
func (a *Agent) refreshPrimaryGatewayFallbackAddresses() {
r := &retryJoiner{
variant: retryJoinMeshGatewayVariant,
cluster: "primary",
addrs: a.config.PrimaryGateways,
maxAttempts: 0,
interval: a.config.PrimaryGatewaysInterval,
join: func(addrs []string) (int, error) {
if err := a.RefreshPrimaryGatewayFallbackAddresses(addrs); err != nil {
return 0, err
}
return len(addrs), nil
},
logger: a.logger,
stopCh: a.PrimaryMeshGatewayAddressesReadyCh(),
}
if err := r.retryJoin(); err != nil {
a.retryJoinCh <- err
}
}
func newDiscover() (*discover.Discover, error) {
providers := make(map[string]discover.Provider)
for k, v := range discover.Providers {
providers[k] = v
}
providers["k8s"] = &discoverk8s.Provider{}
return discover.New(
discover.WithUserAgent(lib.UserAgent()),
discover.WithProviders(providers),
)
}
func retryJoinAddrs(disco *discover.Discover, variant, cluster string, retryJoin []string, logger hclog.Logger) []string {
addrs := []string{}
if disco == nil {
return addrs
}
for _, addr := range retryJoin {
switch {
case strings.Contains(addr, "provider="):
servers, err := disco.Addrs(addr, logger.StandardLogger(&hclog.StandardLoggerOptions{
InferLevels: true,
}))
if err != nil {
if logger != nil {
logger.Error("Cannot discover address",
"address", addr,
"error", err,
)
}
} else {
addrs = append(addrs, servers...)
if logger != nil {
if variant == retryJoinMeshGatewayVariant {
logger.Info("Discovered mesh gateways",
"cluster", cluster,
"mesh_gateways", strings.Join(servers, " "),
)
} else {
logger.Info("Discovered servers",
"cluster", cluster,
"servers", strings.Join(servers, " "),
)
}
}
}
default:
addrs = append(addrs, addr)
}
}
return addrs
}
const (
retryJoinSerfVariant = "serf"
retryJoinMeshGatewayVariant = "mesh-gateway"
)
// retryJoiner is used to handle retrying a join until it succeeds or all
// retries are exhausted.
type retryJoiner struct {
// variant is either "serf" or "mesh-gateway" and just adjusts the log messaging
// emitted
variant string
// cluster is the name of the serf cluster, e.g. "LAN" or "WAN".
cluster string
// addrs is the list of servers or go-discover configurations
// to join with.
addrs []string
// maxAttempts is the number of join attempts before giving up.
maxAttempts int
// interval is the time between two join attempts.
interval time.Duration
// join adds the discovered or configured servers to the given
// serf cluster.
join func([]string) (int, error)
// stopCh is an optional stop channel to exit the retry loop early
stopCh <-chan struct{}
// logger is the agent logger.
logger hclog.Logger
}
func (r *retryJoiner) retryJoin() error {
if len(r.addrs) == 0 {
return nil
}
disco, err := newDiscover()
if err != nil {
return err
}
if r.variant == retryJoinMeshGatewayVariant {
r.logger.Info("Refreshing mesh gateways is supported for the following discovery methods",
"discovery_methods", strings.Join(disco.Names(), " "),
)
r.logger.Info("Refreshing mesh gateways...")
} else {
r.logger.Info("Retry join is supported for the following discovery methods",
"discovery_methods", strings.Join(disco.Names(), " "),
)
r.logger.Info("Joining cluster...")
}
attempt := 0
for {
addrs := retryJoinAddrs(disco, r.variant, r.cluster, r.addrs, r.logger)
if len(addrs) > 0 {
n := 0
n, err = r.join(addrs)
if err == nil {
if r.variant == retryJoinMeshGatewayVariant {
r.logger.Info("Refreshing mesh gateways completed")
} else {
r.logger.Info("Join cluster completed. Synced with initial agents", "num_agents", n)
}
return nil
}
} else if len(addrs) == 0 {
if r.variant == retryJoinMeshGatewayVariant {
err = fmt.Errorf("No mesh gateways found")
} else {
err = fmt.Errorf("No servers to join")
}
}
attempt++
if r.maxAttempts > 0 && attempt > r.maxAttempts {
if r.variant == retryJoinMeshGatewayVariant {
return fmt.Errorf("agent: max refresh of %s mesh gateways retry exhausted, exiting", r.cluster)
} else {
return fmt.Errorf("agent: max join %s retry exhausted, exiting", r.cluster)
}
}
if r.variant == retryJoinMeshGatewayVariant {
r.logger.Warn("Refreshing mesh gateways failed, will retry",
"retry_interval", r.interval,
"error", err,
)
} else {
r.logger.Warn("Join cluster failed, will retry",
"retry_interval", r.interval,
"error", err,
)
}
select {
case <-time.After(r.interval):
case <-r.stopCh:
return nil
}
}
}