From 39c7e7c19139c2ee0425687fec2a8174b3587071 Mon Sep 17 00:00:00 2001 From: Nick Cabatoff Date: Fri, 7 Oct 2022 12:09:08 -0400 Subject: [PATCH] Add more raft metrics, emit more metrics on non-perf standbys (#12166) Add some metrics helpful for monitoring raft cluster state. Furthermore, we weren't emitting bolt metrics on regular (non-perf) standbys, and there were other metrics in metricsLoop that would make sense to include in OSS but weren't. We now have an active-node-only func, emitMetricsActiveNode. This runs metricsLoop on the active node. Standbys and perf-standbys run metricsLoop from a goroutine managed by the runStandby rungroup. --- changelog/12166.txt | 3 +++ physical/raft/fsm.go | 2 ++ physical/raft/raft.go | 13 +++++++++++ physical/raft/raft_autopilot.go | 22 +++++++++++++++---- vault/core.go | 6 ++--- vault/core_metrics.go | 23 +++++++++----------- vault/ha.go | 11 ++++++++++ vault/request_forwarding_rpc.go | 6 +++++ website/content/docs/internals/telemetry.mdx | 15 +++++++++---- 9 files changed, 77 insertions(+), 24 deletions(-) create mode 100644 changelog/12166.txt diff --git a/changelog/12166.txt b/changelog/12166.txt new file mode 100644 index 000000000..9cec76cba --- /dev/null +++ b/changelog/12166.txt @@ -0,0 +1,3 @@ +```release-note:improvement +storage/raft: add additional raft metrics relating to applied index and heartbeating; also ensure OSS standbys emit periodic metrics. +``` diff --git a/physical/raft/fsm.go b/physical/raft/fsm.go index 8d5b5524d..f8ca9c654 100644 --- a/physical/raft/fsm.go +++ b/physical/raft/fsm.go @@ -136,6 +136,8 @@ func NewFSM(path string, localID string, logger log.Logger) (*FSM, error) { }) dbPath := filepath.Join(path, databaseFilename) + f.l.Lock() + defer f.l.Unlock() if err := f.openDBFile(dbPath); err != nil { return nil, fmt.Errorf("failed to open bolt file: %w", err) } diff --git a/physical/raft/raft.go b/physical/raft/raft.go index 98d51c05d..7400794f1 100644 --- a/physical/raft/raft.go +++ b/physical/raft/raft.go @@ -581,9 +581,22 @@ func (b *RaftBackend) CollectMetrics(sink *metricsutil.ClusterMetricSink) { b.l.RLock() logstoreStats := b.stableStore.(*raftboltdb.BoltStore).Stats() fsmStats := b.fsm.db.Stats() + stats := b.raft.Stats() b.l.RUnlock() b.collectMetricsWithStats(logstoreStats, sink, "logstore") b.collectMetricsWithStats(fsmStats, sink, "fsm") + labels := []metrics.Label{ + { + Name: "peer_id", + Value: b.localID, + }, + } + for _, key := range []string{"term", "commit_index", "applied_index", "fsm_pending"} { + n, err := strconv.ParseUint(stats[key], 10, 64) + if err == nil { + sink.SetGaugeWithLabels([]string{"raft_storage", "stats", key}, float32(n), labels) + } + } } func (b *RaftBackend) collectMetricsWithStats(stats bolt.Stats, sink *metricsutil.ClusterMetricSink, database string) { diff --git a/physical/raft/raft_autopilot.go b/physical/raft/raft_autopilot.go index eaa75dfa1..5596bbf42 100644 --- a/physical/raft/raft_autopilot.go +++ b/physical/raft/raft_autopilot.go @@ -540,11 +540,25 @@ func (b *RaftBackend) startFollowerHeartbeatTracker() { tickerCh := b.followerHeartbeatTicker.C b.l.RUnlock() + followerGauge := func(peerID string, suffix string, value float32) { + labels := []metrics.Label{ + { + Name: "peer_id", + Value: peerID, + }, + } + metrics.SetGaugeWithLabels([]string{"raft_storage", "follower", suffix}, value, labels) + } for range tickerCh { b.l.RLock() - if b.autopilotConfig.CleanupDeadServers && b.autopilotConfig.DeadServerLastContactThreshold != 0 { - b.followerStates.l.RLock() - for _, state := range b.followerStates.followers { + b.followerStates.l.RLock() + myAppliedIndex := b.raft.AppliedIndex() + for peerID, state := range b.followerStates.followers { + timeSinceLastHeartbeat := time.Now().Sub(state.LastHeartbeat) / time.Millisecond + followerGauge(peerID, "last_heartbeat_ms", float32(timeSinceLastHeartbeat)) + followerGauge(peerID, "applied_index_delta", float32(myAppliedIndex-state.AppliedIndex)) + + if b.autopilotConfig.CleanupDeadServers && b.autopilotConfig.DeadServerLastContactThreshold != 0 { if state.LastHeartbeat.IsZero() || state.IsDead.Load() { continue } @@ -553,8 +567,8 @@ func (b *RaftBackend) startFollowerHeartbeatTracker() { state.IsDead.Store(true) } } - b.followerStates.l.RUnlock() } + b.followerStates.l.RUnlock() b.l.RUnlock() } } diff --git a/vault/core.go b/vault/core.go index 7a4fd8a8f..cbe526407 100644 --- a/vault/core.go +++ b/vault/core.go @@ -2252,6 +2252,9 @@ func (s standardUnsealStrategy) unseal(ctx context.Context, logger log.Logger, c return err } + c.metricsCh = make(chan struct{}) + go c.emitMetricsActiveNode(c.metricsCh) + return nil } @@ -2310,9 +2313,6 @@ func (c *Core) postUnseal(ctx context.Context, ctxCancelFunc context.CancelFunc, seal.StartHealthCheck() } - c.metricsCh = make(chan struct{}) - go c.emitMetrics(c.metricsCh) - // This is intentionally the last block in this function. We want to allow // writes just before allowing client requests, to ensure everything has // been set up properly before any writes can have happened. diff --git a/vault/core_metrics.go b/vault/core_metrics.go index cd570eff3..c6e719fc1 100644 --- a/vault/core_metrics.go +++ b/vault/core_metrics.go @@ -113,16 +113,16 @@ func (c *Core) metricsLoop(stopCh chan struct{}) { c.metricSink.SetGaugeWithLabels([]string{"core", "replication", "dr", "secondary"}, 0, nil) } + // If we're using a raft backend, emit raft metrics + if rb, ok := c.underlyingPhysical.(*raft.RaftBackend); ok { + rb.CollectMetrics(c.MetricSink()) + } + // Capture the total number of in-flight requests c.inFlightReqGaugeMetric() // Refresh gauge metrics that are looped c.cachedGaugeMetricsEmitter() - - // If we're using a raft backend, emit boltdb metrics - if rb, ok := c.underlyingPhysical.(*raft.RaftBackend); ok { - rb.CollectMetrics(c.MetricSink()) - } case <-writeTimer: l := newLockGrabber(c.stateLock.RLock, c.stateLock.RUnlock, stopCh) go l.grab() @@ -232,15 +232,12 @@ func (c *Core) tokenGaugeTtlCollector(ctx context.Context) ([]metricsutil.GaugeL return ts.gaugeCollectorByTtl(ctx) } -// emitMetrics is used to start all the periodc metrics; all of them should -// be shut down when stopCh is closed. -func (c *Core) emitMetrics(stopCh chan struct{}) { +// emitMetricsActiveNode is used to start all the periodic metrics; all of them should +// be shut down when stopCh is closed. This code runs on the active node only. +func (c *Core) emitMetricsActiveNode(stopCh chan struct{}) { // The gauge collection processes are started and stopped here // because there's more than one TokenManager created during startup, // but we only want one set of gauges. - // - // Both active nodes and performance standby nodes call emitMetrics - // so we have to handle both. metricsInit := []struct { MetricName []string MetadataLabel []metrics.Label @@ -349,8 +346,8 @@ func (c *Core) findKvMounts() []*kvMount { c.mountsLock.RLock() defer c.mountsLock.RUnlock() - // emitMetrics doesn't grab the statelock, so this code might run during or after the seal process. - // Therefore, we need to check if c.mounts is nil. If we do not, emitMetrics will panic if this is + // we don't grab the statelock, so this code might run during or after the seal process. + // Therefore, we need to check if c.mounts is nil. If we do not, this will panic when // run after seal. if c.mounts == nil { return mounts diff --git a/vault/ha.go b/vault/ha.go index 4f674dde9..17b6e590d 100644 --- a/vault/ha.go +++ b/vault/ha.go @@ -434,6 +434,17 @@ func (c *Core) runStandby(doneCh, manualStepDownCh, stopCh chan struct{}) { c.logger.Debug("shutting down periodic leader refresh") }) } + { + metricsStop := make(chan struct{}) + + g.Add(func() error { + c.metricsLoop(metricsStop) + return nil + }, func(error) { + close(metricsStop) + c.logger.Debug("shutting down periodic metrics") + }) + } { // Wait for leadership leaderStopCh := make(chan struct{}) diff --git a/vault/request_forwarding_rpc.go b/vault/request_forwarding_rpc.go index 6ae4cf56b..281d9192b 100644 --- a/vault/request_forwarding_rpc.go +++ b/vault/request_forwarding_rpc.go @@ -8,6 +8,7 @@ import ( "sync/atomic" "time" + "github.com/armon/go-metrics" "github.com/hashicorp/vault/helper/forwarding" "github.com/hashicorp/vault/physical/raft" "github.com/hashicorp/vault/sdk/helper/consts" @@ -135,6 +136,9 @@ func (c *forwardingClient) startHeartbeat() { Mode: "standby", } tick := func() { + labels := make([]metrics.Label, 0, 1) + defer metrics.MeasureSinceWithLabels([]string{"ha", "rpc", "client", "echo"}, time.Now(), labels) + req := &EchoRequest{ Message: "ping", ClusterAddr: clusterAddr, @@ -149,12 +153,14 @@ func (c *forwardingClient) startHeartbeat() { req.RaftDesiredSuffrage = raftBackend.DesiredSuffrage() req.RaftRedundancyZone = raftBackend.RedundancyZone() req.RaftUpgradeVersion = raftBackend.EffectiveVersion() + labels = append(labels, metrics.Label{Name: "peer_id", Value: raftBackend.NodeID()}) } ctx, cancel := context.WithTimeout(c.echoContext, 2*time.Second) resp, err := c.RequestForwardingClient.Echo(ctx, req) cancel() if err != nil { + metrics.IncrCounter([]string{"ha", "rpc", "client", "echo", "errors"}, 1) c.core.logger.Debug("forwarding: error sending echo request to active node", "error", err) return } diff --git a/website/content/docs/internals/telemetry.mdx b/website/content/docs/internals/telemetry.mdx index 6633141ce..2785a4281 100644 --- a/website/content/docs/internals/telemetry.mdx +++ b/website/content/docs/internals/telemetry.mdx @@ -222,10 +222,12 @@ These metrics relate to internal operations on Merkle Trees and Write Ahead Logs These metrics are emitted on standbys when talking to the active node, and in some cases by performance standbys as well. -| Metric | Description | Unit | Type | -| :----------------------------------- | :---------------------------------------------------------------- | :----- | :------ | -| `vault.ha.rpc.client.forward` | Time taken to forward a request from a standby to the active node | ms | summary | -| `vault.ha.rpc.client.forward.errors` | Number of standby requests forwarding failures | errors | counter | +| Metric | Description | Unit | Type | +| :----------------------------------- | :------------------------------------------------------------------- | :----- | :------ | +| `vault.ha.rpc.client.forward` | Time taken to forward a request from a standby to the active node | ms | summary | +| `vault.ha.rpc.client.forward.errors` | Number of standby requests forwarding failures | errors | counter | +| `vault.ha.rpc.client.echo` | Time taken to send an echo request from a standby to the active node | ms | summary | +| `vault.ha.rpc.client.echo.errors` | Number of standby echo request failures | errors | counter | ## Replication Metrics @@ -474,6 +476,11 @@ These metrics relate to raft based [integrated storage][integrated-storage]. | `vault.raft_storage.bolt.spill.time` | Time taken spilling. | ms | summary | | `vault.raft_storage.bolt.write.count` | Number of writes performed. | writes | gauge | | `vault.raft_storage.bolt.write.time` | Time taken writing to disk. | ms | summary | +| `vault.raft_storage.stats.commit_index` | Index of last raft log committed to disk on this node. | sequence number | gauge | +| `vault.raft_storage.stats.applied_index` | Highest index of raft log either applied to the FSM or added to fsm_pending queue. | sequence number | gauge | +| `vault.raft_storage.stats.fsm_pending` | Number of raft logs this node has queued to be applied by the FSM. | logs | gauge | +| `vault.raft_storage.follower.applied_index_delta` | Delta between leader applied index and each follower's applied index reported by echoes. | logs | gauge | +| `vault.raft_storage.follower.last_heartbeat_ms` | Time since last echo request received by each follower. | ms | gauge | ## Integrated Storage (Raft) Autopilot