HCP Telemetry Feature (#17460)

* Move hcp client to subpackage hcpclient (#16800)

* [HCP Observability] New MetricsClient (#17100)

* Client configured with TLS using HCP config and retry/throttle

* Add tests and godoc for metrics client

* close body after request

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* remove clone

* Extract CloudConfig and mock for future PR

* Switch to hclog.FromContext

* [HCP Observability] OTELExporter (#17128)

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Create new OTELExporter which uses the MetricsClient
Add transform because the conversion is in an /internal package

* Fix lint error

* early return when there are no metrics

* Add NewOTELExporter() function

* Downgrade to metrics SDK version: v1.15.0-rc.1

* Fix imports

* fix small nits with comments and url.URL

* Fix tests by asserting actual error for context cancellation, fix parallel, and make mock more versatile

* Cleanup error handling and clarify empty metrics case

* Fix input/expected naming in otel_transform_test.go

* add comment for metric tracking

* Add a general isEmpty method

* Add clear error types

* update to latest version 1.15.0 of OTEL

* [HCP Observability] OTELSink (#17159)

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Create new OTELExporter which uses the MetricsClient
Add transform because the conversion is in an /internal package

* Fix lint error

* early return when there are no metrics

* Add NewOTELExporter() function

* Downgrade to metrics SDK version: v1.15.0-rc.1

* Fix imports

* fix small nits with comments and url.URL

* Fix tests by asserting actual error for context cancellation, fix parallel, and make mock more versatile

* Cleanup error handling and clarify empty metrics case

* Fix input/expected naming in otel_transform_test.go

* add comment for metric tracking

* Add a general isEmpty method

* Add clear error types

* update to latest version 1.15.0 of OTEL

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* Initialize OTELSink with sync.Map for all the instrument stores.

* Moved PeriodicReader init to NewOtelReader function. This allows us to use a ManualReader for tests.

* Switch to mutex instead of sync.Map to avoid type assertion

* Add gauge store

* Clarify comments

* return concrete sink type

* Fix lint errors

* Move gauge store to be within sink

* Use context.TODO,rebase and clenaup opts handling

* Rebase onto otl exporter to downgrade metrics API to v1.15.0-rc.1

* Fix imports

* Update to latest stable version by rebasing on cc-4933, fix import, remove mutex init, fix opts error messages and use logger from ctx

* Add lots of documentation to the OTELSink

* Fix gauge store comment and check ok

* Add select and ctx.Done() check to gauge callback

* use require.Equal for attributes

* Fixed import naming

* Remove float64 calls and add a NewGaugeStore method

* Change name Store to Set in gaugeStore, add concurrency tests in both OTELSink and gauge store

* Generate 100 gauge operations

* Seperate the labels into goroutines in sink test

* Generate kv store for the test case keys to avoid using uuid

* Added a race test with 300 samples for OTELSink

* Do not pass in waitgroup and use error channel instead.

* Using SHA 7dea2225a218872e86d2f580e82c089b321617b0 to avoid build failures in otel

* Fix nits

* [HCP Observability] Init OTELSink in Telemetry (#17162)

* Move hcp client to subpackage hcpclient (#16800)

* [HCP Observability] New MetricsClient (#17100)

* Client configured with TLS using HCP config and retry/throttle

* Add tests and godoc for metrics client

* close body after request

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* remove clone

* Extract CloudConfig and mock for future PR

* Switch to hclog.FromContext

* [HCP Observability] New MetricsClient (#17100)

* Client configured with TLS using HCP config and retry/throttle

* Add tests and godoc for metrics client

* close body after request

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* remove clone

* Extract CloudConfig and mock for future PR

* Switch to hclog.FromContext

* [HCP Observability] New MetricsClient (#17100)

* Client configured with TLS using HCP config and retry/throttle

* Add tests and godoc for metrics client

* close body after request

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* remove clone

* Extract CloudConfig and mock for future PR

* Switch to hclog.FromContext

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Create new OTELExporter which uses the MetricsClient
Add transform because the conversion is in an /internal package

* Fix lint error

* early return when there are no metrics

* Add NewOTELExporter() function

* Downgrade to metrics SDK version: v1.15.0-rc.1

* Fix imports

* fix small nits with comments and url.URL

* Fix tests by asserting actual error for context cancellation, fix parallel, and make mock more versatile

* Cleanup error handling and clarify empty metrics case

* Fix input/expected naming in otel_transform_test.go

* add comment for metric tracking

* Add a general isEmpty method

* Add clear error types

* update to latest version 1.15.0 of OTEL

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* Initialize OTELSink with sync.Map for all the instrument stores.

* Moved PeriodicReader init to NewOtelReader function. This allows us to use a ManualReader for tests.

* Switch to mutex instead of sync.Map to avoid type assertion

* Add gauge store

* Clarify comments

* return concrete sink type

* Fix lint errors

* Move gauge store to be within sink

* Use context.TODO,rebase and clenaup opts handling

* Rebase onto otl exporter to downgrade metrics API to v1.15.0-rc.1

* Fix imports

* Update to latest stable version by rebasing on cc-4933, fix import, remove mutex init, fix opts error messages and use logger from ctx

* Add lots of documentation to the OTELSink

* Fix gauge store comment and check ok

* Add select and ctx.Done() check to gauge callback

* use require.Equal for attributes

* Fixed import naming

* Remove float64 calls and add a NewGaugeStore method

* Change name Store to Set in gaugeStore, add concurrency tests in both OTELSink and gauge store

* Generate 100 gauge operations

* Seperate the labels into goroutines in sink test

* Generate kv store for the test case keys to avoid using uuid

* Added a race test with 300 samples for OTELSink

* [HCP Observability] OTELExporter (#17128)

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Create new OTELExporter which uses the MetricsClient
Add transform because the conversion is in an /internal package

* Fix lint error

* early return when there are no metrics

* Add NewOTELExporter() function

* Downgrade to metrics SDK version: v1.15.0-rc.1

* Fix imports

* fix small nits with comments and url.URL

* Fix tests by asserting actual error for context cancellation, fix parallel, and make mock more versatile

* Cleanup error handling and clarify empty metrics case

* Fix input/expected naming in otel_transform_test.go

* add comment for metric tracking

* Add a general isEmpty method

* Add clear error types

* update to latest version 1.15.0 of OTEL

* Do not pass in waitgroup and use error channel instead.

* Using SHA 7dea2225a218872e86d2f580e82c089b321617b0 to avoid build failures in otel

* Rebase onto otl exporter to downgrade metrics API to v1.15.0-rc.1

* Initialize OTELSink with sync.Map for all the instrument stores.

* Added telemetry agent to client and init sink in deps

* Fixed client

* Initalize sink in deps

* init sink in telemetry library

* Init deps before telemetry

* Use concrete telemetry.OtelSink type

* add /v1/metrics

* Avoid returning err for telemetry init

* move sink init within the IsCloudEnabled()

* Use HCPSinkOpts in deps instead

* update golden test for configuration file

* Switch to using extra sinks in the telemetry library

* keep name MetricsConfig

* fix log in verifyCCMRegistration

* Set logger in context

* pass around MetricSink in deps

* Fix imports

* Rebased onto otel sink pr

* Fix URL in test

* [HCP Observability] OTELSink (#17159)

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Create new OTELExporter which uses the MetricsClient
Add transform because the conversion is in an /internal package

* Fix lint error

* early return when there are no metrics

* Add NewOTELExporter() function

* Downgrade to metrics SDK version: v1.15.0-rc.1

* Fix imports

* fix small nits with comments and url.URL

* Fix tests by asserting actual error for context cancellation, fix parallel, and make mock more versatile

* Cleanup error handling and clarify empty metrics case

* Fix input/expected naming in otel_transform_test.go

* add comment for metric tracking

* Add a general isEmpty method

* Add clear error types

* update to latest version 1.15.0 of OTEL

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* Initialize OTELSink with sync.Map for all the instrument stores.

* Moved PeriodicReader init to NewOtelReader function. This allows us to use a ManualReader for tests.

* Switch to mutex instead of sync.Map to avoid type assertion

* Add gauge store

* Clarify comments

* return concrete sink type

* Fix lint errors

* Move gauge store to be within sink

* Use context.TODO,rebase and clenaup opts handling

* Rebase onto otl exporter to downgrade metrics API to v1.15.0-rc.1

* Fix imports

* Update to latest stable version by rebasing on cc-4933, fix import, remove mutex init, fix opts error messages and use logger from ctx

* Add lots of documentation to the OTELSink

* Fix gauge store comment and check ok

* Add select and ctx.Done() check to gauge callback

* use require.Equal for attributes

* Fixed import naming

* Remove float64 calls and add a NewGaugeStore method

* Change name Store to Set in gaugeStore, add concurrency tests in both OTELSink and gauge store

* Generate 100 gauge operations

* Seperate the labels into goroutines in sink test

* Generate kv store for the test case keys to avoid using uuid

* Added a race test with 300 samples for OTELSink

* Do not pass in waitgroup and use error channel instead.

* Using SHA 7dea2225a218872e86d2f580e82c089b321617b0 to avoid build failures in otel

* Fix nits

* pass extraSinks as function param instead

* Add default interval as package export

* remove verifyCCM func

* Add clusterID

* Fix import and add t.Parallel() for missing tests

* Kick Vercel CI

* Remove scheme from endpoint path, and fix error logging

* return metrics.MetricSink for sink method

* Update SDK

* [HCP Observability] Metrics filtering and Labels in Go Metrics sink (#17184)

* Move hcp client to subpackage hcpclient (#16800)

* [HCP Observability] New MetricsClient (#17100)

* Client configured with TLS using HCP config and retry/throttle

* Add tests and godoc for metrics client

* close body after request

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* remove clone

* Extract CloudConfig and mock for future PR

* Switch to hclog.FromContext

* [HCP Observability] New MetricsClient (#17100)

* Client configured with TLS using HCP config and retry/throttle

* Add tests and godoc for metrics client

* close body after request

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* remove clone

* Extract CloudConfig and mock for future PR

* Switch to hclog.FromContext

* [HCP Observability] New MetricsClient (#17100)

* Client configured with TLS using HCP config and retry/throttle

* Add tests and godoc for metrics client

* close body after request

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* remove clone

* Extract CloudConfig and mock for future PR

* Switch to hclog.FromContext

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Create new OTELExporter which uses the MetricsClient
Add transform because the conversion is in an /internal package

* Fix lint error

* early return when there are no metrics

* Add NewOTELExporter() function

* Downgrade to metrics SDK version: v1.15.0-rc.1

* Fix imports

* fix small nits with comments and url.URL

* Fix tests by asserting actual error for context cancellation, fix parallel, and make mock more versatile

* Cleanup error handling and clarify empty metrics case

* Fix input/expected naming in otel_transform_test.go

* add comment for metric tracking

* Add a general isEmpty method

* Add clear error types

* update to latest version 1.15.0 of OTEL

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* Initialize OTELSink with sync.Map for all the instrument stores.

* Moved PeriodicReader init to NewOtelReader function. This allows us to use a ManualReader for tests.

* Switch to mutex instead of sync.Map to avoid type assertion

* Add gauge store

* Clarify comments

* return concrete sink type

* Fix lint errors

* Move gauge store to be within sink

* Use context.TODO,rebase and clenaup opts handling

* Rebase onto otl exporter to downgrade metrics API to v1.15.0-rc.1

* Fix imports

* Update to latest stable version by rebasing on cc-4933, fix import, remove mutex init, fix opts error messages and use logger from ctx

* Add lots of documentation to the OTELSink

* Fix gauge store comment and check ok

* Add select and ctx.Done() check to gauge callback

* use require.Equal for attributes

* Fixed import naming

* Remove float64 calls and add a NewGaugeStore method

* Change name Store to Set in gaugeStore, add concurrency tests in both OTELSink and gauge store

* Generate 100 gauge operations

* Seperate the labels into goroutines in sink test

* Generate kv store for the test case keys to avoid using uuid

* Added a race test with 300 samples for OTELSink

* [HCP Observability] OTELExporter (#17128)

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Create new OTELExporter which uses the MetricsClient
Add transform because the conversion is in an /internal package

* Fix lint error

* early return when there are no metrics

* Add NewOTELExporter() function

* Downgrade to metrics SDK version: v1.15.0-rc.1

* Fix imports

* fix small nits with comments and url.URL

* Fix tests by asserting actual error for context cancellation, fix parallel, and make mock more versatile

* Cleanup error handling and clarify empty metrics case

* Fix input/expected naming in otel_transform_test.go

* add comment for metric tracking

* Add a general isEmpty method

* Add clear error types

* update to latest version 1.15.0 of OTEL

* Do not pass in waitgroup and use error channel instead.

* Using SHA 7dea2225a218872e86d2f580e82c089b321617b0 to avoid build failures in otel

* Rebase onto otl exporter to downgrade metrics API to v1.15.0-rc.1

* Initialize OTELSink with sync.Map for all the instrument stores.

* Added telemetry agent to client and init sink in deps

* Fixed client

* Initalize sink in deps

* init sink in telemetry library

* Init deps before telemetry

* Use concrete telemetry.OtelSink type

* add /v1/metrics

* Avoid returning err for telemetry init

* move sink init within the IsCloudEnabled()

* Use HCPSinkOpts in deps instead

* update golden test for configuration file

* Switch to using extra sinks in the telemetry library

* keep name MetricsConfig

* fix log in verifyCCMRegistration

* Set logger in context

* pass around MetricSink in deps

* Fix imports

* Rebased onto otel sink pr

* Fix URL in test

* [HCP Observability] OTELSink (#17159)

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Create new OTELExporter which uses the MetricsClient
Add transform because the conversion is in an /internal package

* Fix lint error

* early return when there are no metrics

* Add NewOTELExporter() function

* Downgrade to metrics SDK version: v1.15.0-rc.1

* Fix imports

* fix small nits with comments and url.URL

* Fix tests by asserting actual error for context cancellation, fix parallel, and make mock more versatile

* Cleanup error handling and clarify empty metrics case

* Fix input/expected naming in otel_transform_test.go

* add comment for metric tracking

* Add a general isEmpty method

* Add clear error types

* update to latest version 1.15.0 of OTEL

* Client configured with TLS using HCP config and retry/throttle

* run go mod tidy

* Remove one abstraction to use the config from deps

* Address PR feedback

* Initialize OTELSink with sync.Map for all the instrument stores.

* Moved PeriodicReader init to NewOtelReader function. This allows us to use a ManualReader for tests.

* Switch to mutex instead of sync.Map to avoid type assertion

* Add gauge store

* Clarify comments

* return concrete sink type

* Fix lint errors

* Move gauge store to be within sink

* Use context.TODO,rebase and clenaup opts handling

* Rebase onto otl exporter to downgrade metrics API to v1.15.0-rc.1

* Fix imports

* Update to latest stable version by rebasing on cc-4933, fix import, remove mutex init, fix opts error messages and use logger from ctx

* Add lots of documentation to the OTELSink

* Fix gauge store comment and check ok

* Add select and ctx.Done() check to gauge callback

* use require.Equal for attributes

* Fixed import naming

* Remove float64 calls and add a NewGaugeStore method

* Change name Store to Set in gaugeStore, add concurrency tests in both OTELSink and gauge store

* Generate 100 gauge operations

* Seperate the labels into goroutines in sink test

* Generate kv store for the test case keys to avoid using uuid

* Added a race test with 300 samples for OTELSink

* Do not pass in waitgroup and use error channel instead.

* Using SHA 7dea2225a218872e86d2f580e82c089b321617b0 to avoid build failures in otel

* Fix nits

* pass extraSinks as function param instead

* Add default interval as package export

* remove verifyCCM func

* Add clusterID

* Fix import and add t.Parallel() for missing tests

* Kick Vercel CI

* Remove scheme from endpoint path, and fix error logging

* return metrics.MetricSink for sink method

* Update SDK

* Added telemetry agent to client and init sink in deps

* Add node_id and __replica__ default labels

* add function for default labels and set x-hcp-resource-id

* Fix labels tests

* Commit suggestion for getDefaultLabels

Co-authored-by: Joshua Timmons <joshua.timmons1@gmail.com>

* Fixed server.id, and t.Parallel()

* Make defaultLabels a method on the TelemetryConfig object

* Rename FilterList to lowercase filterList

* Cleanup filter implemetation by combining regex into a single one, and making the type lowercase

* Fix append

* use regex directly for filters

* Fix x-resource-id test to use mocked value

* Fix log.Error formats

* Forgot the len(opts.Label) optimization)

* Use cfg.NodeID instead

---------

Co-authored-by: Joshua Timmons <joshua.timmons1@gmail.com>

* remove replic tag (#17484)

* [HCP Observability] Add custom metrics for OTEL sink, improve logging, upgrade modules and cleanup metrics client (#17455)

* Add custom metrics for Exporter and transform operations

* Improve deps logging

Run go mod tidy

* Upgrade SDK and OTEL

* Remove the partial success implemetation and check for HTTP status code in metrics client

* Add x-channel

* cleanup logs in deps.go based on PR feedback

* Change to debug log and lowercase

* address test operation feedback

* use GetHumanVersion on version

* Fix error wrapping

* Fix metric names

* [HCP Observability] Turn off retries for now until dynamically configurable (#17496)

* Remove retries for now until dynamic configuration is possible

* Clarify comment

* Update changelog

* improve changelog

---------

Co-authored-by: Joshua Timmons <joshua.timmons1@gmail.com>
This commit is contained in:
Ashvitha 2023-05-29 16:11:08 -04:00 committed by GitHub
parent 6db3842ca6
commit 077a755fe8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 2602 additions and 78 deletions

3
.changelog/17460.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:feature
hcp: Add new metrics sink to collect, aggregate and export server metrics to HCP in OTEL format.
```

View File

@ -60,6 +60,7 @@ import (
agentgrpc "github.com/hashicorp/consul/agent/grpc-internal"
"github.com/hashicorp/consul/agent/grpc-internal/services/subscribe"
"github.com/hashicorp/consul/agent/hcp"
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
logdrop "github.com/hashicorp/consul/agent/log-drop"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
@ -2027,7 +2028,7 @@ func (s *Server) trackLeaderChanges() {
// hcpServerStatus is the callback used by the HCP manager to emit status updates to the HashiCorp Cloud Platform when
// enabled.
func (s *Server) hcpServerStatus(deps Deps) hcp.StatusCallback {
return func(ctx context.Context) (status hcp.ServerStatus, err error) {
return func(ctx context.Context) (status hcpclient.ServerStatus, err error) {
status.Name = s.config.NodeName
status.ID = string(s.config.NodeID)
status.Version = cslversion.GetHumanVersion()

View File

@ -27,8 +27,6 @@ import (
"golang.org/x/time/rate"
"google.golang.org/grpc"
"github.com/hashicorp/consul/agent/hcp"
"github.com/hashicorp/consul-net-rpc/net/rpc"
"github.com/hashicorp/consul/agent/connect"
@ -36,6 +34,7 @@ import (
rpcRate "github.com/hashicorp/consul/agent/consul/rate"
external "github.com/hashicorp/consul/agent/grpc-external"
grpcmiddleware "github.com/hashicorp/consul/agent/grpc-middleware"
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/rpc/middleware"
"github.com/hashicorp/consul/agent/structs"
@ -2082,10 +2081,10 @@ func TestServer_hcpManager(t *testing.T) {
_, conf1 := testServerConfig(t)
conf1.BootstrapExpect = 1
conf1.RPCAdvertise = &net.TCPAddr{IP: []byte{127, 0, 0, 2}, Port: conf1.RPCAddr.Port}
hcp1 := hcp.NewMockClient(t)
hcp1.EXPECT().PushServerStatus(mock.Anything, mock.MatchedBy(func(status *hcp.ServerStatus) bool {
hcp1 := hcpclient.NewMockClient(t)
hcp1.EXPECT().PushServerStatus(mock.Anything, mock.MatchedBy(func(status *hcpclient.ServerStatus) bool {
return status.ID == string(conf1.NodeID)
})).Run(func(ctx context.Context, status *hcp.ServerStatus) {
})).Run(func(ctx context.Context, status *hcpclient.ServerStatus) {
require.Equal(t, status.LanAddress, "127.0.0.2")
}).Call.Return(nil)

View File

@ -23,7 +23,7 @@ import (
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/hcp"
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/lib/retry"
"github.com/hashicorp/go-uuid"
@ -65,7 +65,7 @@ type RawBootstrapConfig struct {
// fetch from HCP servers if the local data is incomplete.
// It must be passed a (CLI) UI implementation so it can deliver progress
// updates to the user, for example if it is waiting to retry for a long period.
func LoadConfig(ctx context.Context, client hcp.Client, dataDir string, loader ConfigLoader, ui UI) (ConfigLoader, error) {
func LoadConfig(ctx context.Context, client hcpclient.Client, dataDir string, loader ConfigLoader, ui UI) (ConfigLoader, error) {
ui.Output("Loading configuration from HCP")
// See if we have existing config on disk
@ -181,14 +181,14 @@ func finalizeRuntimeConfig(rc *config.RuntimeConfig, cfg *RawBootstrapConfig) {
// fetchBootstrapConfig will fetch boostrap configuration from remote servers and persist it to disk.
// It will retry until successful or a terminal error condition is found (e.g. permission denied).
func fetchBootstrapConfig(ctx context.Context, client hcp.Client, dataDir string, ui UI) (*RawBootstrapConfig, error) {
func fetchBootstrapConfig(ctx context.Context, client hcpclient.Client, dataDir string, ui UI) (*RawBootstrapConfig, error) {
w := retry.Waiter{
MinWait: 1 * time.Second,
MaxWait: 5 * time.Minute,
Jitter: retry.NewJitter(50),
}
var bsCfg *hcp.BootstrapConfig
var bsCfg *hcpclient.BootstrapConfig
for {
// Note we don't want to shadow `ctx` here since we need that for the Wait
// below.
@ -225,7 +225,7 @@ func fetchBootstrapConfig(ctx context.Context, client hcp.Client, dataDir string
// persistAndProcessConfig is called when we receive data from CCM.
// We validate and persist everything that was received, then also update
// the JSON config as needed.
func persistAndProcessConfig(dataDir string, devMode bool, bsCfg *hcp.BootstrapConfig) (string, error) {
func persistAndProcessConfig(dataDir string, devMode bool, bsCfg *hcpclient.BootstrapConfig) (string, error) {
if devMode {
// Agent in dev mode, we still need somewhere to persist the certs
// temporarily though to be able to start up at all since we don't support

View File

@ -12,6 +12,7 @@ import (
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/hcp"
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/go-uuid"
@ -157,7 +158,7 @@ func TestLoadConfig_Persistence(t *testing.T) {
// Override the client TLS config so that the test server can be trusted.
initial.RuntimeConfig.Cloud.WithTLSConfig(clientTLS)
client, err := hcp.NewClient(initial.RuntimeConfig.Cloud)
client, err := hcpclient.NewClient(initial.RuntimeConfig.Cloud)
require.NoError(t, err)
loader, err := LoadConfig(context.Background(), client, initial.RuntimeConfig.DataDir, baseLoader, ui)

View File

@ -1,7 +1,7 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package hcp
package client
import (
"context"
@ -11,6 +11,8 @@ import (
httptransport "github.com/go-openapi/runtime/client"
"github.com/go-openapi/strfmt"
hcptelemetry "github.com/hashicorp/hcp-sdk-go/clients/cloud-consul-telemetry-gateway/preview/2023-04-14/client/consul_telemetry_service"
hcpgnm "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/client/global_network_manager_service"
gnmmod "github.com/hashicorp/hcp-sdk-go/clients/cloud-global-network-manager-service/preview/2022-02-15/models"
"github.com/hashicorp/hcp-sdk-go/httpclient"
@ -20,15 +22,34 @@ import (
"github.com/hashicorp/consul/version"
)
// metricsGatewayPath is the default path for metrics export request on the Telemetry Gateway.
const metricsGatewayPath = "/v1/metrics"
// Client interface exposes HCP operations that can be invoked by Consul
//
//go:generate mockery --name Client --with-expecter --inpackage
type Client interface {
FetchBootstrap(ctx context.Context) (*BootstrapConfig, error)
FetchTelemetryConfig(ctx context.Context) (*TelemetryConfig, error)
PushServerStatus(ctx context.Context, status *ServerStatus) error
DiscoverServers(ctx context.Context) ([]string, error)
}
// MetricsConfig holds metrics specific configuration for the TelemetryConfig.
// The endpoint field overrides the TelemetryConfig endpoint.
type MetricsConfig struct {
Filters []string
Endpoint string
}
// TelemetryConfig contains configuration for telemetry data forwarded by Consul servers
// to the HCP Telemetry gateway.
type TelemetryConfig struct {
Endpoint string
Labels map[string]string
MetricsConfig *MetricsConfig
}
type BootstrapConfig struct {
Name string
BootstrapExpect int
@ -44,6 +65,7 @@ type hcpClient struct {
hc *httptransport.Runtime
cfg config.CloudConfig
gnm hcpgnm.ClientService
tgw hcptelemetry.ClientService
resource resource.Resource
}
@ -64,6 +86,8 @@ func NewClient(cfg config.CloudConfig) (Client, error) {
}
client.gnm = hcpgnm.New(client.hc, nil)
client.tgw = hcptelemetry.New(client.hc, nil)
return client, nil
}
@ -79,6 +103,29 @@ func httpClient(c config.CloudConfig) (*httptransport.Runtime, error) {
})
}
// FetchTelemetryConfig obtains telemetry configuration from the Telemetry Gateway.
func (c *hcpClient) FetchTelemetryConfig(ctx context.Context) (*TelemetryConfig, error) {
params := hcptelemetry.NewAgentTelemetryConfigParamsWithContext(ctx).
WithLocationOrganizationID(c.resource.Organization).
WithLocationProjectID(c.resource.Project).
WithClusterID(c.resource.ID)
resp, err := c.tgw.AgentTelemetryConfig(params, nil)
if err != nil {
return nil, err
}
payloadConfig := resp.Payload.TelemetryConfig
return &TelemetryConfig{
Endpoint: payloadConfig.Endpoint,
Labels: payloadConfig.Labels,
MetricsConfig: &MetricsConfig{
Filters: payloadConfig.Metrics.IncludeList,
Endpoint: payloadConfig.Metrics.Endpoint,
},
}, nil
}
func (c *hcpClient) FetchBootstrap(ctx context.Context) (*BootstrapConfig, error) {
version := version.GetHumanVersion()
params := hcpgnm.NewAgentBootstrapConfigParamsWithContext(ctx).
@ -233,3 +280,32 @@ func (c *hcpClient) DiscoverServers(ctx context.Context) ([]string, error) {
return servers, nil
}
// Enabled verifies if telemetry is enabled by ensuring a valid endpoint has been retrieved.
// It returns full metrics endpoint and true if a valid endpoint was obtained.
func (t *TelemetryConfig) Enabled() (string, bool) {
endpoint := t.Endpoint
if override := t.MetricsConfig.Endpoint; override != "" {
endpoint = override
}
if endpoint == "" {
return "", false
}
// The endpoint from Telemetry Gateway is a domain without scheme, and without the metrics path, so they must be added.
return endpoint + metricsGatewayPath, true
}
// DefaultLabels returns a set of <key, value> string pairs that must be added as attributes to all exported telemetry data.
func (t *TelemetryConfig) DefaultLabels(nodeID string) map[string]string {
labels := map[string]string{
"node_id": nodeID, // used to delineate Consul nodes in graphs
}
for k, v := range t.Labels {
labels[k] = v
}
return labels
}

View File

@ -0,0 +1,75 @@
package client
import (
"context"
"testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestFetchTelemetryConfig(t *testing.T) {
t.Parallel()
for name, test := range map[string]struct {
metricsEndpoint string
expect func(*MockClient)
disabled bool
}{
"success": {
expect: func(mockClient *MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&TelemetryConfig{
Endpoint: "https://test.com",
MetricsConfig: &MetricsConfig{
Endpoint: "",
},
}, nil)
},
metricsEndpoint: "https://test.com/v1/metrics",
},
"overrideMetricsEndpoint": {
expect: func(mockClient *MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&TelemetryConfig{
Endpoint: "https://test.com",
MetricsConfig: &MetricsConfig{
Endpoint: "https://test.com",
},
}, nil)
},
metricsEndpoint: "https://test.com/v1/metrics",
},
"disabledWithEmptyEndpoint": {
expect: func(mockClient *MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&TelemetryConfig{
Endpoint: "",
MetricsConfig: &MetricsConfig{
Endpoint: "",
},
}, nil)
},
disabled: true,
},
} {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
mock := NewMockClient(t)
test.expect(mock)
telemetryCfg, err := mock.FetchTelemetryConfig(context.Background())
require.NoError(t, err)
if test.disabled {
endpoint, ok := telemetryCfg.Enabled()
require.False(t, ok)
require.Empty(t, endpoint)
return
}
endpoint, ok := telemetryCfg.Enabled()
require.True(t, ok)
require.Equal(t, test.metricsEndpoint, endpoint)
})
}
}

View File

@ -0,0 +1,157 @@
package client
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"time"
"github.com/hashicorp/go-cleanhttp"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-retryablehttp"
hcpcfg "github.com/hashicorp/hcp-sdk-go/config"
"github.com/hashicorp/hcp-sdk-go/resource"
colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
"golang.org/x/oauth2"
"google.golang.org/protobuf/proto"
"github.com/hashicorp/consul/version"
)
const (
// HTTP Client config
defaultStreamTimeout = 15 * time.Second
// Retry config
// TODO: Eventually, we'd like to configure these values dynamically.
defaultRetryWaitMin = 1 * time.Second
defaultRetryWaitMax = 15 * time.Second
// defaultRetryMax is set to 0 to turn off retry functionality, until dynamic configuration is possible.
// This is to circumvent any spikes in load that may cause or exacerbate server-side issues for now.
defaultRetryMax = 0
)
// MetricsClient exports Consul metrics in OTLP format to the HCP Telemetry Gateway.
type MetricsClient interface {
ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error
}
// cloudConfig represents cloud config for TLS abstracted in an interface for easy testing.
type CloudConfig interface {
HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error)
Resource() (resource.Resource, error)
}
// otlpClient is an implementation of MetricsClient with a retryable http client for retries and to honor throttle.
// It also holds default HTTP headers to add to export requests.
type otlpClient struct {
client *retryablehttp.Client
header *http.Header
}
// NewMetricsClient returns a configured MetricsClient.
// The current implementation uses otlpClient to provide retry functionality.
func NewMetricsClient(cfg CloudConfig, ctx context.Context) (MetricsClient, error) {
if cfg == nil {
return nil, fmt.Errorf("failed to init telemetry client: provide valid cloudCfg (Cloud Configuration for TLS)")
}
if ctx == nil {
return nil, fmt.Errorf("failed to init telemetry client: provide a valid context")
}
logger := hclog.FromContext(ctx)
c, err := newHTTPClient(cfg, logger)
if err != nil {
return nil, fmt.Errorf("failed to init telemetry client: %v", err)
}
r, err := cfg.Resource()
if err != nil {
return nil, fmt.Errorf("failed to init telemetry client: %v", err)
}
header := make(http.Header)
header.Set("content-type", "application/x-protobuf")
header.Set("x-hcp-resource-id", r.String())
header.Set("x-channel", fmt.Sprintf("consul/%s", version.GetHumanVersion()))
return &otlpClient{
client: c,
header: &header,
}, nil
}
// newHTTPClient configures the retryable HTTP client.
func newHTTPClient(cloudCfg CloudConfig, logger hclog.Logger) (*retryablehttp.Client, error) {
hcpCfg, err := cloudCfg.HCPConfig()
if err != nil {
return nil, err
}
tlsTransport := cleanhttp.DefaultPooledTransport()
tlsTransport.TLSClientConfig = hcpCfg.APITLSConfig()
var transport http.RoundTripper = &oauth2.Transport{
Base: tlsTransport,
Source: hcpCfg,
}
client := &http.Client{
Transport: transport,
Timeout: defaultStreamTimeout,
}
retryClient := &retryablehttp.Client{
HTTPClient: client,
Logger: logger.Named("hcp_telemetry_client"),
RetryWaitMin: defaultRetryWaitMin,
RetryWaitMax: defaultRetryWaitMax,
RetryMax: defaultRetryMax,
CheckRetry: retryablehttp.DefaultRetryPolicy,
Backoff: retryablehttp.DefaultBackoff,
}
return retryClient, nil
}
// ExportMetrics is the single method exposed by MetricsClient to export OTLP metrics to the desired HCP endpoint.
// The endpoint is configurable as the endpoint can change during periodic refresh of CCM telemetry config.
// By configuring the endpoint here, we can re-use the same client and override the endpoint when making a request.
func (o *otlpClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error {
pbRequest := &colmetricpb.ExportMetricsServiceRequest{
ResourceMetrics: []*metricpb.ResourceMetrics{protoMetrics},
}
body, err := proto.Marshal(pbRequest)
if err != nil {
return fmt.Errorf("failed to marshal the request: %w", err)
}
req, err := retryablehttp.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(body))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
req.Header = *o.header
resp, err := o.client.Do(req.WithContext(ctx))
if err != nil {
return fmt.Errorf("failed to post metrics: %w", err)
}
defer resp.Body.Close()
var respData bytes.Buffer
if _, err := io.Copy(&respData, resp.Body); err != nil {
return fmt.Errorf("failed to read body: %w", err)
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to export metrics: code %d: %s", resp.StatusCode, string(body))
}
return nil
}

View File

@ -0,0 +1,114 @@
package client
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"github.com/stretchr/testify/require"
colpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
"google.golang.org/protobuf/proto"
"github.com/hashicorp/consul/version"
)
func TestNewMetricsClient(t *testing.T) {
for name, test := range map[string]struct {
wantErr string
cfg CloudConfig
ctx context.Context
}{
"success": {
cfg: &MockCloudCfg{},
ctx: context.Background(),
},
"failsWithoutCloudCfg": {
wantErr: "failed to init telemetry client: provide valid cloudCfg (Cloud Configuration for TLS)",
cfg: nil,
ctx: context.Background(),
},
"failsWithoutContext": {
wantErr: "failed to init telemetry client: provide a valid context",
cfg: MockCloudCfg{},
ctx: nil,
},
"failsHCPConfig": {
wantErr: "failed to init telemetry client",
cfg: MockCloudCfg{
ConfigErr: fmt.Errorf("test bad hcp config"),
},
ctx: context.Background(),
},
"failsBadResource": {
wantErr: "failed to init telemetry client",
cfg: MockCloudCfg{
ResourceErr: fmt.Errorf("test bad resource"),
},
ctx: context.Background(),
},
} {
t.Run(name, func(t *testing.T) {
client, err := NewMetricsClient(test.cfg, test.ctx)
if test.wantErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), test.wantErr)
return
}
require.Nil(t, err)
require.NotNil(t, client)
})
}
}
func TestExportMetrics(t *testing.T) {
for name, test := range map[string]struct {
wantErr string
status int
}{
"success": {
status: http.StatusOK,
},
"failsWithNonRetryableError": {
status: http.StatusBadRequest,
wantErr: "failed to export metrics: code 400",
},
} {
t.Run(name, func(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
require.Equal(t, r.Header.Get("content-type"), "application/x-protobuf")
require.Equal(t, r.Header.Get("x-hcp-resource-id"), testResourceID)
require.Equal(t, r.Header.Get("x-channel"), fmt.Sprintf("consul/%s", version.GetHumanVersion()))
require.Equal(t, r.Header.Get("Authorization"), "Bearer test-token")
body := colpb.ExportMetricsServiceResponse{}
bytes, err := proto.Marshal(&body)
require.NoError(t, err)
w.Header().Set("Content-Type", "application/x-protobuf")
w.WriteHeader(test.status)
w.Write(bytes)
}))
defer srv.Close()
client, err := NewMetricsClient(MockCloudCfg{}, context.Background())
require.NoError(t, err)
ctx := context.Background()
metrics := &metricpb.ResourceMetrics{}
err = client.ExportMetrics(ctx, metrics, srv.URL)
if test.wantErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), test.wantErr)
return
}
require.NoError(t, err)
})
}
}

View File

@ -1,6 +1,6 @@
// Code generated by mockery v2.15.0. DO NOT EDIT.
// Code generated by mockery v2.22.1. DO NOT EDIT.
package hcp
package client
import (
context "context"
@ -26,6 +26,10 @@ func (_m *MockClient) DiscoverServers(ctx context.Context) ([]string, error) {
ret := _m.Called(ctx)
var r0 []string
var r1 error
if rf, ok := ret.Get(0).(func(context.Context) ([]string, error)); ok {
return rf(ctx)
}
if rf, ok := ret.Get(0).(func(context.Context) []string); ok {
r0 = rf(ctx)
} else {
@ -34,7 +38,6 @@ func (_m *MockClient) DiscoverServers(ctx context.Context) ([]string, error) {
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(ctx)
} else {
@ -67,11 +70,20 @@ func (_c *MockClient_DiscoverServers_Call) Return(_a0 []string, _a1 error) *Mock
return _c
}
func (_c *MockClient_DiscoverServers_Call) RunAndReturn(run func(context.Context) ([]string, error)) *MockClient_DiscoverServers_Call {
_c.Call.Return(run)
return _c
}
// FetchBootstrap provides a mock function with given fields: ctx
func (_m *MockClient) FetchBootstrap(ctx context.Context) (*BootstrapConfig, error) {
ret := _m.Called(ctx)
var r0 *BootstrapConfig
var r1 error
if rf, ok := ret.Get(0).(func(context.Context) (*BootstrapConfig, error)); ok {
return rf(ctx)
}
if rf, ok := ret.Get(0).(func(context.Context) *BootstrapConfig); ok {
r0 = rf(ctx)
} else {
@ -80,7 +92,6 @@ func (_m *MockClient) FetchBootstrap(ctx context.Context) (*BootstrapConfig, err
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(ctx)
} else {
@ -113,6 +124,65 @@ func (_c *MockClient_FetchBootstrap_Call) Return(_a0 *BootstrapConfig, _a1 error
return _c
}
func (_c *MockClient_FetchBootstrap_Call) RunAndReturn(run func(context.Context) (*BootstrapConfig, error)) *MockClient_FetchBootstrap_Call {
_c.Call.Return(run)
return _c
}
// FetchTelemetryConfig provides a mock function with given fields: ctx
func (_m *MockClient) FetchTelemetryConfig(ctx context.Context) (*TelemetryConfig, error) {
ret := _m.Called(ctx)
var r0 *TelemetryConfig
var r1 error
if rf, ok := ret.Get(0).(func(context.Context) (*TelemetryConfig, error)); ok {
return rf(ctx)
}
if rf, ok := ret.Get(0).(func(context.Context) *TelemetryConfig); ok {
r0 = rf(ctx)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*TelemetryConfig)
}
}
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(ctx)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockClient_FetchTelemetryConfig_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FetchTelemetryConfig'
type MockClient_FetchTelemetryConfig_Call struct {
*mock.Call
}
// FetchTelemetryConfig is a helper method to define mock.On call
// - ctx context.Context
func (_e *MockClient_Expecter) FetchTelemetryConfig(ctx interface{}) *MockClient_FetchTelemetryConfig_Call {
return &MockClient_FetchTelemetryConfig_Call{Call: _e.mock.On("FetchTelemetryConfig", ctx)}
}
func (_c *MockClient_FetchTelemetryConfig_Call) Run(run func(ctx context.Context)) *MockClient_FetchTelemetryConfig_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context))
})
return _c
}
func (_c *MockClient_FetchTelemetryConfig_Call) Return(_a0 *TelemetryConfig, _a1 error) *MockClient_FetchTelemetryConfig_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockClient_FetchTelemetryConfig_Call) RunAndReturn(run func(context.Context) (*TelemetryConfig, error)) *MockClient_FetchTelemetryConfig_Call {
_c.Call.Return(run)
return _c
}
// PushServerStatus provides a mock function with given fields: ctx, status
func (_m *MockClient) PushServerStatus(ctx context.Context, status *ServerStatus) error {
ret := _m.Called(ctx, status)
@ -151,6 +221,11 @@ func (_c *MockClient_PushServerStatus_Call) Return(_a0 error) *MockClient_PushSe
return _c
}
func (_c *MockClient_PushServerStatus_Call) RunAndReturn(run func(context.Context, *ServerStatus) error) *MockClient_PushServerStatus_Call {
_c.Call.Return(run)
return _c
}
type mockConstructorTestingTNewMockClient interface {
mock.TestingT
Cleanup(func())

View File

@ -0,0 +1,47 @@
package client
import (
"crypto/tls"
"net/url"
hcpcfg "github.com/hashicorp/hcp-sdk-go/config"
"github.com/hashicorp/hcp-sdk-go/profile"
"github.com/hashicorp/hcp-sdk-go/resource"
"golang.org/x/oauth2"
)
const testResourceID = "organization/test-org/project/test-project/test-type/test-id"
type mockHCPCfg struct{}
func (m *mockHCPCfg) Token() (*oauth2.Token, error) {
return &oauth2.Token{
AccessToken: "test-token",
}, nil
}
func (m *mockHCPCfg) APITLSConfig() *tls.Config { return nil }
func (m *mockHCPCfg) SCADAAddress() string { return "" }
func (m *mockHCPCfg) SCADATLSConfig() *tls.Config { return &tls.Config{} }
func (m *mockHCPCfg) APIAddress() string { return "" }
func (m *mockHCPCfg) PortalURL() *url.URL { return &url.URL{} }
func (m *mockHCPCfg) Profile() *profile.UserProfile { return nil }
type MockCloudCfg struct {
ConfigErr error
ResourceErr error
}
func (m MockCloudCfg) Resource() (resource.Resource, error) {
r := resource.Resource{
ID: "test-id",
Type: "test-type",
Organization: "test-org",
Project: "test-project",
}
return r, m.ResourceErr
}
func (m MockCloudCfg) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) {
return &mockHCPCfg{}, m.ConfigErr
}

View File

@ -7,6 +7,7 @@ import (
"crypto/tls"
hcpcfg "github.com/hashicorp/hcp-sdk-go/config"
"github.com/hashicorp/hcp-sdk-go/resource"
)
// CloudConfig defines configuration for connecting to HCP services
@ -30,6 +31,10 @@ func (c *CloudConfig) WithTLSConfig(cfg *tls.Config) {
c.TLSConfig = cfg
}
func (c *CloudConfig) Resource() (resource.Resource, error) {
return resource.FromString(c.ResourceID)
}
func (c *CloudConfig) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfig, error) {
if c.TLSConfig == nil {
c.TLSConfig = &tls.Config{}
@ -46,6 +51,6 @@ func (c *CloudConfig) HCPConfig(opts ...hcpcfg.HCPConfigOption) (hcpcfg.HCPConfi
if c.ScadaAddress != "" {
opts = append(opts, hcpcfg.WithSCADA(c.ScadaAddress, c.TLSConfig))
}
opts = append(opts, hcpcfg.FromEnv())
opts = append(opts, hcpcfg.FromEnv(), hcpcfg.WithoutBrowserLogin())
return hcpcfg.NewHCPConfig(opts...)
}

View File

@ -4,23 +4,94 @@
package hcp
import (
"context"
"fmt"
"net/url"
"time"
"github.com/armon/go-metrics"
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/consul/agent/hcp/scada"
"github.com/hashicorp/consul/agent/hcp/telemetry"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-hclog"
)
// Deps contains the interfaces that the rest of Consul core depends on for HCP integration.
type Deps struct {
Client Client
Client hcpclient.Client
Provider scada.Provider
Sink metrics.MetricSink
}
func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (d Deps, err error) {
d.Client, err = NewClient(cfg)
func NewDeps(cfg config.CloudConfig, logger hclog.Logger, nodeID types.NodeID) (Deps, error) {
client, err := hcpclient.NewClient(cfg)
if err != nil {
return
return Deps{}, fmt.Errorf("failed to init client: %w:", err)
}
d.Provider, err = scada.New(cfg, logger.Named("hcp.scada"))
return
provider, err := scada.New(cfg, logger.Named("scada"))
if err != nil {
return Deps{}, fmt.Errorf("failed to init scada: %w", err)
}
sink := sink(client, &cfg, logger.Named("sink"), nodeID)
return Deps{
Client: client,
Provider: provider,
Sink: sink,
}, nil
}
// sink provides initializes an OTELSink which forwards Consul metrics to HCP.
// The sink is only initialized if the server is registered with the management plane (CCM).
// This step should not block server initialization, so errors are logged, but not returned.
func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Logger, nodeID types.NodeID) metrics.MetricSink {
ctx := context.Background()
ctx = hclog.WithContext(ctx, logger)
reqCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
telemetryCfg, err := hcpClient.FetchTelemetryConfig(reqCtx)
if err != nil {
logger.Error("failed to fetch telemetry config", "error", err)
return nil
}
endpoint, isEnabled := telemetryCfg.Enabled()
if !isEnabled {
return nil
}
u, err := url.Parse(endpoint)
if err != nil {
logger.Error("failed to parse url endpoint", "error", err)
return nil
}
metricsClient, err := hcpclient.NewMetricsClient(cfg, ctx)
if err != nil {
logger.Error("failed to init metrics client", "error", err)
return nil
}
sinkOpts := &telemetry.OTELSinkOpts{
Ctx: ctx,
Reader: telemetry.NewOTELReader(metricsClient, u, telemetry.DefaultExportInterval),
Labels: telemetryCfg.DefaultLabels(string(nodeID)),
Filters: telemetryCfg.MetricsConfig.Filters,
}
sink, err := telemetry.NewOTELSink(sinkOpts)
if err != nil {
logger.Error("failed to init OTEL sink", "error", err)
return nil
}
logger.Debug("initialized HCP metrics sink")
return sink
}

106
agent/hcp/deps_test.go Normal file
View File

@ -0,0 +1,106 @@
package hcp
import (
"fmt"
"testing"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/types"
)
func TestSink(t *testing.T) {
t.Parallel()
for name, test := range map[string]struct {
expect func(*client.MockClient)
mockCloudCfg client.CloudConfig
expectedSink bool
}{
"success": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{
Endpoint: "https://test.com",
MetricsConfig: &client.MetricsConfig{
Endpoint: "https://test.com",
},
}, nil)
},
mockCloudCfg: client.MockCloudCfg{},
expectedSink: true,
},
"noSinkWhenServerNotRegisteredWithCCM": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{
Endpoint: "",
MetricsConfig: &client.MetricsConfig{
Endpoint: "",
},
}, nil)
},
mockCloudCfg: client.MockCloudCfg{},
},
"noSinkWhenCCMVerificationFails": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, fmt.Errorf("fetch failed"))
},
mockCloudCfg: client.MockCloudCfg{},
},
"noSinkWhenMetricsClientInitFails": {
mockCloudCfg: client.MockCloudCfg{
ConfigErr: fmt.Errorf("test bad hcp config"),
},
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{
Endpoint: "https://test.com",
MetricsConfig: &client.MetricsConfig{
Endpoint: "",
},
}, nil)
},
},
"failsWithFetchTelemetryFailure": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, fmt.Errorf("FetchTelemetryConfig error"))
},
},
"failsWithURLParseErr": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{
// Minimum 2 chars for a domain to be valid.
Endpoint: "s",
MetricsConfig: &client.MetricsConfig{
// Invalid domain chars
Endpoint: " ",
},
}, nil)
},
},
"noErrWithEmptyEndpoint": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{
Endpoint: "",
MetricsConfig: &client.MetricsConfig{
Endpoint: "",
},
}, nil)
},
},
} {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
c := client.NewMockClient(t)
l := hclog.NewNullLogger()
test.expect(c)
sinkOpts := sink(c, test.mockCloudCfg, l, types.NodeID("server1234"))
if !test.expectedSink {
require.Nil(t, sinkOpts)
return
}
require.NotNil(t, sinkOpts)
})
}
}

View File

@ -9,7 +9,7 @@ import (
"log"
"time"
"github.com/hashicorp/consul/agent/hcp"
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/agent/hcp/config"
)
@ -32,7 +32,7 @@ func (p *Provider) Addrs(args map[string]string, l *log.Logger) ([]string, error
return nil, err
}
client, err := hcp.NewClient(cfg.CloudConfig)
client, err := hcpclient.NewClient(cfg.CloudConfig)
if err != nil {
return nil, err
}

View File

@ -8,6 +8,7 @@ import (
"sync"
"time"
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-hclog"
)
@ -18,7 +19,7 @@ var (
)
type ManagerConfig struct {
Client Client
Client hcpclient.Client
StatusFn StatusCallback
MinInterval time.Duration
@ -47,7 +48,7 @@ func (cfg *ManagerConfig) nextHeartbeat() time.Duration {
return min + lib.RandomStagger(max-min)
}
type StatusCallback func(context.Context) (ServerStatus, error)
type StatusCallback func(context.Context) (hcpclient.ServerStatus, error)
type Manager struct {
logger hclog.Logger

View File

@ -8,6 +8,7 @@ import (
"testing"
"time"
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
@ -15,12 +16,12 @@ import (
)
func TestManager_Run(t *testing.T) {
client := NewMockClient(t)
statusF := func(ctx context.Context) (ServerStatus, error) {
return ServerStatus{ID: t.Name()}, nil
client := hcpclient.NewMockClient(t)
statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) {
return hcpclient.ServerStatus{ID: t.Name()}, nil
}
updateCh := make(chan struct{}, 1)
client.EXPECT().PushServerStatus(mock.Anything, &ServerStatus{ID: t.Name()}).Return(nil).Once()
client.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: t.Name()}).Return(nil).Once()
mgr := NewManager(ManagerConfig{
Client: client,
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
@ -43,14 +44,14 @@ func TestManager_Run(t *testing.T) {
}
func TestManager_SendUpdate(t *testing.T) {
client := NewMockClient(t)
statusF := func(ctx context.Context) (ServerStatus, error) {
return ServerStatus{ID: t.Name()}, nil
client := hcpclient.NewMockClient(t)
statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) {
return hcpclient.ServerStatus{ID: t.Name()}, nil
}
updateCh := make(chan struct{}, 1)
// Expect two calls, once during run startup and again when SendUpdate is called
client.EXPECT().PushServerStatus(mock.Anything, &ServerStatus{ID: t.Name()}).Return(nil).Twice()
client.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: t.Name()}).Return(nil).Twice()
mgr := NewManager(ManagerConfig{
Client: client,
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
@ -73,14 +74,14 @@ func TestManager_SendUpdate(t *testing.T) {
}
func TestManager_SendUpdate_Periodic(t *testing.T) {
client := NewMockClient(t)
statusF := func(ctx context.Context) (ServerStatus, error) {
return ServerStatus{ID: t.Name()}, nil
client := hcpclient.NewMockClient(t)
statusF := func(ctx context.Context) (hcpclient.ServerStatus, error) {
return hcpclient.ServerStatus{ID: t.Name()}, nil
}
updateCh := make(chan struct{}, 1)
// Expect two calls, once during run startup and again when SendUpdate is called
client.EXPECT().PushServerStatus(mock.Anything, &ServerStatus{ID: t.Name()}).Return(nil).Twice()
client.EXPECT().PushServerStatus(mock.Anything, &hcpclient.ServerStatus{ID: t.Name()}).Return(nil).Twice()
mgr := NewManager(ManagerConfig{
Client: client,
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),

View File

@ -0,0 +1,14 @@
package telemetry
// Keys for custom Go Metrics metrics emitted only for the OTEL
// export (exporter.go) and transform (transform.go) failures and successes.
// These enable us to monitor OTEL operations.
var (
internalMetricTransformFailure []string = []string{"hcp", "otel", "transform", "failure"}
internalMetricExportSuccess []string = []string{"hcp", "otel", "exporter", "export", "sucess"}
internalMetricExportFailure []string = []string{"hcp", "otel", "exporter", "export", "failure"}
internalMetricExporterShutdown []string = []string{"hcp", "otel", "exporter", "shutdown"}
internalMetricExporterForceFlush []string = []string{"hcp", "otel", "exporter", "force_flush"}
)

View File

@ -0,0 +1,12 @@
// Package telemetry implements functionality to collect, aggregate, convert and export
// telemetry data in OpenTelemetry Protocol (OTLP) format.
//
// The entrypoint is the OpenTelemetry (OTEL) go-metrics sink which:
// - Receives metric data.
// - Aggregates metric data using the OTEL Go Metrics SDK.
// - Exports metric data using a configurable OTEL exporter.
//
// The package also provides an OTEL exporter implementation to be used within the sink, which:
// - Transforms metric data from the Metrics SDK OTEL representation to OTLP format.
// - Exports OTLP metric data to an external endpoint using a configurable client.
package telemetry

View File

@ -0,0 +1,37 @@
package telemetry
import (
"fmt"
"regexp"
"strings"
"github.com/hashicorp/go-multierror"
)
// newFilterRegex returns a valid regex used to filter metrics.
// It will fail if there are 0 valid regex filters given.
func newFilterRegex(filters []string) (*regexp.Regexp, error) {
var mErr error
validFilters := make([]string, 0, len(filters))
for _, filter := range filters {
_, err := regexp.Compile(filter)
if err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("compilation of filter %q failed: %w", filter, err))
continue
}
validFilters = append(validFilters, filter)
}
if len(validFilters) == 0 {
return nil, multierror.Append(mErr, fmt.Errorf("no valid filters"))
}
// Combine the valid regex strings with an OR.
finalRegex := strings.Join(validFilters, "|")
composedRegex, err := regexp.Compile(finalRegex)
if err != nil {
return nil, fmt.Errorf("failed to compile regex: %w", err)
}
return composedRegex, nil
}

View File

@ -0,0 +1,58 @@
package telemetry
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestFilter(t *testing.T) {
t.Parallel()
for name, tc := range map[string]struct {
filters []string
expectedRegexString string
matches []string
wantErr string
wantMatch bool
}{
"badFilterRegex": {
filters: []string{"(*LF)"},
wantErr: "no valid filters",
},
"failsWithNoRegex": {
filters: []string{},
wantErr: "no valid filters",
},
"matchFound": {
filters: []string{"raft.*", "mem.*"},
expectedRegexString: "raft.*|mem.*",
matches: []string{"consul.raft.peers", "consul.mem.heap_size"},
wantMatch: true,
},
"matchNotFound": {
filters: []string{"mem.*"},
matches: []string{"consul.raft.peers", "consul.txn.apply"},
expectedRegexString: "mem.*",
wantMatch: false,
},
} {
tc := tc
t.Run(name, func(t *testing.T) {
t.Parallel()
f, err := newFilterRegex(tc.filters)
if tc.wantErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tc.wantErr)
return
}
require.NoError(t, err)
require.Equal(t, tc.expectedRegexString, f.String())
for _, metric := range tc.matches {
m := f.MatchString(metric)
require.Equal(t, tc.wantMatch, m)
}
})
}
}

View File

@ -0,0 +1,77 @@
package telemetry
import (
"context"
"sync"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)
// gaugeStore holds last seen Gauge values for a particular metric (<name,last_value>) in the store.
// OTEL does not currently have a synchronous Gauge instrument. Instead, it allows the registration of callbacks.
// The callbacks are called during export, where the Gauge value must be returned.
// This store is a workaround, which holds last seen Gauge values until the callback is called.
type gaugeStore struct {
store map[string]*gaugeValue
mutex sync.Mutex
}
// gaugeValues are the last seen measurement for a Gauge metric, which contains a float64 value and labels.
type gaugeValue struct {
Value float64
Attributes []attribute.KeyValue
}
// NewGaugeStore returns an initialized empty gaugeStore.
func NewGaugeStore() *gaugeStore {
return &gaugeStore{
store: make(map[string]*gaugeValue, 0),
}
}
// LoadAndDelete will read a Gauge value and delete it.
// Once registered for a metric name, a Gauge callback will continue to execute every collection cycel.
// We must delete the value once we have read it, to avoid repeat values being sent.
func (g *gaugeStore) LoadAndDelete(key string) (*gaugeValue, bool) {
g.mutex.Lock()
defer g.mutex.Unlock()
gauge, ok := g.store[key]
if !ok {
return nil, ok
}
delete(g.store, key)
return gauge, ok
}
// Set adds a gaugeValue to the global gauge store.
func (g *gaugeStore) Set(key string, value float64, labels []attribute.KeyValue) {
g.mutex.Lock()
defer g.mutex.Unlock()
gv := &gaugeValue{
Value: value,
Attributes: labels,
}
g.store[key] = gv
}
// gaugeCallback returns a callback which gets called when metrics are collected for export.
func (g *gaugeStore) gaugeCallback(key string) metric.Float64Callback {
// Closures keep a reference to the key string, that get garbage collected when code completes.
return func(ctx context.Context, obs metric.Float64Observer) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
if gauge, ok := g.LoadAndDelete(key); ok {
obs.Observe(gauge.Value, metric.WithAttributes(gauge.Attributes...))
}
return nil
}
}
}

View File

@ -0,0 +1,89 @@
package telemetry
import (
"context"
"fmt"
"sync"
"testing"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
)
func TestGaugeStore(t *testing.T) {
t.Parallel()
gaugeStore := NewGaugeStore()
attributes := []attribute.KeyValue{
{
Key: attribute.Key("test_key"),
Value: attribute.StringValue("test_value"),
},
}
gaugeStore.Set("test", 1.23, attributes)
// Should store a new gauge.
val, ok := gaugeStore.LoadAndDelete("test")
require.True(t, ok)
require.Equal(t, val.Value, 1.23)
require.Equal(t, val.Attributes, attributes)
// Gauge with key "test" have been deleted.
val, ok = gaugeStore.LoadAndDelete("test")
require.False(t, ok)
require.Nil(t, val)
gaugeStore.Set("duplicate", 1.5, nil)
gaugeStore.Set("duplicate", 6.7, nil)
// Gauge with key "duplicate" should hold the latest (last seen) value.
val, ok = gaugeStore.LoadAndDelete("duplicate")
require.True(t, ok)
require.Equal(t, val.Value, 6.7)
}
func TestGaugeCallback_Failure(t *testing.T) {
t.Parallel()
k := "consul.raft.apply"
gaugeStore := NewGaugeStore()
gaugeStore.Set(k, 1.23, nil)
cb := gaugeStore.gaugeCallback(k)
ctx, cancel := context.WithCancel(context.Background())
cancel()
err := cb(ctx, nil)
require.ErrorIs(t, err, context.Canceled)
}
// TestGaugeStore_Race induces a race condition. When run with go test -race,
// this test should pass if implementation is concurrency safe.
func TestGaugeStore_Race(t *testing.T) {
t.Parallel()
gaugeStore := NewGaugeStore()
wg := &sync.WaitGroup{}
samples := 100
errCh := make(chan error, samples)
for i := 0; i < samples; i++ {
wg.Add(1)
key := fmt.Sprintf("consul.test.%d", i)
value := 12.34
go func() {
defer wg.Done()
gaugeStore.Set(key, value, nil)
gv, _ := gaugeStore.LoadAndDelete(key)
if gv.Value != value {
errCh <- fmt.Errorf("expected value: '%f', but got: '%f' for key: '%s'", value, gv.Value, key)
}
}()
}
wg.Wait()
require.Empty(t, errCh)
}

View File

@ -0,0 +1,81 @@
package telemetry
import (
"context"
"fmt"
"net/url"
goMetrics "github.com/armon/go-metrics"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
)
// OTELExporter is a custom implementation of a OTEL Metrics SDK metrics.Exporter.
// The exporter is used by a OTEL Metrics SDK PeriodicReader to export aggregated metrics.
// This allows us to use a custom client - HCP authenticated MetricsClient.
type OTELExporter struct {
client hcpclient.MetricsClient
endpoint *url.URL
}
// NewOTELExporter returns a configured OTELExporter
func NewOTELExporter(client hcpclient.MetricsClient, endpoint *url.URL) *OTELExporter {
return &OTELExporter{
client: client,
endpoint: endpoint,
}
}
// Temporality returns the Cumulative temporality for metrics aggregation.
// Telemetry Gateway stores metrics in Prometheus format, so use Cummulative aggregation as default.
func (e *OTELExporter) Temporality(_ metric.InstrumentKind) metricdata.Temporality {
return metricdata.CumulativeTemporality
}
// Aggregation returns the Aggregation to use for an instrument kind.
// The default implementation provided by the OTEL Metrics SDK library DefaultAggregationSelector panics.
// This custom version replicates that logic, but removes the panic.
func (e *OTELExporter) Aggregation(kind metric.InstrumentKind) aggregation.Aggregation {
switch kind {
case metric.InstrumentKindObservableGauge:
return aggregation.LastValue{}
case metric.InstrumentKindHistogram:
return aggregation.ExplicitBucketHistogram{
Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
NoMinMax: false,
}
}
// for metric.InstrumentKindCounter and others, default to sum.
return aggregation.Sum{}
}
// Export serializes and transmits metric data to a receiver.
func (e *OTELExporter) Export(ctx context.Context, metrics *metricdata.ResourceMetrics) error {
otlpMetrics := transformOTLP(metrics)
if isEmpty(otlpMetrics) {
return nil
}
err := e.client.ExportMetrics(ctx, otlpMetrics, e.endpoint.String())
if err != nil {
goMetrics.IncrCounter(internalMetricExportFailure, 1)
return fmt.Errorf("failed to export metrics: %w", err)
}
goMetrics.IncrCounter(internalMetricExportSuccess, 1)
return nil
}
// ForceFlush is a no-op, as the MetricsClient client holds no state.
func (e *OTELExporter) ForceFlush(ctx context.Context) error {
goMetrics.IncrCounter(internalMetricExporterForceFlush, 1)
return ctx.Err()
}
// Shutdown is a no-op, as the MetricsClient is a HTTP client that requires no graceful shutdown.
func (e *OTELExporter) Shutdown(ctx context.Context) error {
goMetrics.IncrCounter(internalMetricExporterShutdown, 1)
return ctx.Err()
}

View File

@ -0,0 +1,208 @@
package telemetry
import (
"context"
"fmt"
"net/url"
"strings"
"testing"
"time"
"github.com/armon/go-metrics"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
"github.com/hashicorp/consul/agent/hcp/client"
)
type mockMetricsClient struct {
exportErr error
}
func (m *mockMetricsClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error {
return m.exportErr
}
func TestTemporality(t *testing.T) {
t.Parallel()
exp := &OTELExporter{}
require.Equal(t, metricdata.CumulativeTemporality, exp.Temporality(metric.InstrumentKindCounter))
}
func TestAggregation(t *testing.T) {
t.Parallel()
for name, test := range map[string]struct {
kind metric.InstrumentKind
expAgg aggregation.Aggregation
}{
"gauge": {
kind: metric.InstrumentKindObservableGauge,
expAgg: aggregation.LastValue{},
},
"counter": {
kind: metric.InstrumentKindCounter,
expAgg: aggregation.Sum{},
},
"histogram": {
kind: metric.InstrumentKindHistogram,
expAgg: aggregation.ExplicitBucketHistogram{Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, NoMinMax: false},
},
} {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
exp := &OTELExporter{}
require.Equal(t, test.expAgg, exp.Aggregation(test.kind))
})
}
}
func TestExport(t *testing.T) {
t.Parallel()
for name, test := range map[string]struct {
wantErr string
metrics *metricdata.ResourceMetrics
client client.MetricsClient
}{
"earlyReturnWithoutScopeMetrics": {
client: &mockMetricsClient{},
metrics: mutateMetrics(nil),
},
"earlyReturnWithoutMetrics": {
client: &mockMetricsClient{},
metrics: mutateMetrics([]metricdata.ScopeMetrics{
{Metrics: []metricdata.Metrics{}},
},
),
},
"errorWithExportFailure": {
client: &mockMetricsClient{
exportErr: fmt.Errorf("failed to export metrics."),
},
metrics: mutateMetrics([]metricdata.ScopeMetrics{
{
Metrics: []metricdata.Metrics{
{
Name: "consul.raft.commitTime",
Data: metricdata.Gauge[float64]{},
},
},
},
},
),
wantErr: "failed to export metrics",
},
} {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
exp := NewOTELExporter(test.client, &url.URL{})
err := exp.Export(context.Background(), test.metrics)
if test.wantErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), test.wantErr)
return
}
require.NoError(t, err)
})
}
}
// TestExport_CustomMetrics tests that a custom metric (hcp.otel.exporter.*) is emitted
// for exporter operations. This test cannot be run in parallel as the metrics.NewGlobal()
// sets a shared global sink.
func TestExport_CustomMetrics(t *testing.T) {
for name, tc := range map[string]struct {
client client.MetricsClient
metricKey []string
operation string
}{
"exportSuccessEmitsCustomMetric": {
client: &mockMetricsClient{},
metricKey: internalMetricExportSuccess,
operation: "export",
},
"exportFailureEmitsCustomMetric": {
client: &mockMetricsClient{
exportErr: fmt.Errorf("client err"),
},
metricKey: internalMetricExportFailure,
operation: "export",
},
"shutdownEmitsCustomMetric": {
metricKey: internalMetricExporterShutdown,
operation: "shutdown",
},
"forceFlushEmitsCustomMetric": {
metricKey: internalMetricExporterForceFlush,
operation: "flush",
},
} {
t.Run(name, func(t *testing.T) {
// Init global sink.
serviceName := "test.transform"
cfg := metrics.DefaultConfig(serviceName)
cfg.EnableHostname = false
sink := metrics.NewInmemSink(10*time.Second, 10*time.Second)
metrics.NewGlobal(cfg, sink)
// Perform operation that emits metric.
exp := NewOTELExporter(tc.client, &url.URL{})
ctx := context.Background()
switch tc.operation {
case "flush":
exp.ForceFlush(ctx)
case "shutdown":
exp.Shutdown(ctx)
default:
exp.Export(ctx, inputResourceMetrics)
}
// Collect sink metrics.
intervals := sink.Data()
require.Len(t, intervals, 1)
key := serviceName + "." + strings.Join(tc.metricKey, ".")
sv := intervals[0].Counters[key]
// Verify count for transform failure metric.
require.NotNil(t, sv)
require.NotNil(t, sv.AggregateSample)
require.Equal(t, 1, sv.AggregateSample.Count)
})
}
}
func TestForceFlush(t *testing.T) {
t.Parallel()
exp := &OTELExporter{}
ctx, cancel := context.WithCancel(context.Background())
cancel()
err := exp.ForceFlush(ctx)
require.ErrorIs(t, err, context.Canceled)
}
func TestShutdown(t *testing.T) {
t.Parallel()
exp := &OTELExporter{}
ctx, cancel := context.WithCancel(context.Background())
cancel()
err := exp.Shutdown(ctx)
require.ErrorIs(t, err, context.Canceled)
}
func mutateMetrics(m []metricdata.ScopeMetrics) *metricdata.ResourceMetrics {
return &metricdata.ResourceMetrics{
Resource: resource.Empty(),
ScopeMetrics: m,
}
}

View File

@ -0,0 +1,245 @@
package telemetry
import (
"bytes"
"context"
"fmt"
"net/url"
"regexp"
"strings"
"sync"
"time"
gometrics "github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"
"go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/metric"
otelsdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
"github.com/hashicorp/consul/agent/hcp/client"
)
// DefaultExportInterval is a default time interval between export of aggregated metrics.
const DefaultExportInterval = 10 * time.Second
// OTELSinkOpts is used to provide configuration when initializing an OTELSink using NewOTELSink.
type OTELSinkOpts struct {
Reader otelsdk.Reader
Ctx context.Context
Filters []string
Labels map[string]string
}
// OTELSink captures and aggregates telemetry data as per the OpenTelemetry (OTEL) specification.
// Metric data is exported in OpenTelemetry Protocol (OTLP) wire format.
// This should be used as a Go Metrics backend, as it implements the MetricsSink interface.
type OTELSink struct {
// spaceReplacer cleans the flattened key by removing any spaces.
spaceReplacer *strings.Replacer
logger hclog.Logger
filters *regexp.Regexp
// meterProvider is an OTEL MeterProvider, the entrypoint to the OTEL Metrics SDK.
// It handles reading/export of aggregated metric data.
// It enables creation and usage of an OTEL Meter.
meterProvider *otelsdk.MeterProvider
// meter is an OTEL Meter, which enables the creation of OTEL instruments.
meter *otelmetric.Meter
// Instrument stores contain an OTEL Instrument per metric name (<name, instrument>)
// for each gauge, counter and histogram types.
// An instrument allows us to record a measurement for a particular metric, and continuously aggregates metrics.
// We lazy load the creation of these intruments until a metric is seen, and use them repeatedly to record measurements.
gaugeInstruments map[string]otelmetric.Float64ObservableGauge
counterInstruments map[string]otelmetric.Float64Counter
histogramInstruments map[string]otelmetric.Float64Histogram
// gaugeStore is required to hold last-seen values of gauges
// This is a workaround, as OTEL currently does not have synchronous gauge instruments.
// It only allows the registration of "callbacks", which obtain values when the callback is called.
// We must hold gauge values until the callback is called, when the measurement is exported, and can be removed.
gaugeStore *gaugeStore
mutex sync.Mutex
}
// NewOTELReader returns a configured OTEL PeriodicReader to export metrics every X seconds.
// It configures the reader with a custom OTELExporter with a MetricsClient to transform and export
// metrics in OTLP format to an external url.
func NewOTELReader(client client.MetricsClient, url *url.URL, exportInterval time.Duration) otelsdk.Reader {
exporter := NewOTELExporter(client, url)
return otelsdk.NewPeriodicReader(exporter, otelsdk.WithInterval(exportInterval))
}
// NewOTELSink returns a sink which fits the Go Metrics MetricsSink interface.
// It sets up a MeterProvider and Meter, key pieces of the OTEL Metrics SDK which
// enable us to create OTEL Instruments to record measurements.
func NewOTELSink(opts *OTELSinkOpts) (*OTELSink, error) {
if opts.Reader == nil {
return nil, fmt.Errorf("ferror: provide valid reader")
}
if opts.Ctx == nil {
return nil, fmt.Errorf("ferror: provide valid context")
}
logger := hclog.FromContext(opts.Ctx).Named("otel_sink")
filterList, err := newFilterRegex(opts.Filters)
if err != nil {
logger.Error("Failed to initialize all filters", "error", err)
}
attrs := make([]attribute.KeyValue, 0, len(opts.Labels))
for k, v := range opts.Labels {
kv := attribute.KeyValue{
Key: attribute.Key(k),
Value: attribute.StringValue(v),
}
attrs = append(attrs, kv)
}
// Setup OTEL Metrics SDK to aggregate, convert and export metrics periodically.
res := resource.NewWithAttributes("", attrs...)
meterProvider := otelsdk.NewMeterProvider(otelsdk.WithResource(res), otelsdk.WithReader(opts.Reader))
meter := meterProvider.Meter("github.com/hashicorp/consul/agent/hcp/telemetry")
return &OTELSink{
filters: filterList,
spaceReplacer: strings.NewReplacer(" ", "_"),
logger: logger,
meterProvider: meterProvider,
meter: &meter,
gaugeStore: NewGaugeStore(),
gaugeInstruments: make(map[string]otelmetric.Float64ObservableGauge, 0),
counterInstruments: make(map[string]otelmetric.Float64Counter, 0),
histogramInstruments: make(map[string]otelmetric.Float64Histogram, 0),
}, nil
}
// SetGauge emits a Consul gauge metric.
func (o *OTELSink) SetGauge(key []string, val float32) {
o.SetGaugeWithLabels(key, val, nil)
}
// AddSample emits a Consul histogram metric.
func (o *OTELSink) AddSample(key []string, val float32) {
o.AddSampleWithLabels(key, val, nil)
}
// IncrCounter emits a Consul counter metric.
func (o *OTELSink) IncrCounter(key []string, val float32) {
o.IncrCounterWithLabels(key, val, nil)
}
// AddSampleWithLabels emits a Consul gauge metric that gets
// registed by an OpenTelemetry Histogram instrument.
func (o *OTELSink) SetGaugeWithLabels(key []string, val float32, labels []gometrics.Label) {
k := o.flattenKey(key)
if !o.filters.MatchString(k) {
return
}
// Set value in global Gauge store.
o.gaugeStore.Set(k, float64(val), toAttributes(labels))
o.mutex.Lock()
defer o.mutex.Unlock()
// If instrument does not exist, create it and register callback to emit last value in global Gauge store.
if _, ok := o.gaugeInstruments[k]; !ok {
// The registration of a callback only needs to happen once, when the instrument is created.
// The callback will be triggered every export cycle for that metric.
// It must be explicitly de-registered to be removed (which we do not do), to ensure new gauge values are exported every cycle.
inst, err := (*o.meter).Float64ObservableGauge(k, otelmetric.WithFloat64Callback(o.gaugeStore.gaugeCallback(k)))
if err != nil {
o.logger.Error("Failed to create gauge instrument", "error", err)
return
}
o.gaugeInstruments[k] = inst
}
}
// AddSampleWithLabels emits a Consul sample metric that gets registed by an OpenTelemetry Histogram instrument.
func (o *OTELSink) AddSampleWithLabels(key []string, val float32, labels []gometrics.Label) {
k := o.flattenKey(key)
if !o.filters.MatchString(k) {
return
}
o.mutex.Lock()
defer o.mutex.Unlock()
inst, ok := o.histogramInstruments[k]
if !ok {
histogram, err := (*o.meter).Float64Histogram(k)
if err != nil {
o.logger.Error("Failed create histogram instrument", "error", err)
return
}
inst = histogram
o.histogramInstruments[k] = inst
}
attrs := toAttributes(labels)
inst.Record(context.TODO(), float64(val), otelmetric.WithAttributes(attrs...))
}
// IncrCounterWithLabels emits a Consul counter metric that gets registed by an OpenTelemetry Histogram instrument.
func (o *OTELSink) IncrCounterWithLabels(key []string, val float32, labels []gometrics.Label) {
k := o.flattenKey(key)
if !o.filters.MatchString(k) {
return
}
o.mutex.Lock()
defer o.mutex.Unlock()
inst, ok := o.counterInstruments[k]
if !ok {
counter, err := (*o.meter).Float64Counter(k)
if err != nil {
o.logger.Error("Failed to create counter instrument:", "error", err)
return
}
inst = counter
o.counterInstruments[k] = inst
}
attrs := toAttributes(labels)
inst.Add(context.TODO(), float64(val), otelmetric.WithAttributes(attrs...))
}
// EmitKey unsupported.
func (o *OTELSink) EmitKey(key []string, val float32) {}
// flattenKey key along with its labels.
func (o *OTELSink) flattenKey(parts []string) string {
buf := &bytes.Buffer{}
joined := strings.Join(parts, ".")
o.spaceReplacer.WriteString(buf, joined)
return buf.String()
}
// toAttributes converts go metrics Labels into OTEL format []attributes.KeyValue
func toAttributes(labels []gometrics.Label) []attribute.KeyValue {
if len(labels) == 0 {
return nil
}
attrs := make([]attribute.KeyValue, len(labels))
for i, label := range labels {
attrs[i] = attribute.KeyValue{
Key: attribute.Key(label.Name),
Value: attribute.StringValue(label.Value),
}
}
return attrs
}

View File

@ -0,0 +1,409 @@
package telemetry
import (
"context"
"fmt"
"sort"
"strings"
"sync"
"testing"
gometrics "github.com/armon/go-metrics"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
)
var (
expectedResource = resource.NewWithAttributes("", attribute.KeyValue{
Key: attribute.Key("node_id"),
Value: attribute.StringValue("test"),
})
attrs = attribute.NewSet(attribute.KeyValue{
Key: attribute.Key("metric.label"),
Value: attribute.StringValue("test"),
})
expectedSinkMetrics = map[string]metricdata.Metrics{
"consul.raft.leader": {
Name: "consul.raft.leader",
Description: "",
Unit: "",
Data: metricdata.Gauge[float64]{
DataPoints: []metricdata.DataPoint[float64]{
{
Attributes: *attribute.EmptySet(),
Value: float64(float32(0)),
},
},
},
},
"consul.autopilot.healthy": {
Name: "consul.autopilot.healthy",
Description: "",
Unit: "",
Data: metricdata.Gauge[float64]{
DataPoints: []metricdata.DataPoint[float64]{
{
Attributes: attrs,
Value: float64(float32(1.23)),
},
},
},
},
"consul.raft.state.leader": {
Name: "consul.raft.state.leader",
Description: "",
Unit: "",
Data: metricdata.Sum[float64]{
DataPoints: []metricdata.DataPoint[float64]{
{
Attributes: *attribute.EmptySet(),
Value: float64(float32(23.23)),
},
},
},
},
"consul.raft.apply": {
Name: "consul.raft.apply",
Description: "",
Unit: "",
Data: metricdata.Sum[float64]{
DataPoints: []metricdata.DataPoint[float64]{
{
Attributes: attrs,
Value: float64(float32(1.44)),
},
},
},
},
"consul.raft.leader.lastContact": {
Name: "consul.raft.leader.lastContact",
Description: "",
Unit: "",
Data: metricdata.Histogram[float64]{
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: *attribute.EmptySet(),
Count: 1,
Sum: float64(float32(45.32)),
Min: metricdata.NewExtrema(float64(float32(45.32))),
Max: metricdata.NewExtrema(float64(float32(45.32))),
},
},
},
},
"consul.raft.commitTime": {
Name: "consul.raft.commitTime",
Description: "",
Unit: "",
Data: metricdata.Histogram[float64]{
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attrs,
Count: 1,
Sum: float64(float32(26.34)),
Min: metricdata.NewExtrema(float64(float32(26.34))),
Max: metricdata.NewExtrema(float64(float32(26.34))),
},
},
},
},
}
)
func TestNewOTELSink(t *testing.T) {
t.Parallel()
for name, test := range map[string]struct {
wantErr string
opts *OTELSinkOpts
}{
"failsWithEmptyLogger": {
wantErr: "ferror: provide valid context",
opts: &OTELSinkOpts{
Reader: metric.NewManualReader(),
},
},
"failsWithEmptyReader": {
wantErr: "ferror: provide valid reader",
opts: &OTELSinkOpts{
Reader: nil,
Ctx: context.Background(),
},
},
"success": {
opts: &OTELSinkOpts{
Ctx: context.Background(),
Reader: metric.NewManualReader(),
Labels: map[string]string{
"server": "test",
},
Filters: []string{"raft"},
},
},
} {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
sink, err := NewOTELSink(test.opts)
if test.wantErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), test.wantErr)
return
}
require.NotNil(t, sink)
})
}
}
func TestOTELSink(t *testing.T) {
t.Parallel()
// Manual reader outputs the aggregated metrics when reader.Collect is called.
reader := metric.NewManualReader()
ctx := context.Background()
opts := &OTELSinkOpts{
Reader: reader,
Ctx: ctx,
Filters: []string{"raft", "autopilot"},
Labels: map[string]string{
"node_id": "test",
},
}
sink, err := NewOTELSink(opts)
require.NoError(t, err)
labels := []gometrics.Label{
{
Name: "metric.label",
Value: "test",
},
}
sink.SetGauge([]string{"consul", "raft", "leader"}, float32(0))
sink.SetGaugeWithLabels([]string{"consul", "autopilot", "healthy"}, float32(1.23), labels)
sink.IncrCounter([]string{"consul", "raft", "state", "leader"}, float32(23.23))
sink.IncrCounterWithLabels([]string{"consul", "raft", "apply"}, float32(1.44), labels)
sink.AddSample([]string{"consul", "raft", "leader", "lastContact"}, float32(45.32))
sink.AddSampleWithLabels([]string{"consul", "raft", "commitTime"}, float32(26.34), labels)
var collected metricdata.ResourceMetrics
err = reader.Collect(ctx, &collected)
require.NoError(t, err)
isSame(t, expectedSinkMetrics, collected)
}
func TestOTELSink_Race(t *testing.T) {
reader := metric.NewManualReader()
ctx := context.Background()
opts := &OTELSinkOpts{
Ctx: ctx,
Reader: reader,
Labels: map[string]string{
"node_id": "test",
},
Filters: []string{"test"},
}
sink, err := NewOTELSink(opts)
require.NoError(t, err)
samples := 100
expectedMetrics := generateSamples(samples)
wg := &sync.WaitGroup{}
errCh := make(chan error, samples)
for k, v := range expectedMetrics {
wg.Add(1)
go func(k string, v metricdata.Metrics) {
defer wg.Done()
performSinkOperation(sink, k, v, errCh)
}(k, v)
}
wg.Wait()
require.Empty(t, errCh)
var collected metricdata.ResourceMetrics
err = reader.Collect(ctx, &collected)
require.NoError(t, err)
isSame(t, expectedMetrics, collected)
}
// generateSamples generates n of each gauges, counter and histogram measurements to use for test purposes.
func generateSamples(n int) map[string]metricdata.Metrics {
generated := make(map[string]metricdata.Metrics, 3*n)
for i := 0; i < n; i++ {
v := 12.3
k := fmt.Sprintf("consul.test.gauges.%d", i)
generated[k] = metricdata.Metrics{
Name: k,
Data: metricdata.Gauge[float64]{
DataPoints: []metricdata.DataPoint[float64]{
{
Attributes: *attribute.EmptySet(),
Value: float64(float32(v)),
},
},
},
}
}
for i := 0; i < n; i++ {
v := 22.23
k := fmt.Sprintf("consul.test.sum.%d", i)
generated[k] = metricdata.Metrics{
Name: k,
Data: metricdata.Sum[float64]{
DataPoints: []metricdata.DataPoint[float64]{
{
Attributes: *attribute.EmptySet(),
Value: float64(float32(v)),
},
},
},
}
}
for i := 0; i < n; i++ {
v := 13.24
k := fmt.Sprintf("consul.test.hist.%d", i)
generated[k] = metricdata.Metrics{
Name: k,
Data: metricdata.Histogram[float64]{
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: *attribute.EmptySet(),
Sum: float64(float32(v)),
Max: metricdata.NewExtrema(float64(float32(v))),
Min: metricdata.NewExtrema(float64(float32(v))),
Count: 1,
},
},
},
}
}
return generated
}
// performSinkOperation emits a measurement using the OTELSink and calls wg.Done() when completed.
func performSinkOperation(sink *OTELSink, k string, v metricdata.Metrics, errCh chan error) {
key := strings.Split(k, ".")
data := v.Data
switch data.(type) {
case metricdata.Gauge[float64]:
gauge, ok := data.(metricdata.Gauge[float64])
if !ok {
errCh <- fmt.Errorf("unexpected type assertion error for key: %s", key)
}
sink.SetGauge(key, float32(gauge.DataPoints[0].Value))
case metricdata.Sum[float64]:
sum, ok := data.(metricdata.Sum[float64])
if !ok {
errCh <- fmt.Errorf("unexpected type assertion error for key: %s", key)
}
sink.IncrCounter(key, float32(sum.DataPoints[0].Value))
case metricdata.Histogram[float64]:
hist, ok := data.(metricdata.Histogram[float64])
if !ok {
errCh <- fmt.Errorf("unexpected type assertion error for key: %s", key)
}
sink.AddSample(key, float32(hist.DataPoints[0].Sum))
}
}
func isSame(t *testing.T, expectedMap map[string]metricdata.Metrics, actual metricdata.ResourceMetrics) {
// Validate resource
require.Equal(t, expectedResource, actual.Resource)
// Validate Metrics
require.NotEmpty(t, actual.ScopeMetrics)
actualMetrics := actual.ScopeMetrics[0].Metrics
require.Equal(t, len(expectedMap), len(actualMetrics))
for _, actual := range actualMetrics {
name := actual.Name
expected, ok := expectedMap[actual.Name]
require.True(t, ok, "metric key %s should be in expectedMetrics map", name)
isSameMetrics(t, expected, actual)
}
}
// compareMetrics verifies if two metricdata.Metric objects are equal by ignoring the time component.
// avoid duplicate datapoint values to ensure predictable order of sort.
func isSameMetrics(t *testing.T, expected metricdata.Metrics, actual metricdata.Metrics) {
require.Equal(t, expected.Name, actual.Name, "different .Name field")
require.Equal(t, expected.Description, actual.Description, "different .Description field")
require.Equal(t, expected.Unit, actual.Unit, "different .Unit field")
switch expectedData := expected.Data.(type) {
case metricdata.Gauge[float64]:
actualData, ok := actual.Data.(metricdata.Gauge[float64])
require.True(t, ok, "different metric types: expected metricdata.Gauge[float64]")
isSameDataPoint(t, expectedData.DataPoints, actualData.DataPoints)
case metricdata.Sum[float64]:
actualData, ok := actual.Data.(metricdata.Sum[float64])
require.True(t, ok, "different metric types: expected metricdata.Sum[float64]")
isSameDataPoint(t, expectedData.DataPoints, actualData.DataPoints)
case metricdata.Histogram[float64]:
actualData, ok := actual.Data.(metricdata.Histogram[float64])
require.True(t, ok, "different metric types: expected metricdata.Histogram")
isSameHistogramData(t, expectedData.DataPoints, actualData.DataPoints)
}
}
func isSameDataPoint(t *testing.T, expected []metricdata.DataPoint[float64], actual []metricdata.DataPoint[float64]) {
require.Equal(t, len(expected), len(actual), "different datapoints length")
// Sort for predictable data in order of lowest value.
sort.Slice(expected, func(i, j int) bool {
return expected[i].Value < expected[j].Value
})
sort.Slice(actual, func(i, j int) bool {
return expected[i].Value < expected[j].Value
})
// Only verify the value and attributes.
for i, dp := range expected {
currActual := actual[i]
require.Equal(t, dp.Value, currActual.Value, "different datapoint value")
require.Equal(t, dp.Attributes, currActual.Attributes, "different attributes")
}
}
func isSameHistogramData(t *testing.T, expected []metricdata.HistogramDataPoint[float64], actual []metricdata.HistogramDataPoint[float64]) {
require.Equal(t, len(expected), len(actual), "different histogram datapoint length")
// Sort for predictable data in order of lowest sum.
sort.Slice(expected, func(i, j int) bool {
return expected[i].Sum < expected[j].Sum
})
sort.Slice(actual, func(i, j int) bool {
return expected[i].Sum < expected[j].Sum
})
// Only verify the value and the attributes.
for i, dp := range expected {
currActual := actual[i]
require.Equal(t, dp.Sum, currActual.Sum, "different histogram datapoint .Sum value")
require.Equal(t, dp.Max, currActual.Max, "different histogram datapoint .Max value")
require.Equal(t, dp.Min, currActual.Min, "different histogram datapoint .Min value")
require.Equal(t, dp.Count, currActual.Count, "different histogram datapoint .Count value")
require.Equal(t, dp.Attributes, currActual.Attributes, "different attributes")
}
}

View File

@ -0,0 +1,186 @@
package telemetry
import (
"errors"
"fmt"
goMetrics "github.com/armon/go-metrics"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
cpb "go.opentelemetry.io/proto/otlp/common/v1"
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
rpb "go.opentelemetry.io/proto/otlp/resource/v1"
)
var (
aggregationErr = errors.New("unsupported aggregation")
temporalityErr = errors.New("unsupported temporality")
)
// isEmpty verifies if the given OTLP protobuf metrics contains metric data.
// isEmpty returns true if no ScopeMetrics exist or all metrics within ScopeMetrics are empty.
func isEmpty(rm *mpb.ResourceMetrics) bool {
// No ScopeMetrics
if len(rm.ScopeMetrics) == 0 {
return true
}
// If any inner metrics contain data, return false.
for _, v := range rm.ScopeMetrics {
if len(v.Metrics) != 0 {
return false
}
}
// All inner metrics are empty.
return true
}
// TransformOTLP returns an OTLP ResourceMetrics generated from OTEL metrics. If rm
// contains invalid ScopeMetrics, an error will be returned along with an OTLP
// ResourceMetrics that contains partial OTLP ScopeMetrics.
func transformOTLP(rm *metricdata.ResourceMetrics) *mpb.ResourceMetrics {
sms := scopeMetricsToPB(rm.ScopeMetrics)
return &mpb.ResourceMetrics{
Resource: &rpb.Resource{
Attributes: attributesToPB(rm.Resource.Iter()),
},
ScopeMetrics: sms,
}
}
// scopeMetrics returns a slice of OTLP ScopeMetrics.
func scopeMetricsToPB(scopeMetrics []metricdata.ScopeMetrics) []*mpb.ScopeMetrics {
out := make([]*mpb.ScopeMetrics, 0, len(scopeMetrics))
for _, sm := range scopeMetrics {
ms := metricsToPB(sm.Metrics)
out = append(out, &mpb.ScopeMetrics{
Scope: &cpb.InstrumentationScope{
Name: sm.Scope.Name,
Version: sm.Scope.Version,
},
Metrics: ms,
})
}
return out
}
// metrics returns a slice of OTLP Metric generated from OTEL metrics sdk ones.
func metricsToPB(metrics []metricdata.Metrics) []*mpb.Metric {
out := make([]*mpb.Metric, 0, len(metrics))
for _, m := range metrics {
o, err := metricTypeToPB(m)
if err != nil {
goMetrics.IncrCounter(internalMetricTransformFailure, 1)
continue
}
out = append(out, o)
}
return out
}
// metricType identifies the instrument type and converts it to OTLP format.
// only float64 values are accepted since the go metrics sink only receives float64 values.
func metricTypeToPB(m metricdata.Metrics) (*mpb.Metric, error) {
out := &mpb.Metric{
Name: m.Name,
Description: m.Description,
Unit: m.Unit,
}
switch a := m.Data.(type) {
case metricdata.Gauge[float64]:
out.Data = &mpb.Metric_Gauge{
Gauge: &mpb.Gauge{
DataPoints: dataPointsToPB(a.DataPoints),
},
}
case metricdata.Sum[float64]:
if a.Temporality != metricdata.CumulativeTemporality {
return out, fmt.Errorf("error: %w: %T", temporalityErr, a)
}
out.Data = &mpb.Metric_Sum{
Sum: &mpb.Sum{
AggregationTemporality: mpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE,
IsMonotonic: a.IsMonotonic,
DataPoints: dataPointsToPB(a.DataPoints),
},
}
case metricdata.Histogram[float64]:
if a.Temporality != metricdata.CumulativeTemporality {
return out, fmt.Errorf("error: %w: %T", temporalityErr, a)
}
out.Data = &mpb.Metric_Histogram{
Histogram: &mpb.Histogram{
AggregationTemporality: mpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE,
DataPoints: histogramDataPointsToPB(a.DataPoints),
},
}
default:
return out, fmt.Errorf("error: %w: %T", aggregationErr, a)
}
return out, nil
}
// DataPoints returns a slice of OTLP NumberDataPoint generated from OTEL metrics sdk ones.
func dataPointsToPB(dataPoints []metricdata.DataPoint[float64]) []*mpb.NumberDataPoint {
out := make([]*mpb.NumberDataPoint, 0, len(dataPoints))
for _, dp := range dataPoints {
ndp := &mpb.NumberDataPoint{
Attributes: attributesToPB(dp.Attributes.Iter()),
StartTimeUnixNano: uint64(dp.StartTime.UnixNano()),
TimeUnixNano: uint64(dp.Time.UnixNano()),
}
ndp.Value = &mpb.NumberDataPoint_AsDouble{
AsDouble: dp.Value,
}
out = append(out, ndp)
}
return out
}
// HistogramDataPoints returns a slice of OTLP HistogramDataPoint from OTEL metrics sdk ones.
func histogramDataPointsToPB(dataPoints []metricdata.HistogramDataPoint[float64]) []*mpb.HistogramDataPoint {
out := make([]*mpb.HistogramDataPoint, 0, len(dataPoints))
for _, dp := range dataPoints {
sum := dp.Sum
hdp := &mpb.HistogramDataPoint{
Attributes: attributesToPB(dp.Attributes.Iter()),
StartTimeUnixNano: uint64(dp.StartTime.UnixNano()),
TimeUnixNano: uint64(dp.Time.UnixNano()),
Count: dp.Count,
Sum: &sum,
BucketCounts: dp.BucketCounts,
ExplicitBounds: dp.Bounds,
}
if v, ok := dp.Min.Value(); ok {
hdp.Min = &v
}
if v, ok := dp.Max.Value(); ok {
hdp.Max = &v
}
out = append(out, hdp)
}
return out
}
// attributes transforms items of an attribute iterator into OTLP key-values.
// Currently, labels are only <string, string> key-value pairs.
func attributesToPB(iter attribute.Iterator) []*cpb.KeyValue {
l := iter.Len()
if iter.Len() == 0 {
return nil
}
out := make([]*cpb.KeyValue, 0, l)
for iter.Next() {
kv := iter.Attribute()
av := &cpb.AnyValue{
Value: &cpb.AnyValue_StringValue{
StringValue: kv.Value.AsString(),
},
}
out = append(out, &cpb.KeyValue{Key: string(kv.Key), Value: av})
}
return out
}

View File

@ -0,0 +1,342 @@
package telemetry
import (
"strings"
"testing"
"time"
"github.com/armon/go-metrics"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
cpb "go.opentelemetry.io/proto/otlp/common/v1"
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
rpb "go.opentelemetry.io/proto/otlp/resource/v1"
)
var (
// Common attributes for test cases.
start = time.Date(2000, time.January, 01, 0, 0, 0, 0, time.FixedZone("GMT", 0))
end = start.Add(30 * time.Second)
alice = attribute.NewSet(attribute.String("user", "alice"))
bob = attribute.NewSet(attribute.String("user", "bob"))
pbAlice = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{
Value: &cpb.AnyValue_StringValue{StringValue: "alice"},
}}
pbBob = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{
Value: &cpb.AnyValue_StringValue{StringValue: "bob"},
}}
// DataPoint test case : Histogram Datapoints (Histogram)
minA, maxA, sumA = 2.0, 4.0, 90.0
minB, maxB, sumB = 4.0, 150.0, 234.0
inputHDP = []metricdata.HistogramDataPoint[float64]{{
Attributes: alice,
StartTime: start,
Time: end,
Count: 30,
Bounds: []float64{1, 5},
BucketCounts: []uint64{0, 30, 0},
Min: metricdata.NewExtrema(minA),
Max: metricdata.NewExtrema(maxA),
Sum: sumA,
}, {
Attributes: bob,
StartTime: start,
Time: end,
Count: 3,
Bounds: []float64{1, 5},
BucketCounts: []uint64{0, 1, 2},
Min: metricdata.NewExtrema(minB),
Max: metricdata.NewExtrema(maxB),
Sum: sumB,
}}
expectedHDP = []*mpb.HistogramDataPoint{{
Attributes: []*cpb.KeyValue{pbAlice},
StartTimeUnixNano: uint64(start.UnixNano()),
TimeUnixNano: uint64(end.UnixNano()),
Count: 30,
Sum: &sumA,
ExplicitBounds: []float64{1, 5},
BucketCounts: []uint64{0, 30, 0},
Min: &minA,
Max: &maxA,
}, {
Attributes: []*cpb.KeyValue{pbBob},
StartTimeUnixNano: uint64(start.UnixNano()),
TimeUnixNano: uint64(end.UnixNano()),
Count: 3,
Sum: &sumB,
ExplicitBounds: []float64{1, 5},
BucketCounts: []uint64{0, 1, 2},
Min: &minB,
Max: &maxB,
}}
// DataPoint test case : Number Datapoints (Gauge / Counter)
inputDP = []metricdata.DataPoint[float64]{
{Attributes: alice, StartTime: start, Time: end, Value: 1.0},
{Attributes: bob, StartTime: start, Time: end, Value: 2.0},
}
expectedDP = []*mpb.NumberDataPoint{
{
Attributes: []*cpb.KeyValue{pbAlice},
StartTimeUnixNano: uint64(start.UnixNano()),
TimeUnixNano: uint64(end.UnixNano()),
Value: &mpb.NumberDataPoint_AsDouble{AsDouble: 1.0},
},
{
Attributes: []*cpb.KeyValue{pbBob},
StartTimeUnixNano: uint64(start.UnixNano()),
TimeUnixNano: uint64(end.UnixNano()),
Value: &mpb.NumberDataPoint_AsDouble{AsDouble: 2.0},
},
}
invalidSumTemporality = metricdata.Metrics{
Name: "invalid-sum",
Description: "Sum with invalid temporality",
Unit: "1",
Data: metricdata.Sum[float64]{
Temporality: metricdata.DeltaTemporality,
IsMonotonic: false,
DataPoints: inputDP,
},
}
invalidSumAgg = metricdata.Metrics{
Name: "unknown",
Description: "Unknown aggregation",
Unit: "1",
Data: metricdata.Sum[int64]{},
}
invalidHistTemporality = metricdata.Metrics{
Name: "invalid-histogram",
Description: "Invalid histogram",
Unit: "1",
Data: metricdata.Histogram[float64]{
Temporality: metricdata.DeltaTemporality,
DataPoints: inputHDP,
},
}
validFloat64Gauge = metricdata.Metrics{
Name: "float64-gauge",
Description: "Gauge with float64 values",
Unit: "1",
Data: metricdata.Gauge[float64]{DataPoints: inputDP},
}
validFloat64Sum = metricdata.Metrics{
Name: "float64-sum",
Description: "Sum with float64 values",
Unit: "1",
Data: metricdata.Sum[float64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: false,
DataPoints: inputDP,
},
}
validFloat64Hist = metricdata.Metrics{
Name: "float64-histogram",
Description: "Histogram",
Unit: "1",
Data: metricdata.Histogram[float64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: inputHDP,
},
}
// Metrics Test Case
// - 3 invalid metrics and 3 Valid to test filtering
// - 1 invalid metric type
// - 2 invalid cummulative temporalities (only cummulative supported)
// - 3 types (Gauge, Counter, and Histogram) supported
inputMetrics = []metricdata.Metrics{
validFloat64Gauge,
validFloat64Sum,
validFloat64Hist,
invalidSumTemporality,
invalidHistTemporality,
invalidSumAgg,
}
expectedMetrics = []*mpb.Metric{
{
Name: "float64-gauge",
Description: "Gauge with float64 values",
Unit: "1",
Data: &mpb.Metric_Gauge{Gauge: &mpb.Gauge{DataPoints: expectedDP}},
},
{
Name: "float64-sum",
Description: "Sum with float64 values",
Unit: "1",
Data: &mpb.Metric_Sum{Sum: &mpb.Sum{
AggregationTemporality: mpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE,
IsMonotonic: false,
DataPoints: expectedDP,
}},
},
{
Name: "float64-histogram",
Description: "Histogram",
Unit: "1",
Data: &mpb.Metric_Histogram{Histogram: &mpb.Histogram{
AggregationTemporality: mpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE,
DataPoints: expectedHDP,
}},
},
}
// ScopeMetrics Test Cases
inputScopeMetrics = []metricdata.ScopeMetrics{{
Scope: instrumentation.Scope{
Name: "test/code/path",
Version: "v0.1.0",
},
Metrics: inputMetrics,
}}
expectedScopeMetrics = []*mpb.ScopeMetrics{{
Scope: &cpb.InstrumentationScope{
Name: "test/code/path",
Version: "v0.1.0",
},
Metrics: expectedMetrics,
}}
// ResourceMetrics Test Cases
inputResourceMetrics = &metricdata.ResourceMetrics{
Resource: resource.NewSchemaless(
semconv.ServiceName("test server"),
semconv.ServiceVersion("v0.1.0"),
),
ScopeMetrics: inputScopeMetrics,
}
expectedResourceMetrics = &mpb.ResourceMetrics{
Resource: &rpb.Resource{
Attributes: []*cpb.KeyValue{
{
Key: "service.name",
Value: &cpb.AnyValue{
Value: &cpb.AnyValue_StringValue{StringValue: "test server"},
},
},
{
Key: "service.version",
Value: &cpb.AnyValue{
Value: &cpb.AnyValue_StringValue{StringValue: "v0.1.0"},
},
},
},
},
ScopeMetrics: expectedScopeMetrics,
}
)
// TestTransformOTLP runs tests from the "bottom-up" of the metricdata data types.
func TestTransformOTLP(t *testing.T) {
t.Parallel()
// Histogram DataPoint Test Case (Histograms)
assert.Equal(t, expectedHDP, histogramDataPointsToPB(inputHDP))
// Number DataPoint Test Case (Counters / Gauges)
require.Equal(t, expectedDP, dataPointsToPB(inputDP))
// MetricType Error Test Cases
_, err := metricTypeToPB(invalidHistTemporality)
require.Error(t, err)
require.ErrorIs(t, err, temporalityErr)
_, err = metricTypeToPB(invalidSumTemporality)
require.Error(t, err)
require.ErrorIs(t, err, temporalityErr)
_, err = metricTypeToPB(invalidSumAgg)
require.Error(t, err)
require.ErrorIs(t, err, aggregationErr)
// Metrics Test Case
m := metricsToPB(inputMetrics)
require.Equal(t, expectedMetrics, m)
require.Equal(t, len(expectedMetrics), 3)
// Scope Metrics Test Case
sm := scopeMetricsToPB(inputScopeMetrics)
require.Equal(t, expectedScopeMetrics, sm)
// // Resource Metrics Test Case
rm := transformOTLP(inputResourceMetrics)
require.Equal(t, expectedResourceMetrics, rm)
}
// TestTransformOTLP_CustomMetrics tests that a custom metric (hcp.otel.transform.failure) is emitted
// when transform fails. This test cannot be run in parallel as the metrics.NewGlobal()
// sets a shared global sink.
func TestTransformOTLP_CustomMetrics(t *testing.T) {
for name, tc := range map[string]struct {
inputRM *metricdata.ResourceMetrics
expectedMetricCount int
}{
"successNoMetric": {
inputRM: &metricdata.ResourceMetrics{
// 3 valid metrics.
ScopeMetrics: []metricdata.ScopeMetrics{
{
Metrics: []metricdata.Metrics{
validFloat64Gauge,
validFloat64Hist,
validFloat64Sum,
},
},
},
},
},
"failureEmitsMetric": {
// inputScopeMetrics contains 3 bad metrics.
inputRM: inputResourceMetrics,
expectedMetricCount: 3,
},
} {
tc := tc
t.Run(name, func(t *testing.T) {
// Init global sink.
serviceName := "test.transform"
cfg := metrics.DefaultConfig(serviceName)
cfg.EnableHostname = false
sink := metrics.NewInmemSink(10*time.Second, 10*time.Second)
metrics.NewGlobal(cfg, sink)
// Perform operation that emits metric.
transformOTLP(tc.inputRM)
// Collect sink metrics.
intervals := sink.Data()
require.Len(t, intervals, 1)
key := serviceName + "." + strings.Join(internalMetricTransformFailure, ".")
sv := intervals[0].Counters[key]
if tc.expectedMetricCount == 0 {
require.Empty(t, sv)
return
}
// Verify count for transform failure metric.
require.NotNil(t, sv)
require.NotNil(t, sv.AggregateSample)
require.Equal(t, 3, sv.AggregateSample.Count)
})
}
}

View File

@ -10,6 +10,7 @@ import (
"sync"
"time"
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-hclog"
wal "github.com/hashicorp/raft-wal"
@ -101,7 +102,18 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer, providedLogger hcl
cfg.Telemetry.PrometheusOpts.CounterDefinitions = counters
cfg.Telemetry.PrometheusOpts.SummaryDefinitions = summaries
d.MetricsConfig, err = lib.InitTelemetry(cfg.Telemetry, d.Logger)
var extraSinks []metrics.MetricSink
if cfg.IsCloudEnabled() {
d.HCP, err = hcp.NewDeps(cfg.Cloud, d.Logger.Named("hcp"), cfg.NodeID)
if err != nil {
return d, err
}
if d.HCP.Sink != nil {
extraSinks = append(extraSinks, d.HCP.Sink)
}
}
d.MetricsConfig, err = lib.InitTelemetry(cfg.Telemetry, d.Logger, extraSinks...)
if err != nil {
return d, fmt.Errorf("failed to initialize telemetry: %w", err)
}
@ -192,12 +204,6 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer, providedLogger hcl
d.EventPublisher = stream.NewEventPublisher(10 * time.Second)
d.XDSStreamLimiter = limiter.NewSessionLimiter()
if cfg.IsCloudEnabled() {
d.HCP, err = hcp.NewDeps(cfg.Cloud, d.Logger)
if err != nil {
return d, err
}
}
return d, nil
}

View File

@ -21,8 +21,8 @@ import (
"github.com/hashicorp/consul/agent"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/hcp"
hcpbootstrap "github.com/hashicorp/consul/agent/hcp/bootstrap"
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/command/cli"
"github.com/hashicorp/consul/command/flags"
"github.com/hashicorp/consul/lib"
@ -169,7 +169,7 @@ func (c *cmd) run(args []string) int {
return 1
}
if res.RuntimeConfig.IsCloudEnabled() {
client, err := hcp.NewClient(res.RuntimeConfig.Cloud)
client, err := hcpclient.NewClient(res.RuntimeConfig.Cloud)
if err != nil {
ui.Error("error building HCP HTTP client: " + err.Error())
return 1

20
go.mod
View File

@ -53,6 +53,7 @@ require (
github.com/hashicorp/go-memdb v1.3.4
github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/go-raftchunking v0.7.0
github.com/hashicorp/go-retryablehttp v0.6.7
github.com/hashicorp/go-secure-stdlib/awsutil v0.1.6
github.com/hashicorp/go-sockaddr v1.0.2
github.com/hashicorp/go-syslog v1.0.0
@ -61,7 +62,7 @@ require (
github.com/hashicorp/golang-lru v0.5.4
github.com/hashicorp/hcl v1.0.0
github.com/hashicorp/hcp-scada-provider v0.2.3
github.com/hashicorp/hcp-sdk-go v0.44.1-0.20230508124639-28da4c5b03f3
github.com/hashicorp/hcp-sdk-go v0.48.0
github.com/hashicorp/hil v0.0.0-20200423225030-a18a1cd20038
github.com/hashicorp/memberlist v0.5.0
github.com/hashicorp/raft v1.5.0
@ -93,14 +94,19 @@ require (
github.com/rboyer/safeio v0.2.1
github.com/ryanuber/columnize v2.1.2+incompatible
github.com/shirou/gopsutil/v3 v3.22.8
github.com/stretchr/testify v1.8.2
github.com/stretchr/testify v1.8.3
go.etcd.io/bbolt v1.3.6
go.opentelemetry.io/otel v1.16.0
go.opentelemetry.io/otel/metric v1.16.0
go.opentelemetry.io/otel/sdk v1.16.0
go.opentelemetry.io/otel/sdk/metric v0.39.0
go.opentelemetry.io/proto/otlp v0.19.0
go.uber.org/goleak v1.1.10
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d
golang.org/x/net v0.8.0
golang.org/x/oauth2 v0.6.0
golang.org/x/sync v0.1.0
golang.org/x/sys v0.6.0
golang.org/x/sys v0.8.0
golang.org/x/time v0.3.0
google.golang.org/genproto v0.0.0-20220921223823-23cae91e6737
google.golang.org/grpc v1.49.0
@ -147,7 +153,7 @@ require (
github.com/dimchansky/utfbom v1.1.0 // indirect
github.com/envoyproxy/protoc-gen-validate v0.1.0 // indirect
github.com/form3tech-oss/jwt-go v3.2.2+incompatible // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-openapi/analysis v0.21.4 // indirect
@ -167,11 +173,11 @@ require (
github.com/googleapis/gax-go/v2 v2.1.0 // indirect
github.com/googleapis/gnostic v0.2.0 // indirect
github.com/gophercloud/gophercloud v0.3.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/hashicorp/go-msgpack/v2 v2.0.0 // indirect
github.com/hashicorp/go-plugin v1.4.5 // indirect
github.com/hashicorp/go-retryablehttp v0.6.7 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/go-secure-stdlib/mlock v0.1.1 // indirect
github.com/hashicorp/go-secure-stdlib/parseutil v0.1.6 // indirect
@ -225,9 +231,7 @@ require (
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.mongodb.org/mongo-driver v1.11.0 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/otel v1.11.1 // indirect
go.opentelemetry.io/otel/trace v1.11.1 // indirect
go.opentelemetry.io/proto/otlp v0.7.0 // indirect
go.opentelemetry.io/otel/trace v1.16.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect

42
go.sum
View File

@ -191,8 +191,10 @@ github.com/cloudflare/cloudflare-go v0.10.2/go.mod h1:qhVI5MKwBGhdNU89ZRz2plgYut
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI=
github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1 h1:zH8ljVhhq7yC0MIeUL/IviMtY8hx2mK8cN9wEYb8ggw=
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
@ -309,8 +311,8 @@ github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
@ -394,6 +396,8 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptGaCkuDUx6wNykzzlUixGxvkme+H/lnzb+A=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ=
github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4=
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
@ -515,6 +519,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.8.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 h1:BZHcxBETFHIdVyhyEfOvn/RdU/QGdLI4y34qQGjGWO0=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks=
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw=
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI=
github.com/hashicorp/consul-awsauth v0.0.0-20220713182709-05ac1c5c2706 h1:1ZEjnveDe20yFa6lSkfdQZm5BR/b271n0MsB5R2L3us=
@ -604,8 +610,8 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hashicorp/hcp-scada-provider v0.2.3 h1:AarYR+/Pcv+cMvPdAlb92uOBmZfEH6ny4+DT+4NY2VQ=
github.com/hashicorp/hcp-scada-provider v0.2.3/go.mod h1:ZFTgGwkzNv99PLQjTsulzaCplCzOTBh0IUQsPKzrQFo=
github.com/hashicorp/hcp-sdk-go v0.44.1-0.20230508124639-28da4c5b03f3 h1:9QstZdsLIS6iPyYxQoyymRz8nBw9jMdEbGy29gtgzVQ=
github.com/hashicorp/hcp-sdk-go v0.44.1-0.20230508124639-28da4c5b03f3/go.mod h1:hZqky4HEzsKwvLOt4QJlZUrjeQmb4UCZUhDP2HyQFfc=
github.com/hashicorp/hcp-sdk-go v0.48.0 h1:LWpFR7YVDz4uG4C/ixcy2tRbg7/BgjMcTh1bRkKaeBQ=
github.com/hashicorp/hcp-sdk-go v0.48.0/go.mod h1:hZqky4HEzsKwvLOt4QJlZUrjeQmb4UCZUhDP2HyQFfc=
github.com/hashicorp/hil v0.0.0-20200423225030-a18a1cd20038 h1:n9J0rwVWXDpNd5iZnwY7w4WZyq53/rROeI7OVvLW8Ok=
github.com/hashicorp/hil v0.0.0-20200423225030-a18a1cd20038/go.mod h1:n2TSygSNwsLJ76m8qFXTSc7beTb+auJxYdqrnoqwZWE=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
@ -1005,8 +1011,9 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY=
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/tencentcloud/tencentcloud-sdk-go v1.0.162 h1:8fDzz4GuVg4skjY2B0nMN7h6uN61EDVkuLyI2+qGHhI=
github.com/tencentcloud/tencentcloud-sdk-go v1.0.162/go.mod h1:asUz5BPXxgoPGaRgZaVm1iGcUAuHyYUo1nXqKa83cvI=
github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
@ -1073,13 +1080,19 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opentelemetry.io/otel v1.11.1 h1:4WLLAmcfkmDk2ukNXJyq3/kiz/3UzCaYq6PskJsaou4=
go.opentelemetry.io/otel v1.11.1/go.mod h1:1nNhXBbWSD0nsL38H6btgnFN2k4i0sNLHNNMZMSbUGE=
go.opentelemetry.io/otel/sdk v1.11.1 h1:F7KmQgoHljhUuJyA+9BiU+EkJfyX5nVVF4wyzWZpKxs=
go.opentelemetry.io/otel/trace v1.11.1 h1:ofxdnzsNrGBYXbP7t7zpUK281+go5rF7dvdIZXF8gdQ=
go.opentelemetry.io/otel/trace v1.11.1/go.mod h1:f/Q9G7vzk5u91PhbmKbg1Qn0rzH1LJ4vbPHFGkTPtOk=
go.opentelemetry.io/proto/otlp v0.7.0 h1:rwOQPCuKAKmwGKq2aVNnYIibI6wnV7EvzgfTCzcdGg8=
go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s=
go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4=
go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo=
go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4=
go.opentelemetry.io/otel/sdk v1.16.0 h1:Z1Ok1YsijYL0CSJpHt4cS3wDDh7p572grzNrBMiMWgE=
go.opentelemetry.io/otel/sdk v1.16.0/go.mod h1:tMsIuKXuuIWPBAOrH+eHtvhTL+SntFtXF9QD68aP6p4=
go.opentelemetry.io/otel/sdk/metric v0.39.0 h1:Kun8i1eYf48kHH83RucG93ffz0zGV1sh46FAScOTuDI=
go.opentelemetry.io/otel/sdk/metric v0.39.0/go.mod h1:piDIRgjcK7u0HCL5pCA4e74qpK/jk3NiUoAHATVAmiI=
go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs=
go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw=
go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
@ -1230,6 +1243,7 @@ golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ
golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc=
golang.org/x/oauth2 v0.6.0 h1:Lh8GPgSKBfWSwFvtuWOfeI3aAAnbXTSutYxJiOJFgIw=
golang.org/x/oauth2 v0.6.0/go.mod h1:ycmewcwgD4Rpr3eZJLSB4Kyyljb3qDh40vJ8STE5HKw=
@ -1340,8 +1354,8 @@ golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.6.0 h1:clScbb1cHjoCkyRbWwBEUZ5H/tIFu5TAXIqaZD0Gcjw=
@ -1537,6 +1551,7 @@ google.golang.org/genproto v0.0.0-20210828152312-66f60bf46e71/go.mod h1:eFjDcFEc
google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto v0.0.0-20210903162649-d08c68adba83/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto v0.0.0-20210924002016-3dee208752a0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20220921223823-23cae91e6737 h1:K1zaaMdYBXRyX+cwFnxj7M6zwDyumLQMZ5xqwGvjreQ=
google.golang.org/genproto v0.0.0-20220921223823-23cae91e6737/go.mod h1:2r/26NEF3bFmT3eC3aZreahSal0C3Shl8Gi6vyDYqOQ=
google.golang.org/grpc v1.8.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
@ -1570,6 +1585,7 @@ google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnD
google.golang.org/grpc v1.39.1/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE=
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
google.golang.org/grpc v1.41.0/go.mod h1:U3l9uK9J0sini8mHphKoXyaqDA/8VyGnDee1zzIUK6k=
google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
google.golang.org/grpc v1.49.0 h1:WTLtQzmQori5FUH25Pq4WT22oCsv8USpQ+F6rqtsmxw=
google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=

View File

@ -324,7 +324,7 @@ func circonusSink(cfg TelemetryConfig, _ string) (metrics.MetricSink, error) {
return sink, nil
}
func configureSinks(cfg TelemetryConfig, memSink metrics.MetricSink) (metrics.FanoutSink, error) {
func configureSinks(cfg TelemetryConfig, memSink metrics.MetricSink, extraSinks []metrics.MetricSink) (metrics.FanoutSink, error) {
metricsConf := metrics.DefaultConfig(cfg.MetricsPrefix)
metricsConf.EnableHostname = !cfg.DisableHostname
metricsConf.FilterDefault = cfg.FilterDefault
@ -349,6 +349,11 @@ func configureSinks(cfg TelemetryConfig, memSink metrics.MetricSink) (metrics.Fa
addSink(dogstatdSink)
addSink(circonusSink)
addSink(prometheusSink)
for _, sink := range extraSinks {
if sink != nil {
sinks = append(sinks, sink)
}
}
if len(sinks) > 0 {
sinks = append(sinks, memSink)
@ -364,7 +369,7 @@ func configureSinks(cfg TelemetryConfig, memSink metrics.MetricSink) (metrics.Fa
// values as returned by Runtimecfg.Config().
// InitTelemetry retries configurating the sinks in case error is retriable
// and retry_failed_connection is set to true.
func InitTelemetry(cfg TelemetryConfig, logger hclog.Logger) (*MetricsConfig, error) {
func InitTelemetry(cfg TelemetryConfig, logger hclog.Logger, extraSinks ...metrics.MetricSink) (*MetricsConfig, error) {
if cfg.Disable {
return nil, nil
}
@ -384,7 +389,7 @@ func InitTelemetry(cfg TelemetryConfig, logger hclog.Logger) (*MetricsConfig, er
}
for {
logger.Warn("retrying configure metric sinks", "retries", waiter.Failures())
_, err := configureSinks(cfg, memSink)
_, err := configureSinks(cfg, memSink, extraSinks)
if err == nil {
logger.Info("successfully configured metrics sinks")
return
@ -397,7 +402,7 @@ func InitTelemetry(cfg TelemetryConfig, logger hclog.Logger) (*MetricsConfig, er
}
}
if _, errs := configureSinks(cfg, memSink); errs != nil {
if _, errs := configureSinks(cfg, memSink, extraSinks); errs != nil {
if isRetriableError(errs) && cfg.RetryFailedConfiguration {
logger.Warn("failed configure sinks", "error", multierror.Flatten(errs))
ctx, cancel = context.WithCancel(context.Background())

View File

@ -10,6 +10,8 @@ import (
"testing"
"github.com/hashicorp/consul/logging"
"github.com/armon/go-metrics"
"github.com/hashicorp/go-multierror"
"github.com/stretchr/testify/require"
)
@ -24,15 +26,16 @@ func newCfg() TelemetryConfig {
func TestConfigureSinks(t *testing.T) {
cfg := newCfg()
sinks, err := configureSinks(cfg, nil)
extraSinks := []metrics.MetricSink{&metrics.BlackholeSink{}}
sinks, err := configureSinks(cfg, nil, extraSinks)
require.Error(t, err)
// 3 sinks: statsd, statsite, inmem
require.Equal(t, 3, len(sinks))
// 4 sinks: statsd, statsite, inmem, extra sink (blackhole)
require.Equal(t, 4, len(sinks))
cfg = TelemetryConfig{
DogstatsdAddr: "",
}
_, err = configureSinks(cfg, nil)
_, err = configureSinks(cfg, nil, nil)
require.NoError(t, err)
}