open-vault/vendor/github.com/google/go-metrics-stackdriver/stackdriver.go
2019-08-22 11:07:25 -04:00

517 lines
13 KiB
Go

// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package stackdriver
import (
"bytes"
"context"
"fmt"
"log"
"os"
"path"
"strconv"
"strings"
"sync"
"time"
"cloud.google.com/go/compute/metadata"
monitoring "cloud.google.com/go/monitoring/apiv3"
metrics "github.com/armon/go-metrics"
googlepb "github.com/golang/protobuf/ptypes/timestamp"
distributionpb "google.golang.org/genproto/googleapis/api/distribution"
"google.golang.org/genproto/googleapis/api/metric"
metricpb "google.golang.org/genproto/googleapis/api/metric"
monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
)
// Sink conforms to the metrics.MetricSink interface and is used to transmit
// metrics information to stackdriver.
//
// Sink performs in-process aggregation of metrics to limit calls to
// stackdriver.
type Sink struct {
client *monitoring.MetricClient
interval time.Duration
firstTime time.Time
gauges map[string]*gauge
counters map[string]*counter
histograms map[string]*histogram
bucketer BucketFn
taskInfo *taskInfo
mu sync.Mutex
}
// Config options for the stackdriver Sink.
type Config struct {
// The Google Cloud Project ID to publish metrics to.
// Optional. GCP instance metadata is used to determine the ProjectID if
// not set.
ProjectID string
// The bucketer is used to determine histogram bucket boundaries
// for the sampled metrics.
// Optional. Defaults to DefaultBucketer
Bucketer BucketFn
// The interval between sampled metric points. Must be > 1 minute.
// https://cloud.google.com/monitoring/custom-metrics/creating-metrics#writing-ts
// Optional. Defaults to 1 minute.
ReportingInterval time.Duration
// The location of the running task. See:
// https://cloud.google.com/monitoring/api/resources#tag_generic_task
// Optional. GCP instance metadata is used to determine the location,
// otherwise it defaults to 'global'.
Location string
// The namespace for the running task. See:
// https://cloud.google.com/monitoring/api/resources#tag_generic_task
// Optional. Defaults to 'default'.
Namespace string
// The job name for the running task. See:
// https://cloud.google.com/monitoring/api/resources#tag_generic_task
// Optional. Defaults to the running program name.
Job string
// The task ID for the running task. See:
// https://cloud.google.com/monitoring/api/resources#tag_generic_task
// Optional. Defaults to a combination of hostname+pid.
TaskID string
}
type taskInfo struct {
ProjectID string
Location string
Namespace string
Job string
TaskID string
}
// BucketFn returns the histogram bucket thresholds based on the given metric
// name.
type BucketFn func(string) []float64
// DefaultBucketer is the default BucketFn used to determing bucketing values
// for metrics.
func DefaultBucketer(name string) []float64 {
return []float64{10.0, 25.0, 50.0, 100.0, 150.0, 200.0, 250.0, 300.0, 500.0, 1000.0, 1500.0, 2000.0, 3000.0, 4000.0, 5000.0}
}
// NewSink creates a Sink to flush metrics to stackdriver every interval. The
// interval should be greater than 1 minute.
func NewSink(client *monitoring.MetricClient, config *Config) *Sink {
s := &Sink{
client: client,
bucketer: config.Bucketer,
interval: config.ReportingInterval,
taskInfo: &taskInfo{
ProjectID: config.ProjectID,
Location: config.Location,
Namespace: config.Namespace,
Job: config.Job,
TaskID: config.TaskID,
},
}
// apply defaults if not configured explicitly
if s.bucketer == nil {
s.bucketer = DefaultBucketer
}
if s.interval < 60*time.Second {
s.interval = 60 * time.Second
}
if s.taskInfo.ProjectID == "" {
id, err := metadata.ProjectID()
if err != nil {
log.Printf("could not configure go-metrics stackdriver ProjectID: %s", err)
}
s.taskInfo.ProjectID = id
}
if s.taskInfo.Location == "" {
// attempt to detect
zone, err := metadata.Zone()
if err != nil {
log.Printf("could not configure go-metric stackdriver location: %s", err)
zone = "global"
}
s.taskInfo.Location = zone
}
if s.taskInfo.Namespace == "" {
s.taskInfo.Namespace = "default"
}
if s.taskInfo.Job == "" {
s.taskInfo.Job = path.Base(os.Args[0])
}
if s.taskInfo.TaskID == "" {
hostname, err := os.Hostname()
if err != nil {
hostname = "localhost"
}
s.taskInfo.TaskID = "go-" + strconv.Itoa(os.Getpid()) + "@" + hostname
}
s.reset()
// run cancelable goroutine that reports on interval
go s.flushMetrics(context.Background())
return s
}
func (s *Sink) flushMetrics(ctx context.Context) {
if s.interval == 0*time.Second {
return
}
ticker := time.NewTicker(s.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Println("stopped flushing metrics")
return
case <-ticker.C:
s.report(ctx)
}
}
}
func (s *Sink) reset() {
s.mu.Lock()
defer s.mu.Unlock()
s.firstTime = time.Now()
s.gauges = make(map[string]*gauge)
s.counters = make(map[string]*counter)
s.histograms = make(map[string]*histogram)
}
func (s *Sink) deep() (time.Time, map[string]*gauge, map[string]*counter, map[string]*histogram) {
rGauges := make(map[string]*gauge, len(s.gauges))
rCounters := make(map[string]*counter, len(s.counters))
rHistograms := make(map[string]*histogram, len(s.histograms))
s.mu.Lock()
end := time.Now()
for k, v := range s.gauges {
rGauges[k] = &gauge{
name: v.name,
value: v.value,
}
}
for k, v := range s.counters {
rCounters[k] = &counter{
name: v.name,
value: v.value,
}
}
for k, v := range s.histograms {
r := &histogram{
name: v.name,
buckets: v.buckets,
counts: make([]int64, len(v.counts)),
}
copy(r.counts, v.counts)
rHistograms[k] = r
}
s.mu.Unlock()
return end, rGauges, rCounters, rHistograms
}
func (s *Sink) report(ctx context.Context) {
end, rGauges, rCounters, rHistograms := s.deep()
// https://cloud.google.com/monitoring/api/resources
resource := &monitoredrespb.MonitoredResource{
Type: "generic_task",
Labels: map[string]string{
"project_id": s.taskInfo.ProjectID,
"location": s.taskInfo.Location,
"namespace": s.taskInfo.Namespace,
"job": s.taskInfo.Job,
"task_id": s.taskInfo.TaskID,
},
}
ts := []*monitoringpb.TimeSeries{}
for _, v := range rCounters {
ts = append(ts, &monitoringpb.TimeSeries{
Metric: &metricpb.Metric{
Type: path.Join("custom.googleapis.com", "go-metrics", v.name.name),
Labels: v.name.labelMap(),
},
MetricKind: metric.MetricDescriptor_GAUGE,
Resource: resource,
Points: []*monitoringpb.Point{
&monitoringpb.Point{
Interval: &monitoringpb.TimeInterval{
EndTime: &googlepb.Timestamp{
Seconds: end.Unix(),
},
},
Value: &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_DoubleValue{
DoubleValue: v.value,
},
},
},
},
})
}
for _, v := range rGauges {
ts = append(ts, &monitoringpb.TimeSeries{
Metric: &metricpb.Metric{
Type: path.Join("custom.googleapis.com", "go-metrics", v.name.name),
Labels: v.name.labelMap(),
},
MetricKind: metric.MetricDescriptor_GAUGE,
Resource: resource,
Points: []*monitoringpb.Point{
&monitoringpb.Point{
Interval: &monitoringpb.TimeInterval{
EndTime: &googlepb.Timestamp{
Seconds: end.Unix(),
},
},
Value: &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_DoubleValue{
DoubleValue: float64(v.value),
},
},
},
},
})
}
for _, v := range rHistograms {
var count int64
count = 0
for _, i := range v.counts {
count += int64(i)
}
ts = append(ts, &monitoringpb.TimeSeries{
Metric: &metricpb.Metric{
Type: path.Join("custom.googleapis.com", "go-metrics", v.name.name),
Labels: v.name.labelMap(),
},
MetricKind: metric.MetricDescriptor_CUMULATIVE,
Resource: resource,
Points: []*monitoringpb.Point{
&monitoringpb.Point{
Interval: &monitoringpb.TimeInterval{
StartTime: &googlepb.Timestamp{
Seconds: s.firstTime.Unix(),
},
EndTime: &googlepb.Timestamp{
Seconds: end.Unix(),
},
},
Value: &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_DistributionValue{
DistributionValue: &distributionpb.Distribution{
BucketOptions: &distributionpb.Distribution_BucketOptions{
Options: &distributionpb.Distribution_BucketOptions_ExplicitBuckets{
ExplicitBuckets: &distributionpb.Distribution_BucketOptions_Explicit{
Bounds: v.buckets,
},
},
},
BucketCounts: v.counts,
Count: count,
},
},
},
},
},
})
}
if s.client == nil {
return
}
for i := 0; i < len(ts); i += 200 {
end := i + 200
if end > len(ts) {
end = len(ts)
}
err := s.client.CreateTimeSeries(ctx, &monitoringpb.CreateTimeSeriesRequest{
Name: fmt.Sprintf("projects/%s", s.taskInfo.ProjectID),
TimeSeries: ts[i:end],
})
if err != nil {
log.Printf("Failed to write time series data: %v", err)
}
}
}
// A Gauge should retain the last value it is set to.
func (s *Sink) SetGauge(key []string, val float32) {
s.SetGaugeWithLabels(key, val, nil)
}
// A Gauge should retain the last value it is set to.
func (s *Sink) SetGaugeWithLabels(key []string, val float32, labels []metrics.Label) {
n := newSeries(key, labels)
g := &gauge{
name: n,
value: val,
}
s.mu.Lock()
defer s.mu.Unlock()
s.gauges[n.key] = g
}
// Should emit a Key/Value pair for each call.
func (s *Sink) EmitKey(key []string, val float32) {
// EmitKey is not implemented for stackdriver
}
// Counters should accumulate values.
func (s *Sink) IncrCounter(key []string, val float32) {
s.IncrCounterWithLabels(key, val, nil)
}
// Counters should accumulate values.
func (s *Sink) IncrCounterWithLabels(key []string, val float32, labels []metrics.Label) {
n := newSeries(key, labels)
s.mu.Lock()
defer s.mu.Unlock()
c, ok := s.counters[n.key]
if ok {
c.value += float64(val)
} else {
s.counters[n.key] = &counter{
name: n,
value: float64(val),
}
}
}
// Samples are for timing information, where quantiles are used.
func (s *Sink) AddSample(key []string, val float32) {
s.AddSampleWithLabels(key, val, nil)
}
// Samples are for timing information, where quantiles are used.
func (s *Sink) AddSampleWithLabels(key []string, val float32, labels []metrics.Label) {
n := newSeries(key, labels)
s.mu.Lock()
defer s.mu.Unlock()
h, ok := s.histograms[n.key]
if ok {
h.addSample(val)
} else {
h = newHistogram(n, s.bucketer)
h.addSample(val)
s.histograms[n.key] = h
}
}
var _ metrics.MetricSink = (*Sink)(nil)
// Series holds the naming for a timeseries metric.
type series struct {
key string
name string
labels []metrics.Label
}
func newSeries(key []string, labels []metrics.Label) *series {
buf := &bytes.Buffer{}
replacer := strings.NewReplacer(" ", "_")
if len(key) > 0 {
replacer.WriteString(buf, key[0])
}
for _, k := range key[1:] {
replacer.WriteString(buf, ".")
replacer.WriteString(buf, k)
}
name := buf.String()
for _, label := range labels {
replacer.WriteString(buf, fmt.Sprintf(";%s=%s", label.Name, label.Value))
}
return &series{
key: buf.String(),
name: name,
labels: labels,
}
}
func (s *series) labelMap() map[string]string {
o := make(map[string]string, len(s.labels))
for _, v := range s.labels {
o[v.Name] = v.Value
}
return o
}
// https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TimeSeries#point
type gauge struct {
name *series
value float32
}
// https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TimeSeries#point
type counter struct {
name *series
value float64
}
// https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TimeSeries#distribution
type histogram struct {
name *series
buckets []float64
counts []int64
}
func newHistogram(name *series, bucketer BucketFn) *histogram {
h := &histogram{
name: name,
buckets: bucketer(name.name),
}
h.counts = make([]int64, len(h.buckets))
return h
}
func (h *histogram) addSample(val float32) {
for i := range h.buckets {
if float64(val) <= h.buckets[i] {
h.counts[i]++
return
}
}
h.counts[len(h.buckets)-1]++
}