Add more raft metrics, emit more metrics on non-perf standbys (#12166)

Add some metrics helpful for monitoring raft cluster state.

Furthermore, we weren't emitting bolt metrics on regular (non-perf) standbys, and there were other metrics
in metricsLoop that would make sense to include in OSS but weren't.  We now have an active-node-only func,
emitMetricsActiveNode.  This runs metricsLoop on the active node.  Standbys and perf-standbys run metricsLoop
from a goroutine managed by the runStandby rungroup.
This commit is contained in:
Nick Cabatoff 2022-10-07 12:09:08 -04:00 committed by GitHub
parent ad1503ebcd
commit 39c7e7c191
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 77 additions and 24 deletions

3
changelog/12166.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
storage/raft: add additional raft metrics relating to applied index and heartbeating; also ensure OSS standbys emit periodic metrics.
```

View File

@ -136,6 +136,8 @@ func NewFSM(path string, localID string, logger log.Logger) (*FSM, error) {
}) })
dbPath := filepath.Join(path, databaseFilename) dbPath := filepath.Join(path, databaseFilename)
f.l.Lock()
defer f.l.Unlock()
if err := f.openDBFile(dbPath); err != nil { if err := f.openDBFile(dbPath); err != nil {
return nil, fmt.Errorf("failed to open bolt file: %w", err) return nil, fmt.Errorf("failed to open bolt file: %w", err)
} }

View File

@ -581,9 +581,22 @@ func (b *RaftBackend) CollectMetrics(sink *metricsutil.ClusterMetricSink) {
b.l.RLock() b.l.RLock()
logstoreStats := b.stableStore.(*raftboltdb.BoltStore).Stats() logstoreStats := b.stableStore.(*raftboltdb.BoltStore).Stats()
fsmStats := b.fsm.db.Stats() fsmStats := b.fsm.db.Stats()
stats := b.raft.Stats()
b.l.RUnlock() b.l.RUnlock()
b.collectMetricsWithStats(logstoreStats, sink, "logstore") b.collectMetricsWithStats(logstoreStats, sink, "logstore")
b.collectMetricsWithStats(fsmStats, sink, "fsm") b.collectMetricsWithStats(fsmStats, sink, "fsm")
labels := []metrics.Label{
{
Name: "peer_id",
Value: b.localID,
},
}
for _, key := range []string{"term", "commit_index", "applied_index", "fsm_pending"} {
n, err := strconv.ParseUint(stats[key], 10, 64)
if err == nil {
sink.SetGaugeWithLabels([]string{"raft_storage", "stats", key}, float32(n), labels)
}
}
} }
func (b *RaftBackend) collectMetricsWithStats(stats bolt.Stats, sink *metricsutil.ClusterMetricSink, database string) { func (b *RaftBackend) collectMetricsWithStats(stats bolt.Stats, sink *metricsutil.ClusterMetricSink, database string) {

View File

@ -540,11 +540,25 @@ func (b *RaftBackend) startFollowerHeartbeatTracker() {
tickerCh := b.followerHeartbeatTicker.C tickerCh := b.followerHeartbeatTicker.C
b.l.RUnlock() b.l.RUnlock()
followerGauge := func(peerID string, suffix string, value float32) {
labels := []metrics.Label{
{
Name: "peer_id",
Value: peerID,
},
}
metrics.SetGaugeWithLabels([]string{"raft_storage", "follower", suffix}, value, labels)
}
for range tickerCh { for range tickerCh {
b.l.RLock() b.l.RLock()
if b.autopilotConfig.CleanupDeadServers && b.autopilotConfig.DeadServerLastContactThreshold != 0 {
b.followerStates.l.RLock() b.followerStates.l.RLock()
for _, state := range b.followerStates.followers { myAppliedIndex := b.raft.AppliedIndex()
for peerID, state := range b.followerStates.followers {
timeSinceLastHeartbeat := time.Now().Sub(state.LastHeartbeat) / time.Millisecond
followerGauge(peerID, "last_heartbeat_ms", float32(timeSinceLastHeartbeat))
followerGauge(peerID, "applied_index_delta", float32(myAppliedIndex-state.AppliedIndex))
if b.autopilotConfig.CleanupDeadServers && b.autopilotConfig.DeadServerLastContactThreshold != 0 {
if state.LastHeartbeat.IsZero() || state.IsDead.Load() { if state.LastHeartbeat.IsZero() || state.IsDead.Load() {
continue continue
} }
@ -553,8 +567,8 @@ func (b *RaftBackend) startFollowerHeartbeatTracker() {
state.IsDead.Store(true) state.IsDead.Store(true)
} }
} }
b.followerStates.l.RUnlock()
} }
b.followerStates.l.RUnlock()
b.l.RUnlock() b.l.RUnlock()
} }
} }

View File

@ -2252,6 +2252,9 @@ func (s standardUnsealStrategy) unseal(ctx context.Context, logger log.Logger, c
return err return err
} }
c.metricsCh = make(chan struct{})
go c.emitMetricsActiveNode(c.metricsCh)
return nil return nil
} }
@ -2310,9 +2313,6 @@ func (c *Core) postUnseal(ctx context.Context, ctxCancelFunc context.CancelFunc,
seal.StartHealthCheck() seal.StartHealthCheck()
} }
c.metricsCh = make(chan struct{})
go c.emitMetrics(c.metricsCh)
// This is intentionally the last block in this function. We want to allow // This is intentionally the last block in this function. We want to allow
// writes just before allowing client requests, to ensure everything has // writes just before allowing client requests, to ensure everything has
// been set up properly before any writes can have happened. // been set up properly before any writes can have happened.

View File

@ -113,16 +113,16 @@ func (c *Core) metricsLoop(stopCh chan struct{}) {
c.metricSink.SetGaugeWithLabels([]string{"core", "replication", "dr", "secondary"}, 0, nil) c.metricSink.SetGaugeWithLabels([]string{"core", "replication", "dr", "secondary"}, 0, nil)
} }
// If we're using a raft backend, emit raft metrics
if rb, ok := c.underlyingPhysical.(*raft.RaftBackend); ok {
rb.CollectMetrics(c.MetricSink())
}
// Capture the total number of in-flight requests // Capture the total number of in-flight requests
c.inFlightReqGaugeMetric() c.inFlightReqGaugeMetric()
// Refresh gauge metrics that are looped // Refresh gauge metrics that are looped
c.cachedGaugeMetricsEmitter() c.cachedGaugeMetricsEmitter()
// If we're using a raft backend, emit boltdb metrics
if rb, ok := c.underlyingPhysical.(*raft.RaftBackend); ok {
rb.CollectMetrics(c.MetricSink())
}
case <-writeTimer: case <-writeTimer:
l := newLockGrabber(c.stateLock.RLock, c.stateLock.RUnlock, stopCh) l := newLockGrabber(c.stateLock.RLock, c.stateLock.RUnlock, stopCh)
go l.grab() go l.grab()
@ -232,15 +232,12 @@ func (c *Core) tokenGaugeTtlCollector(ctx context.Context) ([]metricsutil.GaugeL
return ts.gaugeCollectorByTtl(ctx) return ts.gaugeCollectorByTtl(ctx)
} }
// emitMetrics is used to start all the periodc metrics; all of them should // emitMetricsActiveNode is used to start all the periodic metrics; all of them should
// be shut down when stopCh is closed. // be shut down when stopCh is closed. This code runs on the active node only.
func (c *Core) emitMetrics(stopCh chan struct{}) { func (c *Core) emitMetricsActiveNode(stopCh chan struct{}) {
// The gauge collection processes are started and stopped here // The gauge collection processes are started and stopped here
// because there's more than one TokenManager created during startup, // because there's more than one TokenManager created during startup,
// but we only want one set of gauges. // but we only want one set of gauges.
//
// Both active nodes and performance standby nodes call emitMetrics
// so we have to handle both.
metricsInit := []struct { metricsInit := []struct {
MetricName []string MetricName []string
MetadataLabel []metrics.Label MetadataLabel []metrics.Label
@ -349,8 +346,8 @@ func (c *Core) findKvMounts() []*kvMount {
c.mountsLock.RLock() c.mountsLock.RLock()
defer c.mountsLock.RUnlock() defer c.mountsLock.RUnlock()
// emitMetrics doesn't grab the statelock, so this code might run during or after the seal process. // we don't grab the statelock, so this code might run during or after the seal process.
// Therefore, we need to check if c.mounts is nil. If we do not, emitMetrics will panic if this is // Therefore, we need to check if c.mounts is nil. If we do not, this will panic when
// run after seal. // run after seal.
if c.mounts == nil { if c.mounts == nil {
return mounts return mounts

View File

@ -434,6 +434,17 @@ func (c *Core) runStandby(doneCh, manualStepDownCh, stopCh chan struct{}) {
c.logger.Debug("shutting down periodic leader refresh") c.logger.Debug("shutting down periodic leader refresh")
}) })
} }
{
metricsStop := make(chan struct{})
g.Add(func() error {
c.metricsLoop(metricsStop)
return nil
}, func(error) {
close(metricsStop)
c.logger.Debug("shutting down periodic metrics")
})
}
{ {
// Wait for leadership // Wait for leadership
leaderStopCh := make(chan struct{}) leaderStopCh := make(chan struct{})

View File

@ -8,6 +8,7 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/armon/go-metrics"
"github.com/hashicorp/vault/helper/forwarding" "github.com/hashicorp/vault/helper/forwarding"
"github.com/hashicorp/vault/physical/raft" "github.com/hashicorp/vault/physical/raft"
"github.com/hashicorp/vault/sdk/helper/consts" "github.com/hashicorp/vault/sdk/helper/consts"
@ -135,6 +136,9 @@ func (c *forwardingClient) startHeartbeat() {
Mode: "standby", Mode: "standby",
} }
tick := func() { tick := func() {
labels := make([]metrics.Label, 0, 1)
defer metrics.MeasureSinceWithLabels([]string{"ha", "rpc", "client", "echo"}, time.Now(), labels)
req := &EchoRequest{ req := &EchoRequest{
Message: "ping", Message: "ping",
ClusterAddr: clusterAddr, ClusterAddr: clusterAddr,
@ -149,12 +153,14 @@ func (c *forwardingClient) startHeartbeat() {
req.RaftDesiredSuffrage = raftBackend.DesiredSuffrage() req.RaftDesiredSuffrage = raftBackend.DesiredSuffrage()
req.RaftRedundancyZone = raftBackend.RedundancyZone() req.RaftRedundancyZone = raftBackend.RedundancyZone()
req.RaftUpgradeVersion = raftBackend.EffectiveVersion() req.RaftUpgradeVersion = raftBackend.EffectiveVersion()
labels = append(labels, metrics.Label{Name: "peer_id", Value: raftBackend.NodeID()})
} }
ctx, cancel := context.WithTimeout(c.echoContext, 2*time.Second) ctx, cancel := context.WithTimeout(c.echoContext, 2*time.Second)
resp, err := c.RequestForwardingClient.Echo(ctx, req) resp, err := c.RequestForwardingClient.Echo(ctx, req)
cancel() cancel()
if err != nil { if err != nil {
metrics.IncrCounter([]string{"ha", "rpc", "client", "echo", "errors"}, 1)
c.core.logger.Debug("forwarding: error sending echo request to active node", "error", err) c.core.logger.Debug("forwarding: error sending echo request to active node", "error", err)
return return
} }

View File

@ -223,9 +223,11 @@ These metrics relate to internal operations on Merkle Trees and Write Ahead Logs
These metrics are emitted on standbys when talking to the active node, and in some cases by performance standbys as well. These metrics are emitted on standbys when talking to the active node, and in some cases by performance standbys as well.
| Metric | Description | Unit | Type | | Metric | Description | Unit | Type |
| :----------------------------------- | :---------------------------------------------------------------- | :----- | :------ | | :----------------------------------- | :------------------------------------------------------------------- | :----- | :------ |
| `vault.ha.rpc.client.forward` | Time taken to forward a request from a standby to the active node | ms | summary | | `vault.ha.rpc.client.forward` | Time taken to forward a request from a standby to the active node | ms | summary |
| `vault.ha.rpc.client.forward.errors` | Number of standby requests forwarding failures | errors | counter | | `vault.ha.rpc.client.forward.errors` | Number of standby requests forwarding failures | errors | counter |
| `vault.ha.rpc.client.echo` | Time taken to send an echo request from a standby to the active node | ms | summary |
| `vault.ha.rpc.client.echo.errors` | Number of standby echo request failures | errors | counter |
## Replication Metrics ## Replication Metrics
@ -474,6 +476,11 @@ These metrics relate to raft based [integrated storage][integrated-storage].
| `vault.raft_storage.bolt.spill.time` | Time taken spilling. | ms | summary | | `vault.raft_storage.bolt.spill.time` | Time taken spilling. | ms | summary |
| `vault.raft_storage.bolt.write.count` | Number of writes performed. | writes | gauge | | `vault.raft_storage.bolt.write.count` | Number of writes performed. | writes | gauge |
| `vault.raft_storage.bolt.write.time` | Time taken writing to disk. | ms | summary | | `vault.raft_storage.bolt.write.time` | Time taken writing to disk. | ms | summary |
| `vault.raft_storage.stats.commit_index` | Index of last raft log committed to disk on this node. | sequence number | gauge |
| `vault.raft_storage.stats.applied_index` | Highest index of raft log either applied to the FSM or added to fsm_pending queue. | sequence number | gauge |
| `vault.raft_storage.stats.fsm_pending` | Number of raft logs this node has queued to be applied by the FSM. | logs | gauge |
| `vault.raft_storage.follower.applied_index_delta` | Delta between leader applied index and each follower's applied index reported by echoes. | logs | gauge |
| `vault.raft_storage.follower.last_heartbeat_ms` | Time since last echo request received by each follower. | ms | gauge |
## Integrated Storage (Raft) Autopilot ## Integrated Storage (Raft) Autopilot