diff --git a/.changelog/13091.txt b/.changelog/13091.txt new file mode 100644 index 000000000..54b642f3d --- /dev/null +++ b/.changelog/13091.txt @@ -0,0 +1,5 @@ +```release-note:improvement +config: introduce `telemetry.retry_failed_connection` in agent configuration to +retry on failed connection to any telemetry backend. This prevents the agent from +exiting if the given DogStatsD DNS name is unresolvable, for example. +``` \ No newline at end of file diff --git a/agent/acl_test.go b/agent/acl_test.go index 995a3b6e6..3ed7ce325 100644 --- a/agent/acl_test.go +++ b/agent/acl_test.go @@ -71,7 +71,9 @@ func NewTestACLAgent(t *testing.T, name string, hcl string, resolveAuthz authzRe Output: logBuffer, TimeFormat: "04:05.000", }) - bd.MetricsHandler = metrics.NewInmemSink(1*time.Second, time.Minute) + bd.MetricsConfig = &lib.MetricsConfig{ + Handler: metrics.NewInmemSink(1*time.Second, time.Minute), + } agent, err := New(bd) require.NoError(t, err) diff --git a/agent/agent.go b/agent/agent.go index b1e5fd4aa..86917c53d 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1429,6 +1429,7 @@ func (a *Agent) ShutdownAgent() error { // this would be cancelled anyways (by the closing of the shutdown ch) but // this should help them to be stopped more quickly a.baseDeps.AutoConfig.Stop() + a.baseDeps.MetricsConfig.Cancel() a.stateLock.Lock() defer a.stateLock.Unlock() diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 385e08bfd..87805a24a 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -173,7 +173,7 @@ func (s *HTTPHandlers) AgentMetrics(resp http.ResponseWriter, req *http.Request) handler.ServeHTTP(resp, req) return nil, nil } - return s.agent.baseDeps.MetricsHandler.DisplayMetrics(resp, req) + return s.agent.baseDeps.MetricsConfig.Handler.DisplayMetrics(resp, req) } func (s *HTTPHandlers) AgentMetricsStream(resp http.ResponseWriter, req *http.Request) (interface{}, error) { @@ -210,7 +210,7 @@ func (s *HTTPHandlers) AgentMetricsStream(resp http.ResponseWriter, req *http.Re flusher: flusher, } enc.encoder.SetIndent("", " ") - s.agent.baseDeps.MetricsHandler.Stream(req.Context(), enc) + s.agent.baseDeps.MetricsConfig.Handler.Stream(req.Context(), enc) return nil, nil } diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index 8fccd2093..05c3e59ac 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -39,6 +39,7 @@ import ( tokenStore "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/agent/xds/proxysupport" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" @@ -1563,7 +1564,9 @@ func TestHTTPHandlers_AgentMetricsStream_ACLDeny(t *testing.T) { bd := BaseDeps{} bd.Tokens = new(tokenStore.Store) sink := metrics.NewInmemSink(30*time.Millisecond, time.Second) - bd.MetricsHandler = sink + bd.MetricsConfig = &lib.MetricsConfig{ + Handler: sink, + } d := fakeResolveTokenDelegate{authorizer: acl.DenyAll()} agent := &Agent{ baseDeps: bd, @@ -1590,7 +1593,9 @@ func TestHTTPHandlers_AgentMetricsStream(t *testing.T) { bd := BaseDeps{} bd.Tokens = new(tokenStore.Store) sink := metrics.NewInmemSink(20*time.Millisecond, time.Second) - bd.MetricsHandler = sink + bd.MetricsConfig = &lib.MetricsConfig{ + Handler: sink, + } d := fakeResolveTokenDelegate{authorizer: acl.ManageAll()} agent := &Agent{ baseDeps: bd, diff --git a/agent/config/builder.go b/agent/config/builder.go index 9a814bcb5..741aa06b1 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -917,6 +917,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) { DisableHostname: boolVal(c.Telemetry.DisableHostname), DogstatsdAddr: stringVal(c.Telemetry.DogstatsdAddr), DogstatsdTags: c.Telemetry.DogstatsdTags, + RetryFailedConfiguration: boolVal(c.Telemetry.RetryFailedConfiguration), FilterDefault: boolVal(c.Telemetry.FilterDefault), AllowedPrefixes: telemetryAllowedPrefixes, BlockedPrefixes: telemetryBlockedPrefixes, diff --git a/agent/config/config.go b/agent/config/config.go index d78e7098d..0bb16cda5 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -674,6 +674,7 @@ type Telemetry struct { DisableHostname *bool `mapstructure:"disable_hostname"` DogstatsdAddr *string `mapstructure:"dogstatsd_addr"` DogstatsdTags []string `mapstructure:"dogstatsd_tags"` + RetryFailedConfiguration *bool `mapstructure:"retry_failed_connection"` FilterDefault *bool `mapstructure:"filter_default"` PrefixFilter []string `mapstructure:"prefix_filter"` MetricsPrefix *string `mapstructure:"metrics_prefix"` diff --git a/agent/config/default.go b/agent/config/default.go index 9355ec7ff..8d1846e99 100644 --- a/agent/config/default.go +++ b/agent/config/default.go @@ -128,6 +128,7 @@ func DefaultSource() Source { metrics_prefix = "consul" filter_default = true prefix_filter = [] + retry_failed_connection = true } raft_snapshot_threshold = ` + strconv.Itoa(int(cfg.RaftConfig.SnapshotThreshold)) + ` raft_snapshot_interval = "` + cfg.RaftConfig.SnapshotInterval.String() + `" diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index 719d8ddeb..cdbcb50bc 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -6306,6 +6306,7 @@ func TestLoad_FullConfig(t *testing.T) { DisableHostname: true, DogstatsdAddr: "0wSndumK", DogstatsdTags: []string{"3N81zSUB", "Xtj8AnXZ"}, + RetryFailedConfiguration: true, FilterDefault: true, AllowedPrefixes: []string{"oJotS8XJ"}, BlockedPrefixes: []string{"cazlEhGn", "ftO6DySn.rpc.server.call"}, diff --git a/agent/config/testdata/TestRuntimeConfig_Sanitize.golden b/agent/config/testdata/TestRuntimeConfig_Sanitize.golden index 6cff27c9b..159f600d2 100644 --- a/agent/config/testdata/TestRuntimeConfig_Sanitize.golden +++ b/agent/config/testdata/TestRuntimeConfig_Sanitize.golden @@ -418,6 +418,7 @@ "DisableHostname": false, "DogstatsdAddr": "", "DogstatsdTags": [], + "RetryFailedConfiguration": false, "FilterDefault": false, "MetricsPrefix": "", "PrometheusOpts": { diff --git a/agent/config/testdata/full-config.hcl b/agent/config/testdata/full-config.hcl index c52488751..670d30a16 100644 --- a/agent/config/testdata/full-config.hcl +++ b/agent/config/testdata/full-config.hcl @@ -647,6 +647,7 @@ telemetry { disable_hostname = true dogstatsd_addr = "0wSndumK" dogstatsd_tags = [ "3N81zSUB","Xtj8AnXZ" ] + retry_failed_connection = true filter_default = true prefix_filter = [ "+oJotS8XJ","-cazlEhGn" ] metrics_prefix = "ftO6DySn" diff --git a/agent/config/testdata/full-config.json b/agent/config/testdata/full-config.json index f051b9d81..3b08e0f1c 100644 --- a/agent/config/testdata/full-config.json +++ b/agent/config/testdata/full-config.json @@ -644,6 +644,7 @@ "disable_hostname": true, "dogstatsd_addr": "0wSndumK", "dogstatsd_tags": [ "3N81zSUB","Xtj8AnXZ" ], + "retry_failed_connection": true, "filter_default": true, "prefix_filter": [ "+oJotS8XJ","-cazlEhGn" ], "metrics_prefix": "ftO6DySn", diff --git a/agent/setup.go b/agent/setup.go index bbe54ae06..6e96f468f 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -1,15 +1,12 @@ package agent import ( - "context" "fmt" "io" "net" - "net/http" "sync" "time" - "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" "github.com/hashicorp/go-hclog" "google.golang.org/grpc/grpclog" @@ -41,18 +38,12 @@ import ( type BaseDeps struct { consul.Deps // TODO: un-embed - RuntimeConfig *config.RuntimeConfig - MetricsHandler MetricsHandler - AutoConfig *autoconf.AutoConfig // TODO: use an interface - Cache *cache.Cache - ViewStore *submatview.Store - WatchedFiles []string -} - -// MetricsHandler provides an http.Handler for displaying metrics. -type MetricsHandler interface { - DisplayMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error) - Stream(ctx context.Context, encoder metrics.Encoder) + RuntimeConfig *config.RuntimeConfig + MetricsConfig *lib.MetricsConfig + AutoConfig *autoconf.AutoConfig // TODO: use an interface + Cache *cache.Cache + ViewStore *submatview.Store + WatchedFiles []string } type ConfigLoader func(source config.Source) (config.LoadResult, error) @@ -90,7 +81,8 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error) cfg.Telemetry.PrometheusOpts.GaugeDefinitions = gauges cfg.Telemetry.PrometheusOpts.CounterDefinitions = counters cfg.Telemetry.PrometheusOpts.SummaryDefinitions = summaries - d.MetricsHandler, err = lib.InitTelemetry(cfg.Telemetry) + + d.MetricsConfig, err = lib.InitTelemetry(cfg.Telemetry, d.Logger) if err != nil { return d, fmt.Errorf("failed to initialize telemetry: %w", err) } diff --git a/agent/testagent.go b/agent/testagent.go index 4dbf859bc..6b1c2ed51 100644 --- a/agent/testagent.go +++ b/agent/testagent.go @@ -25,6 +25,7 @@ import ( "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/sdk/freeport" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil/retry" @@ -216,7 +217,9 @@ func (a *TestAgent) Start(t *testing.T) error { bd.Logger = logger // if we are not testing telemetry things, let's use a "mock" sink for metrics if bd.RuntimeConfig.Telemetry.Disable { - bd.MetricsHandler = metrics.NewInmemSink(1*time.Second, time.Minute) + bd.MetricsConfig = &lib.MetricsConfig{ + Handler: metrics.NewInmemSink(1*time.Second, time.Minute), + } } if a.Config != nil && bd.RuntimeConfig.AutoReloadConfigCoalesceInterval == 0 { diff --git a/connect/proxy/proxy.go b/connect/proxy/proxy.go index 86cc24d46..f77d96ca9 100644 --- a/connect/proxy/proxy.go +++ b/connect/proxy/proxy.go @@ -57,7 +57,7 @@ func (p *Proxy) Serve() error { // Setup telemetry if configured // NOTE(kit): As far as I can tell, all of the metrics in the proxy are generated at runtime, so we // don't have any static metrics we initialize at start. - _, err := lib.InitTelemetry(newCfg.Telemetry) + _, err := lib.InitTelemetry(newCfg.Telemetry, p.logger) if err != nil { p.logger.Error("proxy telemetry config error", "error", err) } diff --git a/lib/telemetry.go b/lib/telemetry.go index d74edb37c..6183c6fac 100644 --- a/lib/telemetry.go +++ b/lib/telemetry.go @@ -1,12 +1,20 @@ package lib import ( + "context" + "errors" + "net" + "net/http" + "sync" "time" "github.com/armon/go-metrics" "github.com/armon/go-metrics/circonus" "github.com/armon/go-metrics/datadog" "github.com/armon/go-metrics/prometheus" + "github.com/hashicorp/consul/lib/retry" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-multierror" ) // TelemetryConfig is embedded in config.RuntimeConfig and holds the @@ -153,6 +161,11 @@ type TelemetryConfig struct { // hcl: telemetry { dogstatsd_tags = []string } DogstatsdTags []string `json:"dogstatsd_tags,omitempty" mapstructure:"dogstatsd_tags"` + // RetryFailedConfiguration retries transient errors when setting up sinks (e.g. network errors when connecting to telemetry backends). + // + // hcl: telemetry { retry_failed_connection = (true|false) } + RetryFailedConfiguration bool `json:"retry_failed_connection,omitempty" mapstructure:"retry_failed_connection"` + // FilterDefault is the default for whether to allow a metric that's not // covered by the filter. // @@ -199,6 +212,27 @@ type TelemetryConfig struct { PrometheusOpts prometheus.PrometheusOpts } +// MetricsHandler provides an http.Handler for displaying metrics. +type MetricsHandler interface { + DisplayMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error) + Stream(ctx context.Context, encoder metrics.Encoder) +} + +type MetricsConfig struct { + Handler MetricsHandler + mu sync.Mutex + cancelFn context.CancelFunc +} + +func (cfg *MetricsConfig) Cancel() { + cfg.mu.Lock() + defer cfg.mu.Unlock() + + if cfg.cancelFn != nil { + cfg.cancelFn() + } +} + func statsiteSink(cfg TelemetryConfig, hostname string) (metrics.MetricSink, error) { addr := cfg.StatsiteAddr if addr == "" { @@ -283,17 +317,7 @@ func circonusSink(cfg TelemetryConfig, hostname string) (metrics.MetricSink, err return sink, nil } -// InitTelemetry configures go-metrics based on map of telemetry config -// values as returned by Runtimecfg.Config(). -func InitTelemetry(cfg TelemetryConfig) (*metrics.InmemSink, error) { - if cfg.Disable { - return nil, nil - } - // Setup telemetry - // Aggregate on 10 second intervals for 1 minute. Expose the - // metrics over stderr when there is a SIGUSR1 received. - memSink := metrics.NewInmemSink(10*time.Second, time.Minute) - metrics.DefaultInmemSignal(memSink) +func configureSinks(cfg TelemetryConfig, hostName string, memSink metrics.MetricSink) (metrics.FanoutSink, error) { metricsConf := metrics.DefaultConfig(cfg.MetricsPrefix) metricsConf.EnableHostname = !cfg.DisableHostname metricsConf.FilterDefault = cfg.FilterDefault @@ -301,35 +325,24 @@ func InitTelemetry(cfg TelemetryConfig) (*metrics.InmemSink, error) { metricsConf.BlockedPrefixes = cfg.BlockedPrefixes var sinks metrics.FanoutSink - addSink := func(fn func(TelemetryConfig, string) (metrics.MetricSink, error)) error { + var errors error + addSink := func(fn func(TelemetryConfig, string) (metrics.MetricSink, error)) { s, err := fn(cfg, metricsConf.HostName) if err != nil { - return err + errors = multierror.Append(errors, err) + return } if s != nil { sinks = append(sinks, s) } - return nil } - if err := addSink(statsiteSink); err != nil { - return nil, err - } - if err := addSink(statsdSink); err != nil { - return nil, err - } - if err := addSink(dogstatdSink); err != nil { - return nil, err - } - if err := addSink(circonusSink); err != nil { - return nil, err - } - if err := addSink(circonusSink); err != nil { - return nil, err - } - if err := addSink(prometheusSink); err != nil { - return nil, err - } + addSink(statsiteSink) + addSink(statsdSink) + addSink(dogstatdSink) + addSink(circonusSink) + addSink(circonusSink) + addSink(prometheusSink) if len(sinks) > 0 { sinks = append(sinks, memSink) @@ -338,5 +351,67 @@ func InitTelemetry(cfg TelemetryConfig) (*metrics.InmemSink, error) { metricsConf.EnableHostname = false metrics.NewGlobal(metricsConf, memSink) } - return memSink, nil + return sinks, errors +} + +// InitTelemetry configures go-metrics based on map of telemetry config +// values as returned by Runtimecfg.Config(). +// InitTelemetry retries configurating the sinks in case error is retriable +// and retry_failed_connection is set to true. +func InitTelemetry(cfg TelemetryConfig, logger hclog.Logger) (*MetricsConfig, error) { + if cfg.Disable { + return nil, nil + } + + memSink := metrics.NewInmemSink(10*time.Second, time.Minute) + metrics.DefaultInmemSignal(memSink) + metricsConf := metrics.DefaultConfig(cfg.MetricsPrefix) + + metricsConfig := &MetricsConfig{ + Handler: memSink, + } + + var cancel context.CancelFunc + var ctx context.Context + retryWithBackoff := func() { + waiter := &retry.Waiter{ + MaxWait: 5 * time.Minute, + } + for { + logger.Warn("retrying configure metric sinks", "retries", waiter.Failures()) + _, err := configureSinks(cfg, metricsConf.HostName, memSink) + if err == nil { + logger.Info("successfully configured metrics sinks") + return + } + logger.Error("failed configure sinks", "error", multierror.Flatten(err)) + + if err := waiter.Wait(ctx); err != nil { + logger.Trace("stop retrying configure metrics sinks") + } + } + } + + if _, errs := configureSinks(cfg, metricsConf.HostName, memSink); errs != nil { + if isRetriableError(errs) && cfg.RetryFailedConfiguration { + logger.Warn("failed configure sinks", "error", multierror.Flatten(errs)) + ctx, cancel = context.WithCancel(context.Background()) + + metricsConfig.mu.Lock() + metricsConfig.cancelFn = cancel + metricsConfig.mu.Unlock() + go retryWithBackoff() + } else { + return nil, errs + } + } + return metricsConfig, nil +} + +func isRetriableError(errs error) bool { + var dnsError *net.DNSError + if errors.As(errs, &dnsError) && dnsError.IsNotFound { + return true + } + return false } diff --git a/lib/telemetry_test.go b/lib/telemetry_test.go new file mode 100644 index 000000000..8f0ec176d --- /dev/null +++ b/lib/telemetry_test.go @@ -0,0 +1,66 @@ +package lib + +import ( + "errors" + "net" + "os" + "testing" + + "github.com/hashicorp/consul/logging" + "github.com/hashicorp/go-multierror" + "github.com/stretchr/testify/require" +) + +func newCfg() TelemetryConfig { + return TelemetryConfig{ + StatsdAddr: "statsd.host:1234", + StatsiteAddr: "statsite.host:1234", + DogstatsdAddr: "mydog.host:8125", + } +} + +func TestConfigureSinks(t *testing.T) { + cfg := newCfg() + sinks, err := configureSinks(cfg, "hostname", nil) + require.Error(t, err) + // 3 sinks: statsd, statsite, inmem + require.Equal(t, 3, len(sinks)) + + cfg = TelemetryConfig{ + DogstatsdAddr: "", + } + _, err = configureSinks(cfg, "hostname", nil) + require.NoError(t, err) + +} + +func TestIsRetriableError(t *testing.T) { + var err error + err = multierror.Append(err, errors.New("an error")) + r := isRetriableError(err) + require.False(t, r) + + err = multierror.Append(err, &net.DNSError{ + IsNotFound: true, + }) + r = isRetriableError(err) + require.True(t, r) +} + +func TestInitTelemetryRetrySuccess(t *testing.T) { + logger, err := logging.Setup(logging.Config{ + LogLevel: "INFO", + }, os.Stdout) + require.NoError(t, err) + cfg := newCfg() + _, err = InitTelemetry(cfg, logger) + require.Error(t, err) + + cfg.RetryFailedConfiguration = true + metricsCfg, err := InitTelemetry(cfg, logger) + require.NoError(t, err) + // TODO: we couldn't extract the metrics sinks from the + // global metrics due to it's limitation + // fanoutSink := metrics.Default()} + metricsCfg.cancelFn() +}