Add Envoy extension metrics. (#16114)
This commit is contained in:
parent
177c466ee1
commit
5572f1584d
|
@ -5,6 +5,7 @@ import (
|
|||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
@ -31,6 +32,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/xds/extensionruntime"
|
||||
"github.com/hashicorp/consul/agent/xds/xdscommon"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/consul/version"
|
||||
)
|
||||
|
||||
var errOverwhelmed = status.Error(codes.ResourceExhausted, "this server has too many xDS streams open, please try another")
|
||||
|
@ -409,7 +411,20 @@ func (s *Server) applyEnvoyExtensions(resources *xdscommon.IndexedResources, cfg
|
|||
"partition", cfg.ServiceName.Partition,
|
||||
}
|
||||
|
||||
getMetricLabels := func(err error) []metrics.Label {
|
||||
return []metrics.Label{
|
||||
{Name: "extension", Value: cfg.EnvoyExtension.Name},
|
||||
{Name: "version", Value: "builtin/" + version.Version},
|
||||
{Name: "service", Value: cfgSnap.Service},
|
||||
{Name: "partition", Value: cfgSnap.ProxyID.PartitionOrDefault()},
|
||||
{Name: "namespace", Value: cfgSnap.ProxyID.NamespaceOrDefault()},
|
||||
{Name: "error", Value: strconv.FormatBool(err != nil)},
|
||||
}
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
extender, err := envoyextensions.ConstructExtension(cfg.EnvoyExtension)
|
||||
metrics.MeasureSinceWithLabels([]string{"envoy_extension", "validate_arguments"}, now, getMetricLabels(err))
|
||||
if err != nil {
|
||||
logFn("failed to construct extension", errorParams...)
|
||||
|
||||
|
@ -420,7 +435,9 @@ func (s *Server) applyEnvoyExtensions(resources *xdscommon.IndexedResources, cfg
|
|||
continue
|
||||
}
|
||||
|
||||
now = time.Now()
|
||||
err = extender.Validate(&cfg)
|
||||
metrics.MeasureSinceWithLabels([]string{"envoy_extension", "validate"}, now, getMetricLabels(err))
|
||||
if err != nil {
|
||||
errorParams = append(errorParams, "error", err)
|
||||
logFn("failed to validate extension arguments", errorParams...)
|
||||
|
@ -431,7 +448,9 @@ func (s *Server) applyEnvoyExtensions(resources *xdscommon.IndexedResources, cfg
|
|||
continue
|
||||
}
|
||||
|
||||
now = time.Now()
|
||||
resources, err = extender.Extend(resources, &cfg)
|
||||
metrics.MeasureSinceWithLabels([]string{"envoy_extension", "extend"}, now, getMetricLabels(err))
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -2,11 +2,13 @@ package xds
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
|
||||
"github.com/stretchr/testify/require"
|
||||
rpcstatus "google.golang.org/genproto/googleapis/rpc/status"
|
||||
|
@ -19,7 +21,9 @@ import (
|
|||
"github.com/hashicorp/consul/agent/proxycfg"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/xds/xdscommon"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/consul/version"
|
||||
)
|
||||
|
||||
// NOTE: For these tests, prefer not using xDS protobuf "factory" methods if
|
||||
|
@ -43,7 +47,20 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
|
|||
var snap *proxycfg.ConfigSnapshot
|
||||
|
||||
testutil.RunStep(t, "initial setup", func(t *testing.T) {
|
||||
snap = newTestSnapshot(t, nil, "")
|
||||
snap = newTestSnapshot(t, nil, "", &structs.ProxyConfigEntry{
|
||||
Kind: structs.ProxyDefaults,
|
||||
Name: structs.ProxyConfigGlobal,
|
||||
EnvoyExtensions: []structs.EnvoyExtension{
|
||||
{
|
||||
Name: api.BuiltinLuaExtension,
|
||||
Arguments: map[string]interface{}{
|
||||
"ProxyType": "connect-proxy",
|
||||
"Listener": "inbound",
|
||||
"Script": "x = 0",
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
// Send initial cluster discover. We'll assume we are testing a partial
|
||||
// reconnect and include some initial resource versions that will be
|
||||
|
@ -153,6 +170,8 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
|
|||
|
||||
// We are caught up, so there should be nothing queued to send.
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
requireExtensionMetrics(t, scenario, api.BuiltinLuaExtension, sid, nil)
|
||||
})
|
||||
|
||||
deleteAllButOneEndpoint := func(snap *proxycfg.ConfigSnapshot, uid proxycfg.UpstreamID, targetID string) {
|
||||
|
@ -1524,3 +1543,39 @@ func mustMakeVersionMap(t *testing.T, resources ...proto.Message) map[string]str
|
|||
}
|
||||
return m
|
||||
}
|
||||
|
||||
func requireExtensionMetrics(
|
||||
t *testing.T,
|
||||
scenario *testServerScenario,
|
||||
extName string,
|
||||
sid structs.ServiceID,
|
||||
err error,
|
||||
) {
|
||||
data := scenario.sink.Data()
|
||||
require.Len(t, data, 1)
|
||||
item := data[0]
|
||||
|
||||
expectLabels := []metrics.Label{
|
||||
{Name: "extension", Value: extName},
|
||||
{Name: "version", Value: "builtin/" + version.Version},
|
||||
{Name: "service", Value: sid.ID},
|
||||
{Name: "partition", Value: sid.PartitionOrDefault()},
|
||||
{Name: "namespace", Value: sid.NamespaceOrDefault()},
|
||||
{Name: "error", Value: strconv.FormatBool(err != nil)},
|
||||
}
|
||||
|
||||
for _, s := range []string{
|
||||
"consul.xds.test.envoy_extension.validate_arguments;",
|
||||
"consul.xds.test.envoy_extension.validate;",
|
||||
"consul.xds.test.envoy_extension.extend;",
|
||||
} {
|
||||
foundLabel := false
|
||||
for k, v := range item.Samples {
|
||||
if strings.HasPrefix(k, s) {
|
||||
foundLabel = true
|
||||
require.ElementsMatch(t, expectLabels, v.Labels)
|
||||
}
|
||||
}
|
||||
require.True(t, foundLabel)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue