2020-07-23 18:37:33 +00:00
|
|
|
|
// +build go1.9
|
2019-03-26 21:50:42 +00:00
|
|
|
|
|
2018-04-05 16:21:32 +00:00
|
|
|
|
package prometheus
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"fmt"
|
2020-07-23 18:37:33 +00:00
|
|
|
|
"log"
|
2020-08-11 16:17:43 +00:00
|
|
|
|
"regexp"
|
2018-04-05 16:21:32 +00:00
|
|
|
|
"strings"
|
|
|
|
|
"sync"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/armon/go-metrics"
|
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
2020-07-23 18:37:33 +00:00
|
|
|
|
"github.com/prometheus/client_golang/prometheus/push"
|
2018-04-05 16:21:32 +00:00
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
var (
|
|
|
|
|
// DefaultPrometheusOpts is the default set of options used when creating a
|
|
|
|
|
// PrometheusSink.
|
|
|
|
|
DefaultPrometheusOpts = PrometheusOpts{
|
|
|
|
|
Expiration: 60 * time.Second,
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// PrometheusOpts is used to configure the Prometheus Sink
|
|
|
|
|
type PrometheusOpts struct {
|
|
|
|
|
// Expiration is the duration a metric is valid for, after which it will be
|
|
|
|
|
// untracked. If the value is zero, a metric is never expired.
|
|
|
|
|
Expiration time.Duration
|
2020-09-14 18:53:31 +00:00
|
|
|
|
Registerer prometheus.Registerer
|
2020-11-05 19:51:58 +00:00
|
|
|
|
|
2021-05-04 14:36:53 +00:00
|
|
|
|
// Gauges, Summaries, and Counters allow us to pre-declare metrics by giving
|
|
|
|
|
// their Name, Help, and ConstLabels to the PrometheusSink when it is created.
|
|
|
|
|
// Metrics declared in this way will be initialized at zero and will not be
|
|
|
|
|
// deleted or altered when their expiry is reached.
|
|
|
|
|
//
|
|
|
|
|
// Ex: PrometheusOpts{
|
2020-11-05 19:51:58 +00:00
|
|
|
|
// Expiration: 10 * time.Second,
|
|
|
|
|
// Gauges: []GaugeDefinition{
|
|
|
|
|
// {
|
2021-05-04 14:36:53 +00:00
|
|
|
|
// Name: []string{ "application", "component", "measurement"},
|
2020-11-05 19:51:58 +00:00
|
|
|
|
// Help: "application_component_measurement provides an example of how to declare static metrics",
|
|
|
|
|
// ConstLabels: []metrics.Label{ { Name: "my_label", Value: "does_not_change" }, },
|
|
|
|
|
// },
|
|
|
|
|
// },
|
|
|
|
|
// }
|
|
|
|
|
GaugeDefinitions []GaugeDefinition
|
|
|
|
|
SummaryDefinitions []SummaryDefinition
|
|
|
|
|
CounterDefinitions []CounterDefinition
|
2018-04-05 16:21:32 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type PrometheusSink struct {
|
2020-07-23 18:37:33 +00:00
|
|
|
|
// If these will ever be copied, they should be converted to *sync.Map values and initialized appropriately
|
|
|
|
|
gauges sync.Map
|
|
|
|
|
summaries sync.Map
|
|
|
|
|
counters sync.Map
|
2018-04-05 16:21:32 +00:00
|
|
|
|
expiration time.Duration
|
2021-01-06 12:32:50 +00:00
|
|
|
|
help map[string]string
|
2018-04-05 16:21:32 +00:00
|
|
|
|
}
|
|
|
|
|
|
2020-11-05 19:51:58 +00:00
|
|
|
|
// GaugeDefinition can be provided to PrometheusOpts to declare a constant gauge that is not deleted on expiry.
|
|
|
|
|
type GaugeDefinition struct {
|
2021-01-06 12:32:50 +00:00
|
|
|
|
Name []string
|
2020-11-05 19:51:58 +00:00
|
|
|
|
ConstLabels []metrics.Label
|
2021-01-06 12:32:50 +00:00
|
|
|
|
Help string
|
2020-11-05 19:51:58 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type gauge struct {
|
2020-08-11 16:17:43 +00:00
|
|
|
|
prometheus.Gauge
|
|
|
|
|
updatedAt time.Time
|
2020-11-05 19:51:58 +00:00
|
|
|
|
// canDelete is set if the metric is created during runtime so we know it's ephemeral and can delete it on expiry.
|
|
|
|
|
canDelete bool
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// SummaryDefinition can be provided to PrometheusOpts to declare a constant summary that is not deleted on expiry.
|
|
|
|
|
type SummaryDefinition struct {
|
2021-01-06 12:32:50 +00:00
|
|
|
|
Name []string
|
2020-11-05 19:51:58 +00:00
|
|
|
|
ConstLabels []metrics.Label
|
2021-01-06 12:32:50 +00:00
|
|
|
|
Help string
|
2020-08-11 16:17:43 +00:00
|
|
|
|
}
|
|
|
|
|
|
2020-11-05 19:51:58 +00:00
|
|
|
|
type summary struct {
|
2020-08-11 16:17:43 +00:00
|
|
|
|
prometheus.Summary
|
|
|
|
|
updatedAt time.Time
|
2020-11-05 19:51:58 +00:00
|
|
|
|
canDelete bool
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// CounterDefinition can be provided to PrometheusOpts to declare a constant counter that is not deleted on expiry.
|
|
|
|
|
type CounterDefinition struct {
|
2021-01-06 12:32:50 +00:00
|
|
|
|
Name []string
|
2020-11-05 19:51:58 +00:00
|
|
|
|
ConstLabels []metrics.Label
|
2021-01-06 12:32:50 +00:00
|
|
|
|
Help string
|
2020-08-11 16:17:43 +00:00
|
|
|
|
}
|
|
|
|
|
|
2020-11-05 19:51:58 +00:00
|
|
|
|
type counter struct {
|
2020-08-11 16:17:43 +00:00
|
|
|
|
prometheus.Counter
|
|
|
|
|
updatedAt time.Time
|
2020-11-05 19:51:58 +00:00
|
|
|
|
canDelete bool
|
2020-08-11 16:17:43 +00:00
|
|
|
|
}
|
|
|
|
|
|
2018-04-05 16:21:32 +00:00
|
|
|
|
// NewPrometheusSink creates a new PrometheusSink using the default options.
|
|
|
|
|
func NewPrometheusSink() (*PrometheusSink, error) {
|
|
|
|
|
return NewPrometheusSinkFrom(DefaultPrometheusOpts)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewPrometheusSinkFrom creates a new PrometheusSink using the passed options.
|
|
|
|
|
func NewPrometheusSinkFrom(opts PrometheusOpts) (*PrometheusSink, error) {
|
|
|
|
|
sink := &PrometheusSink{
|
2020-07-23 18:37:33 +00:00
|
|
|
|
gauges: sync.Map{},
|
|
|
|
|
summaries: sync.Map{},
|
|
|
|
|
counters: sync.Map{},
|
2018-04-05 16:21:32 +00:00
|
|
|
|
expiration: opts.Expiration,
|
2021-01-06 12:32:50 +00:00
|
|
|
|
help: make(map[string]string),
|
2018-04-05 16:21:32 +00:00
|
|
|
|
}
|
|
|
|
|
|
2021-01-06 12:32:50 +00:00
|
|
|
|
initGauges(&sink.gauges, opts.GaugeDefinitions, sink.help)
|
|
|
|
|
initSummaries(&sink.summaries, opts.SummaryDefinitions, sink.help)
|
|
|
|
|
initCounters(&sink.counters, opts.CounterDefinitions, sink.help)
|
2020-11-05 19:51:58 +00:00
|
|
|
|
|
2020-09-14 18:53:31 +00:00
|
|
|
|
reg := opts.Registerer
|
|
|
|
|
if reg == nil {
|
|
|
|
|
reg = prometheus.DefaultRegisterer
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return sink, reg.Register(sink)
|
2018-04-05 16:21:32 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Describe is needed to meet the Collector interface.
|
|
|
|
|
func (p *PrometheusSink) Describe(c chan<- *prometheus.Desc) {
|
|
|
|
|
// We must emit some description otherwise an error is returned. This
|
|
|
|
|
// description isn't shown to the user!
|
|
|
|
|
prometheus.NewGauge(prometheus.GaugeOpts{Name: "Dummy", Help: "Dummy"}).Describe(c)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Collect meets the collection interface and allows us to enforce our expiration
|
|
|
|
|
// logic to clean up ephemeral metrics if their value haven't been set for a
|
|
|
|
|
// duration exceeding our allowed expiration time.
|
|
|
|
|
func (p *PrometheusSink) Collect(c chan<- prometheus.Metric) {
|
2021-05-04 14:36:53 +00:00
|
|
|
|
p.collectAtTime(c, time.Now())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// collectAtTime allows internal testing of the expiry based logic here without
|
|
|
|
|
// mocking clocks or making tests timing sensitive.
|
|
|
|
|
func (p *PrometheusSink) collectAtTime(c chan<- prometheus.Metric, t time.Time) {
|
2018-04-05 16:21:32 +00:00
|
|
|
|
expire := p.expiration != 0
|
2020-07-23 18:37:33 +00:00
|
|
|
|
p.gauges.Range(func(k, v interface{}) bool {
|
2020-11-05 19:51:58 +00:00
|
|
|
|
if v == nil {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
g := v.(*gauge)
|
|
|
|
|
lastUpdate := g.updatedAt
|
2021-05-04 14:36:53 +00:00
|
|
|
|
if expire && lastUpdate.Add(p.expiration).Before(t) {
|
2020-11-05 19:51:58 +00:00
|
|
|
|
if g.canDelete {
|
2020-08-11 16:17:43 +00:00
|
|
|
|
p.gauges.Delete(k)
|
2020-11-05 19:51:58 +00:00
|
|
|
|
return true
|
2020-08-11 16:17:43 +00:00
|
|
|
|
}
|
2018-04-05 16:21:32 +00:00
|
|
|
|
}
|
2020-11-05 19:51:58 +00:00
|
|
|
|
g.Collect(c)
|
2020-07-23 18:37:33 +00:00
|
|
|
|
return true
|
|
|
|
|
})
|
|
|
|
|
p.summaries.Range(func(k, v interface{}) bool {
|
2020-11-05 19:51:58 +00:00
|
|
|
|
if v == nil {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
s := v.(*summary)
|
|
|
|
|
lastUpdate := s.updatedAt
|
2021-05-04 14:36:53 +00:00
|
|
|
|
if expire && lastUpdate.Add(p.expiration).Before(t) {
|
2020-11-05 19:51:58 +00:00
|
|
|
|
if s.canDelete {
|
2020-08-11 16:17:43 +00:00
|
|
|
|
p.summaries.Delete(k)
|
2020-11-05 19:51:58 +00:00
|
|
|
|
return true
|
2020-08-11 16:17:43 +00:00
|
|
|
|
}
|
2018-04-05 16:21:32 +00:00
|
|
|
|
}
|
2020-11-05 19:51:58 +00:00
|
|
|
|
s.Collect(c)
|
2020-07-23 18:37:33 +00:00
|
|
|
|
return true
|
|
|
|
|
})
|
|
|
|
|
p.counters.Range(func(k, v interface{}) bool {
|
2020-11-05 19:51:58 +00:00
|
|
|
|
if v == nil {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
count := v.(*counter)
|
|
|
|
|
lastUpdate := count.updatedAt
|
2021-05-04 14:36:53 +00:00
|
|
|
|
if expire && lastUpdate.Add(p.expiration).Before(t) {
|
2020-11-05 19:51:58 +00:00
|
|
|
|
if count.canDelete {
|
2020-08-11 16:17:43 +00:00
|
|
|
|
p.counters.Delete(k)
|
2020-11-05 19:51:58 +00:00
|
|
|
|
return true
|
2020-08-11 16:17:43 +00:00
|
|
|
|
}
|
2018-04-05 16:21:32 +00:00
|
|
|
|
}
|
2020-11-05 19:51:58 +00:00
|
|
|
|
count.Collect(c)
|
2020-07-23 18:37:33 +00:00
|
|
|
|
return true
|
|
|
|
|
})
|
2018-04-05 16:21:32 +00:00
|
|
|
|
}
|
|
|
|
|
|
2021-01-06 12:32:50 +00:00
|
|
|
|
func initGauges(m *sync.Map, gauges []GaugeDefinition, help map[string]string) {
|
2020-11-05 19:51:58 +00:00
|
|
|
|
for _, g := range gauges {
|
|
|
|
|
key, hash := flattenKey(g.Name, g.ConstLabels)
|
2021-01-06 12:32:50 +00:00
|
|
|
|
help[fmt.Sprintf("gauge.%s", key)] = g.Help
|
2020-11-05 19:51:58 +00:00
|
|
|
|
pG := prometheus.NewGauge(prometheus.GaugeOpts{
|
|
|
|
|
Name: key,
|
|
|
|
|
Help: g.Help,
|
|
|
|
|
ConstLabels: prometheusLabels(g.ConstLabels),
|
|
|
|
|
})
|
2021-01-06 12:32:50 +00:00
|
|
|
|
m.Store(hash, &gauge{Gauge: pG})
|
2020-11-05 19:51:58 +00:00
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
2021-01-06 12:32:50 +00:00
|
|
|
|
func initSummaries(m *sync.Map, summaries []SummaryDefinition, help map[string]string) {
|
2020-11-05 19:51:58 +00:00
|
|
|
|
for _, s := range summaries {
|
|
|
|
|
key, hash := flattenKey(s.Name, s.ConstLabels)
|
2021-01-06 12:32:50 +00:00
|
|
|
|
help[fmt.Sprintf("summary.%s", key)] = s.Help
|
2020-11-05 19:51:58 +00:00
|
|
|
|
pS := prometheus.NewSummary(prometheus.SummaryOpts{
|
|
|
|
|
Name: key,
|
|
|
|
|
Help: s.Help,
|
|
|
|
|
MaxAge: 10 * time.Second,
|
|
|
|
|
ConstLabels: prometheusLabels(s.ConstLabels),
|
|
|
|
|
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
|
|
|
|
|
})
|
2021-01-06 12:32:50 +00:00
|
|
|
|
m.Store(hash, &summary{Summary: pS})
|
2020-11-05 19:51:58 +00:00
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
2021-01-06 12:32:50 +00:00
|
|
|
|
func initCounters(m *sync.Map, counters []CounterDefinition, help map[string]string) {
|
2020-11-05 19:51:58 +00:00
|
|
|
|
for _, c := range counters {
|
|
|
|
|
key, hash := flattenKey(c.Name, c.ConstLabels)
|
2021-01-06 12:32:50 +00:00
|
|
|
|
help[fmt.Sprintf("counter.%s", key)] = c.Help
|
2020-11-05 19:51:58 +00:00
|
|
|
|
pC := prometheus.NewCounter(prometheus.CounterOpts{
|
|
|
|
|
Name: key,
|
|
|
|
|
Help: c.Help,
|
|
|
|
|
ConstLabels: prometheusLabels(c.ConstLabels),
|
|
|
|
|
})
|
2021-01-06 12:32:50 +00:00
|
|
|
|
m.Store(hash, &counter{Counter: pC})
|
2020-11-05 19:51:58 +00:00
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
2019-03-26 21:50:42 +00:00
|
|
|
|
var forbiddenChars = regexp.MustCompile("[ .=\\-/]")
|
2018-04-05 16:21:32 +00:00
|
|
|
|
|
2020-11-05 19:51:58 +00:00
|
|
|
|
func flattenKey(parts []string, labels []metrics.Label) (string, string) {
|
2018-04-05 16:21:32 +00:00
|
|
|
|
key := strings.Join(parts, "_")
|
|
|
|
|
key = forbiddenChars.ReplaceAllString(key, "_")
|
|
|
|
|
|
|
|
|
|
hash := key
|
|
|
|
|
for _, label := range labels {
|
|
|
|
|
hash += fmt.Sprintf(";%s=%s", label.Name, label.Value)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return key, hash
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func prometheusLabels(labels []metrics.Label) prometheus.Labels {
|
|
|
|
|
l := make(prometheus.Labels)
|
|
|
|
|
for _, label := range labels {
|
|
|
|
|
l[label.Name] = label.Value
|
|
|
|
|
}
|
|
|
|
|
return l
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *PrometheusSink) SetGauge(parts []string, val float32) {
|
|
|
|
|
p.SetGaugeWithLabels(parts, val, nil)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *PrometheusSink) SetGaugeWithLabels(parts []string, val float32, labels []metrics.Label) {
|
2020-11-05 19:51:58 +00:00
|
|
|
|
key, hash := flattenKey(parts, labels)
|
2020-08-11 16:17:43 +00:00
|
|
|
|
pg, ok := p.gauges.Load(hash)
|
|
|
|
|
|
|
|
|
|
// The sync.Map underlying gauges stores pointers to our structs. If we need to make updates,
|
|
|
|
|
// rather than modifying the underlying value directly, which would be racy, we make a local
|
|
|
|
|
// copy by dereferencing the pointer we get back, making the appropriate changes, and then
|
|
|
|
|
// storing a pointer to our local copy. The underlying Prometheus types are threadsafe,
|
|
|
|
|
// so there's no issues there. It's possible for racy updates to occur to the updatedAt
|
|
|
|
|
// value, but since we're always setting it to time.Now(), it doesn't really matter.
|
|
|
|
|
if ok {
|
2020-11-05 19:51:58 +00:00
|
|
|
|
localGauge := *pg.(*gauge)
|
2020-08-11 16:17:43 +00:00
|
|
|
|
localGauge.Set(float64(val))
|
|
|
|
|
localGauge.updatedAt = time.Now()
|
|
|
|
|
p.gauges.Store(hash, &localGauge)
|
2020-11-05 19:51:58 +00:00
|
|
|
|
|
2021-01-06 12:32:50 +00:00
|
|
|
|
// The gauge does not exist, create the gauge and allow it to be deleted
|
2020-08-11 16:17:43 +00:00
|
|
|
|
} else {
|
2021-01-06 12:32:50 +00:00
|
|
|
|
help := key
|
|
|
|
|
existingHelp, ok := p.help[fmt.Sprintf("gauge.%s", key)]
|
|
|
|
|
if ok {
|
|
|
|
|
help = existingHelp
|
|
|
|
|
}
|
2020-08-11 16:17:43 +00:00
|
|
|
|
g := prometheus.NewGauge(prometheus.GaugeOpts{
|
2018-04-05 16:21:32 +00:00
|
|
|
|
Name: key,
|
2021-01-06 12:32:50 +00:00
|
|
|
|
Help: help,
|
2018-04-05 16:21:32 +00:00
|
|
|
|
ConstLabels: prometheusLabels(labels),
|
|
|
|
|
})
|
2020-08-11 16:17:43 +00:00
|
|
|
|
g.Set(float64(val))
|
2020-11-05 19:51:58 +00:00
|
|
|
|
pg = &gauge{
|
2021-01-06 12:32:50 +00:00
|
|
|
|
Gauge: g,
|
2020-11-05 19:51:58 +00:00
|
|
|
|
updatedAt: time.Now(),
|
|
|
|
|
canDelete: true,
|
2020-08-11 16:17:43 +00:00
|
|
|
|
}
|
|
|
|
|
p.gauges.Store(hash, pg)
|
2018-04-05 16:21:32 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *PrometheusSink) AddSample(parts []string, val float32) {
|
|
|
|
|
p.AddSampleWithLabels(parts, val, nil)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *PrometheusSink) AddSampleWithLabels(parts []string, val float32, labels []metrics.Label) {
|
2020-11-05 19:51:58 +00:00
|
|
|
|
key, hash := flattenKey(parts, labels)
|
2020-08-11 16:17:43 +00:00
|
|
|
|
ps, ok := p.summaries.Load(hash)
|
|
|
|
|
|
2020-11-05 19:51:58 +00:00
|
|
|
|
// Does the summary already exist for this sample type?
|
2020-08-11 16:17:43 +00:00
|
|
|
|
if ok {
|
2020-11-05 19:51:58 +00:00
|
|
|
|
localSummary := *ps.(*summary)
|
2020-08-11 16:17:43 +00:00
|
|
|
|
localSummary.Observe(float64(val))
|
|
|
|
|
localSummary.updatedAt = time.Now()
|
|
|
|
|
p.summaries.Store(hash, &localSummary)
|
2020-11-05 19:51:58 +00:00
|
|
|
|
|
2021-01-06 12:32:50 +00:00
|
|
|
|
// The summary does not exist, create the Summary and allow it to be deleted
|
2020-08-11 16:17:43 +00:00
|
|
|
|
} else {
|
2021-01-06 12:32:50 +00:00
|
|
|
|
help := key
|
|
|
|
|
existingHelp, ok := p.help[fmt.Sprintf("summary.%s", key)]
|
|
|
|
|
if ok {
|
|
|
|
|
help = existingHelp
|
|
|
|
|
}
|
2020-08-11 16:17:43 +00:00
|
|
|
|
s := prometheus.NewSummary(prometheus.SummaryOpts{
|
2018-04-05 16:21:32 +00:00
|
|
|
|
Name: key,
|
2021-01-06 12:32:50 +00:00
|
|
|
|
Help: help,
|
2018-04-05 16:21:32 +00:00
|
|
|
|
MaxAge: 10 * time.Second,
|
|
|
|
|
ConstLabels: prometheusLabels(labels),
|
2020-07-23 18:37:33 +00:00
|
|
|
|
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
|
2018-04-05 16:21:32 +00:00
|
|
|
|
})
|
2020-08-11 16:17:43 +00:00
|
|
|
|
s.Observe(float64(val))
|
2020-11-05 19:51:58 +00:00
|
|
|
|
ps = &summary{
|
2021-01-06 12:32:50 +00:00
|
|
|
|
Summary: s,
|
2020-11-05 19:51:58 +00:00
|
|
|
|
updatedAt: time.Now(),
|
|
|
|
|
canDelete: true,
|
2020-08-11 16:17:43 +00:00
|
|
|
|
}
|
|
|
|
|
p.summaries.Store(hash, ps)
|
2018-04-05 16:21:32 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// EmitKey is not implemented. Prometheus doesn’t offer a type for which an
|
|
|
|
|
// arbitrary number of values is retained, as Prometheus works with a pull
|
|
|
|
|
// model, rather than a push model.
|
|
|
|
|
func (p *PrometheusSink) EmitKey(key []string, val float32) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *PrometheusSink) IncrCounter(parts []string, val float32) {
|
|
|
|
|
p.IncrCounterWithLabels(parts, val, nil)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *PrometheusSink) IncrCounterWithLabels(parts []string, val float32, labels []metrics.Label) {
|
2020-11-05 19:51:58 +00:00
|
|
|
|
key, hash := flattenKey(parts, labels)
|
2020-08-11 16:17:43 +00:00
|
|
|
|
pc, ok := p.counters.Load(hash)
|
|
|
|
|
|
2020-11-05 19:51:58 +00:00
|
|
|
|
// Does the counter exist?
|
2020-08-11 16:17:43 +00:00
|
|
|
|
if ok {
|
2020-11-05 19:51:58 +00:00
|
|
|
|
localCounter := *pc.(*counter)
|
2020-08-11 16:17:43 +00:00
|
|
|
|
localCounter.Add(float64(val))
|
|
|
|
|
localCounter.updatedAt = time.Now()
|
|
|
|
|
p.counters.Store(hash, &localCounter)
|
2020-11-05 19:51:58 +00:00
|
|
|
|
|
2021-01-06 12:32:50 +00:00
|
|
|
|
// The counter does not exist yet, create it and allow it to be deleted
|
2020-08-11 16:17:43 +00:00
|
|
|
|
} else {
|
2021-01-06 12:32:50 +00:00
|
|
|
|
help := key
|
|
|
|
|
existingHelp, ok := p.help[fmt.Sprintf("counter.%s", key)]
|
|
|
|
|
if ok {
|
|
|
|
|
help = existingHelp
|
|
|
|
|
}
|
2020-08-11 16:17:43 +00:00
|
|
|
|
c := prometheus.NewCounter(prometheus.CounterOpts{
|
2018-04-05 16:21:32 +00:00
|
|
|
|
Name: key,
|
2021-01-06 12:32:50 +00:00
|
|
|
|
Help: help,
|
2018-04-05 16:21:32 +00:00
|
|
|
|
ConstLabels: prometheusLabels(labels),
|
|
|
|
|
})
|
2020-08-11 16:17:43 +00:00
|
|
|
|
c.Add(float64(val))
|
2020-11-05 19:51:58 +00:00
|
|
|
|
pc = &counter{
|
2021-01-06 12:32:50 +00:00
|
|
|
|
Counter: c,
|
2020-11-05 19:51:58 +00:00
|
|
|
|
updatedAt: time.Now(),
|
|
|
|
|
canDelete: true,
|
2020-08-11 16:17:43 +00:00
|
|
|
|
}
|
|
|
|
|
p.counters.Store(hash, pc)
|
2020-07-23 18:37:33 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2020-11-05 19:51:58 +00:00
|
|
|
|
// PrometheusPushSink wraps a normal prometheus sink and provides an address and facilities to export it to an address
|
|
|
|
|
// on an interval.
|
2020-07-23 18:37:33 +00:00
|
|
|
|
type PrometheusPushSink struct {
|
|
|
|
|
*PrometheusSink
|
|
|
|
|
pusher *push.Pusher
|
|
|
|
|
address string
|
|
|
|
|
pushInterval time.Duration
|
|
|
|
|
stopChan chan struct{}
|
|
|
|
|
}
|
|
|
|
|
|
2020-11-05 19:51:58 +00:00
|
|
|
|
// NewPrometheusPushSink creates a PrometheusPushSink by taking an address, interval, and destination name.
|
|
|
|
|
func NewPrometheusPushSink(address string, pushInterval time.Duration, name string) (*PrometheusPushSink, error) {
|
2020-07-23 18:37:33 +00:00
|
|
|
|
promSink := &PrometheusSink{
|
|
|
|
|
gauges: sync.Map{},
|
|
|
|
|
summaries: sync.Map{},
|
|
|
|
|
counters: sync.Map{},
|
|
|
|
|
expiration: 60 * time.Second,
|
2018-04-05 16:21:32 +00:00
|
|
|
|
}
|
2020-07-23 18:37:33 +00:00
|
|
|
|
|
|
|
|
|
pusher := push.New(address, name).Collector(promSink)
|
|
|
|
|
|
|
|
|
|
sink := &PrometheusPushSink{
|
|
|
|
|
promSink,
|
|
|
|
|
pusher,
|
|
|
|
|
address,
|
2020-11-05 19:51:58 +00:00
|
|
|
|
pushInterval,
|
2020-07-23 18:37:33 +00:00
|
|
|
|
make(chan struct{}),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sink.flushMetrics()
|
|
|
|
|
return sink, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *PrometheusPushSink) flushMetrics() {
|
|
|
|
|
ticker := time.NewTicker(s.pushInterval)
|
|
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-ticker.C:
|
|
|
|
|
err := s.pusher.Push()
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Printf("[ERR] Error pushing to Prometheus! Err: %s", err)
|
|
|
|
|
}
|
|
|
|
|
case <-s.stopChan:
|
|
|
|
|
ticker.Stop()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *PrometheusPushSink) Shutdown() {
|
|
|
|
|
close(s.stopChan)
|
2018-04-05 16:21:32 +00:00
|
|
|
|
}
|