Reduce the frequency of metric exports to minutely (#18584) (#18601)

This commit is contained in:
Joshua Timmons 2023-08-28 16:44:14 -04:00 committed by GitHub
parent d0e61057c0
commit 5e98e86c3b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 43 additions and 23 deletions

3
.changelog/18584.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
Reduce the frequency of metric exports from Consul to HCP from every 10s to every 1m
```

View File

@ -82,7 +82,7 @@ func sink(
return nil, fmt.Errorf("failed to init config provider: %w", err) return nil, fmt.Errorf("failed to init config provider: %w", err)
} }
reader := telemetry.NewOTELReader(metricsClient, cfgProvider, telemetry.DefaultExportInterval) reader := telemetry.NewOTELReader(metricsClient, cfgProvider)
sinkOpts := &telemetry.OTELSinkOpts{ sinkOpts := &telemetry.OTELSinkOpts{
Reader: reader, Reader: reader,
ConfigProvider: cfgProvider, ConfigProvider: cfgProvider,

View File

@ -24,17 +24,17 @@ type EndpointProvider interface {
GetEndpoint() *url.URL GetEndpoint() *url.URL
} }
// OTELExporter is a custom implementation of a OTEL Metrics SDK metrics.Exporter. // otelExporter is a custom implementation of a OTEL Metrics SDK metrics.Exporter.
// The exporter is used by a OTEL Metrics SDK PeriodicReader to export aggregated metrics. // The exporter is used by a OTEL Metrics SDK PeriodicReader to export aggregated metrics.
// This allows us to use a custom client - HCP authenticated MetricsClient. // This allows us to use a custom client - HCP authenticated MetricsClient.
type OTELExporter struct { type otelExporter struct {
client MetricsClient client MetricsClient
endpointProvider EndpointProvider endpointProvider EndpointProvider
} }
// NewOTELExporter returns a configured OTELExporter. // newOTELExporter returns a configured OTELExporter.
func NewOTELExporter(client MetricsClient, endpointProvider EndpointProvider) *OTELExporter { func newOTELExporter(client MetricsClient, endpointProvider EndpointProvider) *otelExporter {
return &OTELExporter{ return &otelExporter{
client: client, client: client,
endpointProvider: endpointProvider, endpointProvider: endpointProvider,
} }
@ -42,14 +42,14 @@ func NewOTELExporter(client MetricsClient, endpointProvider EndpointProvider) *O
// Temporality returns the Cumulative temporality for metrics aggregation. // Temporality returns the Cumulative temporality for metrics aggregation.
// Telemetry Gateway stores metrics in Prometheus format, so use Cummulative aggregation as default. // Telemetry Gateway stores metrics in Prometheus format, so use Cummulative aggregation as default.
func (e *OTELExporter) Temporality(_ metric.InstrumentKind) metricdata.Temporality { func (e *otelExporter) Temporality(_ metric.InstrumentKind) metricdata.Temporality {
return metricdata.CumulativeTemporality return metricdata.CumulativeTemporality
} }
// Aggregation returns the Aggregation to use for an instrument kind. // Aggregation returns the Aggregation to use for an instrument kind.
// The default implementation provided by the OTEL Metrics SDK library DefaultAggregationSelector panics. // The default implementation provided by the OTEL Metrics SDK library DefaultAggregationSelector panics.
// This custom version replicates that logic, but removes the panic. // This custom version replicates that logic, but removes the panic.
func (e *OTELExporter) Aggregation(kind metric.InstrumentKind) aggregation.Aggregation { func (e *otelExporter) Aggregation(kind metric.InstrumentKind) aggregation.Aggregation {
switch kind { switch kind {
case metric.InstrumentKindObservableGauge: case metric.InstrumentKindObservableGauge:
return aggregation.LastValue{} return aggregation.LastValue{}
@ -64,7 +64,7 @@ func (e *OTELExporter) Aggregation(kind metric.InstrumentKind) aggregation.Aggre
} }
// Export serializes and transmits metric data to a receiver. // Export serializes and transmits metric data to a receiver.
func (e *OTELExporter) Export(ctx context.Context, metrics *metricdata.ResourceMetrics) error { func (e *otelExporter) Export(ctx context.Context, metrics *metricdata.ResourceMetrics) error {
endpoint := e.endpointProvider.GetEndpoint() endpoint := e.endpointProvider.GetEndpoint()
if endpoint == nil { if endpoint == nil {
return nil return nil
@ -86,13 +86,13 @@ func (e *OTELExporter) Export(ctx context.Context, metrics *metricdata.ResourceM
} }
// ForceFlush is a no-op, as the MetricsClient client holds no state. // ForceFlush is a no-op, as the MetricsClient client holds no state.
func (e *OTELExporter) ForceFlush(ctx context.Context) error { func (e *otelExporter) ForceFlush(ctx context.Context) error {
goMetrics.IncrCounter(internalMetricExporterForceFlush, 1) goMetrics.IncrCounter(internalMetricExporterForceFlush, 1)
return ctx.Err() return ctx.Err()
} }
// Shutdown is a no-op, as the MetricsClient is a HTTP client that requires no graceful shutdown. // Shutdown is a no-op, as the MetricsClient is a HTTP client that requires no graceful shutdown.
func (e *OTELExporter) Shutdown(ctx context.Context) error { func (e *otelExporter) Shutdown(ctx context.Context) error {
goMetrics.IncrCounter(internalMetricExporterShutdown, 1) goMetrics.IncrCounter(internalMetricExporterShutdown, 1)
return ctx.Err() return ctx.Err()
} }

View File

@ -37,7 +37,7 @@ func (m *mockEndpointProvider) GetEndpoint() *url.URL { return m.endpoint }
func TestTemporality(t *testing.T) { func TestTemporality(t *testing.T) {
t.Parallel() t.Parallel()
exp := &OTELExporter{} exp := &otelExporter{}
require.Equal(t, metricdata.CumulativeTemporality, exp.Temporality(metric.InstrumentKindCounter)) require.Equal(t, metricdata.CumulativeTemporality, exp.Temporality(metric.InstrumentKindCounter))
} }
@ -63,7 +63,7 @@ func TestAggregation(t *testing.T) {
test := test test := test
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
t.Parallel() t.Parallel()
exp := &OTELExporter{} exp := &otelExporter{}
require.Equal(t, test.expAgg, exp.Aggregation(test.kind)) require.Equal(t, test.expAgg, exp.Aggregation(test.kind))
}) })
} }
@ -122,7 +122,7 @@ func TestExport(t *testing.T) {
} }
} }
exp := NewOTELExporter(test.client, provider) exp := newOTELExporter(test.client, provider)
err := exp.Export(context.Background(), test.metrics) err := exp.Export(context.Background(), test.metrics)
if test.wantErr != "" { if test.wantErr != "" {
@ -179,7 +179,7 @@ func TestExport_CustomMetrics(t *testing.T) {
u, err := url.Parse(testExportEndpoint) u, err := url.Parse(testExportEndpoint)
require.NoError(t, err) require.NoError(t, err)
exp := NewOTELExporter(tc.client, &mockEndpointProvider{ exp := newOTELExporter(tc.client, &mockEndpointProvider{
endpoint: u, endpoint: u,
}) })
@ -209,7 +209,7 @@ func TestExport_CustomMetrics(t *testing.T) {
func TestForceFlush(t *testing.T) { func TestForceFlush(t *testing.T) {
t.Parallel() t.Parallel()
exp := &OTELExporter{} exp := &otelExporter{}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
cancel() cancel()
@ -219,7 +219,7 @@ func TestForceFlush(t *testing.T) {
func TestShutdown(t *testing.T) { func TestShutdown(t *testing.T) {
t.Parallel() t.Parallel()
exp := &OTELExporter{} exp := &otelExporter{}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
cancel() cancel()

View File

@ -10,20 +10,34 @@ import (
"time" "time"
gometrics "github.com/armon/go-metrics" gometrics "github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/metric" otelmetric "go.opentelemetry.io/otel/metric"
otelsdk "go.opentelemetry.io/otel/sdk/metric" otelsdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/sdk/resource"
"github.com/hashicorp/go-hclog"
) )
// DefaultExportInterval is a default time interval between export of aggregated metrics. const (
const DefaultExportInterval = 10 * time.Second // defaultExportInterval is a default time interval between export of aggregated metrics.
// At the time of writing this is the same as the otelsdk.Reader's default export interval.
defaultExportInterval = 60 * time.Second
// defaultExportTimeout is the time the otelsdk.Reader waits on an export before cancelling it.
// At the time of writing this is the same as the otelsdk.Reader's default export timeout default.
//
// note: in practice we are more likely to hit the http.Client Timeout in telemetry.MetricsClient.
// That http.Client Timeout is 15 seconds (at the time of writing). The otelsdk.Reader will use
// defaultExportTimeout for the entire Export call, but since the http.Client's Timeout is 15s,
// we should hit that first before reaching the 30 second timeout set here.
defaultExportTimeout = 30 * time.Second
)
// ConfigProvider is required to provide custom metrics processing. // ConfigProvider is required to provide custom metrics processing.
type ConfigProvider interface { type ConfigProvider interface {
// GetLabels should return a set of OTEL attributes added by default all metrics. // GetLabels should return a set of OTEL attributes added by default all metrics.
GetLabels() map[string]string GetLabels() map[string]string
// GetFilters should return filtesr that are required to enable metric processing. // GetFilters should return filtesr that are required to enable metric processing.
// Filters act as an allowlist to collect only the required metrics. // Filters act as an allowlist to collect only the required metrics.
GetFilters() *regexp.Regexp GetFilters() *regexp.Regexp
@ -72,9 +86,12 @@ type OTELSink struct {
// NewOTELReader returns a configured OTEL PeriodicReader to export metrics every X seconds. // NewOTELReader returns a configured OTEL PeriodicReader to export metrics every X seconds.
// It configures the reader with a custom OTELExporter with a MetricsClient to transform and export // It configures the reader with a custom OTELExporter with a MetricsClient to transform and export
// metrics in OTLP format to an external url. // metrics in OTLP format to an external url.
func NewOTELReader(client MetricsClient, endpointProvider EndpointProvider, exportInterval time.Duration) otelsdk.Reader { func NewOTELReader(client MetricsClient, endpointProvider EndpointProvider) otelsdk.Reader {
exporter := NewOTELExporter(client, endpointProvider) return otelsdk.NewPeriodicReader(
return otelsdk.NewPeriodicReader(exporter, otelsdk.WithInterval(exportInterval)) newOTELExporter(client, endpointProvider),
otelsdk.WithInterval(defaultExportInterval),
otelsdk.WithTimeout(defaultExportTimeout),
)
} }
// NewOTELSink returns a sink which fits the Go Metrics MetricsSink interface. // NewOTELSink returns a sink which fits the Go Metrics MetricsSink interface.