// Copyright (c) HashiCorp, Inc. // SPDX-License-Identifier: MPL-2.0 package metricsutil import ( "context" "math/rand" "sort" "time" "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" "github.com/hashicorp/vault/helper/timeutil" ) // GaugeLabelValues is one gauge in a set sharing a single key, that // are measured in a batch. type GaugeLabelValues struct { Labels []Label Value float32 } // GaugeCollector is a callback function that returns an unfiltered // set of label-value pairs. It may be cancelled if it takes too long. type GaugeCollector = func(context.Context) ([]GaugeLabelValues, error) // collectionBound is a hard limit on how long a collection process // may take, as a fraction of the current interval. const collectionBound = 0.02 // collectionTarget is a soft limit; if exceeded, the collection interval // will be doubled. const collectionTarget = 0.01 // A GaugeCollectionProcess is responsible for one particular gauge metric. // It handles a delay on initial startup; limiting the cardinality; and // exponential backoff on the requested interval. type GaugeCollectionProcess struct { stop chan struct{} stopped chan struct{} // gauge name key []string // labels to use when reporting labels []Label // callback function collector GaugeCollector // destination for metrics sink Metrics logger log.Logger // time between collections originalInterval time.Duration currentInterval time.Duration ticker *time.Ticker // used to help limit cardinality maxGaugeCardinality int // time source clock timeutil.Clock } // NewGaugeCollectionProcess creates a new collection process for the callback // function given as an argument, and starts it running. // A label should be provided for metrics *about* this collection process. // // The Run() method must be called to start the process. func NewGaugeCollectionProcess( key []string, id []Label, collector GaugeCollector, m metrics.MetricSink, gaugeInterval time.Duration, maxGaugeCardinality int, logger log.Logger, ) (*GaugeCollectionProcess, error) { return newGaugeCollectionProcessWithClock( key, id, collector, SinkWrapper{MetricSink: m}, gaugeInterval, maxGaugeCardinality, logger, timeutil.DefaultClock{}, ) } // NewGaugeCollectionProcess creates a new collection process for the callback // function given as an argument, and starts it running. // A label should be provided for metrics *about* this collection process. // // The Run() method must be called to start the process. func (m *ClusterMetricSink) NewGaugeCollectionProcess( key []string, id []Label, collector GaugeCollector, logger log.Logger, ) (*GaugeCollectionProcess, error) { return newGaugeCollectionProcessWithClock( key, id, collector, m, m.GaugeInterval, m.MaxGaugeCardinality, logger, timeutil.DefaultClock{}, ) } // test version allows an alternative clock implementation func newGaugeCollectionProcessWithClock( key []string, id []Label, collector GaugeCollector, sink Metrics, gaugeInterval time.Duration, maxGaugeCardinality int, logger log.Logger, clock timeutil.Clock, ) (*GaugeCollectionProcess, error) { process := &GaugeCollectionProcess{ stop: make(chan struct{}, 1), stopped: make(chan struct{}, 1), key: key, labels: id, collector: collector, sink: sink, originalInterval: gaugeInterval, currentInterval: gaugeInterval, maxGaugeCardinality: maxGaugeCardinality, logger: logger, clock: clock, } return process, nil } // delayStart randomly delays by up to one extra interval // so that collection processes do not all run at the time. // If we knew all the processes in advance, we could just schedule them // evenly, but a new one could be added per secret engine. func (p *GaugeCollectionProcess) delayStart() bool { randomDelay := time.Duration(rand.Int63n(int64(p.currentInterval))) // A Timer might be better, but then we'd have to simulate // one of those too? delayTick := p.clock.NewTicker(randomDelay) defer delayTick.Stop() select { case <-p.stop: return true case <-delayTick.C: break } return false } // resetTicker stops the old ticker and starts a new one at the current // interval setting. func (p *GaugeCollectionProcess) resetTicker() { if p.ticker != nil { p.ticker.Stop() } p.ticker = p.clock.NewTicker(p.currentInterval) } // collectAndFilterGauges executes the callback function, // limits the cardinality, and streams the results to the metrics sink. func (p *GaugeCollectionProcess) collectAndFilterGauges() { // Run for only an allotted amount of time. timeout := time.Duration(collectionBound * float64(p.currentInterval)) ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() p.sink.AddDurationWithLabels([]string{"metrics", "collection", "interval"}, p.currentInterval, p.labels) start := p.clock.Now() values, err := p.collector(ctx) end := p.clock.Now() duration := end.Sub(start) // Report how long it took to perform the operation. p.sink.AddDurationWithLabels([]string{"metrics", "collection"}, duration, p.labels) // If over threshold, back off by doubling the measurement interval. // Currently a restart is the only way to bring it back down. threshold := time.Duration(collectionTarget * float64(p.currentInterval)) if duration > threshold { p.logger.Warn("gauge collection time exceeded target", "target", threshold, "actual", duration, "id", p.labels) p.currentInterval *= 2 p.resetTicker() } if err != nil { p.logger.Error("error collecting gauge", "id", p.labels, "error", err) p.sink.IncrCounterWithLabels([]string{"metrics", "collection", "error"}, 1, p.labels) return } // Filter to top N. // This does not guarantee total cardinality is <= N, but it does slow things down // a little if the cardinality *is* too high and the gauge needs to be disabled. if len(values) > p.maxGaugeCardinality { sort.Slice(values, func(a, b int) bool { return values[a].Value > values[b].Value }) values = values[:p.maxGaugeCardinality] } p.streamGaugesToSink(values) } // batchSize is the number of metrics to be sent per tick duration. const batchSize = 25 func (p *GaugeCollectionProcess) streamGaugesToSink(values []GaugeLabelValues) { // Dumping 500 metrics in one big chunk is somewhat unfriendly to UDP-based // transport, and to the rest of the metrics trying to get through. // Let's smooth things out over the course of a second. // 1 second / 500 = 2 ms each, so we can send 25 (batchSize) per 50 milliseconds. // That should be one or two packets. sendTick := p.clock.NewTicker(50 * time.Millisecond) defer sendTick.Stop() for i, lv := range values { if i > 0 && i%batchSize == 0 { select { case <-p.stop: // because the channel is closed, // the main loop will successfully // read from p.stop too, and exit. return case <-sendTick.C: break } } p.sink.SetGaugeWithLabels(p.key, lv.Value, lv.Labels) } } // Run should be called as a goroutine. func (p *GaugeCollectionProcess) Run() { defer close(p.stopped) // Wait a random amount of time stopReceived := p.delayStart() if stopReceived { return } // Create a ticker to start each cycle p.resetTicker() // Loop until we get a signal to stop for { select { case <-p.ticker.C: p.collectAndFilterGauges() case <-p.stop: // Can't use defer because this might // not be the original ticker. p.ticker.Stop() return } } } // Stop the collection process func (p *GaugeCollectionProcess) Stop() { close(p.stop) }