Retry on bad dogstatsd connection (#13091)
- Introduce a new telemetry configurable parameter retry_failed_connection. User can set the value to true to let consul agent continue its start process on failed connection to datadog server. When set to false, agent will stop on failed start. The default behavior is true. Co-authored-by: Dan Upton <daniel@floppy.co> Co-authored-by: Evan Culver <eculver@users.noreply.github.com>
This commit is contained in:
parent
9389b8b6fa
commit
df27fa0c84
|
@ -0,0 +1,5 @@
|
|||
```release-note:improvement
|
||||
config: introduce `telemetry.retry_failed_connection` in agent configuration to
|
||||
retry on failed connection to any telemetry backend. This prevents the agent from
|
||||
exiting if the given DogStatsD DNS name is unresolvable, for example.
|
||||
```
|
|
@ -71,7 +71,9 @@ func NewTestACLAgent(t *testing.T, name string, hcl string, resolveAuthz authzRe
|
|||
Output: logBuffer,
|
||||
TimeFormat: "04:05.000",
|
||||
})
|
||||
bd.MetricsHandler = metrics.NewInmemSink(1*time.Second, time.Minute)
|
||||
bd.MetricsConfig = &lib.MetricsConfig{
|
||||
Handler: metrics.NewInmemSink(1*time.Second, time.Minute),
|
||||
}
|
||||
|
||||
agent, err := New(bd)
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -1429,6 +1429,7 @@ func (a *Agent) ShutdownAgent() error {
|
|||
// this would be cancelled anyways (by the closing of the shutdown ch) but
|
||||
// this should help them to be stopped more quickly
|
||||
a.baseDeps.AutoConfig.Stop()
|
||||
a.baseDeps.MetricsConfig.Cancel()
|
||||
|
||||
a.stateLock.Lock()
|
||||
defer a.stateLock.Unlock()
|
||||
|
|
|
@ -173,7 +173,7 @@ func (s *HTTPHandlers) AgentMetrics(resp http.ResponseWriter, req *http.Request)
|
|||
handler.ServeHTTP(resp, req)
|
||||
return nil, nil
|
||||
}
|
||||
return s.agent.baseDeps.MetricsHandler.DisplayMetrics(resp, req)
|
||||
return s.agent.baseDeps.MetricsConfig.Handler.DisplayMetrics(resp, req)
|
||||
}
|
||||
|
||||
func (s *HTTPHandlers) AgentMetricsStream(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
|
@ -210,7 +210,7 @@ func (s *HTTPHandlers) AgentMetricsStream(resp http.ResponseWriter, req *http.Re
|
|||
flusher: flusher,
|
||||
}
|
||||
enc.encoder.SetIndent("", " ")
|
||||
s.agent.baseDeps.MetricsHandler.Stream(req.Context(), enc)
|
||||
s.agent.baseDeps.MetricsConfig.Handler.Stream(req.Context(), enc)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -39,6 +39,7 @@ import (
|
|||
tokenStore "github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/agent/xds/proxysupport"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
|
@ -1563,7 +1564,9 @@ func TestHTTPHandlers_AgentMetricsStream_ACLDeny(t *testing.T) {
|
|||
bd := BaseDeps{}
|
||||
bd.Tokens = new(tokenStore.Store)
|
||||
sink := metrics.NewInmemSink(30*time.Millisecond, time.Second)
|
||||
bd.MetricsHandler = sink
|
||||
bd.MetricsConfig = &lib.MetricsConfig{
|
||||
Handler: sink,
|
||||
}
|
||||
d := fakeResolveTokenDelegate{authorizer: acl.DenyAll()}
|
||||
agent := &Agent{
|
||||
baseDeps: bd,
|
||||
|
@ -1590,7 +1593,9 @@ func TestHTTPHandlers_AgentMetricsStream(t *testing.T) {
|
|||
bd := BaseDeps{}
|
||||
bd.Tokens = new(tokenStore.Store)
|
||||
sink := metrics.NewInmemSink(20*time.Millisecond, time.Second)
|
||||
bd.MetricsHandler = sink
|
||||
bd.MetricsConfig = &lib.MetricsConfig{
|
||||
Handler: sink,
|
||||
}
|
||||
d := fakeResolveTokenDelegate{authorizer: acl.ManageAll()}
|
||||
agent := &Agent{
|
||||
baseDeps: bd,
|
||||
|
|
|
@ -917,6 +917,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
|
|||
DisableHostname: boolVal(c.Telemetry.DisableHostname),
|
||||
DogstatsdAddr: stringVal(c.Telemetry.DogstatsdAddr),
|
||||
DogstatsdTags: c.Telemetry.DogstatsdTags,
|
||||
RetryFailedConfiguration: boolVal(c.Telemetry.RetryFailedConfiguration),
|
||||
FilterDefault: boolVal(c.Telemetry.FilterDefault),
|
||||
AllowedPrefixes: telemetryAllowedPrefixes,
|
||||
BlockedPrefixes: telemetryBlockedPrefixes,
|
||||
|
|
|
@ -674,6 +674,7 @@ type Telemetry struct {
|
|||
DisableHostname *bool `mapstructure:"disable_hostname"`
|
||||
DogstatsdAddr *string `mapstructure:"dogstatsd_addr"`
|
||||
DogstatsdTags []string `mapstructure:"dogstatsd_tags"`
|
||||
RetryFailedConfiguration *bool `mapstructure:"retry_failed_connection"`
|
||||
FilterDefault *bool `mapstructure:"filter_default"`
|
||||
PrefixFilter []string `mapstructure:"prefix_filter"`
|
||||
MetricsPrefix *string `mapstructure:"metrics_prefix"`
|
||||
|
|
|
@ -128,6 +128,7 @@ func DefaultSource() Source {
|
|||
metrics_prefix = "consul"
|
||||
filter_default = true
|
||||
prefix_filter = []
|
||||
retry_failed_connection = true
|
||||
}
|
||||
raft_snapshot_threshold = ` + strconv.Itoa(int(cfg.RaftConfig.SnapshotThreshold)) + `
|
||||
raft_snapshot_interval = "` + cfg.RaftConfig.SnapshotInterval.String() + `"
|
||||
|
|
|
@ -6306,6 +6306,7 @@ func TestLoad_FullConfig(t *testing.T) {
|
|||
DisableHostname: true,
|
||||
DogstatsdAddr: "0wSndumK",
|
||||
DogstatsdTags: []string{"3N81zSUB", "Xtj8AnXZ"},
|
||||
RetryFailedConfiguration: true,
|
||||
FilterDefault: true,
|
||||
AllowedPrefixes: []string{"oJotS8XJ"},
|
||||
BlockedPrefixes: []string{"cazlEhGn", "ftO6DySn.rpc.server.call"},
|
||||
|
|
|
@ -418,6 +418,7 @@
|
|||
"DisableHostname": false,
|
||||
"DogstatsdAddr": "",
|
||||
"DogstatsdTags": [],
|
||||
"RetryFailedConfiguration": false,
|
||||
"FilterDefault": false,
|
||||
"MetricsPrefix": "",
|
||||
"PrometheusOpts": {
|
||||
|
|
|
@ -647,6 +647,7 @@ telemetry {
|
|||
disable_hostname = true
|
||||
dogstatsd_addr = "0wSndumK"
|
||||
dogstatsd_tags = [ "3N81zSUB","Xtj8AnXZ" ]
|
||||
retry_failed_connection = true
|
||||
filter_default = true
|
||||
prefix_filter = [ "+oJotS8XJ","-cazlEhGn" ]
|
||||
metrics_prefix = "ftO6DySn"
|
||||
|
|
|
@ -644,6 +644,7 @@
|
|||
"disable_hostname": true,
|
||||
"dogstatsd_addr": "0wSndumK",
|
||||
"dogstatsd_tags": [ "3N81zSUB","Xtj8AnXZ" ],
|
||||
"retry_failed_connection": true,
|
||||
"filter_default": true,
|
||||
"prefix_filter": [ "+oJotS8XJ","-cazlEhGn" ],
|
||||
"metrics_prefix": "ftO6DySn",
|
||||
|
|
|
@ -1,15 +1,12 @@
|
|||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/armon/go-metrics/prometheus"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
|
@ -41,18 +38,12 @@ import (
|
|||
type BaseDeps struct {
|
||||
consul.Deps // TODO: un-embed
|
||||
|
||||
RuntimeConfig *config.RuntimeConfig
|
||||
MetricsHandler MetricsHandler
|
||||
AutoConfig *autoconf.AutoConfig // TODO: use an interface
|
||||
Cache *cache.Cache
|
||||
ViewStore *submatview.Store
|
||||
WatchedFiles []string
|
||||
}
|
||||
|
||||
// MetricsHandler provides an http.Handler for displaying metrics.
|
||||
type MetricsHandler interface {
|
||||
DisplayMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error)
|
||||
Stream(ctx context.Context, encoder metrics.Encoder)
|
||||
RuntimeConfig *config.RuntimeConfig
|
||||
MetricsConfig *lib.MetricsConfig
|
||||
AutoConfig *autoconf.AutoConfig // TODO: use an interface
|
||||
Cache *cache.Cache
|
||||
ViewStore *submatview.Store
|
||||
WatchedFiles []string
|
||||
}
|
||||
|
||||
type ConfigLoader func(source config.Source) (config.LoadResult, error)
|
||||
|
@ -90,7 +81,8 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error)
|
|||
cfg.Telemetry.PrometheusOpts.GaugeDefinitions = gauges
|
||||
cfg.Telemetry.PrometheusOpts.CounterDefinitions = counters
|
||||
cfg.Telemetry.PrometheusOpts.SummaryDefinitions = summaries
|
||||
d.MetricsHandler, err = lib.InitTelemetry(cfg.Telemetry)
|
||||
|
||||
d.MetricsConfig, err = lib.InitTelemetry(cfg.Telemetry, d.Logger)
|
||||
if err != nil {
|
||||
return d, fmt.Errorf("failed to initialize telemetry: %w", err)
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/consul"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/sdk/freeport"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
|
@ -216,7 +217,9 @@ func (a *TestAgent) Start(t *testing.T) error {
|
|||
bd.Logger = logger
|
||||
// if we are not testing telemetry things, let's use a "mock" sink for metrics
|
||||
if bd.RuntimeConfig.Telemetry.Disable {
|
||||
bd.MetricsHandler = metrics.NewInmemSink(1*time.Second, time.Minute)
|
||||
bd.MetricsConfig = &lib.MetricsConfig{
|
||||
Handler: metrics.NewInmemSink(1*time.Second, time.Minute),
|
||||
}
|
||||
}
|
||||
|
||||
if a.Config != nil && bd.RuntimeConfig.AutoReloadConfigCoalesceInterval == 0 {
|
||||
|
|
|
@ -57,7 +57,7 @@ func (p *Proxy) Serve() error {
|
|||
// Setup telemetry if configured
|
||||
// NOTE(kit): As far as I can tell, all of the metrics in the proxy are generated at runtime, so we
|
||||
// don't have any static metrics we initialize at start.
|
||||
_, err := lib.InitTelemetry(newCfg.Telemetry)
|
||||
_, err := lib.InitTelemetry(newCfg.Telemetry, p.logger)
|
||||
if err != nil {
|
||||
p.logger.Error("proxy telemetry config error", "error", err)
|
||||
}
|
||||
|
|
141
lib/telemetry.go
141
lib/telemetry.go
|
@ -1,12 +1,20 @@
|
|||
package lib
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/armon/go-metrics/circonus"
|
||||
"github.com/armon/go-metrics/datadog"
|
||||
"github.com/armon/go-metrics/prometheus"
|
||||
"github.com/hashicorp/consul/lib/retry"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
)
|
||||
|
||||
// TelemetryConfig is embedded in config.RuntimeConfig and holds the
|
||||
|
@ -153,6 +161,11 @@ type TelemetryConfig struct {
|
|||
// hcl: telemetry { dogstatsd_tags = []string }
|
||||
DogstatsdTags []string `json:"dogstatsd_tags,omitempty" mapstructure:"dogstatsd_tags"`
|
||||
|
||||
// RetryFailedConfiguration retries transient errors when setting up sinks (e.g. network errors when connecting to telemetry backends).
|
||||
//
|
||||
// hcl: telemetry { retry_failed_connection = (true|false) }
|
||||
RetryFailedConfiguration bool `json:"retry_failed_connection,omitempty" mapstructure:"retry_failed_connection"`
|
||||
|
||||
// FilterDefault is the default for whether to allow a metric that's not
|
||||
// covered by the filter.
|
||||
//
|
||||
|
@ -199,6 +212,27 @@ type TelemetryConfig struct {
|
|||
PrometheusOpts prometheus.PrometheusOpts
|
||||
}
|
||||
|
||||
// MetricsHandler provides an http.Handler for displaying metrics.
|
||||
type MetricsHandler interface {
|
||||
DisplayMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error)
|
||||
Stream(ctx context.Context, encoder metrics.Encoder)
|
||||
}
|
||||
|
||||
type MetricsConfig struct {
|
||||
Handler MetricsHandler
|
||||
mu sync.Mutex
|
||||
cancelFn context.CancelFunc
|
||||
}
|
||||
|
||||
func (cfg *MetricsConfig) Cancel() {
|
||||
cfg.mu.Lock()
|
||||
defer cfg.mu.Unlock()
|
||||
|
||||
if cfg.cancelFn != nil {
|
||||
cfg.cancelFn()
|
||||
}
|
||||
}
|
||||
|
||||
func statsiteSink(cfg TelemetryConfig, hostname string) (metrics.MetricSink, error) {
|
||||
addr := cfg.StatsiteAddr
|
||||
if addr == "" {
|
||||
|
@ -283,17 +317,7 @@ func circonusSink(cfg TelemetryConfig, hostname string) (metrics.MetricSink, err
|
|||
return sink, nil
|
||||
}
|
||||
|
||||
// InitTelemetry configures go-metrics based on map of telemetry config
|
||||
// values as returned by Runtimecfg.Config().
|
||||
func InitTelemetry(cfg TelemetryConfig) (*metrics.InmemSink, error) {
|
||||
if cfg.Disable {
|
||||
return nil, nil
|
||||
}
|
||||
// Setup telemetry
|
||||
// Aggregate on 10 second intervals for 1 minute. Expose the
|
||||
// metrics over stderr when there is a SIGUSR1 received.
|
||||
memSink := metrics.NewInmemSink(10*time.Second, time.Minute)
|
||||
metrics.DefaultInmemSignal(memSink)
|
||||
func configureSinks(cfg TelemetryConfig, hostName string, memSink metrics.MetricSink) (metrics.FanoutSink, error) {
|
||||
metricsConf := metrics.DefaultConfig(cfg.MetricsPrefix)
|
||||
metricsConf.EnableHostname = !cfg.DisableHostname
|
||||
metricsConf.FilterDefault = cfg.FilterDefault
|
||||
|
@ -301,35 +325,24 @@ func InitTelemetry(cfg TelemetryConfig) (*metrics.InmemSink, error) {
|
|||
metricsConf.BlockedPrefixes = cfg.BlockedPrefixes
|
||||
|
||||
var sinks metrics.FanoutSink
|
||||
addSink := func(fn func(TelemetryConfig, string) (metrics.MetricSink, error)) error {
|
||||
var errors error
|
||||
addSink := func(fn func(TelemetryConfig, string) (metrics.MetricSink, error)) {
|
||||
s, err := fn(cfg, metricsConf.HostName)
|
||||
if err != nil {
|
||||
return err
|
||||
errors = multierror.Append(errors, err)
|
||||
return
|
||||
}
|
||||
if s != nil {
|
||||
sinks = append(sinks, s)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := addSink(statsiteSink); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := addSink(statsdSink); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := addSink(dogstatdSink); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := addSink(circonusSink); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := addSink(circonusSink); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := addSink(prometheusSink); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
addSink(statsiteSink)
|
||||
addSink(statsdSink)
|
||||
addSink(dogstatdSink)
|
||||
addSink(circonusSink)
|
||||
addSink(circonusSink)
|
||||
addSink(prometheusSink)
|
||||
|
||||
if len(sinks) > 0 {
|
||||
sinks = append(sinks, memSink)
|
||||
|
@ -338,5 +351,67 @@ func InitTelemetry(cfg TelemetryConfig) (*metrics.InmemSink, error) {
|
|||
metricsConf.EnableHostname = false
|
||||
metrics.NewGlobal(metricsConf, memSink)
|
||||
}
|
||||
return memSink, nil
|
||||
return sinks, errors
|
||||
}
|
||||
|
||||
// InitTelemetry configures go-metrics based on map of telemetry config
|
||||
// values as returned by Runtimecfg.Config().
|
||||
// InitTelemetry retries configurating the sinks in case error is retriable
|
||||
// and retry_failed_connection is set to true.
|
||||
func InitTelemetry(cfg TelemetryConfig, logger hclog.Logger) (*MetricsConfig, error) {
|
||||
if cfg.Disable {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
memSink := metrics.NewInmemSink(10*time.Second, time.Minute)
|
||||
metrics.DefaultInmemSignal(memSink)
|
||||
metricsConf := metrics.DefaultConfig(cfg.MetricsPrefix)
|
||||
|
||||
metricsConfig := &MetricsConfig{
|
||||
Handler: memSink,
|
||||
}
|
||||
|
||||
var cancel context.CancelFunc
|
||||
var ctx context.Context
|
||||
retryWithBackoff := func() {
|
||||
waiter := &retry.Waiter{
|
||||
MaxWait: 5 * time.Minute,
|
||||
}
|
||||
for {
|
||||
logger.Warn("retrying configure metric sinks", "retries", waiter.Failures())
|
||||
_, err := configureSinks(cfg, metricsConf.HostName, memSink)
|
||||
if err == nil {
|
||||
logger.Info("successfully configured metrics sinks")
|
||||
return
|
||||
}
|
||||
logger.Error("failed configure sinks", "error", multierror.Flatten(err))
|
||||
|
||||
if err := waiter.Wait(ctx); err != nil {
|
||||
logger.Trace("stop retrying configure metrics sinks")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if _, errs := configureSinks(cfg, metricsConf.HostName, memSink); errs != nil {
|
||||
if isRetriableError(errs) && cfg.RetryFailedConfiguration {
|
||||
logger.Warn("failed configure sinks", "error", multierror.Flatten(errs))
|
||||
ctx, cancel = context.WithCancel(context.Background())
|
||||
|
||||
metricsConfig.mu.Lock()
|
||||
metricsConfig.cancelFn = cancel
|
||||
metricsConfig.mu.Unlock()
|
||||
go retryWithBackoff()
|
||||
} else {
|
||||
return nil, errs
|
||||
}
|
||||
}
|
||||
return metricsConfig, nil
|
||||
}
|
||||
|
||||
func isRetriableError(errs error) bool {
|
||||
var dnsError *net.DNSError
|
||||
if errors.As(errs, &dnsError) && dnsError.IsNotFound {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
package lib
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func newCfg() TelemetryConfig {
|
||||
return TelemetryConfig{
|
||||
StatsdAddr: "statsd.host:1234",
|
||||
StatsiteAddr: "statsite.host:1234",
|
||||
DogstatsdAddr: "mydog.host:8125",
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigureSinks(t *testing.T) {
|
||||
cfg := newCfg()
|
||||
sinks, err := configureSinks(cfg, "hostname", nil)
|
||||
require.Error(t, err)
|
||||
// 3 sinks: statsd, statsite, inmem
|
||||
require.Equal(t, 3, len(sinks))
|
||||
|
||||
cfg = TelemetryConfig{
|
||||
DogstatsdAddr: "",
|
||||
}
|
||||
_, err = configureSinks(cfg, "hostname", nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
}
|
||||
|
||||
func TestIsRetriableError(t *testing.T) {
|
||||
var err error
|
||||
err = multierror.Append(err, errors.New("an error"))
|
||||
r := isRetriableError(err)
|
||||
require.False(t, r)
|
||||
|
||||
err = multierror.Append(err, &net.DNSError{
|
||||
IsNotFound: true,
|
||||
})
|
||||
r = isRetriableError(err)
|
||||
require.True(t, r)
|
||||
}
|
||||
|
||||
func TestInitTelemetryRetrySuccess(t *testing.T) {
|
||||
logger, err := logging.Setup(logging.Config{
|
||||
LogLevel: "INFO",
|
||||
}, os.Stdout)
|
||||
require.NoError(t, err)
|
||||
cfg := newCfg()
|
||||
_, err = InitTelemetry(cfg, logger)
|
||||
require.Error(t, err)
|
||||
|
||||
cfg.RetryFailedConfiguration = true
|
||||
metricsCfg, err := InitTelemetry(cfg, logger)
|
||||
require.NoError(t, err)
|
||||
// TODO: we couldn't extract the metrics sinks from the
|
||||
// global metrics due to it's limitation
|
||||
// fanoutSink := metrics.Default()}
|
||||
metricsCfg.cancelFn()
|
||||
}
|
Loading…
Reference in New Issue