Token gauge metrics implementation. (#9239)

* Token gauge metrics implementation.
* Enable gauges only when interval is nonzero.
* Added count by TTL
* Yandle "in restore mode" error specifically.
* Refactored initialization code for gauge collection processes.
* Fixed for multiple namespaces.
* Ability to disable individual gauges with environment variable.
* changelog++
This commit is contained in:
Mark Gritter 2020-06-23 18:36:24 -05:00 committed by GitHub
parent 101b0d3054
commit 97d415d024
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 549 additions and 64 deletions

View File

@ -19,6 +19,7 @@ IMPROVEMENTS:
* auth/kubernetes: Allow disabling `iss` validation [[GH-91](https://github.com/hashicorp/vault-plugin-auth-kubernetes/pull/91)]
* core: Add the Go version used to build a Vault binary to the server message output. [[GH-9078](https://github.com/hashicorp/vault/pull/9078)]
* core: Added Password Policies for user-configurable password generation [[GH-8637](https://github.com/hashicorp/vault/pull/8637)]
* core: New telemetry metrics covering token counts, token creation, KV secret counts, lease creation. [[GH-9239](https://github.com/hashicorp/vault/pull/9239)] [[GH-9250](https://github.com/hashicorp/vault/pull/9250)] [[GH-9244](https://github.com/hashicorp/vault/pull/9244)] [[GH-9052](https://github.com/hashicorp/vault/pull/9052)]
* cli: Support reading TLS parameters from file for the `vault operator raft join` command. [[GH-9060](https://github.com/hashicorp/vault/pull/9060)]
* plugin: Add SDK method, `Sys.ReloadPlugin`, and CLI command, `vault plugin reload`,
for reloading plugins. [[GH-8777](https://github.com/hashicorp/vault/pull/8777)]

View File

@ -20,7 +20,7 @@ var bucketBoundaries = []struct {
{30 * 24 * time.Hour, "30d"},
}
const overflowBucket = "+Inf"
const OverflowBucket = "+Inf"
// TTLBucket computes the label to apply for a token TTL.
func TTLBucket(ttl time.Duration) string {
@ -31,7 +31,7 @@ func TTLBucket(ttl time.Duration) string {
},
)
if upperBound >= len(bucketBoundaries) {
return overflowBucket
return OverflowBucket
} else {
return bucketBoundaries[upperBound].Label
}

View File

@ -2086,61 +2086,6 @@ func stopReplicationImpl(c *Core) error {
return nil
}
// emitMetrics is used to periodically expose metrics while running
func (c *Core) emitMetrics(stopCh chan struct{}) {
emitTimer := time.Tick(time.Second)
writeTimer := time.Tick(c.counters.syncInterval)
identityCountTimer := time.Tick(time.Minute * 10)
for {
select {
case <-emitTimer:
c.metricsMutex.Lock()
if c.expiration != nil {
c.expiration.emitMetrics()
}
//Refresh the sealed gauge
if c.Sealed() {
c.metricSink.SetGaugeWithLabels([]string{"core", "unsealed"}, 0, nil)
} else {
c.metricSink.SetGaugeWithLabels([]string{"core", "unsealed"}, 1, nil)
}
c.metricsMutex.Unlock()
case <-writeTimer:
if stopped := grabLockOrStop(c.stateLock.RLock, c.stateLock.RUnlock, stopCh); stopped {
// Go through the loop again, this time the stop channel case
// should trigger
continue
}
if c.perfStandby {
syncCounter(c)
} else {
err := c.saveCurrentRequestCounters(context.Background(), time.Now())
if err != nil {
c.logger.Error("writing request counters to barrier", "err", err)
}
}
c.stateLock.RUnlock()
case <-identityCountTimer:
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
entities, err := c.countActiveEntities(ctx)
if err != nil {
c.logger.Error("error counting identity entities", "err", err)
} else {
metrics.SetGauge([]string{"identity", "num_entities"}, float32(entities.Entities.Total))
}
}()
case <-stopCh:
return
}
}
}
func (c *Core) ReplicationState() consts.ReplicationState {
return consts.ReplicationState(atomic.LoadUint32(c.replicationState))
}

View File

@ -2,7 +2,10 @@ package vault
import (
"context"
"errors"
"os"
"strings"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/vault/helper/metricsutil"
@ -10,7 +13,185 @@ import (
"github.com/hashicorp/vault/sdk/logical"
)
// TODO: move emitMetrics into this file.
func (c *Core) metricsLoop(stopCh chan struct{}) {
emitTimer := time.Tick(time.Second)
writeTimer := time.Tick(c.counters.syncInterval)
identityCountTimer := time.Tick(time.Minute * 10)
// This loop covers
// vault.expire.num_leases
// vault.core.unsealed
// vault.identity.num_entities
// and the non-telemetry request counters shown in the UI.
for {
select {
case <-emitTimer:
c.metricsMutex.Lock()
if c.expiration != nil {
c.expiration.emitMetrics()
}
// Refresh the sealed gauge
if c.Sealed() {
c.metricSink.SetGaugeWithLabels([]string{"core", "unsealed"}, 0, nil)
} else {
c.metricSink.SetGaugeWithLabels([]string{"core", "unsealed"}, 1, nil)
}
c.metricsMutex.Unlock()
case <-writeTimer:
if stopped := grabLockOrStop(c.stateLock.RLock, c.stateLock.RUnlock, stopCh); stopped {
// Go through the loop again, this time the stop channel case
// should trigger
continue
}
if c.perfStandby {
syncCounter(c)
} else {
err := c.saveCurrentRequestCounters(context.Background(), time.Now())
if err != nil {
c.logger.Error("writing request counters to barrier", "err", err)
}
}
c.stateLock.RUnlock()
case <-identityCountTimer:
// TODO: this can be replaced by the identity gauge counter; we need to
// sum across all namespaces.
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
entities, err := c.countActiveEntities(ctx)
if err != nil {
c.logger.Error("error counting identity entities", "err", err)
} else {
metrics.SetGauge([]string{"identity", "num_entities"}, float32(entities.Entities.Total))
}
}()
case <-stopCh:
return
}
}
}
// These wrappers are responsible for redirecting to the current instance of
// TokenStore; there is one per method because an additional level of abstraction
// seems confusing.
func (c *Core) tokenGaugeCollector(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
// stateLock or authLock protects the tokenStore pointer
c.stateLock.RLock()
ts := c.tokenStore
c.stateLock.RUnlock()
if ts == nil {
return []metricsutil.GaugeLabelValues{}, errors.New("nil token store")
}
return ts.gaugeCollector(ctx)
}
func (c *Core) tokenGaugePolicyCollector(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
c.stateLock.RLock()
ts := c.tokenStore
c.stateLock.RUnlock()
if ts == nil {
return []metricsutil.GaugeLabelValues{}, errors.New("nil token store")
}
return ts.gaugeCollectorByPolicy(ctx)
}
func (c *Core) tokenGaugeMethodCollector(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
c.stateLock.RLock()
ts := c.tokenStore
c.stateLock.RUnlock()
if ts == nil {
return []metricsutil.GaugeLabelValues{}, errors.New("nil token store")
}
return ts.gaugeCollectorByMethod(ctx)
}
func (c *Core) tokenGaugeTtlCollector(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
c.stateLock.RLock()
ts := c.tokenStore
c.stateLock.RUnlock()
if ts == nil {
return []metricsutil.GaugeLabelValues{}, errors.New("nil token store")
}
return ts.gaugeCollectorByTtl(ctx)
}
// emitMetrics is used to start all the periodc metrics; all of them should
// be shut down when stopCh is closed.
func (c *Core) emitMetrics(stopCh chan struct{}) {
// The gauge collection processes are started and stopped here
// because there's more than one TokenManager created during startup,
// but we only want one set of gauges.
metricsInit := []struct {
MetricName []string
MetadataLabel []metrics.Label
CollectorFunc metricsutil.GaugeCollector
DisableEnvVar string
}{
{
[]string{"token", "count"},
[]metrics.Label{{"gauge", "token_by_namespace"}},
c.tokenGaugeCollector,
"",
},
{
[]string{"token", "count", "by_policy"},
[]metrics.Label{{"gauge", "token_by_policy"}},
c.tokenGaugePolicyCollector,
"",
},
{
[]string{"token", "count", "by_auth"},
[]metrics.Label{{"gauge", "token_by_auth"}},
c.tokenGaugeMethodCollector,
"",
},
{
[]string{"token", "count", "by_ttl"},
[]metrics.Label{{"gauge", "token_by_ttl"}},
c.tokenGaugeTtlCollector,
"",
},
{
[]string{"secret", "kv", "count"},
[]metrics.Label{{"gauge", "kv_secrets_by_mountpoint"}},
c.kvSecretGaugeCollector,
"VAULT_DISABLE_KV_GAUGE",
},
}
if c.MetricSink().GaugeInterval == time.Duration(0) {
c.logger.Info("usage gauge collection is disabled")
} else {
for _, init := range metricsInit {
if init.DisableEnvVar != "" {
if os.Getenv(init.DisableEnvVar) != "" {
c.logger.Info("usage gauge collection is disabled for",
"metric", init.MetricName)
continue
}
}
proc, err := c.MetricSink().NewGaugeCollectionProcess(
init.MetricName,
init.MetadataLabel,
init.CollectorFunc,
c.logger,
)
if err != nil {
c.logger.Error("failed to start collector", "metric", init.MetricName, "error", err)
} else {
go proc.Run()
defer proc.Stop()
}
}
}
// When this returns, all the defers set up above will fire.
c.metricsLoop(stopCh)
}
type kvMount struct {
Namespace *namespace.Namespace
@ -53,9 +234,9 @@ func (c *Core) kvCollectionErrorCount() {
func (c *Core) walkKvMountSecrets(ctx context.Context, m *kvMount) {
var subdirectories []string
if m.Version == "1" {
subdirectories = []string{m.MountPoint}
subdirectories = []string{m.Namespace.Path + m.MountPoint}
} else {
subdirectories = []string{m.MountPoint + "metadata/"}
subdirectories = []string{m.Namespace.Path + m.MountPoint + "metadata/"}
}
for len(subdirectories) > 0 {
@ -115,7 +296,8 @@ func (c *Core) kvSecretGaugeCollector(ctx context.Context) ([]metricsutil.GaugeL
mounts := c.findKvMounts()
results := make([]metricsutil.GaugeLabelValues, len(mounts))
// Context must have root namespace
// Use a root namespace, so include namespace path
// in any queries.
ctx = namespace.RootContext(ctx)
// Route list requests to all the identified mounts.

View File

@ -1901,10 +1901,17 @@ func (m *ExpirationManager) emitMetrics() {
// type (though most likely we would only call this from within the "vault" core package.)
type ExpirationWalkFunction = func(leaseID string, auth *logical.Auth, path string) bool
var (
ErrInRestoreMode = errors.New("expiration manager in restore mode")
)
// WalkTokens extracts the Auth structure from leases corresponding to tokens.
// Returning false from the walk function terminates the iteration.
// TODO: signal if reload hasn't finished yet?
func (m *ExpirationManager) WalkTokens(walkFn ExpirationWalkFunction) {
func (m *ExpirationManager) WalkTokens(walkFn ExpirationWalkFunction) error {
if m.inRestoreMode() {
return ErrInRestoreMode
}
callback := func(key, value interface{}) bool {
p := value.(pendingInfo)
if p.cachedLeaseInfo == nil {
@ -1919,6 +1926,8 @@ func (m *ExpirationManager) WalkTokens(walkFn ExpirationWalkFunction) {
m.pending.Range(callback)
m.nonexpiring.Range(callback)
return nil
}
// leaseEntry is used to structure the values the expiration

View File

@ -2092,6 +2092,8 @@ func TestExpiration_WalkTokens(t *testing.T) {
sampleToken(t, exp, "auth/github/login", false, "root"),
}
waitForRestore(t, exp)
for true {
// Count before and after each revocation
t.Logf("Counting %d tokens.", len(tokenEntries))
@ -2133,6 +2135,22 @@ func TestExpiration_WalkTokens(t *testing.T) {
}
func waitForRestore(t *testing.T, exp *ExpirationManager) {
t.Helper()
timeout := time.After(200 * time.Millisecond)
ticker := time.Tick(5 * time.Millisecond)
for exp.inRestoreMode() {
select {
case <-timeout:
t.Fatalf("Timeout waiting for expiration manager to recover.")
case <-ticker:
continue
}
}
}
func TestExpiration_CachedPolicyIsShared(t *testing.T) {
exp := mockExpiration(t)
@ -2144,6 +2162,7 @@ func TestExpiration_CachedPolicyIsShared(t *testing.T) {
var policies [][]string
waitForRestore(t, exp)
exp.WalkTokens(func(leaseId string, auth *logical.Auth, path string) bool {
policies = append(policies, auth.Policies)
return true

View File

@ -14,6 +14,7 @@ import (
"time"
"github.com/armon/go-metrics"
"github.com/armon/go-radix"
"github.com/golang/protobuf/proto"
"github.com/hashicorp/errwrap"
log "github.com/hashicorp/go-hclog"
@ -2719,6 +2720,7 @@ func (ts *TokenStore) handleCreateCommon(ctx context.Context, req *logical.Reque
// Count the successful token creation.
ttl_label := metricsutil.TTLBucket(te.TTL)
ts.core.metricSink.IncrCounterWithLabels(
[]string{"token", "creation"},
1,
@ -3397,6 +3399,15 @@ func (ts *TokenStore) tokenStoreRoleCreateUpdate(ctx context.Context, req *logic
return resp, nil
}
func suppressRestoreModeError(err error) error {
if err != nil {
if strings.Contains(err.Error(), ErrInRestoreMode.Error()) {
return nil
}
}
return err
}
// gaugeCollector is responsible for counting the number of tokens by
// namespace. Separate versions cover the other two counts; this is somewhat
// less efficient than doing just one pass over the tokens and can
@ -3419,7 +3430,7 @@ func (ts *TokenStore) gaugeCollector(ctx context.Context) ([]metricsutil.GaugeLa
namespacePosition[ns.ID] = i
}
ts.expiration.WalkTokens(func(leaseID string, auth *logical.Auth, path string) bool {
err := ts.expiration.WalkTokens(func(leaseID string, auth *logical.Auth, path string) bool {
select {
// Abort and return empty collection if it's taking too much time, nonblocking check.
case <-ctx.Done():
@ -3439,10 +3450,16 @@ func (ts *TokenStore) gaugeCollector(ctx context.Context) ([]metricsutil.GaugeLa
}
})
if err != nil {
return []metricsutil.GaugeLabelValues{}, suppressRestoreModeError(err)
}
// If collection was cancelled, return an empty array.
select {
case <-ctx.Done():
return []metricsutil.GaugeLabelValues{}, nil
default:
break
}
for i := range values {
@ -3452,6 +3469,233 @@ func (ts *TokenStore) gaugeCollector(ctx context.Context) ([]metricsutil.GaugeLa
}
func (ts *TokenStore) gaugeCollectorByPolicy(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
if ts.expiration == nil {
return []metricsutil.GaugeLabelValues{}, errors.New("expiration manager is nil")
}
allNamespaces := ts.core.collectNamespaces()
byNsAndPolicy := make(map[string]map[string]int)
err := ts.expiration.WalkTokens(func(leaseID string, auth *logical.Auth, path string) bool {
select {
// Abort and return empty collection if it's taking too much time, nonblocking check.
case <-ctx.Done():
return false
default:
_, nsID := namespace.SplitIDFromString(leaseID)
if nsID == "" {
nsID = namespace.RootNamespaceID
}
policyMap, ok := byNsAndPolicy[nsID]
if !ok {
policyMap = make(map[string]int)
byNsAndPolicy[nsID] = policyMap
}
for _, policy := range auth.Policies {
policyMap[policy] = policyMap[policy] + 1
}
return true
}
})
if err != nil {
return []metricsutil.GaugeLabelValues{}, suppressRestoreModeError(err)
}
// If collection was cancelled, return an empty array.
select {
case <-ctx.Done():
return []metricsutil.GaugeLabelValues{}, nil
default:
break
}
// TODO: can we estimate the needed size?
flattenedResults := make([]metricsutil.GaugeLabelValues, 0)
for _, ns := range allNamespaces {
for policy, count := range byNsAndPolicy[ns.ID] {
flattenedResults = append(flattenedResults,
metricsutil.GaugeLabelValues{
Labels: []metrics.Label{
metricsutil.NamespaceLabel(ns),
{"policy", policy},
},
Value: float32(count),
})
}
}
return flattenedResults, nil
}
func (ts *TokenStore) gaugeCollectorByTtl(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
if ts.expiration == nil {
return []metricsutil.GaugeLabelValues{}, errors.New("expiration manager is nil")
}
allNamespaces := ts.core.collectNamespaces()
byNsAndBucket := make(map[string]map[string]int)
err := ts.expiration.WalkTokens(func(leaseID string, auth *logical.Auth, path string) bool {
select {
// Abort and return empty collection if it's taking too much time, nonblocking check.
case <-ctx.Done():
return false
default:
if auth == nil {
return true
}
_, nsID := namespace.SplitIDFromString(leaseID)
if nsID == "" {
nsID = namespace.RootNamespaceID
}
bucketMap, ok := byNsAndBucket[nsID]
if !ok {
bucketMap = make(map[string]int)
byNsAndBucket[nsID] = bucketMap
}
bucket := metricsutil.TTLBucket(auth.TTL)
// Zero is a special value in this context
if auth.TTL == time.Duration(0) {
bucket = metricsutil.OverflowBucket
}
bucketMap[bucket] = bucketMap[bucket] + 1
return true
}
})
if err != nil {
return []metricsutil.GaugeLabelValues{}, suppressRestoreModeError(err)
}
// If collection was cancelled, return an empty array.
select {
case <-ctx.Done():
return []metricsutil.GaugeLabelValues{}, nil
default:
break
}
// 10 different time buckets, at the moment, though many should
// be unused.
flattenedResults := make([]metricsutil.GaugeLabelValues, 0, len(allNamespaces)*10)
for _, ns := range allNamespaces {
for bucket, count := range byNsAndBucket[ns.ID] {
flattenedResults = append(flattenedResults,
metricsutil.GaugeLabelValues{
Labels: []metrics.Label{
metricsutil.NamespaceLabel(ns),
{"creation_ttl", bucket},
},
Value: float32(count),
})
}
}
return flattenedResults, nil
}
func (ts *TokenStore) gaugeCollectorByMethod(ctx context.Context) ([]metricsutil.GaugeLabelValues, error) {
if ts.expiration == nil {
return []metricsutil.GaugeLabelValues{}, errors.New("expiration manager is nil")
}
rootContext := namespace.RootContext(ctx)
allNamespaces := ts.core.collectNamespaces()
byNsAndMethod := make(map[string]map[string]int)
// Cache the prefixes that we find locally rather than
// hitting the shared mount table every time
prefixTree := radix.New()
pathToPrefix := func(nsID string, path string) string {
ns, err := NamespaceByID(rootContext, nsID, ts.core)
if ns == nil || err != nil {
return "unknown"
}
ctx := namespace.ContextWithNamespace(rootContext, ns)
key := ns.Path + path
_, method, ok := prefixTree.LongestPrefix(key)
if ok {
return method.(string)
}
// Look up the path from the lease within the correct namespace
// Need to hold stateLock while accessing the router.
ts.core.stateLock.RLock()
defer ts.core.stateLock.RUnlock()
mountEntry := ts.core.router.MatchingMountEntry(ctx, path)
if mountEntry == nil {
return "unknown"
}
// mountEntry.Path lacks the "auth/" prefix; perhaps we should
// refactor router to provide a method that returns both the matching
// path *and* the the mount entry?
// Or we could just always add "auth/"?
matchingMount := ts.core.router.MatchingMount(ctx, path)
if matchingMount == "" {
// Shouldn't happen, but a race is possible?
return mountEntry.Type
}
key = ns.Path + matchingMount
prefixTree.Insert(key, mountEntry.Type)
return mountEntry.Type
}
err := ts.expiration.WalkTokens(func(leaseID string, auth *logical.Auth, path string) bool {
select {
// Abort and return empty collection if it's taking too much time, nonblocking check.
case <-ctx.Done():
return false
default:
_, nsID := namespace.SplitIDFromString(leaseID)
if nsID == "" {
nsID = namespace.RootNamespaceID
}
methodMap, ok := byNsAndMethod[nsID]
if !ok {
methodMap = make(map[string]int)
byNsAndMethod[nsID] = methodMap
}
method := pathToPrefix(nsID, path)
methodMap[method] = methodMap[method] + 1
return true
}
})
if err != nil {
return []metricsutil.GaugeLabelValues{}, suppressRestoreModeError(err)
}
// If collection was cancelled, return an empty array.
select {
case <-ctx.Done():
return []metricsutil.GaugeLabelValues{}, nil
default:
break
}
// TODO: how can we estimate the needed size?
flattenedResults := make([]metricsutil.GaugeLabelValues, 0)
for _, ns := range allNamespaces {
for method, count := range byNsAndMethod[ns.ID] {
flattenedResults = append(flattenedResults,
metricsutil.GaugeLabelValues{
Labels: []metrics.Label{
metricsutil.NamespaceLabel(ns),
{"auth_method", method},
},
Value: float32(count),
})
}
}
return flattenedResults, nil
}
const (
tokenTidyHelp = `
This endpoint performs cleanup tasks that can be run if certain error

View File

@ -5741,3 +5741,88 @@ func TestTokenStore_TokenID(t *testing.T) {
}
})
}
func expectInGaugeCollection(t *testing.T, expectedLabels map[string]string, expectedValue float32, actual []metricsutil.GaugeLabelValues) {
t.Helper()
for _, glv := range actual {
actualLabels := make(map[string]string)
for _, l := range glv.Labels {
actualLabels[l.Name] = l.Value
}
if labelsMatch(actualLabels, expectedLabels) {
if expectedValue != glv.Value {
t.Errorf("expeced %v for %v, got %v", expectedValue, expectedLabels, glv.Value)
}
return
}
}
t.Errorf("didn't find labels %v", expectedLabels)
}
func TestTokenStore_Collectors(t *testing.T) {
ctx := namespace.RootContext(nil)
exp := mockExpiration(t)
ts := exp.tokenStore
// This helper is defined in expiration.go
sampleToken(t, exp, "auth/userpass/login", true, "default")
sampleToken(t, exp, "auth/userpass/login", true, "policy23457")
sampleToken(t, exp, "auth/token/create", false, "root")
sampleToken(t, exp, "auth/github/login", true, "root")
sampleToken(t, exp, "auth/github/login", false, "root")
waitForRestore(t, exp)
// By namespace:
values, err := ts.gaugeCollector(ctx)
if err != nil {
t.Fatalf("bad collector run: %v", err)
}
if len(values) != 1 {
t.Errorf("got %v values, expected 1", len(values))
}
expectInGaugeCollection(t,
map[string]string{"namespace": "root"},
5.0,
values)
values, err = ts.gaugeCollectorByPolicy(ctx)
if err != nil {
t.Fatalf("bad collector run: %v", err)
}
if len(values) != 3 {
t.Errorf("got %v values, expected 3", len(values))
}
expectInGaugeCollection(t,
map[string]string{"namespace": "root", "policy": "root"},
3.0,
values)
expectInGaugeCollection(t,
map[string]string{"namespace": "root", "policy": "default"},
1.0,
values)
expectInGaugeCollection(t,
map[string]string{"namespace": "root", "policy": "policy23457"},
1.0,
values)
values, err = ts.gaugeCollectorByTtl(ctx)
if err != nil {
t.Fatalf("bad collector run: %v", err)
}
if len(values) != 2 {
t.Errorf("got %v values, expected 2", len(values))
}
expectInGaugeCollection(t,
map[string]string{"namespace": "root", "creation_ttl": "1h"},
3.0,
values)
expectInGaugeCollection(t,
map[string]string{"namespace": "root", "creation_ttl": "+Inf"},
2.0,
values)
// Need to set up router for this to work, TODO
// ts.gaugeCollectorByMethod( ctx )
}