Backport of [HCP Telemetry] Periodic Refresh for Dynamic Telemetry Configuration into release/1.16.x (#18359)

[HCP Telemetry] Periodic Refresh for Dynamic Telemetry Configuration (#18168)

* OTElExporter now uses an EndpointProvider to discover the endpoint

* OTELSink uses a ConfigProvider to obtain filters and labels configuration

* improve tests for otel_sink

* Regex logic is moved into client for a method on the TelemetryConfig object

* Create a telemetry_config_provider and update deps to use it

* Fix conversion

* fix import newline

* Add logger to hcp client and move telemetry_config out of the client.go file

* Add a telemetry_config.go to refactor client.go

* Update deps

* update hcp deps test

* Modify telemetry_config_providers

* Check for nil filters

* PR review updates

* Fix comments and move around pieces

* Fix comments

* Remove context from client struct

* Moved ctx out of sink struct and fixed filters, added a test

* Remove named imports, use errors.New if not fformatting

* Remove HCP dependencies in telemetry package

* Add success metric and move lock only to grab the t.cfgHahs

* Update hash

* fix nits

* Create an equals method and add tests

* Improve telemetry_config_provider.go tests

* Add race test

* Add missing godoc

* Remove mock for MetricsClient

* Avoid goroutine test panics

* trying to kick CI lint issues by upgrading mod

* imprve test code and add hasher for testing

* Use structure logging for filters, fix error constants, and default to allow all regex

* removed hashin and modify logic to simplify

* Improve race test and fix PR feedback by removing hash equals and avoid testing the timer.Ticker logic, and instead unit test

* Ran make go-mod-tidy

* Use errtypes in the test

* Add changelog

* add safety check for exporter endpoint

* remove require.Contains by using error types, fix structure logging, and fix success metric typo in exporter

* Fixed race test to have changing config values

* Send success metric before modifying config

* Avoid the defer and move the success metric under
This commit is contained in:
Ashvitha 2023-08-02 13:37:39 -04:00 committed by GitHub
parent 10600efb88
commit e8c91d9d70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1582 additions and 524 deletions

3
.changelog/18168.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
hcp: Add dynamic configuration support for the export of server metrics to HCP.
```

View File

@ -35,21 +35,6 @@ type Client interface {
DiscoverServers(ctx context.Context) ([]string, error)
}
// MetricsConfig holds metrics specific configuration for the TelemetryConfig.
// The endpoint field overrides the TelemetryConfig endpoint.
type MetricsConfig struct {
Filters []string
Endpoint string
}
// TelemetryConfig contains configuration for telemetry data forwarded by Consul servers
// to the HCP Telemetry gateway.
type TelemetryConfig struct {
Endpoint string
Labels map[string]string
MetricsConfig *MetricsConfig
}
type BootstrapConfig struct {
Name string
BootstrapExpect int
@ -112,10 +97,14 @@ func (c *hcpClient) FetchTelemetryConfig(ctx context.Context) (*TelemetryConfig,
resp, err := c.tgw.AgentTelemetryConfig(params, nil)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to fetch from HCP: %w", err)
}
return convertTelemetryConfig(resp)
if err := validateAgentTelemetryConfigPayload(resp); err != nil {
return nil, fmt.Errorf("invalid response payload: %w", err)
}
return convertAgentTelemetryResponse(ctx, resp, c.cfg)
}
func (c *hcpClient) FetchBootstrap(ctx context.Context) (*BootstrapConfig, error) {
@ -272,60 +261,3 @@ func (c *hcpClient) DiscoverServers(ctx context.Context) ([]string, error) {
return servers, nil
}
// convertTelemetryConfig validates the AgentTelemetryConfig payload and converts it into a TelemetryConfig object.
func convertTelemetryConfig(resp *hcptelemetry.AgentTelemetryConfigOK) (*TelemetryConfig, error) {
if resp.Payload == nil {
return nil, fmt.Errorf("missing payload")
}
if resp.Payload.TelemetryConfig == nil {
return nil, fmt.Errorf("missing telemetry config")
}
payloadConfig := resp.Payload.TelemetryConfig
var metricsConfig MetricsConfig
if payloadConfig.Metrics != nil {
metricsConfig.Endpoint = payloadConfig.Metrics.Endpoint
metricsConfig.Filters = payloadConfig.Metrics.IncludeList
}
return &TelemetryConfig{
Endpoint: payloadConfig.Endpoint,
Labels: payloadConfig.Labels,
MetricsConfig: &metricsConfig,
}, nil
}
// Enabled verifies if telemetry is enabled by ensuring a valid endpoint has been retrieved.
// It returns full metrics endpoint and true if a valid endpoint was obtained.
func (t *TelemetryConfig) Enabled() (string, bool) {
endpoint := t.Endpoint
if override := t.MetricsConfig.Endpoint; override != "" {
endpoint = override
}
if endpoint == "" {
return "", false
}
// The endpoint from Telemetry Gateway is a domain without scheme, and without the metrics path, so they must be added.
return endpoint + metricsGatewayPath, true
}
// DefaultLabels returns a set of <key, value> string pairs that must be added as attributes to all exported telemetry data.
func (t *TelemetryConfig) DefaultLabels(cfg config.CloudConfig) map[string]string {
labels := make(map[string]string)
nodeID := string(cfg.NodeID)
if nodeID != "" {
labels["node_id"] = nodeID
}
if cfg.NodeName != "" {
labels["node_name"] = cfg.NodeName
}
for k, v := range t.Labels {
labels[k] = v
}
return labels
}

View File

@ -2,200 +2,122 @@ package client
import (
"context"
"fmt"
"net/url"
"regexp"
"testing"
"time"
"github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/hcp-sdk-go/clients/cloud-consul-telemetry-gateway/preview/2023-04-14/client/consul_telemetry_service"
"github.com/go-openapi/runtime"
hcptelemetry "github.com/hashicorp/hcp-sdk-go/clients/cloud-consul-telemetry-gateway/preview/2023-04-14/client/consul_telemetry_service"
"github.com/hashicorp/hcp-sdk-go/clients/cloud-consul-telemetry-gateway/preview/2023-04-14/models"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
type mockTGW struct {
mockResponse *hcptelemetry.AgentTelemetryConfigOK
mockError error
}
func (m *mockTGW) AgentTelemetryConfig(params *hcptelemetry.AgentTelemetryConfigParams, authInfo runtime.ClientAuthInfoWriter, opts ...hcptelemetry.ClientOption) (*hcptelemetry.AgentTelemetryConfigOK, error) {
return m.mockResponse, m.mockError
}
func (m *mockTGW) GetLabelValues(params *hcptelemetry.GetLabelValuesParams, authInfo runtime.ClientAuthInfoWriter, opts ...hcptelemetry.ClientOption) (*hcptelemetry.GetLabelValuesOK, error) {
return hcptelemetry.NewGetLabelValuesOK(), nil
}
func (m *mockTGW) QueryRangeBatch(params *hcptelemetry.QueryRangeBatchParams, authInfo runtime.ClientAuthInfoWriter, opts ...hcptelemetry.ClientOption) (*hcptelemetry.QueryRangeBatchOK, error) {
return hcptelemetry.NewQueryRangeBatchOK(), nil
}
func (m *mockTGW) SetTransport(transport runtime.ClientTransport) {}
type expectedTelemetryCfg struct {
endpoint string
labels map[string]string
filters string
refreshInterval time.Duration
}
func TestFetchTelemetryConfig(t *testing.T) {
t.Parallel()
for name, test := range map[string]struct {
metricsEndpoint string
expect func(*MockClient)
disabled bool
}{
"success": {
expect: func(mockClient *MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&TelemetryConfig{
Endpoint: "https://test.com",
MetricsConfig: &MetricsConfig{
Endpoint: "",
},
}, nil)
},
metricsEndpoint: "https://test.com/v1/metrics",
},
"overrideMetricsEndpoint": {
expect: func(mockClient *MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&TelemetryConfig{
Endpoint: "https://test.com",
MetricsConfig: &MetricsConfig{
Endpoint: "https://test.com",
},
}, nil)
},
metricsEndpoint: "https://test.com/v1/metrics",
},
"disabledWithEmptyEndpoint": {
expect: func(mockClient *MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&TelemetryConfig{
Endpoint: "",
MetricsConfig: &MetricsConfig{
Endpoint: "",
},
}, nil)
},
disabled: true,
},
} {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
mock := NewMockClient(t)
test.expect(mock)
telemetryCfg, err := mock.FetchTelemetryConfig(context.Background())
require.NoError(t, err)
if test.disabled {
endpoint, ok := telemetryCfg.Enabled()
require.False(t, ok)
require.Empty(t, endpoint)
return
}
endpoint, ok := telemetryCfg.Enabled()
require.True(t, ok)
require.Equal(t, test.metricsEndpoint, endpoint)
})
}
}
func TestConvertTelemetryConfig(t *testing.T) {
t.Parallel()
for name, test := range map[string]struct {
resp *consul_telemetry_service.AgentTelemetryConfigOK
expectedTelemetryCfg *TelemetryConfig
for name, tc := range map[string]struct {
mockResponse *hcptelemetry.AgentTelemetryConfigOK
mockError error
wantErr string
expected *expectedTelemetryCfg
}{
"success": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{
TelemetryConfig: &models.HashicorpCloudConsulTelemetry20230414TelemetryConfig{
Endpoint: "https://test.com",
Labels: map[string]string{"test": "test"},
"errorsWithFetchFailure": {
mockError: fmt.Errorf("failed to fetch from HCP"),
mockResponse: nil,
wantErr: "failed to fetch from HCP",
},
},
},
expectedTelemetryCfg: &TelemetryConfig{
Endpoint: "https://test.com",
Labels: map[string]string{"test": "test"},
MetricsConfig: &MetricsConfig{},
},
},
"successWithMetricsConfig": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{
TelemetryConfig: &models.HashicorpCloudConsulTelemetry20230414TelemetryConfig{
Endpoint: "https://test.com",
Labels: map[string]string{"test": "test"},
Metrics: &models.HashicorpCloudConsulTelemetry20230414TelemetryMetricsConfig{
Endpoint: "https://metrics-test.com",
IncludeList: []string{"consul.raft.apply"},
},
},
},
},
expectedTelemetryCfg: &TelemetryConfig{
Endpoint: "https://test.com",
Labels: map[string]string{"test": "test"},
MetricsConfig: &MetricsConfig{
Endpoint: "https://metrics-test.com",
Filters: []string{"consul.raft.apply"},
},
},
},
"errorsWithNilPayload": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{},
wantErr: "missing payload",
},
"errorsWithNilTelemetryConfig": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
"errorsWithInvalidPayload": {
mockResponse: &hcptelemetry.AgentTelemetryConfigOK{
Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{},
},
wantErr: "missing telemetry config",
mockError: nil,
wantErr: "invalid response payload",
},
"success:": {
mockResponse: &hcptelemetry.AgentTelemetryConfigOK{
Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{
RefreshConfig: &models.HashicorpCloudConsulTelemetry20230414RefreshConfig{
RefreshInterval: "1s",
},
TelemetryConfig: &models.HashicorpCloudConsulTelemetry20230414TelemetryConfig{
Endpoint: "https://test.com",
Labels: map[string]string{"test": "123"},
Metrics: &models.HashicorpCloudConsulTelemetry20230414TelemetryMetricsConfig{
IncludeList: []string{"consul", "test"},
},
},
},
},
expected: &expectedTelemetryCfg{
endpoint: "https://test.com/v1/metrics",
labels: map[string]string{"test": "123"},
filters: "consul|test",
refreshInterval: 1 * time.Second,
},
},
} {
test := test
tc := tc
t.Run(name, func(t *testing.T) {
t.Parallel()
telemetryCfg, err := convertTelemetryConfig(test.resp)
if test.wantErr != "" {
c := &hcpClient{
tgw: &mockTGW{
mockError: tc.mockError,
mockResponse: tc.mockResponse,
},
}
telemetryCfg, err := c.FetchTelemetryConfig(context.Background())
if tc.wantErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tc.wantErr)
require.Nil(t, telemetryCfg)
require.Contains(t, err.Error(), test.wantErr)
return
}
urlEndpoint, err := url.Parse(tc.expected.endpoint)
require.NoError(t, err)
require.Equal(t, test.expectedTelemetryCfg, telemetryCfg)
})
}
}
func Test_DefaultLabels(t *testing.T) {
for name, tc := range map[string]struct {
cfg config.CloudConfig
expectedLabels map[string]string
}{
"Success": {
cfg: config.CloudConfig{
NodeID: types.NodeID("nodeyid"),
NodeName: "nodey",
},
expectedLabels: map[string]string{
"node_id": "nodeyid",
"node_name": "nodey",
},
},
"NoNodeID": {
cfg: config.CloudConfig{
NodeID: types.NodeID(""),
NodeName: "nodey",
},
expectedLabels: map[string]string{
"node_name": "nodey",
},
},
"NoNodeName": {
cfg: config.CloudConfig{
NodeID: types.NodeID("nodeyid"),
NodeName: "",
},
expectedLabels: map[string]string{
"node_id": "nodeyid",
},
},
"Empty": {
cfg: config.CloudConfig{
NodeID: "",
NodeName: "",
},
expectedLabels: map[string]string{},
},
} {
t.Run(name, func(t *testing.T) {
tCfg := &TelemetryConfig{}
labels := tCfg.DefaultLabels(tc.cfg)
require.Equal(t, labels, tc.expectedLabels)
regexFilters, err := regexp.Compile(tc.expected.filters)
require.NoError(t, err)
expectedCfg := &TelemetryConfig{
MetricsConfig: &MetricsConfig{
Endpoint: urlEndpoint,
Filters: regexFilters,
Labels: tc.expected.labels,
},
RefreshConfig: &RefreshConfig{
RefreshInterval: tc.expected.refreshInterval,
},
}
require.NoError(t, err)
require.Equal(t, expectedCfg, telemetryCfg)
})
}
}

View File

@ -18,6 +18,7 @@ import (
"golang.org/x/oauth2"
"google.golang.org/protobuf/proto"
"github.com/hashicorp/consul/agent/hcp/telemetry"
"github.com/hashicorp/consul/version"
)
@ -38,11 +39,6 @@ const (
defaultErrRespBodyLength = 100
)
// MetricsClient exports Consul metrics in OTLP format to the HCP Telemetry Gateway.
type MetricsClient interface {
ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error
}
// cloudConfig represents cloud config for TLS abstracted in an interface for easy testing.
type CloudConfig interface {
HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error)
@ -58,7 +54,7 @@ type otlpClient struct {
// NewMetricsClient returns a configured MetricsClient.
// The current implementation uses otlpClient to provide retry functionality.
func NewMetricsClient(ctx context.Context, cfg CloudConfig) (MetricsClient, error) {
func NewMetricsClient(ctx context.Context, cfg CloudConfig) (telemetry.MetricsClient, error) {
if cfg == nil {
return nil, fmt.Errorf("failed to init telemetry client: provide valid cloudCfg (Cloud Configuration for TLS)")
}

View File

@ -1,5 +0,0 @@
package client
type MockMetricsClient struct {
MetricsClient
}

View File

@ -0,0 +1,176 @@
package client
import (
"context"
"errors"
"fmt"
"net/url"
"regexp"
"strings"
"time"
"github.com/hashicorp/go-hclog"
hcptelemetry "github.com/hashicorp/hcp-sdk-go/clients/cloud-consul-telemetry-gateway/preview/2023-04-14/client/consul_telemetry_service"
"github.com/hashicorp/consul/agent/hcp/config"
)
var (
// defaultMetricFilters is a regex that matches all metric names.
defaultMetricFilters = regexp.MustCompile(".+")
// Validation errors for AgentTelemetryConfigOK response.
errMissingPayload = errors.New("missing payload")
errMissingTelemetryConfig = errors.New("missing telemetry config")
errMissingRefreshConfig = errors.New("missing refresh config")
errMissingMetricsConfig = errors.New("missing metrics config")
errInvalidRefreshInterval = errors.New("invalid refresh interval")
errInvalidEndpoint = errors.New("invalid metrics endpoint")
)
// TelemetryConfig contains configuration for telemetry data forwarded by Consul servers
// to the HCP Telemetry gateway.
type TelemetryConfig struct {
MetricsConfig *MetricsConfig
RefreshConfig *RefreshConfig
}
// MetricsConfig holds metrics specific configuration within TelemetryConfig.
type MetricsConfig struct {
Labels map[string]string
Filters *regexp.Regexp
Endpoint *url.URL
}
// RefreshConfig contains configuration for the periodic fetch of configuration from HCP.
type RefreshConfig struct {
RefreshInterval time.Duration
}
// MetricsEnabled returns true if metrics export is enabled, i.e. a valid metrics endpoint exists.
func (t *TelemetryConfig) MetricsEnabled() bool {
return t.MetricsConfig.Endpoint != nil
}
// validateAgentTelemetryConfigPayload ensures the returned payload from HCP is valid.
func validateAgentTelemetryConfigPayload(resp *hcptelemetry.AgentTelemetryConfigOK) error {
if resp.Payload == nil {
return errMissingPayload
}
if resp.Payload.TelemetryConfig == nil {
return errMissingTelemetryConfig
}
if resp.Payload.RefreshConfig == nil {
return errMissingRefreshConfig
}
if resp.Payload.TelemetryConfig.Metrics == nil {
return errMissingMetricsConfig
}
return nil
}
// convertAgentTelemetryResponse converts an AgentTelemetryConfig payload into a TelemetryConfig object.
func convertAgentTelemetryResponse(ctx context.Context, resp *hcptelemetry.AgentTelemetryConfigOK, cfg config.CloudConfig) (*TelemetryConfig, error) {
refreshInterval, err := time.ParseDuration(resp.Payload.RefreshConfig.RefreshInterval)
if err != nil {
return nil, fmt.Errorf("%w: %w", errInvalidRefreshInterval, err)
}
telemetryConfig := resp.Payload.TelemetryConfig
metricsEndpoint, err := convertMetricEndpoint(telemetryConfig.Endpoint, telemetryConfig.Metrics.Endpoint)
if err != nil {
return nil, errInvalidEndpoint
}
metricsFilters := convertMetricFilters(ctx, telemetryConfig.Metrics.IncludeList)
metricLabels := convertMetricLabels(telemetryConfig.Labels, cfg)
return &TelemetryConfig{
MetricsConfig: &MetricsConfig{
Endpoint: metricsEndpoint,
Labels: metricLabels,
Filters: metricsFilters,
},
RefreshConfig: &RefreshConfig{
RefreshInterval: refreshInterval,
},
}, nil
}
// convertMetricEndpoint returns a url for the export of metrics, if a valid endpoint was obtained.
// It returns no error, and no url, if an empty endpoint is retrieved (server not registered with CCM).
// It returns an error, and no url, if a bad endpoint is retrieved.
func convertMetricEndpoint(telemetryEndpoint string, metricsEndpoint string) (*url.URL, error) {
// Telemetry endpoint overriden by metrics specific endpoint, if given.
endpoint := telemetryEndpoint
if metricsEndpoint != "" {
endpoint = metricsEndpoint
}
// If endpoint is empty, server not registered with CCM, no error returned.
if endpoint == "" {
return nil, nil
}
// Endpoint from CTW has no metrics path, so it must be added.
rawUrl := endpoint + metricsGatewayPath
u, err := url.ParseRequestURI(rawUrl)
if err != nil {
return nil, fmt.Errorf("%w: %w", errInvalidEndpoint, err)
}
return u, nil
}
// convertMetricFilters returns a valid regex used to filter metrics.
// if invalid filters are given, a defaults regex that allow all metrics is returned.
func convertMetricFilters(ctx context.Context, payloadFilters []string) *regexp.Regexp {
logger := hclog.FromContext(ctx)
validFilters := make([]string, 0, len(payloadFilters))
for _, filter := range payloadFilters {
_, err := regexp.Compile(filter)
if err != nil {
logger.Error("invalid filter", "error", err)
continue
}
validFilters = append(validFilters, filter)
}
if len(validFilters) == 0 {
logger.Error("no valid filters")
return defaultMetricFilters
}
// Combine the valid regex strings with OR.
finalRegex := strings.Join(validFilters, "|")
composedRegex, err := regexp.Compile(finalRegex)
if err != nil {
logger.Error("failed to compile final regex", "error", err)
return defaultMetricFilters
}
return composedRegex
}
// convertMetricLabels returns a set of <key, value> string pairs that must be added as attributes to all exported telemetry data.
func convertMetricLabels(payloadLabels map[string]string, cfg config.CloudConfig) map[string]string {
labels := make(map[string]string)
nodeID := string(cfg.NodeID)
if nodeID != "" {
labels["node_id"] = nodeID
}
if cfg.NodeName != "" {
labels["node_name"] = cfg.NodeName
}
for k, v := range payloadLabels {
labels[k] = v
}
return labels
}

View File

@ -0,0 +1,377 @@
package client
import (
"context"
"net/url"
"regexp"
"testing"
"time"
"github.com/hashicorp/hcp-sdk-go/clients/cloud-consul-telemetry-gateway/preview/2023-04-14/client/consul_telemetry_service"
"github.com/hashicorp/hcp-sdk-go/clients/cloud-consul-telemetry-gateway/preview/2023-04-14/models"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/consul/types"
)
func TestValidateAgentTelemetryConfigPayload(t *testing.T) {
t.Parallel()
for name, tc := range map[string]struct {
resp *consul_telemetry_service.AgentTelemetryConfigOK
wantErr error
}{
"errorsWithNilPayload": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{},
wantErr: errMissingPayload,
},
"errorsWithNilTelemetryConfig": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{
RefreshConfig: &models.HashicorpCloudConsulTelemetry20230414RefreshConfig{},
},
},
wantErr: errMissingTelemetryConfig,
},
"errorsWithNilRefreshConfig": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{
TelemetryConfig: &models.HashicorpCloudConsulTelemetry20230414TelemetryConfig{},
},
},
wantErr: errMissingRefreshConfig,
},
"errorsWithNilMetricsConfig": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{
TelemetryConfig: &models.HashicorpCloudConsulTelemetry20230414TelemetryConfig{},
RefreshConfig: &models.HashicorpCloudConsulTelemetry20230414RefreshConfig{},
},
},
wantErr: errMissingMetricsConfig,
},
"success": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{
TelemetryConfig: &models.HashicorpCloudConsulTelemetry20230414TelemetryConfig{
Metrics: &models.HashicorpCloudConsulTelemetry20230414TelemetryMetricsConfig{},
},
RefreshConfig: &models.HashicorpCloudConsulTelemetry20230414RefreshConfig{},
},
},
},
} {
tc := tc
t.Run(name, func(t *testing.T) {
t.Parallel()
err := validateAgentTelemetryConfigPayload(tc.resp)
if tc.wantErr != nil {
require.ErrorIs(t, err, tc.wantErr)
return
}
require.NoError(t, err)
})
}
}
func TestConvertAgentTelemetryResponse(t *testing.T) {
validTestURL, err := url.Parse("https://test.com/v1/metrics")
require.NoError(t, err)
validTestFilters, err := regexp.Compile("test|consul")
require.NoError(t, err)
for name, tc := range map[string]struct {
resp *consul_telemetry_service.AgentTelemetryConfigOK
expectedTelemetryCfg *TelemetryConfig
wantErr error
expectedEnabled bool
}{
"success": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{
TelemetryConfig: &models.HashicorpCloudConsulTelemetry20230414TelemetryConfig{
Endpoint: "https://test.com",
Labels: map[string]string{"test": "test"},
Metrics: &models.HashicorpCloudConsulTelemetry20230414TelemetryMetricsConfig{
IncludeList: []string{"test", "consul"},
},
},
RefreshConfig: &models.HashicorpCloudConsulTelemetry20230414RefreshConfig{
RefreshInterval: "2s",
},
},
},
expectedTelemetryCfg: &TelemetryConfig{
MetricsConfig: &MetricsConfig{
Endpoint: validTestURL,
Labels: map[string]string{"test": "test"},
Filters: validTestFilters,
},
RefreshConfig: &RefreshConfig{
RefreshInterval: 2 * time.Second,
},
},
expectedEnabled: true,
},
"successNoEndpoint": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{
TelemetryConfig: &models.HashicorpCloudConsulTelemetry20230414TelemetryConfig{
Endpoint: "",
Labels: map[string]string{"test": "test"},
Metrics: &models.HashicorpCloudConsulTelemetry20230414TelemetryMetricsConfig{
IncludeList: []string{"test", "consul"},
},
},
RefreshConfig: &models.HashicorpCloudConsulTelemetry20230414RefreshConfig{
RefreshInterval: "2s",
},
},
},
expectedTelemetryCfg: &TelemetryConfig{
MetricsConfig: &MetricsConfig{
Endpoint: nil,
Labels: map[string]string{"test": "test"},
Filters: validTestFilters,
},
RefreshConfig: &RefreshConfig{
RefreshInterval: 2 * time.Second,
},
},
expectedEnabled: false,
},
"successBadFilters": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{
TelemetryConfig: &models.HashicorpCloudConsulTelemetry20230414TelemetryConfig{
Endpoint: "https://test.com",
Labels: map[string]string{"test": "test"},
Metrics: &models.HashicorpCloudConsulTelemetry20230414TelemetryMetricsConfig{
IncludeList: []string{"[", "(*LF)"},
},
},
RefreshConfig: &models.HashicorpCloudConsulTelemetry20230414RefreshConfig{
RefreshInterval: "2s",
},
},
},
expectedTelemetryCfg: &TelemetryConfig{
MetricsConfig: &MetricsConfig{
Endpoint: validTestURL,
Labels: map[string]string{"test": "test"},
Filters: defaultMetricFilters,
},
RefreshConfig: &RefreshConfig{
RefreshInterval: 2 * time.Second,
},
},
expectedEnabled: true,
},
"errorsWithInvalidRefreshInterval": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{
TelemetryConfig: &models.HashicorpCloudConsulTelemetry20230414TelemetryConfig{
Metrics: &models.HashicorpCloudConsulTelemetry20230414TelemetryMetricsConfig{},
},
RefreshConfig: &models.HashicorpCloudConsulTelemetry20230414RefreshConfig{
RefreshInterval: "300ws",
},
},
},
wantErr: errInvalidRefreshInterval,
},
"errorsWithInvalidEndpoint": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{
TelemetryConfig: &models.HashicorpCloudConsulTelemetry20230414TelemetryConfig{
Metrics: &models.HashicorpCloudConsulTelemetry20230414TelemetryMetricsConfig{
Endpoint: " ",
},
},
RefreshConfig: &models.HashicorpCloudConsulTelemetry20230414RefreshConfig{
RefreshInterval: "1s",
},
},
},
wantErr: errInvalidEndpoint,
},
} {
t.Run(name, func(t *testing.T) {
telemetryCfg, err := convertAgentTelemetryResponse(context.Background(), tc.resp, config.CloudConfig{})
if tc.wantErr != nil {
require.ErrorIs(t, err, tc.wantErr)
require.Nil(t, telemetryCfg)
return
}
require.NoError(t, err)
require.Equal(t, tc.expectedTelemetryCfg, telemetryCfg)
require.Equal(t, tc.expectedEnabled, telemetryCfg.MetricsEnabled())
})
}
}
func TestConvertMetricEndpoint(t *testing.T) {
t.Parallel()
for name, tc := range map[string]struct {
endpoint string
override string
expected string
wantErr error
}{
"success": {
endpoint: "https://test.com",
expected: "https://test.com/v1/metrics",
},
"successMetricsOverride": {
endpoint: "https://test.com",
override: "https://override.com",
expected: "https://override.com/v1/metrics",
},
"noErrorWithEmptyEndpoints": {
endpoint: "",
override: "",
expected: "",
},
"errorWithInvalidURL": {
endpoint: " ",
override: "",
wantErr: errInvalidEndpoint,
},
} {
tc := tc
t.Run(name, func(t *testing.T) {
t.Parallel()
u, err := convertMetricEndpoint(tc.endpoint, tc.override)
if tc.wantErr != nil {
require.ErrorIs(t, err, tc.wantErr)
require.Empty(t, u)
return
}
if tc.expected == "" {
require.Nil(t, u)
require.NoError(t, err)
return
}
require.NotNil(t, u)
require.NoError(t, err)
require.Equal(t, tc.expected, u.String())
})
}
}
func TestConvertMetricFilters(t *testing.T) {
t.Parallel()
for name, tc := range map[string]struct {
filters []string
expectedRegexString string
matches []string
wantErr string
wantMatch bool
}{
"badFilterRegex": {
filters: []string{"(*LF)"},
expectedRegexString: defaultMetricFilters.String(),
matches: []string{"consul.raft.peers", "consul.mem.heap_size"},
wantMatch: true,
},
"emptyRegex": {
filters: []string{},
expectedRegexString: defaultMetricFilters.String(),
matches: []string{"consul.raft.peers", "consul.mem.heap_size"},
wantMatch: true,
},
"matchFound": {
filters: []string{"raft.*", "mem.*"},
expectedRegexString: "raft.*|mem.*",
matches: []string{"consul.raft.peers", "consul.mem.heap_size"},
wantMatch: true,
},
"matchNotFound": {
filters: []string{"mem.*"},
matches: []string{"consul.raft.peers", "consul.txn.apply"},
expectedRegexString: "mem.*",
wantMatch: false,
},
} {
tc := tc
t.Run(name, func(t *testing.T) {
t.Parallel()
f := convertMetricFilters(context.Background(), tc.filters)
require.Equal(t, tc.expectedRegexString, f.String())
for _, metric := range tc.matches {
m := f.MatchString(metric)
require.Equal(t, tc.wantMatch, m)
}
})
}
}
func TestConvertMetricLabels(t *testing.T) {
t.Parallel()
for name, tc := range map[string]struct {
payloadLabels map[string]string
cfg config.CloudConfig
expectedLabels map[string]string
}{
"Success": {
payloadLabels: map[string]string{
"ctw_label": "test",
},
cfg: config.CloudConfig{
NodeID: types.NodeID("nodeyid"),
NodeName: "nodey",
},
expectedLabels: map[string]string{
"ctw_label": "test",
"node_id": "nodeyid",
"node_name": "nodey",
},
},
"NoNodeID": {
payloadLabels: map[string]string{
"ctw_label": "test",
},
cfg: config.CloudConfig{
NodeID: types.NodeID(""),
NodeName: "nodey",
},
expectedLabels: map[string]string{
"ctw_label": "test",
"node_name": "nodey",
},
},
"NoNodeName": {
payloadLabels: map[string]string{
"ctw_label": "test",
},
cfg: config.CloudConfig{
NodeID: types.NodeID("nodeyid"),
NodeName: "",
},
expectedLabels: map[string]string{
"ctw_label": "test",
"node_id": "nodeyid",
},
},
"Empty": {
cfg: config.CloudConfig{
NodeID: "",
NodeName: "",
},
expectedLabels: map[string]string{},
},
} {
tc := tc
t.Run(name, func(t *testing.T) {
t.Parallel()
labels := convertMetricLabels(tc.payloadLabels, tc.cfg)
require.Equal(t, labels, tc.expectedLabels)
})
}
}

View File

@ -6,7 +6,6 @@ package hcp
import (
"context"
"fmt"
"net/url"
"time"
"github.com/armon/go-metrics"
@ -44,7 +43,11 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (Deps, error) {
return Deps{}, fmt.Errorf("failed to init metrics client: %w", err)
}
sink := sink(ctx, client, metricsClient, cfg)
sink, err := sink(ctx, client, metricsClient)
if err != nil {
// Do not prevent server start if sink init fails, only log error.
logger.Error("failed to init sink", "error", err)
}
return Deps{
Client: client,
@ -53,50 +56,44 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (Deps, error) {
}, nil
}
// sink provides initializes an OTELSink which forwards Consul metrics to HCP.
// sink initializes an OTELSink which forwards Consul metrics to HCP.
// The sink is only initialized if the server is registered with the management plane (CCM).
// This step should not block server initialization, so errors are logged, but not returned.
// This step should not block server initialization, errors are returned, only to be logged.
func sink(
ctx context.Context,
hcpClient hcpclient.Client,
metricsClient hcpclient.MetricsClient,
cfg config.CloudConfig,
) metrics.MetricSink {
metricsClient telemetry.MetricsClient,
) (metrics.MetricSink, error) {
logger := hclog.FromContext(ctx).Named("sink")
reqCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
telemetryCfg, err := hcpClient.FetchTelemetryConfig(reqCtx)
if err != nil {
logger.Error("failed to fetch telemetry config", "error", err)
return nil
return nil, fmt.Errorf("failed to fetch telemetry config: %w", err)
}
endpoint, isEnabled := telemetryCfg.Enabled()
if !isEnabled {
return nil
if !telemetryCfg.MetricsEnabled() {
return nil, nil
}
u, err := url.Parse(endpoint)
cfgProvider, err := NewHCPProvider(ctx, hcpClient, telemetryCfg)
if err != nil {
logger.Error("failed to parse url endpoint", "error", err)
return nil
return nil, fmt.Errorf("failed to init config provider: %w", err)
}
reader := telemetry.NewOTELReader(metricsClient, cfgProvider, telemetry.DefaultExportInterval)
sinkOpts := &telemetry.OTELSinkOpts{
Ctx: ctx,
Reader: telemetry.NewOTELReader(metricsClient, u, telemetry.DefaultExportInterval),
Labels: telemetryCfg.DefaultLabels(cfg),
Filters: telemetryCfg.MetricsConfig.Filters,
Reader: reader,
ConfigProvider: cfgProvider,
}
sink, err := telemetry.NewOTELSink(sinkOpts)
sink, err := telemetry.NewOTELSink(ctx, sinkOpts)
if err != nil {
logger.Error("failed to init OTEL sink", "error", err)
return nil
return nil, fmt.Errorf("failed create OTELSink: %w", err)
}
logger.Debug("initialized HCP metrics sink")
return sink
return sink, nil
}

View File

@ -3,98 +3,97 @@ package hcp
import (
"context"
"fmt"
"net/url"
"regexp"
"testing"
"time"
"github.com/hashicorp/consul/agent/hcp/config"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/consul/agent/hcp/telemetry"
)
type mockMetricsClient struct {
telemetry.MetricsClient
}
func TestSink(t *testing.T) {
t.Parallel()
for name, test := range map[string]struct {
expect func(*client.MockClient)
cloudCfg config.CloudConfig
wantErr string
expectedSink bool
}{
"success": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{
Endpoint: "https://test.com",
MetricsConfig: &client.MetricsConfig{
Endpoint: "https://test.com",
},
}, nil)
},
cloudCfg: config.CloudConfig{
NodeID: types.NodeID("nodeyid"),
NodeName: "nodey",
u, _ := url.Parse("https://test.com/v1/metrics")
filters, _ := regexp.Compile("test")
mt := mockTelemetryConfig(1*time.Second, u, filters)
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mt, nil)
},
expectedSink: true,
},
"noSinkWhenServerNotRegisteredWithCCM": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{
Endpoint: "",
MetricsConfig: &client.MetricsConfig{
Endpoint: "",
},
}, nil)
},
cloudCfg: config.CloudConfig{},
},
"noSinkWhenCCMVerificationFails": {
"noSinkWhenFetchTelemetryConfigFails": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, fmt.Errorf("fetch failed"))
},
cloudCfg: config.CloudConfig{},
wantErr: "failed to fetch telemetry config",
},
"failsWithFetchTelemetryFailure": {
"noSinkWhenServerNotRegisteredWithCCM": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, fmt.Errorf("FetchTelemetryConfig error"))
mt := mockTelemetryConfig(1*time.Second, nil, nil)
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mt, nil)
},
},
"failsWithURLParseErr": {
"noSinkWhenTelemetryConfigProviderInitFails": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{
// Minimum 2 chars for a domain to be valid.
Endpoint: "s",
MetricsConfig: &client.MetricsConfig{
// Invalid domain chars
Endpoint: " ",
},
}, nil)
},
},
"noErrWithEmptyEndpoint": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{
Endpoint: "",
MetricsConfig: &client.MetricsConfig{
Endpoint: "",
},
}, nil)
u, _ := url.Parse("https://test.com/v1/metrics")
// Bad refresh interval forces ConfigProvider creation failure.
mt := mockTelemetryConfig(0*time.Second, u, nil)
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mt, nil)
},
wantErr: "failed to init config provider",
},
} {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
c := client.NewMockClient(t)
mc := client.MockMetricsClient{}
mc := mockMetricsClient{}
test.expect(c)
ctx := context.Background()
s := sink(ctx, c, mc, test.cloudCfg)
if !test.expectedSink {
s, err := sink(ctx, c, mc)
if test.wantErr != "" {
require.NotNil(t, err)
require.Contains(t, err.Error(), test.wantErr)
require.Nil(t, s)
return
}
if !test.expectedSink {
require.Nil(t, s)
require.Nil(t, err)
return
}
require.NotNil(t, s)
})
}
}
func mockTelemetryConfig(refreshInterval time.Duration, metricsEndpoint *url.URL, filters *regexp.Regexp) *client.TelemetryConfig {
return &client.TelemetryConfig{
MetricsConfig: &client.MetricsConfig{
Endpoint: metricsEndpoint,
Filters: filters,
},
RefreshConfig: &client.RefreshConfig{
RefreshInterval: refreshInterval,
},
}
}

View File

@ -58,7 +58,11 @@ func TestManager_SendUpdate(t *testing.T) {
StatusFn: statusF,
})
mgr.testUpdateSent = updateCh
go mgr.Run(context.Background())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go mgr.Run(ctx)
select {
case <-updateCh:
case <-time.After(time.Second):
@ -90,7 +94,11 @@ func TestManager_SendUpdate_Periodic(t *testing.T) {
MinInterval: 100 * time.Millisecond,
})
mgr.testUpdateSent = updateCh
go mgr.Run(context.Background())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go mgr.Run(ctx)
select {
case <-updateCh:
case <-time.After(time.Second):

View File

@ -6,7 +6,7 @@ package telemetry
var (
internalMetricTransformFailure []string = []string{"hcp", "otel", "transform", "failure"}
internalMetricExportSuccess []string = []string{"hcp", "otel", "exporter", "export", "sucess"}
internalMetricExportSuccess []string = []string{"hcp", "otel", "exporter", "export", "success"}
internalMetricExportFailure []string = []string{"hcp", "otel", "exporter", "export", "failure"}
internalMetricExporterShutdown []string = []string{"hcp", "otel", "exporter", "shutdown"}

View File

@ -1,37 +0,0 @@
package telemetry
import (
"fmt"
"regexp"
"strings"
"github.com/hashicorp/go-multierror"
)
// newFilterRegex returns a valid regex used to filter metrics.
// It will fail if there are 0 valid regex filters given.
func newFilterRegex(filters []string) (*regexp.Regexp, error) {
var mErr error
validFilters := make([]string, 0, len(filters))
for _, filter := range filters {
_, err := regexp.Compile(filter)
if err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("compilation of filter %q failed: %w", filter, err))
continue
}
validFilters = append(validFilters, filter)
}
if len(validFilters) == 0 {
return nil, multierror.Append(mErr, fmt.Errorf("no valid filters"))
}
// Combine the valid regex strings with an OR.
finalRegex := strings.Join(validFilters, "|")
composedRegex, err := regexp.Compile(finalRegex)
if err != nil {
return nil, fmt.Errorf("failed to compile regex: %w", err)
}
return composedRegex, nil
}

View File

@ -1,58 +0,0 @@
package telemetry
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestFilter(t *testing.T) {
t.Parallel()
for name, tc := range map[string]struct {
filters []string
expectedRegexString string
matches []string
wantErr string
wantMatch bool
}{
"badFilterRegex": {
filters: []string{"(*LF)"},
wantErr: "no valid filters",
},
"failsWithNoRegex": {
filters: []string{},
wantErr: "no valid filters",
},
"matchFound": {
filters: []string{"raft.*", "mem.*"},
expectedRegexString: "raft.*|mem.*",
matches: []string{"consul.raft.peers", "consul.mem.heap_size"},
wantMatch: true,
},
"matchNotFound": {
filters: []string{"mem.*"},
matches: []string{"consul.raft.peers", "consul.txn.apply"},
expectedRegexString: "mem.*",
wantMatch: false,
},
} {
tc := tc
t.Run(name, func(t *testing.T) {
t.Parallel()
f, err := newFilterRegex(tc.filters)
if tc.wantErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tc.wantErr)
return
}
require.NoError(t, err)
require.Equal(t, tc.expectedRegexString, f.String())
for _, metric := range tc.matches {
m := f.MatchString(metric)
require.Equal(t, tc.wantMatch, m)
}
})
}
}

View File

@ -9,23 +9,34 @@ import (
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)
// MetricsClient exports Consul metrics in OTLP format to the desired endpoint.
type MetricsClient interface {
ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error
}
// EndpointProvider provides the endpoint where metrics are exported to by the OTELExporter.
// EndpointProvider exposes the GetEndpoint() interface method to fetch the endpoint.
// This abstraction layer offers flexibility, in particular for dynamic configuration or changes to the endpoint.
type EndpointProvider interface {
GetEndpoint() *url.URL
}
// OTELExporter is a custom implementation of a OTEL Metrics SDK metrics.Exporter.
// The exporter is used by a OTEL Metrics SDK PeriodicReader to export aggregated metrics.
// This allows us to use a custom client - HCP authenticated MetricsClient.
type OTELExporter struct {
client hcpclient.MetricsClient
endpoint *url.URL
client MetricsClient
endpointProvider EndpointProvider
}
// NewOTELExporter returns a configured OTELExporter
func NewOTELExporter(client hcpclient.MetricsClient, endpoint *url.URL) *OTELExporter {
// NewOTELExporter returns a configured OTELExporter.
func NewOTELExporter(client MetricsClient, endpointProvider EndpointProvider) *OTELExporter {
return &OTELExporter{
client: client,
endpoint: endpoint,
endpointProvider: endpointProvider,
}
}
@ -54,11 +65,17 @@ func (e *OTELExporter) Aggregation(kind metric.InstrumentKind) aggregation.Aggre
// Export serializes and transmits metric data to a receiver.
func (e *OTELExporter) Export(ctx context.Context, metrics *metricdata.ResourceMetrics) error {
endpoint := e.endpointProvider.GetEndpoint()
if endpoint == nil {
return nil
}
otlpMetrics := transformOTLP(metrics)
if isEmpty(otlpMetrics) {
return nil
}
err := e.client.ExportMetrics(ctx, otlpMetrics, e.endpoint.String())
err := e.client.ExportMetrics(ctx, otlpMetrics, endpoint.String())
if err != nil {
goMetrics.IncrCounter(internalMetricExportFailure, 1)
return fmt.Errorf("failed to export metrics: %w", err)

View File

@ -15,8 +15,10 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)
"github.com/hashicorp/consul/agent/hcp/client"
const (
testExportEndpoint = "https://test.com/v1/metrics"
)
type mockMetricsClient struct {
@ -27,6 +29,12 @@ func (m *mockMetricsClient) ExportMetrics(ctx context.Context, protoMetrics *met
return m.exportErr
}
type mockEndpointProvider struct {
endpoint *url.URL
}
func (m *mockEndpointProvider) GetEndpoint() *url.URL { return m.endpoint }
func TestTemporality(t *testing.T) {
t.Parallel()
exp := &OTELExporter{}
@ -66,8 +74,13 @@ func TestExport(t *testing.T) {
for name, test := range map[string]struct {
wantErr string
metrics *metricdata.ResourceMetrics
client client.MetricsClient
client MetricsClient
provider EndpointProvider
}{
"earlyReturnWithoutEndpoint": {
client: &mockMetricsClient{},
provider: &mockEndpointProvider{},
},
"earlyReturnWithoutScopeMetrics": {
client: &mockMetricsClient{},
metrics: mutateMetrics(nil),
@ -100,7 +113,16 @@ func TestExport(t *testing.T) {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
exp := NewOTELExporter(test.client, &url.URL{})
provider := test.provider
if provider == nil {
u, err := url.Parse(testExportEndpoint)
require.NoError(t, err)
provider = &mockEndpointProvider{
endpoint: u,
}
}
exp := NewOTELExporter(test.client, provider)
err := exp.Export(context.Background(), test.metrics)
if test.wantErr != "" {
@ -119,7 +141,7 @@ func TestExport(t *testing.T) {
// sets a shared global sink.
func TestExport_CustomMetrics(t *testing.T) {
for name, tc := range map[string]struct {
client client.MetricsClient
client MetricsClient
metricKey []string
operation string
}{
@ -154,7 +176,12 @@ func TestExport_CustomMetrics(t *testing.T) {
metrics.NewGlobal(cfg, sink)
// Perform operation that emits metric.
exp := NewOTELExporter(tc.client, &url.URL{})
u, err := url.Parse(testExportEndpoint)
require.NoError(t, err)
exp := NewOTELExporter(tc.client, &mockEndpointProvider{
endpoint: u,
})
ctx := context.Background()
switch tc.operation {

View File

@ -3,8 +3,7 @@ package telemetry
import (
"bytes"
"context"
"fmt"
"net/url"
"errors"
"regexp"
"strings"
"sync"
@ -16,19 +15,24 @@ import (
otelmetric "go.opentelemetry.io/otel/metric"
otelsdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
"github.com/hashicorp/consul/agent/hcp/client"
)
// DefaultExportInterval is a default time interval between export of aggregated metrics.
const DefaultExportInterval = 10 * time.Second
// ConfigProvider is required to provide custom metrics processing.
type ConfigProvider interface {
// GetLabels should return a set of OTEL attributes added by default all metrics.
GetLabels() map[string]string
// GetFilters should return filtesr that are required to enable metric processing.
// Filters act as an allowlist to collect only the required metrics.
GetFilters() *regexp.Regexp
}
// OTELSinkOpts is used to provide configuration when initializing an OTELSink using NewOTELSink.
type OTELSinkOpts struct {
Reader otelsdk.Reader
Ctx context.Context
Filters []string
Labels map[string]string
ConfigProvider ConfigProvider
}
// OTELSink captures and aggregates telemetry data as per the OpenTelemetry (OTEL) specification.
@ -38,7 +42,7 @@ type OTELSink struct {
// spaceReplacer cleans the flattened key by removing any spaces.
spaceReplacer *strings.Replacer
logger hclog.Logger
filters *regexp.Regexp
cfgProvider ConfigProvider
// meterProvider is an OTEL MeterProvider, the entrypoint to the OTEL Metrics SDK.
// It handles reading/export of aggregated metric data.
@ -68,45 +72,32 @@ type OTELSink struct {
// NewOTELReader returns a configured OTEL PeriodicReader to export metrics every X seconds.
// It configures the reader with a custom OTELExporter with a MetricsClient to transform and export
// metrics in OTLP format to an external url.
func NewOTELReader(client client.MetricsClient, url *url.URL, exportInterval time.Duration) otelsdk.Reader {
exporter := NewOTELExporter(client, url)
func NewOTELReader(client MetricsClient, endpointProvider EndpointProvider, exportInterval time.Duration) otelsdk.Reader {
exporter := NewOTELExporter(client, endpointProvider)
return otelsdk.NewPeriodicReader(exporter, otelsdk.WithInterval(exportInterval))
}
// NewOTELSink returns a sink which fits the Go Metrics MetricsSink interface.
// It sets up a MeterProvider and Meter, key pieces of the OTEL Metrics SDK which
// enable us to create OTEL Instruments to record measurements.
func NewOTELSink(opts *OTELSinkOpts) (*OTELSink, error) {
func NewOTELSink(ctx context.Context, opts *OTELSinkOpts) (*OTELSink, error) {
if opts.Reader == nil {
return nil, fmt.Errorf("ferror: provide valid reader")
return nil, errors.New("ferror: provide valid reader")
}
if opts.Ctx == nil {
return nil, fmt.Errorf("ferror: provide valid context")
if opts.ConfigProvider == nil {
return nil, errors.New("ferror: provide valid config provider")
}
logger := hclog.FromContext(opts.Ctx).Named("otel_sink")
logger := hclog.FromContext(ctx).Named("otel_sink")
filterList, err := newFilterRegex(opts.Filters)
if err != nil {
logger.Error("Failed to initialize all filters", "error", err)
}
attrs := make([]attribute.KeyValue, 0, len(opts.Labels))
for k, v := range opts.Labels {
kv := attribute.KeyValue{
Key: attribute.Key(k),
Value: attribute.StringValue(v),
}
attrs = append(attrs, kv)
}
// Setup OTEL Metrics SDK to aggregate, convert and export metrics periodically.
res := resource.NewWithAttributes("", attrs...)
res := resource.NewSchemaless()
meterProvider := otelsdk.NewMeterProvider(otelsdk.WithResource(res), otelsdk.WithReader(opts.Reader))
meter := meterProvider.Meter("github.com/hashicorp/consul/agent/hcp/telemetry")
return &OTELSink{
filters: filterList,
cfgProvider: opts.ConfigProvider,
spaceReplacer: strings.NewReplacer(" ", "_"),
logger: logger,
meterProvider: meterProvider,
@ -138,12 +129,12 @@ func (o *OTELSink) IncrCounter(key []string, val float32) {
func (o *OTELSink) SetGaugeWithLabels(key []string, val float32, labels []gometrics.Label) {
k := o.flattenKey(key)
if !o.filters.MatchString(k) {
if !o.allowedMetric(k) {
return
}
// Set value in global Gauge store.
o.gaugeStore.Set(k, float64(val), toAttributes(labels))
o.gaugeStore.Set(k, float64(val), o.labelsToAttributes(labels))
o.mutex.Lock()
defer o.mutex.Unlock()
@ -166,7 +157,7 @@ func (o *OTELSink) SetGaugeWithLabels(key []string, val float32, labels []gometr
func (o *OTELSink) AddSampleWithLabels(key []string, val float32, labels []gometrics.Label) {
k := o.flattenKey(key)
if !o.filters.MatchString(k) {
if !o.allowedMetric(k) {
return
}
@ -184,7 +175,7 @@ func (o *OTELSink) AddSampleWithLabels(key []string, val float32, labels []gomet
o.histogramInstruments[k] = inst
}
attrs := toAttributes(labels)
attrs := o.labelsToAttributes(labels)
inst.Record(context.TODO(), float64(val), otelmetric.WithAttributes(attrs...))
}
@ -192,7 +183,7 @@ func (o *OTELSink) AddSampleWithLabels(key []string, val float32, labels []gomet
func (o *OTELSink) IncrCounterWithLabels(key []string, val float32, labels []gometrics.Label) {
k := o.flattenKey(key)
if !o.filters.MatchString(k) {
if !o.allowedMetric(k) {
return
}
@ -211,7 +202,7 @@ func (o *OTELSink) IncrCounterWithLabels(key []string, val float32, labels []gom
o.counterInstruments[k] = inst
}
attrs := toAttributes(labels)
attrs := o.labelsToAttributes(labels)
inst.Add(context.TODO(), float64(val), otelmetric.WithAttributes(attrs...))
}
@ -228,17 +219,39 @@ func (o *OTELSink) flattenKey(parts []string) string {
return buf.String()
}
// toAttributes converts go metrics Labels into OTEL format []attributes.KeyValue
func toAttributes(labels []gometrics.Label) []attribute.KeyValue {
if len(labels) == 0 {
return nil
// filter checks the filter allowlist, if it exists, to verify if this metric should be recorded.
func (o *OTELSink) allowedMetric(key string) bool {
if filters := o.cfgProvider.GetFilters(); filters != nil {
return filters.MatchString(key)
}
attrs := make([]attribute.KeyValue, len(labels))
for i, label := range labels {
attrs[i] = attribute.KeyValue{
return true
}
// labelsToAttributes converts go metrics and provider labels into OTEL format []attributes.KeyValue
func (o *OTELSink) labelsToAttributes(goMetricsLabels []gometrics.Label) []attribute.KeyValue {
providerLabels := o.cfgProvider.GetLabels()
length := len(goMetricsLabels) + len(providerLabels)
if length == 0 {
return []attribute.KeyValue{}
}
attrs := make([]attribute.KeyValue, 0, length)
// Convert provider labels to OTEL attributes.
for _, label := range goMetricsLabels {
attrs = append(attrs, attribute.KeyValue{
Key: attribute.Key(label.Name),
Value: attribute.StringValue(label.Value),
})
}
// Convert provider labels to OTEL attributes.
for k, v := range providerLabels {
attrs = append(attrs, attribute.KeyValue{
Key: attribute.Key(k),
Value: attribute.StringValue(v),
})
}
return attrs

View File

@ -3,6 +3,7 @@ package telemetry
import (
"context"
"fmt"
"regexp"
"sort"
"strings"
"sync"
@ -16,15 +17,32 @@ import (
"go.opentelemetry.io/otel/sdk/resource"
)
type mockConfigProvider struct {
filter *regexp.Regexp
labels map[string]string
}
func (m *mockConfigProvider) GetLabels() map[string]string {
return m.labels
}
func (m *mockConfigProvider) GetFilters() *regexp.Regexp {
return m.filter
}
var (
expectedResource = resource.NewWithAttributes("", attribute.KeyValue{
expectedResource = resource.NewSchemaless()
attrs = attribute.NewSet(attribute.KeyValue{
Key: attribute.Key("node_id"),
Value: attribute.StringValue("test"),
})
attrs = attribute.NewSet(attribute.KeyValue{
attrsWithMetricLabel = attribute.NewSet(attribute.KeyValue{
Key: attribute.Key("metric.label"),
Value: attribute.StringValue("test"),
}, attribute.KeyValue{
Key: attribute.Key("node_id"),
Value: attribute.StringValue("test"),
})
expectedSinkMetrics = map[string]metricdata.Metrics{
@ -35,7 +53,7 @@ var (
Data: metricdata.Gauge[float64]{
DataPoints: []metricdata.DataPoint[float64]{
{
Attributes: *attribute.EmptySet(),
Attributes: attrs,
Value: float64(float32(0)),
},
},
@ -48,7 +66,7 @@ var (
Data: metricdata.Gauge[float64]{
DataPoints: []metricdata.DataPoint[float64]{
{
Attributes: attrs,
Attributes: attrsWithMetricLabel,
Value: float64(float32(1.23)),
},
},
@ -61,7 +79,7 @@ var (
Data: metricdata.Sum[float64]{
DataPoints: []metricdata.DataPoint[float64]{
{
Attributes: *attribute.EmptySet(),
Attributes: attrs,
Value: float64(float32(23.23)),
},
},
@ -74,7 +92,7 @@ var (
Data: metricdata.Sum[float64]{
DataPoints: []metricdata.DataPoint[float64]{
{
Attributes: attrs,
Attributes: attrsWithMetricLabel,
Value: float64(float32(1.44)),
},
},
@ -87,7 +105,7 @@ var (
Data: metricdata.Histogram[float64]{
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: *attribute.EmptySet(),
Attributes: attrs,
Count: 1,
Sum: float64(float32(45.32)),
Min: metricdata.NewExtrema(float64(float32(45.32))),
@ -103,7 +121,7 @@ var (
Data: metricdata.Histogram[float64]{
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attrs,
Attributes: attrsWithMetricLabel,
Count: 1,
Sum: float64(float32(26.34)),
Min: metricdata.NewExtrema(float64(float32(26.34))),
@ -121,34 +139,30 @@ func TestNewOTELSink(t *testing.T) {
wantErr string
opts *OTELSinkOpts
}{
"failsWithEmptyLogger": {
wantErr: "ferror: provide valid context",
opts: &OTELSinkOpts{
Reader: metric.NewManualReader(),
},
},
"failsWithEmptyReader": {
wantErr: "ferror: provide valid reader",
opts: &OTELSinkOpts{
Reader: nil,
Ctx: context.Background(),
ConfigProvider: &mockConfigProvider{},
},
},
"failsWithEmptyConfigProvider": {
wantErr: "ferror: provide valid config provider",
opts: &OTELSinkOpts{
Reader: metric.NewManualReader(),
},
},
"success": {
opts: &OTELSinkOpts{
Ctx: context.Background(),
Reader: metric.NewManualReader(),
Labels: map[string]string{
"server": "test",
},
Filters: []string{"raft"},
ConfigProvider: &mockConfigProvider{},
},
},
} {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
sink, err := NewOTELSink(test.opts)
sink, err := NewOTELSink(context.Background(), test.opts)
if test.wantErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), test.wantErr)
@ -169,14 +183,15 @@ func TestOTELSink(t *testing.T) {
ctx := context.Background()
opts := &OTELSinkOpts{
Reader: reader,
Ctx: ctx,
Filters: []string{"raft", "autopilot"},
Labels: map[string]string{
ConfigProvider: &mockConfigProvider{
filter: regexp.MustCompile("raft|autopilot"),
labels: map[string]string{
"node_id": "test",
},
},
}
sink, err := NewOTELSink(opts)
sink, err := NewOTELSink(ctx, opts)
require.NoError(t, err)
labels := []gometrics.Label{
@ -186,12 +201,15 @@ func TestOTELSink(t *testing.T) {
},
}
sink.SetGauge([]string{"test", "bad_filter", "gauge"}, float32(0))
sink.SetGauge([]string{"consul", "raft", "leader"}, float32(0))
sink.SetGaugeWithLabels([]string{"consul", "autopilot", "healthy"}, float32(1.23), labels)
sink.IncrCounter([]string{"test", "bad_filter", "counter"}, float32(23.23))
sink.IncrCounter([]string{"consul", "raft", "state", "leader"}, float32(23.23))
sink.IncrCounterWithLabels([]string{"consul", "raft", "apply"}, float32(1.44), labels)
sink.AddSample([]string{"test", "bad_filter", "sample"}, float32(45.32))
sink.AddSample([]string{"consul", "raft", "leader", "lastContact"}, float32(45.32))
sink.AddSampleWithLabels([]string{"consul", "raft", "commitTime"}, float32(26.34), labels)
@ -202,23 +220,147 @@ func TestOTELSink(t *testing.T) {
isSame(t, expectedSinkMetrics, collected)
}
func TestLabelsToAttributes(t *testing.T) {
for name, test := range map[string]struct {
providerLabels map[string]string
goMetricsLabels []gometrics.Label
expectedOTELAttributes []attribute.KeyValue
}{
"emptyLabels": {
expectedOTELAttributes: []attribute.KeyValue{},
},
"emptyGoMetricsLabels": {
providerLabels: map[string]string{
"node_id": "test",
},
expectedOTELAttributes: []attribute.KeyValue{
{
Key: attribute.Key("node_id"),
Value: attribute.StringValue("test"),
},
},
},
"emptyProviderLabels": {
goMetricsLabels: []gometrics.Label{
{
Name: "server_type",
Value: "internal",
},
},
expectedOTELAttributes: []attribute.KeyValue{
{
Key: attribute.Key("server_type"),
Value: attribute.StringValue("internal"),
},
},
},
"combinedLabels": {
goMetricsLabels: []gometrics.Label{
{
Name: "server_type",
Value: "internal",
},
{
Name: "method",
Value: "get",
},
},
providerLabels: map[string]string{
"node_id": "test",
"node_name": "labels_test",
},
expectedOTELAttributes: []attribute.KeyValue{
{
Key: attribute.Key("server_type"),
Value: attribute.StringValue("internal"),
},
{
Key: attribute.Key("method"),
Value: attribute.StringValue("get"),
},
{
Key: attribute.Key("node_id"),
Value: attribute.StringValue("test"),
},
{
Key: attribute.Key("node_name"),
Value: attribute.StringValue("labels_test"),
},
},
},
} {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
ctx := context.Background()
opts := &OTELSinkOpts{
Reader: metric.NewManualReader(),
ConfigProvider: &mockConfigProvider{
filter: regexp.MustCompile("raft|autopilot"),
labels: test.providerLabels,
},
}
sink, err := NewOTELSink(ctx, opts)
require.NoError(t, err)
require.Equal(t, test.expectedOTELAttributes, sink.labelsToAttributes(test.goMetricsLabels))
})
}
}
func TestOTELSinkFilters(t *testing.T) {
t.Parallel()
for name, tc := range map[string]struct {
cfgProvider ConfigProvider
expected bool
}{
"emptyMatch": {
cfgProvider: &mockConfigProvider{},
expected: true,
},
"matchingFilter": {
cfgProvider: &mockConfigProvider{
filter: regexp.MustCompile("raft"),
},
expected: true,
},
"mismatchFilter": {cfgProvider: &mockConfigProvider{
filter: regexp.MustCompile("test"),
}},
} {
tc := tc
t.Run(name, func(t *testing.T) {
t.Parallel()
testMetricKey := "consul.raft"
s, err := NewOTELSink(context.Background(), &OTELSinkOpts{
ConfigProvider: tc.cfgProvider,
Reader: metric.NewManualReader(),
})
require.NoError(t, err)
require.Equal(t, tc.expected, s.allowedMetric(testMetricKey))
})
}
}
func TestOTELSink_Race(t *testing.T) {
reader := metric.NewManualReader()
ctx := context.Background()
opts := &OTELSinkOpts{
Ctx: ctx,
Reader: reader,
Labels: map[string]string{
defaultLabels := map[string]string{
"node_id": "test",
}
opts := &OTELSinkOpts{
Reader: reader,
ConfigProvider: &mockConfigProvider{
filter: regexp.MustCompile("test"),
labels: defaultLabels,
},
Filters: []string{"test"},
}
sink, err := NewOTELSink(opts)
sink, err := NewOTELSink(context.Background(), opts)
require.NoError(t, err)
samples := 100
expectedMetrics := generateSamples(samples)
expectedMetrics := generateSamples(samples, defaultLabels)
wg := &sync.WaitGroup{}
errCh := make(chan error, samples)
for k, v := range expectedMetrics {
@ -240,8 +382,17 @@ func TestOTELSink_Race(t *testing.T) {
}
// generateSamples generates n of each gauges, counter and histogram measurements to use for test purposes.
func generateSamples(n int) map[string]metricdata.Metrics {
func generateSamples(n int, labels map[string]string) map[string]metricdata.Metrics {
generated := make(map[string]metricdata.Metrics, 3*n)
attrs := *attribute.EmptySet()
kvs := make([]attribute.KeyValue, 0, len(labels))
for k, v := range labels {
kvs = append(kvs, attribute.KeyValue{Key: attribute.Key(k), Value: attribute.StringValue(v)})
}
if len(kvs) > 0 {
attrs = attribute.NewSet(kvs...)
}
for i := 0; i < n; i++ {
v := 12.3
@ -251,7 +402,7 @@ func generateSamples(n int) map[string]metricdata.Metrics {
Data: metricdata.Gauge[float64]{
DataPoints: []metricdata.DataPoint[float64]{
{
Attributes: *attribute.EmptySet(),
Attributes: attrs,
Value: float64(float32(v)),
},
},
@ -267,7 +418,7 @@ func generateSamples(n int) map[string]metricdata.Metrics {
Data: metricdata.Sum[float64]{
DataPoints: []metricdata.DataPoint[float64]{
{
Attributes: *attribute.EmptySet(),
Attributes: attrs,
Value: float64(float32(v)),
},
},
@ -284,7 +435,7 @@ func generateSamples(n int) map[string]metricdata.Metrics {
Data: metricdata.Histogram[float64]{
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: *attribute.EmptySet(),
Attributes: attrs,
Sum: float64(float32(v)),
Max: metricdata.NewExtrema(float64(float32(v))),
Min: metricdata.NewExtrema(float64(float32(v))),

View File

@ -0,0 +1,156 @@
package hcp
import (
"context"
"fmt"
"net/url"
"regexp"
"sync"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/agent/hcp/telemetry"
)
var (
// internalMetricRefreshFailure is a metric to monitor refresh failures.
internalMetricRefreshFailure []string = []string{"hcp", "telemetry_config_provider", "refresh", "failure"}
// internalMetricRefreshSuccess is a metric to monitor refresh successes.
internalMetricRefreshSuccess []string = []string{"hcp", "telemetry_config_provider", "refresh", "success"}
)
// Ensure hcpProviderImpl implements telemetry provider interfaces.
var _ telemetry.ConfigProvider = &hcpProviderImpl{}
var _ telemetry.EndpointProvider = &hcpProviderImpl{}
// hcpProviderImpl holds telemetry configuration and settings for continuous fetch of new config from HCP.
// it updates configuration, if changes are detected.
type hcpProviderImpl struct {
// cfg holds configuration that can be dynamically updated.
cfg *dynamicConfig
// A reader-writer mutex is used as the provider is read heavy.
// OTEL components access telemetryConfig during metrics collection and export (read).
// Meanwhile, config is only updated when there are changes (write).
rw sync.RWMutex
// hcpClient is an authenticated client used to make HTTP requests to HCP.
hcpClient client.Client
}
// dynamicConfig is a set of configurable settings for metrics collection, processing and export.
// fields MUST be exported to compute hash for equals method.
type dynamicConfig struct {
Endpoint *url.URL
Labels map[string]string
Filters *regexp.Regexp
// refreshInterval controls the interval at which configuration is fetched from HCP to refresh config.
RefreshInterval time.Duration
}
// NewHCPProvider initializes and starts a HCP Telemetry provider with provided params.
func NewHCPProvider(ctx context.Context, hcpClient client.Client, telemetryCfg *client.TelemetryConfig) (*hcpProviderImpl, error) {
refreshInterval := telemetryCfg.RefreshConfig.RefreshInterval
// refreshInterval must be greater than 0, otherwise time.Ticker panics.
if refreshInterval <= 0 {
return nil, fmt.Errorf("invalid refresh interval: %d", refreshInterval)
}
cfg := &dynamicConfig{
Endpoint: telemetryCfg.MetricsConfig.Endpoint,
Labels: telemetryCfg.MetricsConfig.Labels,
Filters: telemetryCfg.MetricsConfig.Filters,
RefreshInterval: refreshInterval,
}
t := &hcpProviderImpl{
cfg: cfg,
hcpClient: hcpClient,
}
go t.run(ctx, refreshInterval)
return t, nil
}
// run continously checks for updates to the telemetry configuration by making a request to HCP.
func (h *hcpProviderImpl) run(ctx context.Context, refreshInterval time.Duration) {
ticker := time.NewTicker(refreshInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if newCfg := h.getUpdate(ctx); newCfg != nil {
ticker.Reset(newCfg.RefreshInterval)
}
case <-ctx.Done():
return
}
}
}
// getUpdate makes a HTTP request to HCP to return a new metrics configuration
// and updates the hcpProviderImpl.
func (h *hcpProviderImpl) getUpdate(ctx context.Context) *dynamicConfig {
logger := hclog.FromContext(ctx).Named("telemetry_config_provider")
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
telemetryCfg, err := h.hcpClient.FetchTelemetryConfig(ctx)
if err != nil {
logger.Error("failed to fetch telemetry config from HCP", "error", err)
metrics.IncrCounter(internalMetricRefreshFailure, 1)
return nil
}
// newRefreshInterval of 0 or less can cause ticker Reset() panic.
newRefreshInterval := telemetryCfg.RefreshConfig.RefreshInterval
if newRefreshInterval <= 0 {
logger.Error("invalid refresh interval duration", "refreshInterval", newRefreshInterval)
metrics.IncrCounter(internalMetricRefreshFailure, 1)
return nil
}
newDynamicConfig := &dynamicConfig{
Filters: telemetryCfg.MetricsConfig.Filters,
Endpoint: telemetryCfg.MetricsConfig.Endpoint,
Labels: telemetryCfg.MetricsConfig.Labels,
RefreshInterval: newRefreshInterval,
}
// Acquire write lock to update new configuration.
h.rw.Lock()
h.cfg = newDynamicConfig
h.rw.Unlock()
metrics.IncrCounter(internalMetricRefreshSuccess, 1)
return newDynamicConfig
}
// GetEndpoint acquires a read lock to return endpoint configuration for consumers.
func (h *hcpProviderImpl) GetEndpoint() *url.URL {
h.rw.RLock()
defer h.rw.RUnlock()
return h.cfg.Endpoint
}
// GetFilters acquires a read lock to return filters configuration for consumers.
func (h *hcpProviderImpl) GetFilters() *regexp.Regexp {
h.rw.RLock()
defer h.rw.RUnlock()
return h.cfg.Filters
}
// GetLabels acquires a read lock to return labels configuration for consumers.
func (h *hcpProviderImpl) GetLabels() map[string]string {
h.rw.RLock()
defer h.rw.RUnlock()
return h.cfg.Labels
}

View File

@ -0,0 +1,384 @@
package hcp
import (
"context"
"fmt"
"net/url"
"regexp"
"strings"
"sync"
"testing"
"time"
"github.com/armon/go-metrics"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/hcp/client"
)
const (
testRefreshInterval = 100 * time.Millisecond
testSinkServiceName = "test.telemetry_config_provider"
testRaceWriteSampleCount = 100
testRaceReadSampleCount = 5000
)
var (
// Test constants to verify inmem sink metrics.
testMetricKeyFailure = testSinkServiceName + "." + strings.Join(internalMetricRefreshFailure, ".")
testMetricKeySuccess = testSinkServiceName + "." + strings.Join(internalMetricRefreshSuccess, ".")
)
type testConfig struct {
filters string
endpoint string
labels map[string]string
refreshInterval time.Duration
}
func TestNewTelemetryConfigProvider(t *testing.T) {
t.Parallel()
for name, tc := range map[string]struct {
testInputs *testConfig
wantErr string
}{
"success": {
testInputs: &testConfig{
refreshInterval: 1 * time.Second,
},
},
"failsWithInvalidRefreshInterval": {
testInputs: &testConfig{
refreshInterval: 0 * time.Second,
},
wantErr: "invalid refresh interval",
},
} {
tc := tc
t.Run(name, func(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testCfg, err := testTelemetryCfg(tc.testInputs)
require.NoError(t, err)
cfgProvider, err := NewHCPProvider(ctx, client.NewMockClient(t), testCfg)
if tc.wantErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tc.wantErr)
require.Nil(t, cfgProvider)
return
}
require.NotNil(t, cfgProvider)
})
}
}
func TestTelemetryConfigProviderGetUpdate(t *testing.T) {
for name, tc := range map[string]struct {
mockExpect func(*client.MockClient)
metricKey string
optsInputs *testConfig
expected *testConfig
}{
"noChanges": {
optsInputs: &testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: testRefreshInterval,
},
mockExpect: func(m *client.MockClient) {
mockCfg, _ := testTelemetryCfg(&testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: testRefreshInterval,
})
m.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mockCfg, nil)
},
expected: &testConfig{
endpoint: "http://test.com/v1/metrics",
labels: map[string]string{
"test_label": "123",
},
filters: "test",
refreshInterval: testRefreshInterval,
},
metricKey: testMetricKeySuccess,
},
"newConfig": {
optsInputs: &testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: 2 * time.Second,
},
mockExpect: func(m *client.MockClient) {
mockCfg, _ := testTelemetryCfg(&testConfig{
endpoint: "http://newendpoint/v1/metrics",
filters: "consul",
labels: map[string]string{
"new_label": "1234",
},
refreshInterval: 2 * time.Second,
})
m.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mockCfg, nil)
},
expected: &testConfig{
endpoint: "http://newendpoint/v1/metrics",
filters: "consul",
labels: map[string]string{
"new_label": "1234",
},
refreshInterval: 2 * time.Second,
},
metricKey: testMetricKeySuccess,
},
"sameConfigInvalidRefreshInterval": {
optsInputs: &testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: testRefreshInterval,
},
mockExpect: func(m *client.MockClient) {
mockCfg, _ := testTelemetryCfg(&testConfig{
refreshInterval: 0 * time.Second,
})
m.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mockCfg, nil)
},
expected: &testConfig{
endpoint: "http://test.com/v1/metrics",
labels: map[string]string{
"test_label": "123",
},
filters: "test",
refreshInterval: testRefreshInterval,
},
metricKey: testMetricKeyFailure,
},
"sameConfigHCPClientFailure": {
optsInputs: &testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: testRefreshInterval,
},
mockExpect: func(m *client.MockClient) {
m.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, fmt.Errorf("failure"))
},
expected: &testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
"test_label": "123",
},
refreshInterval: testRefreshInterval,
},
metricKey: testMetricKeyFailure,
},
} {
t.Run(name, func(t *testing.T) {
sink := initGlobalSink()
mockClient := client.NewMockClient(t)
tc.mockExpect(mockClient)
dynamicCfg, err := testDynamicCfg(tc.optsInputs)
require.NoError(t, err)
provider := &hcpProviderImpl{
hcpClient: mockClient,
cfg: dynamicCfg,
}
provider.getUpdate(context.Background())
// Verify endpoint provider returns correct config values.
require.Equal(t, tc.expected.endpoint, provider.GetEndpoint().String())
require.Equal(t, tc.expected.filters, provider.GetFilters().String())
require.Equal(t, tc.expected.labels, provider.GetLabels())
// Verify count for transform success metric.
interval := sink.Data()[0]
require.NotNil(t, interval, 1)
sv := interval.Counters[tc.metricKey]
assert.NotNil(t, sv.AggregateSample)
require.Equal(t, sv.AggregateSample.Count, 1)
})
}
}
// mockRaceClient is a mock HCP client that fetches TelemetryConfig.
// The mock TelemetryConfig returned can be manually updated at any time.
// It manages concurrent read/write access to config with a sync.RWMutex.
type mockRaceClient struct {
client.Client
cfg *client.TelemetryConfig
rw sync.RWMutex
}
// updateCfg acquires a write lock and updates client config to a new value givent a count.
func (m *mockRaceClient) updateCfg(count int) (*client.TelemetryConfig, error) {
m.rw.Lock()
defer m.rw.Unlock()
labels := map[string]string{fmt.Sprintf("label_%d", count): fmt.Sprintf("value_%d", count)}
filters, err := regexp.Compile(fmt.Sprintf("consul_filter_%d", count))
if err != nil {
return nil, err
}
endpoint, err := url.Parse(fmt.Sprintf("http://consul-endpoint-%d.com", count))
if err != nil {
return nil, err
}
cfg := &client.TelemetryConfig{
MetricsConfig: &client.MetricsConfig{
Filters: filters,
Endpoint: endpoint,
Labels: labels,
},
RefreshConfig: &client.RefreshConfig{
RefreshInterval: testRefreshInterval,
},
}
m.cfg = cfg
return cfg, nil
}
// FetchTelemetryConfig returns the current config held by the mockRaceClient.
func (m *mockRaceClient) FetchTelemetryConfig(ctx context.Context) (*client.TelemetryConfig, error) {
m.rw.RLock()
defer m.rw.RUnlock()
return m.cfg, nil
}
func TestTelemetryConfigProvider_Race(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
initCfg, err := testTelemetryCfg(&testConfig{
endpoint: "test.com",
filters: "test",
labels: map[string]string{"test_label": "test_value"},
refreshInterval: testRefreshInterval,
})
require.NoError(t, err)
m := &mockRaceClient{
cfg: initCfg,
}
// Start the provider goroutine, which fetches client TelemetryConfig every RefreshInterval.
provider, err := NewHCPProvider(ctx, m, m.cfg)
require.NoError(t, err)
for count := 0; count < testRaceWriteSampleCount; count++ {
// Force a TelemetryConfig value change in the mockRaceClient.
newCfg, err := m.updateCfg(count)
require.NoError(t, err)
// Force provider to obtain new client TelemetryConfig immediately.
// This call is necessary to guarantee TelemetryConfig changes to assert on expected values below.
provider.getUpdate(context.Background())
// Start goroutines to access label configuration.
wg := &sync.WaitGroup{}
kickOff(wg, testRaceReadSampleCount, provider, func(provider *hcpProviderImpl) {
require.Equal(t, provider.GetLabels(), newCfg.MetricsConfig.Labels)
})
// Start goroutines to access endpoint configuration.
kickOff(wg, testRaceReadSampleCount, provider, func(provider *hcpProviderImpl) {
require.Equal(t, provider.GetFilters(), newCfg.MetricsConfig.Filters)
})
// Start goroutines to access filter configuration.
kickOff(wg, testRaceReadSampleCount, provider, func(provider *hcpProviderImpl) {
require.Equal(t, provider.GetEndpoint(), newCfg.MetricsConfig.Endpoint)
})
wg.Wait()
}
}
func kickOff(wg *sync.WaitGroup, count int, provider *hcpProviderImpl, check func(cfgProvider *hcpProviderImpl)) {
for i := 0; i < count; i++ {
wg.Add(1)
go func() {
defer wg.Done()
check(provider)
}()
}
}
// initGlobalSink is a helper function to initialize a Go metrics inmemsink.
func initGlobalSink() *metrics.InmemSink {
cfg := metrics.DefaultConfig(testSinkServiceName)
cfg.EnableHostname = false
sink := metrics.NewInmemSink(10*time.Second, 10*time.Second)
metrics.NewGlobal(cfg, sink)
return sink
}
// testDynamicCfg converts testConfig inputs to a dynamicConfig to be used in tests.
func testDynamicCfg(testCfg *testConfig) (*dynamicConfig, error) {
filters, err := regexp.Compile(testCfg.filters)
if err != nil {
return nil, err
}
endpoint, err := url.Parse(testCfg.endpoint)
if err != nil {
return nil, err
}
return &dynamicConfig{
Endpoint: endpoint,
Filters: filters,
Labels: testCfg.labels,
RefreshInterval: testCfg.refreshInterval,
}, nil
}
// testTelemetryCfg converts testConfig inputs to a TelemetryConfig to be used in tests.
func testTelemetryCfg(testCfg *testConfig) (*client.TelemetryConfig, error) {
filters, err := regexp.Compile(testCfg.filters)
if err != nil {
return nil, err
}
endpoint, err := url.Parse(testCfg.endpoint)
if err != nil {
return nil, err
}
return &client.TelemetryConfig{
MetricsConfig: &client.MetricsConfig{
Endpoint: endpoint,
Filters: filters,
Labels: testCfg.labels,
},
RefreshConfig: &client.RefreshConfig{
RefreshInterval: testCfg.refreshInterval,
},
}, nil
}

2
go.mod
View File

@ -62,7 +62,7 @@ require (
github.com/hashicorp/golang-lru v0.5.4
github.com/hashicorp/hcl v1.0.0
github.com/hashicorp/hcp-scada-provider v0.2.3
github.com/hashicorp/hcp-sdk-go v0.48.0
github.com/hashicorp/hcp-sdk-go v0.55.0
github.com/hashicorp/hil v0.0.0-20200423225030-a18a1cd20038
github.com/hashicorp/memberlist v0.5.0
github.com/hashicorp/raft v1.5.0

4
go.sum
View File

@ -554,8 +554,8 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hashicorp/hcp-scada-provider v0.2.3 h1:AarYR+/Pcv+cMvPdAlb92uOBmZfEH6ny4+DT+4NY2VQ=
github.com/hashicorp/hcp-scada-provider v0.2.3/go.mod h1:ZFTgGwkzNv99PLQjTsulzaCplCzOTBh0IUQsPKzrQFo=
github.com/hashicorp/hcp-sdk-go v0.48.0 h1:LWpFR7YVDz4uG4C/ixcy2tRbg7/BgjMcTh1bRkKaeBQ=
github.com/hashicorp/hcp-sdk-go v0.48.0/go.mod h1:hZqky4HEzsKwvLOt4QJlZUrjeQmb4UCZUhDP2HyQFfc=
github.com/hashicorp/hcp-sdk-go v0.55.0 h1:T4sQtgQfQJOD0uucT4hS+GZI1FmoHAQMADj277W++xw=
github.com/hashicorp/hcp-sdk-go v0.55.0/go.mod h1:hZqky4HEzsKwvLOt4QJlZUrjeQmb4UCZUhDP2HyQFfc=
github.com/hashicorp/hil v0.0.0-20200423225030-a18a1cd20038 h1:n9J0rwVWXDpNd5iZnwY7w4WZyq53/rROeI7OVvLW8Ok=
github.com/hashicorp/hil v0.0.0-20200423225030-a18a1cd20038/go.mod h1:n2TSygSNwsLJ76m8qFXTSc7beTb+auJxYdqrnoqwZWE=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=