27bb03bbc0
* adding copyright header * fix fmt and a test
299 lines
7.8 KiB
Go
299 lines
7.8 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package metricsutil
|
|
|
|
import (
|
|
"context"
|
|
"math/rand"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/armon/go-metrics"
|
|
log "github.com/hashicorp/go-hclog"
|
|
)
|
|
|
|
// This interface allows unit tests to substitute in a simulated clock.
|
|
type clock interface {
|
|
Now() time.Time
|
|
NewTicker(time.Duration) *time.Ticker
|
|
}
|
|
|
|
type defaultClock struct{}
|
|
|
|
func (_ defaultClock) Now() time.Time {
|
|
return time.Now()
|
|
}
|
|
|
|
func (_ defaultClock) NewTicker(d time.Duration) *time.Ticker {
|
|
return time.NewTicker(d)
|
|
}
|
|
|
|
// GaugeLabelValues is one gauge in a set sharing a single key, that
|
|
// are measured in a batch.
|
|
type GaugeLabelValues struct {
|
|
Labels []Label
|
|
Value float32
|
|
}
|
|
|
|
// GaugeCollector is a callback function that returns an unfiltered
|
|
// set of label-value pairs. It may be cancelled if it takes too long.
|
|
type GaugeCollector = func(context.Context) ([]GaugeLabelValues, error)
|
|
|
|
// collectionBound is a hard limit on how long a collection process
|
|
// may take, as a fraction of the current interval.
|
|
const collectionBound = 0.02
|
|
|
|
// collectionTarget is a soft limit; if exceeded, the collection interval
|
|
// will be doubled.
|
|
const collectionTarget = 0.01
|
|
|
|
// A GaugeCollectionProcess is responsible for one particular gauge metric.
|
|
// It handles a delay on initial startup; limiting the cardinality; and
|
|
// exponential backoff on the requested interval.
|
|
type GaugeCollectionProcess struct {
|
|
stop chan struct{}
|
|
stopped chan struct{}
|
|
|
|
// gauge name
|
|
key []string
|
|
// labels to use when reporting
|
|
labels []Label
|
|
|
|
// callback function
|
|
collector GaugeCollector
|
|
|
|
// destination for metrics
|
|
sink Metrics
|
|
logger log.Logger
|
|
|
|
// time between collections
|
|
originalInterval time.Duration
|
|
currentInterval time.Duration
|
|
ticker *time.Ticker
|
|
|
|
// used to help limit cardinality
|
|
maxGaugeCardinality int
|
|
|
|
// time source
|
|
clock clock
|
|
}
|
|
|
|
// NewGaugeCollectionProcess creates a new collection process for the callback
|
|
// function given as an argument, and starts it running.
|
|
// A label should be provided for metrics *about* this collection process.
|
|
//
|
|
// The Run() method must be called to start the process.
|
|
func NewGaugeCollectionProcess(
|
|
key []string,
|
|
id []Label,
|
|
collector GaugeCollector,
|
|
m metrics.MetricSink,
|
|
gaugeInterval time.Duration,
|
|
maxGaugeCardinality int,
|
|
logger log.Logger,
|
|
) (*GaugeCollectionProcess, error) {
|
|
return newGaugeCollectionProcessWithClock(
|
|
key,
|
|
id,
|
|
collector,
|
|
SinkWrapper{MetricSink: m},
|
|
gaugeInterval,
|
|
maxGaugeCardinality,
|
|
logger,
|
|
defaultClock{},
|
|
)
|
|
}
|
|
|
|
// NewGaugeCollectionProcess creates a new collection process for the callback
|
|
// function given as an argument, and starts it running.
|
|
// A label should be provided for metrics *about* this collection process.
|
|
//
|
|
// The Run() method must be called to start the process.
|
|
func (m *ClusterMetricSink) NewGaugeCollectionProcess(
|
|
key []string,
|
|
id []Label,
|
|
collector GaugeCollector,
|
|
logger log.Logger,
|
|
) (*GaugeCollectionProcess, error) {
|
|
return newGaugeCollectionProcessWithClock(
|
|
key,
|
|
id,
|
|
collector,
|
|
m,
|
|
m.GaugeInterval,
|
|
m.MaxGaugeCardinality,
|
|
logger,
|
|
defaultClock{},
|
|
)
|
|
}
|
|
|
|
// test version allows an alternative clock implementation
|
|
func newGaugeCollectionProcessWithClock(
|
|
key []string,
|
|
id []Label,
|
|
collector GaugeCollector,
|
|
sink Metrics,
|
|
gaugeInterval time.Duration,
|
|
maxGaugeCardinality int,
|
|
logger log.Logger,
|
|
clock clock,
|
|
) (*GaugeCollectionProcess, error) {
|
|
process := &GaugeCollectionProcess{
|
|
stop: make(chan struct{}, 1),
|
|
stopped: make(chan struct{}, 1),
|
|
key: key,
|
|
labels: id,
|
|
collector: collector,
|
|
sink: sink,
|
|
originalInterval: gaugeInterval,
|
|
currentInterval: gaugeInterval,
|
|
maxGaugeCardinality: maxGaugeCardinality,
|
|
logger: logger,
|
|
clock: clock,
|
|
}
|
|
return process, nil
|
|
}
|
|
|
|
// delayStart randomly delays by up to one extra interval
|
|
// so that collection processes do not all run at the time.
|
|
// If we knew all the processes in advance, we could just schedule them
|
|
// evenly, but a new one could be added per secret engine.
|
|
func (p *GaugeCollectionProcess) delayStart() bool {
|
|
randomDelay := time.Duration(rand.Int63n(int64(p.currentInterval)))
|
|
// A Timer might be better, but then we'd have to simulate
|
|
// one of those too?
|
|
delayTick := p.clock.NewTicker(randomDelay)
|
|
defer delayTick.Stop()
|
|
|
|
select {
|
|
case <-p.stop:
|
|
return true
|
|
case <-delayTick.C:
|
|
break
|
|
}
|
|
return false
|
|
}
|
|
|
|
// resetTicker stops the old ticker and starts a new one at the current
|
|
// interval setting.
|
|
func (p *GaugeCollectionProcess) resetTicker() {
|
|
if p.ticker != nil {
|
|
p.ticker.Stop()
|
|
}
|
|
p.ticker = p.clock.NewTicker(p.currentInterval)
|
|
}
|
|
|
|
// collectAndFilterGauges executes the callback function,
|
|
// limits the cardinality, and streams the results to the metrics sink.
|
|
func (p *GaugeCollectionProcess) collectAndFilterGauges() {
|
|
// Run for only an allotted amount of time.
|
|
timeout := time.Duration(collectionBound * float64(p.currentInterval))
|
|
ctx, cancel := context.WithTimeout(context.Background(),
|
|
timeout)
|
|
defer cancel()
|
|
|
|
p.sink.AddDurationWithLabels([]string{"metrics", "collection", "interval"},
|
|
p.currentInterval,
|
|
p.labels)
|
|
|
|
start := p.clock.Now()
|
|
values, err := p.collector(ctx)
|
|
end := p.clock.Now()
|
|
duration := end.Sub(start)
|
|
|
|
// Report how long it took to perform the operation.
|
|
p.sink.AddDurationWithLabels([]string{"metrics", "collection"},
|
|
duration,
|
|
p.labels)
|
|
|
|
// If over threshold, back off by doubling the measurement interval.
|
|
// Currently a restart is the only way to bring it back down.
|
|
threshold := time.Duration(collectionTarget * float64(p.currentInterval))
|
|
if duration > threshold {
|
|
p.logger.Warn("gauge collection time exceeded target", "target", threshold, "actual", duration, "id", p.labels)
|
|
p.currentInterval *= 2
|
|
p.resetTicker()
|
|
}
|
|
|
|
if err != nil {
|
|
p.logger.Error("error collecting gauge", "id", p.labels, "error", err)
|
|
p.sink.IncrCounterWithLabels([]string{"metrics", "collection", "error"},
|
|
1,
|
|
p.labels)
|
|
return
|
|
}
|
|
|
|
// Filter to top N.
|
|
// This does not guarantee total cardinality is <= N, but it does slow things down
|
|
// a little if the cardinality *is* too high and the gauge needs to be disabled.
|
|
if len(values) > p.maxGaugeCardinality {
|
|
sort.Slice(values, func(a, b int) bool {
|
|
return values[a].Value > values[b].Value
|
|
})
|
|
values = values[:p.maxGaugeCardinality]
|
|
}
|
|
|
|
p.streamGaugesToSink(values)
|
|
}
|
|
|
|
// batchSize is the number of metrics to be sent per tick duration.
|
|
const batchSize = 25
|
|
|
|
func (p *GaugeCollectionProcess) streamGaugesToSink(values []GaugeLabelValues) {
|
|
// Dumping 500 metrics in one big chunk is somewhat unfriendly to UDP-based
|
|
// transport, and to the rest of the metrics trying to get through.
|
|
// Let's smooth things out over the course of a second.
|
|
// 1 second / 500 = 2 ms each, so we can send 25 (batchSize) per 50 milliseconds.
|
|
// That should be one or two packets.
|
|
sendTick := p.clock.NewTicker(50 * time.Millisecond)
|
|
defer sendTick.Stop()
|
|
|
|
for i, lv := range values {
|
|
if i > 0 && i%batchSize == 0 {
|
|
select {
|
|
case <-p.stop:
|
|
// because the channel is closed,
|
|
// the main loop will successfully
|
|
// read from p.stop too, and exit.
|
|
return
|
|
case <-sendTick.C:
|
|
break
|
|
}
|
|
}
|
|
p.sink.SetGaugeWithLabels(p.key, lv.Value, lv.Labels)
|
|
}
|
|
}
|
|
|
|
// Run should be called as a goroutine.
|
|
func (p *GaugeCollectionProcess) Run() {
|
|
defer close(p.stopped)
|
|
|
|
// Wait a random amount of time
|
|
stopReceived := p.delayStart()
|
|
if stopReceived {
|
|
return
|
|
}
|
|
|
|
// Create a ticker to start each cycle
|
|
p.resetTicker()
|
|
|
|
// Loop until we get a signal to stop
|
|
for {
|
|
select {
|
|
case <-p.ticker.C:
|
|
p.collectAndFilterGauges()
|
|
case <-p.stop:
|
|
// Can't use defer because this might
|
|
// not be the original ticker.
|
|
p.ticker.Stop()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Stop the collection process
|
|
func (p *GaugeCollectionProcess) Stop() {
|
|
close(p.stop)
|
|
}
|