xds: emit a labeled gauge of connected xDS streams by version (#10243)
Fixes #10099
This commit is contained in:
parent
b90877b440
commit
7c9763d027
|
@ -0,0 +1,3 @@
|
|||
```release-note:feature
|
||||
xds: emit a labeled gauge of connected xDS streams by version
|
||||
```
|
|
@ -26,6 +26,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/router"
|
||||
"github.com/hashicorp/consul/agent/submatview"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/agent/xds"
|
||||
"github.com/hashicorp/consul/ipaddr"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
|
@ -195,6 +196,7 @@ func getPrometheusDefs(cfg lib.TelemetryConfig) ([]prometheus.GaugeDefinition, [
|
|||
consul.RPCGauges,
|
||||
consul.SessionGauges,
|
||||
grpc.StatsGauges,
|
||||
xds.StatsGauges,
|
||||
usagemetrics.Gauges,
|
||||
consul.ReplicationGauges,
|
||||
Gauges,
|
||||
|
|
|
@ -31,6 +31,8 @@ type ADSDeltaStream = envoy_discovery_v3.AggregatedDiscoveryService_DeltaAggrega
|
|||
|
||||
// DeltaAggregatedResources implements envoy_discovery_v3.AggregatedDiscoveryServiceServer
|
||||
func (s *Server) DeltaAggregatedResources(stream ADSDeltaStream) error {
|
||||
defer s.activeStreams.Increment("v3")()
|
||||
|
||||
// a channel for receiving incoming requests
|
||||
reqCh := make(chan *envoy_discovery_v3.DeltaDiscoveryRequest)
|
||||
reqStop := int32(0)
|
||||
|
|
|
@ -59,6 +59,8 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
|
|||
// Check no response sent yet
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
requireProtocolVersionGauge(t, scenario, "v3", 1)
|
||||
|
||||
// Deliver a new snapshot (tcp with one tcp upstream)
|
||||
mgr.DeliverConfig(t, sid, snap)
|
||||
|
||||
|
|
|
@ -11,6 +11,8 @@ import (
|
|||
envoy_discovery_v2 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
|
||||
envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/armon/go-metrics/prometheus"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
|
@ -25,6 +27,13 @@ import (
|
|||
"github.com/hashicorp/consul/tlsutil"
|
||||
)
|
||||
|
||||
var StatsGauges = []prometheus.GaugeDefinition{
|
||||
{
|
||||
Name: []string{"xds", "server", "streams"},
|
||||
Help: "Measures the number of active xDS streams handled by the server split by protocol version.",
|
||||
},
|
||||
}
|
||||
|
||||
// ADSStream is a shorter way of referring to this thing...
|
||||
type ADSStream = envoy_discovery_v3.AggregatedDiscoveryService_StreamAggregatedResourcesServer
|
||||
type ADSStream_v2 = envoy_discovery_v2.AggregatedDiscoveryService_StreamAggregatedResourcesServer
|
||||
|
@ -141,6 +150,36 @@ type Server struct {
|
|||
AuthCheckFrequency time.Duration
|
||||
|
||||
DisableV2Protocol bool
|
||||
|
||||
activeStreams activeStreamCounters
|
||||
}
|
||||
|
||||
// activeStreamCounters simply encapsulates two counters accessed atomically to
|
||||
// ensure alignment is correct.
|
||||
type activeStreamCounters struct {
|
||||
xDSv3 uint64
|
||||
xDSv2 uint64
|
||||
}
|
||||
|
||||
func (c *activeStreamCounters) Increment(xdsVersion string) func() {
|
||||
var counter *uint64
|
||||
switch xdsVersion {
|
||||
case "v3":
|
||||
counter = &c.xDSv3
|
||||
case "v2":
|
||||
counter = &c.xDSv2
|
||||
default:
|
||||
return func() {}
|
||||
}
|
||||
|
||||
labels := []metrics.Label{{Name: "version", Value: xdsVersion}}
|
||||
|
||||
count := atomic.AddUint64(counter, 1)
|
||||
metrics.SetGaugeWithLabels([]string{"xds", "server", "streams"}, float32(count), labels)
|
||||
return func() {
|
||||
count := atomic.AddUint64(counter, ^uint64(0))
|
||||
metrics.SetGaugeWithLabels([]string{"xds", "server", "streams"}, float32(count), labels)
|
||||
}
|
||||
}
|
||||
|
||||
func NewServer(
|
||||
|
@ -171,6 +210,8 @@ func (s *Server) StreamAggregatedResources(stream ADSStream) error {
|
|||
|
||||
// Deprecated: remove when xDS v2 is no longer supported
|
||||
func (s *Server) streamAggregatedResources(stream ADSStream) error {
|
||||
defer s.activeStreams.Increment("v2")()
|
||||
|
||||
// Note: despite dealing entirely in v3 protobufs, this function is
|
||||
// exclusively used from the xDS v2 shim RPC handler, so the logging below
|
||||
// will refer to it as "v2".
|
||||
|
|
|
@ -43,6 +43,8 @@ func TestServer_StreamAggregatedResources_v2_BasicProtocol_TCP(t *testing.T) {
|
|||
// Check no response sent yet
|
||||
assertChanBlocked(t, envoy.stream.sendCh)
|
||||
|
||||
requireProtocolVersionGauge(t, scenario, "v2", 1)
|
||||
|
||||
// Deliver a new snapshot
|
||||
snap := newTestSnapshot(t, nil, "")
|
||||
mgr.DeliverConfig(t, sid, snap)
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
|
||||
envoy_type_v3 "github.com/envoyproxy/go-control-plane/envoy/type/v3"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/golang/protobuf/ptypes/wrappers"
|
||||
|
@ -118,6 +119,7 @@ type testServerScenario struct {
|
|||
server *Server
|
||||
mgr *testManager
|
||||
envoy *TestEnvoy
|
||||
sink *metrics.InmemSink
|
||||
errCh <-chan error
|
||||
}
|
||||
|
||||
|
@ -155,6 +157,17 @@ func newTestServerScenarioInner(
|
|||
envoy.Close()
|
||||
})
|
||||
|
||||
sink := metrics.NewInmemSink(1*time.Minute, 1*time.Minute)
|
||||
cfg := metrics.DefaultConfig("consul.xds.test")
|
||||
cfg.EnableHostname = false
|
||||
cfg.EnableRuntimeMetrics = false
|
||||
metrics.NewGlobal(cfg, sink)
|
||||
|
||||
t.Cleanup(func() {
|
||||
sink := &metrics.BlackholeSink{}
|
||||
metrics.NewGlobal(cfg, sink)
|
||||
})
|
||||
|
||||
s := NewServer(
|
||||
testutil.Logger(t),
|
||||
mgr,
|
||||
|
@ -178,6 +191,7 @@ func newTestServerScenarioInner(
|
|||
server: s,
|
||||
mgr: mgr,
|
||||
envoy: envoy,
|
||||
sink: sink,
|
||||
errCh: errCh,
|
||||
}
|
||||
}
|
||||
|
@ -647,3 +661,23 @@ func runStep(t *testing.T, name string, fn func(t *testing.T)) {
|
|||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
||||
func requireProtocolVersionGauge(
|
||||
t *testing.T,
|
||||
scenario *testServerScenario,
|
||||
xdsVersion string,
|
||||
expected int,
|
||||
) {
|
||||
data := scenario.sink.Data()
|
||||
require.Len(t, data, 1)
|
||||
|
||||
item := data[0]
|
||||
require.Len(t, item.Gauges, 1)
|
||||
|
||||
val, ok := item.Gauges["consul.xds.test.xds.server.streams;version="+xdsVersion]
|
||||
require.True(t, ok)
|
||||
|
||||
require.Equal(t, "consul.xds.test.xds.server.streams", val.Name)
|
||||
require.Equal(t, expected, int(val.Value))
|
||||
require.Equal(t, []metrics.Label{{Name: "version", Value: xdsVersion}}, val.Labels)
|
||||
}
|
||||
|
|
|
@ -417,7 +417,8 @@ These metrics are used to monitor the health of the Consul servers.
|
|||
| `consul.grpc.server.connection.count` | Counts the number of new gRPC connections received by the server. | connections | counter |
|
||||
| `consul.grpc.server.connections` | Measures the number of active gRPC connections open on the server. | connections | gauge |
|
||||
| `consul.grpc.server.stream.count` | Counts the number of new gRPC streams received by the server. | streams | counter |
|
||||
| `consul.grpc.server.streams` | Measures the number of active gRPC streams handled by the server. | streams | guage |
|
||||
| `consul.grpc.server.streams` | Measures the number of active gRPC streams handled by the server. | streams | gauge |
|
||||
| `consul.xds.server.streams` | Measures the number of active xDS streams handled by the server split by protocol version. | streams | gauge |
|
||||
|
||||
## Cluster Health
|
||||
|
||||
|
|
Loading…
Reference in New Issue