Add consul.xds.server.streamStart metric (#14957)
This adds a new consul.xds.server.streamStart metric to measure the time taken to first generate xDS resources after an xDS stream is opened.
This commit is contained in:
parent
28b7dea973
commit
be1a4438a9
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:improvement
|
||||||
|
telemetry: Added a `consul.xds.server.streamStart` metric to measure time taken to first generate xDS resources for an xDS stream.
|
||||||
|
```
|
|
@ -202,8 +202,7 @@ func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getPrometheusDefs reaches into every slice of prometheus defs we've defined in each part of the agent, and appends
|
// getPrometheusDefs reaches into every slice of prometheus defs we've defined in each part of the agent, and appends
|
||||||
//
|
// all of our slices into one nice slice of definitions per metric type for the Consul agent to pass to go-metrics.
|
||||||
// all of our slices into one nice slice of definitions per metric type for the Consul agent to pass to go-metrics.
|
|
||||||
func getPrometheusDefs(cfg lib.TelemetryConfig, isServer bool) ([]prometheus.GaugeDefinition, []prometheus.CounterDefinition, []prometheus.SummaryDefinition) {
|
func getPrometheusDefs(cfg lib.TelemetryConfig, isServer bool) ([]prometheus.GaugeDefinition, []prometheus.CounterDefinition, []prometheus.SummaryDefinition) {
|
||||||
// TODO: "raft..." metrics come from the raft lib and we should migrate these to a telemetry
|
// TODO: "raft..." metrics come from the raft lib and we should migrate these to a telemetry
|
||||||
// package within. In the mean time, we're going to define a few here because they're key to monitoring Consul.
|
// package within. In the mean time, we're going to define a few here because they're key to monitoring Consul.
|
||||||
|
@ -345,6 +344,7 @@ func getPrometheusDefs(cfg lib.TelemetryConfig, isServer bool) ([]prometheus.Gau
|
||||||
fsm.CommandsSummaries,
|
fsm.CommandsSummaries,
|
||||||
fsm.SnapshotSummaries,
|
fsm.SnapshotSummaries,
|
||||||
raftSummaries,
|
raftSummaries,
|
||||||
|
xds.StatsSummaries,
|
||||||
}
|
}
|
||||||
// Flatten definitions
|
// Flatten definitions
|
||||||
// NOTE(kit): Do we actually want to create a set here so we can ensure definition names are unique?
|
// NOTE(kit): Do we actually want to create a set here so we can ensure definition names are unique?
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sort"
|
"sort"
|
||||||
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -104,6 +105,9 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
|
||||||
proxyID structs.ServiceID
|
proxyID structs.ServiceID
|
||||||
nonce uint64 // xDS requires a unique nonce to correlate response/request pairs
|
nonce uint64 // xDS requires a unique nonce to correlate response/request pairs
|
||||||
ready bool // set to true after the first snapshot arrives
|
ready bool // set to true after the first snapshot arrives
|
||||||
|
|
||||||
|
streamStartTime = time.Now()
|
||||||
|
streamStartOnce sync.Once
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -360,6 +364,10 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
|
||||||
|
|
||||||
generator.Logger.Trace("Invoking all xDS resource handlers and sending changed data if there are any")
|
generator.Logger.Trace("Invoking all xDS resource handlers and sending changed data if there are any")
|
||||||
|
|
||||||
|
streamStartOnce.Do(func() {
|
||||||
|
metrics.MeasureSince([]string{"xds", "server", "streamStart"}, streamStartTime)
|
||||||
|
})
|
||||||
|
|
||||||
for _, op := range xDSUpdateOrder {
|
for _, op := range xDSUpdateOrder {
|
||||||
err, sent := handlers[op.TypeUrl].SendIfNew(
|
err, sent := handlers[op.TypeUrl].SendIfNew(
|
||||||
cfgSnap.Kind,
|
cfgSnap.Kind,
|
||||||
|
|
|
@ -1448,7 +1448,7 @@ func TestServer_DeltaAggregatedResources_v3_StreamDrained(t *testing.T) {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
testutil.RunStep(t, "check drain counter incremeted", func(t *testing.T) {
|
testutil.RunStep(t, "check drain counter incremented", func(t *testing.T) {
|
||||||
data := scenario.sink.Data()
|
data := scenario.sink.Data()
|
||||||
require.Len(t, data, 1)
|
require.Len(t, data, 1)
|
||||||
|
|
||||||
|
@ -1459,6 +1459,19 @@ func TestServer_DeltaAggregatedResources_v3_StreamDrained(t *testing.T) {
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
require.Equal(t, 1, val.Count)
|
require.Equal(t, 1, val.Count)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
testutil.RunStep(t, "check streamStart metric recorded", func(t *testing.T) {
|
||||||
|
data := scenario.sink.Data()
|
||||||
|
require.Len(t, data, 1)
|
||||||
|
|
||||||
|
item := data[0]
|
||||||
|
require.Len(t, item.Counters, 1)
|
||||||
|
|
||||||
|
val, ok := item.Samples["consul.xds.test.xds.server.streamStart"]
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, 1, val.Count)
|
||||||
|
})
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type testLimiter struct {
|
type testLimiter struct {
|
||||||
|
|
|
@ -37,6 +37,13 @@ var StatsCounters = []prometheus.CounterDefinition{
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var StatsSummaries = []prometheus.SummaryDefinition{
|
||||||
|
{
|
||||||
|
Name: []string{"xds", "server", "streamStart"},
|
||||||
|
Help: "Measures the time in milliseconds after an xDS stream is opened until xDS resources are first generated for the stream.",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
// ADSStream is a shorter way of referring to this thing...
|
// ADSStream is a shorter way of referring to this thing...
|
||||||
type ADSStream = envoy_discovery_v3.AggregatedDiscoveryService_StreamAggregatedResourcesServer
|
type ADSStream = envoy_discovery_v3.AggregatedDiscoveryService_StreamAggregatedResourcesServer
|
||||||
|
|
||||||
|
|
|
@ -542,6 +542,7 @@ These metrics are used to monitor the health of the Consul servers.
|
||||||
| `consul.xds.server.streams` | Measures the number of active xDS streams handled by the server split by protocol version. | streams | gauge |
|
| `consul.xds.server.streams` | Measures the number of active xDS streams handled by the server split by protocol version. | streams | gauge |
|
||||||
| `consul.xds.server.idealStreamsMax` | The maximum number of xDS streams per server, chosen to achieve a roughly even spread of load across servers. | streams | gauge |
|
| `consul.xds.server.idealStreamsMax` | The maximum number of xDS streams per server, chosen to achieve a roughly even spread of load across servers. | streams | gauge |
|
||||||
| `consul.xds.server.streamDrained` | Counts the number of xDS streams that are drained when rebalancing the load between servers. | streams | counter |
|
| `consul.xds.server.streamDrained` | Counts the number of xDS streams that are drained when rebalancing the load between servers. | streams | counter |
|
||||||
|
| `consul.xds.server.streamStart` | Measures the time taken to first generate xDS resources after an xDS stream is opened. | ms | timer |
|
||||||
|
|
||||||
|
|
||||||
## Server Workload
|
## Server Workload
|
||||||
|
|
Loading…
Reference in New Issue