open-vault/vault/core_metrics.go
Hamid Ghaf 27bb03bbc0
adding copyright header (#19555)
* adding copyright header

* fix fmt and a test
2023-03-15 09:00:52 -07:00

568 lines
16 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package vault
import (
"context"
"errors"
"os"
"strings"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/vault/helper/metricsutil"
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/physical/raft"
"github.com/hashicorp/vault/sdk/helper/consts"
"github.com/hashicorp/vault/sdk/logical"
)
func (c *Core) metricsLoop(stopCh chan struct{}) {
emitTimer := time.Tick(time.Second)
stopOrHAState := func() (bool, consts.HAState) {
l := newLockGrabber(c.stateLock.RLock, c.stateLock.RUnlock, stopCh)
go l.grab()
if stopped := l.lockOrStop(); stopped {
return true, 0
}
defer c.stateLock.RUnlock()
return false, c.HAState()
}
identityCountTimer := time.Tick(time.Minute * 10)
// Only emit on active node of cluster that is not a DR secondary.
if stopped, haState := stopOrHAState(); stopped {
return
} else if haState == consts.Standby || c.IsDRSecondary() {
identityCountTimer = nil
}
writeTimer := time.Tick(time.Second * 30)
// Do not process the writeTimer on DR Secondary nodes
if c.IsDRSecondary() {
writeTimer = nil
}
// This loop covers
// vault.expire.num_leases
// vault.core.unsealed
// vault.identity.num_entities
// and the non-telemetry request counters shown in the UI.
for {
select {
case <-emitTimer:
stopped, haState := stopOrHAState()
if stopped {
return
}
if haState == consts.Active {
c.metricsMutex.Lock()
// Emit on active node only
if c.expiration != nil {
c.expiration.emitMetrics()
}
c.metricsMutex.Unlock()
}
// Refresh the sealed gauge, on all nodes
if c.Sealed() {
c.metricSink.SetGaugeWithLabels([]string{"core", "unsealed"}, 0, nil)
} else {
c.metricSink.SetGaugeWithLabels([]string{"core", "unsealed"}, 1, nil)
}
if c.UndoLogsEnabled() {
c.metricSink.SetGaugeWithLabels([]string{"core", "replication", "write_undo_logs"}, 1, nil)
} else {
c.metricSink.SetGaugeWithLabels([]string{"core", "replication", "write_undo_logs"}, 0, nil)
}
// Refresh the standby gauge, on all nodes
if haState != consts.Active {
c.metricSink.SetGaugeWithLabels([]string{"core", "active"}, 0, nil)
} else {
c.metricSink.SetGaugeWithLabels([]string{"core", "active"}, 1, nil)
}
if haState == consts.PerfStandby {
c.metricSink.SetGaugeWithLabels([]string{"core", "performance_standby"}, 1, nil)
} else {
c.metricSink.SetGaugeWithLabels([]string{"core", "performance_standby"}, 0, nil)
}
if c.ReplicationState().HasState(consts.ReplicationPerformancePrimary) {
c.metricSink.SetGaugeWithLabels([]string{"core", "replication", "performance", "primary"}, 1, nil)
} else {
c.metricSink.SetGaugeWithLabels([]string{"core", "replication", "performance", "primary"}, 0, nil)
}
if c.IsPerfSecondary() {
c.metricSink.SetGaugeWithLabels([]string{"core", "replication", "performance", "secondary"}, 1, nil)
} else {
c.metricSink.SetGaugeWithLabels([]string{"core", "replication", "performance", "secondary"}, 0, nil)
}
if c.ReplicationState().HasState(consts.ReplicationDRPrimary) {
c.metricSink.SetGaugeWithLabels([]string{"core", "replication", "dr", "primary"}, 1, nil)
} else {
c.metricSink.SetGaugeWithLabels([]string{"core", "replication", "dr", "primary"}, 0, nil)
}
if c.IsDRSecondary() {
c.metricSink.SetGaugeWithLabels([]string{"core", "replication", "dr", "secondary"}, 1, nil)
} else {
c.metricSink.SetGaugeWithLabels([]string{"core", "replication", "dr", "secondary"}, 0, nil)
}
// If we're using a raft backend, emit raft metrics
if rb, ok := c.underlyingPhysical.(*raft.RaftBackend); ok {
rb.CollectMetrics(c.MetricSink())
}
// Capture the total number of in-flight requests
c.inFlightReqGaugeMetric()
// Refresh gauge metrics that are looped
c.cachedGaugeMetricsEmitter()
case <-writeTimer:
l := newLockGrabber(c.stateLock.RLock, c.stateLock.RUnlock, stopCh)
go l.grab()
if stopped := l.lockOrStop(); stopped {
return
}
// Ship barrier encryption counts if a perf standby or the active node
// on a performance secondary cluster
if c.perfStandby || c.IsPerfSecondary() { // already have lock here, do not re-acquire
err := syncBarrierEncryptionCounter(c)
if err != nil {
c.logger.Error("writing syncing encryption counters", "err", err)
}
}
c.stateLock.RUnlock()
case <-identityCountTimer:
// TODO: this can be replaced by the identity gauge counter; we need to
// sum across all namespaces.
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
entities, err := c.countActiveEntities(ctx)
if err != nil {
c.logger.Error("error counting identity entities", "err", err)
} else {
metrics.SetGauge([]string{"identity", "num_entities"}, float32(entities.Entities.Total))
}
}()
case <-stopCh:
return
}
}
}
// These wrappers are responsible for redirecting to the current instance of
// TokenStore; there is one per method because an additional level of abstraction
// seems confusing.
func (c *Core) tokenGaugeCollector(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
if c.IsDRSecondary() {
// there is no expiration manager on DR Secondaries
return []metricsutil.GaugeLabelValues{}, nil
}
// stateLock or authLock protects the tokenStore pointer
c.stateLock.RLock()
ts := c.tokenStore
c.stateLock.RUnlock()
if ts == nil {
return []metricsutil.GaugeLabelValues{}, errors.New("nil token store")
}
return ts.gaugeCollector(ctx)
}
func (c *Core) tokenGaugePolicyCollector(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
if c.IsDRSecondary() {
// there is no expiration manager on DR Secondaries
return []metricsutil.GaugeLabelValues{}, nil
}
c.stateLock.RLock()
ts := c.tokenStore
c.stateLock.RUnlock()
if ts == nil {
return []metricsutil.GaugeLabelValues{}, errors.New("nil token store")
}
return ts.gaugeCollectorByPolicy(ctx)
}
func (c *Core) leaseExpiryGaugeCollector(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
c.stateLock.RLock()
e := c.expiration
metricsConsts := c.MetricSink().TelemetryConsts
c.stateLock.RUnlock()
if e == nil {
return []metricsutil.GaugeLabelValues{}, errors.New("nil expiration manager")
}
return e.leaseAggregationMetrics(ctx, metricsConsts)
}
func (c *Core) tokenGaugeMethodCollector(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
if c.IsDRSecondary() {
// there is no expiration manager on DR Secondaries
return []metricsutil.GaugeLabelValues{}, nil
}
c.stateLock.RLock()
ts := c.tokenStore
c.stateLock.RUnlock()
if ts == nil {
return []metricsutil.GaugeLabelValues{}, errors.New("nil token store")
}
return ts.gaugeCollectorByMethod(ctx)
}
func (c *Core) tokenGaugeTtlCollector(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
if c.IsDRSecondary() {
// there is no expiration manager on DR Secondaries
return []metricsutil.GaugeLabelValues{}, nil
}
c.stateLock.RLock()
ts := c.tokenStore
c.stateLock.RUnlock()
if ts == nil {
return []metricsutil.GaugeLabelValues{}, errors.New("nil token store")
}
return ts.gaugeCollectorByTtl(ctx)
}
// emitMetricsActiveNode is used to start all the periodic metrics; all of them should
// be shut down when stopCh is closed. This code runs on the active node only.
func (c *Core) emitMetricsActiveNode(stopCh chan struct{}) {
// The gauge collection processes are started and stopped here
// because there's more than one TokenManager created during startup,
// but we only want one set of gauges.
metricsInit := []struct {
MetricName []string
MetadataLabel []metrics.Label
CollectorFunc metricsutil.GaugeCollector
DisableEnvVar string
}{
{
[]string{"token", "count"},
[]metrics.Label{{"gauge", "token_by_namespace"}},
c.tokenGaugeCollector,
"",
},
{
[]string{"token", "count", "by_policy"},
[]metrics.Label{{"gauge", "token_by_policy"}},
c.tokenGaugePolicyCollector,
"",
},
{
[]string{"expire", "leases", "by_expiration"},
[]metrics.Label{{"gauge", "leases_by_expiration"}},
c.leaseExpiryGaugeCollector,
"",
},
{
[]string{"token", "count", "by_auth"},
[]metrics.Label{{"gauge", "token_by_auth"}},
c.tokenGaugeMethodCollector,
"",
},
{
[]string{"token", "count", "by_ttl"},
[]metrics.Label{{"gauge", "token_by_ttl"}},
c.tokenGaugeTtlCollector,
"",
},
{
[]string{"secret", "kv", "count"},
[]metrics.Label{{"gauge", "kv_secrets_by_mountpoint"}},
c.kvSecretGaugeCollector,
"VAULT_DISABLE_KV_GAUGE",
},
{
[]string{"identity", "entity", "count"},
[]metrics.Label{{"gauge", "identity_by_namespace"}},
c.entityGaugeCollector,
"",
},
{
[]string{"identity", "entity", "alias", "count"},
[]metrics.Label{{"gauge", "identity_by_mountpoint"}},
c.entityGaugeCollectorByMount,
"",
},
{
[]string{"identity", "entity", "active", "partial_month"},
[]metrics.Label{{"gauge", "identity_active_month"}},
c.activeEntityGaugeCollector,
"",
},
}
// Disable collection if configured, or if we're a performance standby
// node or DR secondary cluster.
if c.MetricSink().GaugeInterval == time.Duration(0) {
c.logger.Info("usage gauge collection is disabled")
} else if standby, _ := c.Standby(); !standby && !c.IsDRSecondary() {
for _, init := range metricsInit {
if init.DisableEnvVar != "" {
if os.Getenv(init.DisableEnvVar) != "" {
c.logger.Info("usage gauge collection is disabled for",
"metric", init.MetricName)
continue
}
}
proc, err := c.MetricSink().NewGaugeCollectionProcess(
init.MetricName,
init.MetadataLabel,
init.CollectorFunc,
c.logger,
)
if err != nil {
c.logger.Error("failed to start collector", "metric", init.MetricName, "error", err)
} else {
go proc.Run()
defer proc.Stop()
}
}
}
// When this returns, all the defers set up above will fire.
c.metricsLoop(stopCh)
}
type kvMount struct {
Namespace *namespace.Namespace
MountPoint string
Version string
NumSecrets int
}
func (c *Core) findKvMounts() []*kvMount {
mounts := make([]*kvMount, 0)
c.mountsLock.RLock()
defer c.mountsLock.RUnlock()
// we don't grab the statelock, so this code might run during or after the seal process.
// Therefore, we need to check if c.mounts is nil. If we do not, this will panic when
// run after seal.
if c.mounts == nil {
return mounts
}
for _, entry := range c.mounts.Entries {
if entry.Type == "kv" || entry.Type == "generic" {
version, ok := entry.Options["version"]
if !ok {
version = "1"
}
mounts = append(mounts, &kvMount{
Namespace: entry.namespace,
MountPoint: entry.Path,
Version: version,
NumSecrets: 0,
})
}
}
return mounts
}
func (c *Core) kvCollectionErrorCount() {
c.MetricSink().IncrCounterWithLabels(
[]string{"metrics", "collection", "error"},
1,
[]metrics.Label{{"gauge", "kv_secrets_by_mountpoint"}},
)
}
func (c *Core) walkKvMountSecrets(ctx context.Context, m *kvMount) {
var subdirectories []string
if m.Version == "1" {
subdirectories = []string{m.Namespace.Path + m.MountPoint}
} else {
subdirectories = []string{m.Namespace.Path + m.MountPoint + "metadata/"}
}
for len(subdirectories) > 0 {
// Check for cancellation
select {
case <-ctx.Done():
return
default:
break
}
currentDirectory := subdirectories[0]
subdirectories = subdirectories[1:]
listRequest := &logical.Request{
Operation: logical.ListOperation,
Path: currentDirectory,
}
resp, err := c.router.Route(ctx, listRequest)
if err != nil {
c.kvCollectionErrorCount()
// ErrUnsupportedPath probably means that the mount is not there any more,
// don't log those cases.
if !strings.Contains(err.Error(), logical.ErrUnsupportedPath.Error()) {
c.logger.Error("failed to perform internal KV list", "mount_point", m.MountPoint, "error", err)
break
}
// Quit handling this mount point (but it'll still appear in the list)
return
}
if resp == nil {
continue
}
rawKeys, ok := resp.Data["keys"]
if !ok {
continue
}
keys, ok := rawKeys.([]string)
if !ok {
c.kvCollectionErrorCount()
c.logger.Error("KV list keys are not a []string", "mount_point", m.MountPoint, "rawKeys", rawKeys)
// Quit handling this mount point (but it'll still appear in the list)
return
}
for _, path := range keys {
if len(path) > 0 && path[len(path)-1] == '/' {
subdirectories = append(subdirectories, currentDirectory+path)
} else {
m.NumSecrets += 1
}
}
}
}
func (c *Core) kvSecretGaugeCollector(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
// Find all KV mounts
mounts := c.findKvMounts()
results := make([]metricsutil.GaugeLabelValues, len(mounts))
// Use a root namespace, so include namespace path
// in any queries.
ctx = namespace.RootContext(ctx)
// Route list requests to all the identified mounts.
// (All of these will show up as activity in the vault.route metric.)
// Then we have to explore each subdirectory.
for i, m := range mounts {
// Check for cancellation, return empty array
select {
case <-ctx.Done():
return []metricsutil.GaugeLabelValues{}, nil
default:
break
}
results[i].Labels = []metrics.Label{
metricsutil.NamespaceLabel(m.Namespace),
{"mount_point", m.MountPoint},
}
c.walkKvMountSecrets(ctx, m)
results[i].Value = float32(m.NumSecrets)
}
return results, nil
}
func (c *Core) entityGaugeCollector(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
// Protect against concurrent changes during seal
c.stateLock.RLock()
identityStore := c.identityStore
c.stateLock.RUnlock()
if identityStore == nil {
return []metricsutil.GaugeLabelValues{}, errors.New("nil identity store")
}
byNamespace, err := identityStore.countEntitiesByNamespace(ctx)
if err != nil {
return []metricsutil.GaugeLabelValues{}, err
}
// No check for expiration here; the bulk of the work should be in
// counting the entities.
allNamespaces := c.collectNamespaces()
values := make([]metricsutil.GaugeLabelValues, len(allNamespaces))
for i := range values {
values[i].Labels = []metrics.Label{
metricsutil.NamespaceLabel(allNamespaces[i]),
}
values[i].Value = float32(byNamespace[allNamespaces[i].ID])
}
return values, nil
}
func (c *Core) entityGaugeCollectorByMount(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
c.stateLock.RLock()
identityStore := c.identityStore
c.stateLock.RUnlock()
if identityStore == nil {
return []metricsutil.GaugeLabelValues{}, errors.New("nil identity store")
}
byAccessor, err := identityStore.countEntitiesByMountAccessor(ctx)
if err != nil {
return []metricsutil.GaugeLabelValues{}, err
}
values := make([]metricsutil.GaugeLabelValues, 0)
for accessor, count := range byAccessor {
// Terminate if taking too long to do the translation
select {
case <-ctx.Done():
return values, errors.New("context cancelled")
default:
break
}
c.stateLock.RLock()
mountEntry := c.router.MatchingMountByAccessor(accessor)
c.stateLock.RUnlock()
if mountEntry == nil {
continue
}
values = append(values, metricsutil.GaugeLabelValues{
Labels: []metrics.Label{
metricsutil.NamespaceLabel(mountEntry.namespace),
{"auth_method", mountEntry.Type},
{"mount_point", "auth/" + mountEntry.Path},
},
Value: float32(count),
})
}
return values, nil
}
func (c *Core) cachedGaugeMetricsEmitter() {
if c.metricsHelper == nil {
return
}
loopMetrics := &c.metricsHelper.LoopMetrics.Metrics
emit := func(key interface{}, value interface{}) bool {
metricValue := value.(metricsutil.GaugeMetric)
c.metricSink.SetGaugeWithLabels(metricValue.Key, metricValue.Value, metricValue.Labels)
return true
}
loopMetrics.Range(emit)
}
func (c *Core) inFlightReqGaugeMetric() {
totalInFlightReq := c.inFlightReqData.InFlightReqCount.Load()
// Adding a gauge metric to capture total number of inflight requests
c.metricSink.SetGaugeWithLabels([]string{"core", "in_flight_requests"}, float32(totalInFlightReq), nil)
}