Merge pull request #3147 from hashicorp/b-tagged-metrics

Add support for tagged metrics
This commit is contained in:
Chelsea Komlo 2017-09-05 15:55:39 -04:00 committed by GitHub
commit e3973a4603
28 changed files with 1232 additions and 317 deletions

View File

@ -1,3 +1,8 @@
## 0.7 (Unreleased)
IMPROVEMENTS:
* telemetry: Add support for tagged metrics for Nomad clients [GH-3147]
## 0.6.3 (Unreleased)
BUG FIXES:

View File

@ -13,7 +13,7 @@ import (
"sync"
"time"
"github.com/armon/go-metrics"
metrics "github.com/armon/go-metrics"
"github.com/boltdb/bolt"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
@ -153,6 +153,10 @@ type Client struct {
// clientACLResolver holds the ACL resolution state
clientACLResolver
// baseLabels are used when emitting tagged metrics. All client metrics will
// have these tags, and optionally more.
baseLabels []metrics.Label
}
var (
@ -1843,6 +1847,10 @@ DISCOLOOP:
// emitStats collects host resource usage stats periodically
func (c *Client) emitStats() {
// Assign labels directly before emitting stats so the information expected
// is ready
c.baseLabels = []metrics.Label{{Name: "node_id", Value: c.Node().ID}, {Name: "datacenter", Value: c.Node().Datacenter}}
// Start collecting host stats right away and then keep collecting every
// collection interval
next := time.NewTimer(0)
@ -1859,7 +1867,7 @@ func (c *Client) emitStats() {
// Publish Node metrics if operator has opted in
if c.config.PublishNodeMetrics {
c.emitHostStats(c.hostStatsCollector.Stats())
c.emitHostStats()
}
c.emitClientMetrics()
@ -1869,33 +1877,69 @@ func (c *Client) emitStats() {
}
}
// emitHostStats pushes host resource usage stats to remote metrics collection sinks
func (c *Client) emitHostStats(hStats *stats.HostStats) {
nodeID := c.Node().ID
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "total"}, float32(hStats.Memory.Total))
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "available"}, float32(hStats.Memory.Available))
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "used"}, float32(hStats.Memory.Used))
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "free"}, float32(hStats.Memory.Free))
// setGaugeForMemoryStats proxies metrics for memory specific statistics
func (c *Client) setGaugeForMemoryStats(nodeID string, hStats *stats.HostStats) {
if !c.config.DisableTaggedMetrics {
metrics.SetGaugeWithLabels([]string{"client", "host", "memory", "total"}, float32(hStats.Memory.Total), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "host", "memory", "available"}, float32(hStats.Memory.Available), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "host", "memory", "used"}, float32(hStats.Memory.Used), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "host", "memory", "free"}, float32(hStats.Memory.Free), c.baseLabels)
}
metrics.SetGauge([]string{"uptime"}, float32(hStats.Uptime))
if c.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "total"}, float32(hStats.Memory.Total))
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "available"}, float32(hStats.Memory.Available))
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "used"}, float32(hStats.Memory.Used))
metrics.SetGauge([]string{"client", "host", "memory", nodeID, "free"}, float32(hStats.Memory.Free))
}
}
// setGaugeForCPUStats proxies metrics for CPU specific statistics
func (c *Client) setGaugeForCPUStats(nodeID string, hStats *stats.HostStats) {
for _, cpu := range hStats.CPU {
metrics.SetGauge([]string{"client", "host", "cpu", nodeID, cpu.CPU, "total"}, float32(cpu.Total))
metrics.SetGauge([]string{"client", "host", "cpu", nodeID, cpu.CPU, "user"}, float32(cpu.User))
metrics.SetGauge([]string{"client", "host", "cpu", nodeID, cpu.CPU, "idle"}, float32(cpu.Idle))
metrics.SetGauge([]string{"client", "host", "cpu", nodeID, cpu.CPU, "system"}, float32(cpu.System))
}
if !c.config.DisableTaggedMetrics {
labels := append(c.baseLabels, metrics.Label{"cpu", cpu.CPU})
metrics.SetGaugeWithLabels([]string{"client", "host", "cpu", "total"}, float32(cpu.Total), labels)
metrics.SetGaugeWithLabels([]string{"client", "host", "cpu", "user"}, float32(cpu.User), labels)
metrics.SetGaugeWithLabels([]string{"client", "host", "cpu", "idle"}, float32(cpu.Idle), labels)
metrics.SetGaugeWithLabels([]string{"client", "host", "cpu", "system"}, float32(cpu.System), labels)
}
if c.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "host", "cpu", nodeID, cpu.CPU, "total"}, float32(cpu.Total))
metrics.SetGauge([]string{"client", "host", "cpu", nodeID, cpu.CPU, "user"}, float32(cpu.User))
metrics.SetGauge([]string{"client", "host", "cpu", nodeID, cpu.CPU, "idle"}, float32(cpu.Idle))
metrics.SetGauge([]string{"client", "host", "cpu", nodeID, cpu.CPU, "system"}, float32(cpu.System))
}
}
}
// setGaugeForDiskStats proxies metrics for disk specific statistics
func (c *Client) setGaugeForDiskStats(nodeID string, hStats *stats.HostStats) {
for _, disk := range hStats.DiskStats {
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "size"}, float32(disk.Size))
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "used"}, float32(disk.Used))
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "available"}, float32(disk.Available))
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "used_percent"}, float32(disk.UsedPercent))
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "inodes_percent"}, float32(disk.InodesUsedPercent))
}
if !c.config.DisableTaggedMetrics {
labels := append(c.baseLabels, metrics.Label{"disk", disk.Device})
// Get all the resources for the node
c.configLock.RLock()
metrics.SetGaugeWithLabels([]string{"client", "host", "disk", "size"}, float32(disk.Size), labels)
metrics.SetGaugeWithLabels([]string{"client", "host", "disk", "used"}, float32(disk.Used), labels)
metrics.SetGaugeWithLabels([]string{"client", "host", "disk", "available"}, float32(disk.Available), labels)
metrics.SetGaugeWithLabels([]string{"client", "host", "disk", "used_percent"}, float32(disk.UsedPercent), labels)
metrics.SetGaugeWithLabels([]string{"client", "host", "disk", "inodes_percent"}, float32(disk.InodesUsedPercent), labels)
}
if c.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "size"}, float32(disk.Size))
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "used"}, float32(disk.Used))
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "available"}, float32(disk.Available))
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "used_percent"}, float32(disk.UsedPercent))
metrics.SetGauge([]string{"client", "host", "disk", nodeID, disk.Device, "inodes_percent"}, float32(disk.InodesUsedPercent))
}
}
}
// setGaugeForAllocationStats proxies metrics for allocation specific statistics
func (c *Client) setGaugeForAllocationStats(nodeID string) {
node := c.configCopy.Node
c.configLock.RUnlock()
total := node.Resources
@ -1903,13 +1947,29 @@ func (c *Client) emitHostStats(hStats *stats.HostStats) {
allocated := c.getAllocatedResources(node)
// Emit allocated
metrics.SetGauge([]string{"client", "allocated", "memory", nodeID}, float32(allocated.MemoryMB))
metrics.SetGauge([]string{"client", "allocated", "disk", nodeID}, float32(allocated.DiskMB))
metrics.SetGauge([]string{"client", "allocated", "cpu", nodeID}, float32(allocated.CPU))
metrics.SetGauge([]string{"client", "allocated", "iops", nodeID}, float32(allocated.IOPS))
if !c.config.DisableTaggedMetrics {
metrics.SetGaugeWithLabels([]string{"client", "allocated", "memory"}, float32(allocated.MemoryMB), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocated", "disk"}, float32(allocated.DiskMB), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocated", "cpu"}, float32(allocated.CPU), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocated", "iops"}, float32(allocated.IOPS), c.baseLabels)
}
if c.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "allocated", "memory", nodeID}, float32(allocated.MemoryMB))
metrics.SetGauge([]string{"client", "allocated", "disk", nodeID}, float32(allocated.DiskMB))
metrics.SetGauge([]string{"client", "allocated", "cpu", nodeID}, float32(allocated.CPU))
metrics.SetGauge([]string{"client", "allocated", "iops", nodeID}, float32(allocated.IOPS))
}
for _, n := range allocated.Networks {
metrics.SetGauge([]string{"client", "allocated", "network", n.Device, nodeID}, float32(n.MBits))
if !c.config.DisableTaggedMetrics {
labels := append(c.baseLabels, metrics.Label{"device", n.Device})
metrics.SetGaugeWithLabels([]string{"client", "allocated", "network"}, float32(n.MBits), labels)
}
if c.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "allocated", "network", n.Device, nodeID}, float32(n.MBits))
}
}
// Emit unallocated
@ -1917,10 +1977,20 @@ func (c *Client) emitHostStats(hStats *stats.HostStats) {
unallocatedDisk := total.DiskMB - res.DiskMB - allocated.DiskMB
unallocatedCpu := total.CPU - res.CPU - allocated.CPU
unallocatedIops := total.IOPS - res.IOPS - allocated.IOPS
metrics.SetGauge([]string{"client", "unallocated", "memory", nodeID}, float32(unallocatedMem))
metrics.SetGauge([]string{"client", "unallocated", "disk", nodeID}, float32(unallocatedDisk))
metrics.SetGauge([]string{"client", "unallocated", "cpu", nodeID}, float32(unallocatedCpu))
metrics.SetGauge([]string{"client", "unallocated", "iops", nodeID}, float32(unallocatedIops))
if !c.config.DisableTaggedMetrics {
metrics.SetGaugeWithLabels([]string{"client", "unallocated", "memory"}, float32(unallocatedMem), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "unallocated", "disk"}, float32(unallocatedDisk), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "unallocated", "cpu"}, float32(unallocatedCpu), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "unallocated", "iops"}, float32(unallocatedIops), c.baseLabels)
}
if c.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "unallocated", "memory", nodeID}, float32(unallocatedMem))
metrics.SetGauge([]string{"client", "unallocated", "disk", nodeID}, float32(unallocatedDisk))
metrics.SetGauge([]string{"client", "unallocated", "cpu", nodeID}, float32(unallocatedCpu))
metrics.SetGauge([]string{"client", "unallocated", "iops", nodeID}, float32(unallocatedIops))
}
for _, n := range allocated.Networks {
totalMbits := 0
@ -1932,10 +2002,42 @@ func (c *Client) emitHostStats(hStats *stats.HostStats) {
}
unallocatedMbits := totalMbits - n.MBits
metrics.SetGauge([]string{"client", "unallocated", "network", n.Device, nodeID}, float32(unallocatedMbits))
if !c.config.DisableTaggedMetrics {
labels := append(c.baseLabels, metrics.Label{"device", n.Device})
metrics.SetGaugeWithLabels([]string{"client", "unallocated", "network"}, float32(unallocatedMbits), labels)
}
if c.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "unallocated", "network", n.Device, nodeID}, float32(unallocatedMbits))
}
}
}
// No lables are required so we emit with only a key/value syntax
func (c *Client) setGaugeForUptime(hStats *stats.HostStats) {
if !c.config.DisableTaggedMetrics {
metrics.SetGaugeWithLabels([]string{"uptime"}, float32(hStats.Uptime), c.baseLabels)
}
if c.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"uptime"}, float32(hStats.Uptime))
}
}
// emitHostStats pushes host resource usage stats to remote metrics collection sinks
func (c *Client) emitHostStats() {
nodeID := c.Node().ID
hStats := c.hostStatsCollector.Stats()
c.setGaugeForMemoryStats(nodeID, hStats)
c.setGaugeForUptime(hStats)
c.setGaugeForCPUStats(nodeID, hStats)
c.setGaugeForDiskStats(nodeID, hStats)
// TODO: This should be moved to emitClientMetrics
c.setGaugeForAllocationStats(nodeID)
}
// emitClientMetrics emits lower volume client metrics
func (c *Client) emitClientMetrics() {
nodeID := c.Node().ID
@ -1960,11 +2062,21 @@ func (c *Client) emitClientMetrics() {
}
}
metrics.SetGauge([]string{"client", "allocations", "migrating", nodeID}, float32(migrating))
metrics.SetGauge([]string{"client", "allocations", "blocked", nodeID}, float32(blocked))
metrics.SetGauge([]string{"client", "allocations", "pending", nodeID}, float32(pending))
metrics.SetGauge([]string{"client", "allocations", "running", nodeID}, float32(running))
metrics.SetGauge([]string{"client", "allocations", "terminal", nodeID}, float32(terminal))
if !c.config.DisableTaggedMetrics {
metrics.SetGaugeWithLabels([]string{"client", "allocations", "migrating"}, float32(migrating), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocations", "blocked"}, float32(blocked), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocations", "pending"}, float32(pending), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocations", "running"}, float32(running), c.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocations", "terminal"}, float32(terminal), c.baseLabels)
}
if c.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "allocations", "migrating", nodeID}, float32(migrating))
metrics.SetGauge([]string{"client", "allocations", "blocked", nodeID}, float32(blocked))
metrics.SetGauge([]string{"client", "allocations", "pending", nodeID}, float32(pending))
metrics.SetGauge([]string{"client", "allocations", "running", nodeID}, float32(running))
metrics.SetGauge([]string{"client", "allocations", "terminal", nodeID}, float32(terminal))
}
}
func (c *Client) getAllocatedResources(selfNode *structs.Node) *structs.Resources {

View File

@ -22,6 +22,7 @@ import (
nconfig "github.com/hashicorp/nomad/nomad/structs/config"
"github.com/hashicorp/nomad/testutil"
"github.com/mitchellh/hashstructure"
"github.com/stretchr/testify/assert"
ctestutil "github.com/hashicorp/nomad/client/testutil"
)
@ -136,6 +137,32 @@ func TestClient_StartStop(t *testing.T) {
}
}
// Certain labels for metrics are dependant on client intial setup. This tests
// that the client has properly initialized before we assign values to labels
func TestClient_BaseLabels(t *testing.T) {
t.Parallel()
assert := assert.New(t)
client := testClient(t, nil)
if err := client.Shutdown(); err != nil {
t.Fatalf("err: %v", err)
}
// directly invoke this function, as otherwise this will fail on a CI build
// due to a race condition
client.emitStats()
baseLabels := client.baseLabels
assert.NotEqual(0, len(baseLabels))
nodeID := client.Node().ID
for _, e := range baseLabels {
if e.Name == "node_id" {
assert.Equal(nodeID, e.Value)
}
}
}
func TestClient_RPC(t *testing.T) {
t.Parallel()
s1, addr := testServer(t, nil)

View File

@ -188,6 +188,14 @@ type Config struct {
// ACLPolicyTTL is how long we cache policy values for
ACLPolicyTTL time.Duration
// DisableTaggedMetrics determines whether metrics will be displayed via a
// key/value/tag format, or simply a key/value format
DisableTaggedMetrics bool
// BackwardsCompatibleMetrics determines whether to show methods of
// displaying metrics for older verions, or to only show the new format
BackwardsCompatibleMetrics bool
}
func (c *Config) Copy() *Config {
@ -205,20 +213,22 @@ func (c *Config) Copy() *Config {
// DefaultConfig returns the default configuration
func DefaultConfig() *Config {
return &Config{
Version: version.GetVersion(),
VaultConfig: config.DefaultVaultConfig(),
ConsulConfig: config.DefaultConsulConfig(),
LogOutput: os.Stderr,
Region: "global",
StatsCollectionInterval: 1 * time.Second,
TLSConfig: &config.TLSConfig{},
LogLevel: "DEBUG",
GCInterval: 1 * time.Minute,
GCParallelDestroys: 2,
GCDiskUsageThreshold: 80,
GCInodeUsageThreshold: 70,
GCMaxAllocs: 50,
NoHostUUID: true,
Version: version.GetVersion(),
VaultConfig: config.DefaultVaultConfig(),
ConsulConfig: config.DefaultConsulConfig(),
LogOutput: os.Stderr,
Region: "global",
StatsCollectionInterval: 1 * time.Second,
TLSConfig: &config.TLSConfig{},
LogLevel: "DEBUG",
GCInterval: 1 * time.Minute,
GCParallelDestroys: 2,
GCDiskUsageThreshold: 80,
GCInodeUsageThreshold: 70,
GCMaxAllocs: 50,
NoHostUUID: true,
DisableTaggedMetrics: false,
BackwardsCompatibleMetrics: false,
}
}

View File

@ -14,7 +14,7 @@ import (
"sync"
"time"
"github.com/armon/go-metrics"
metrics "github.com/armon/go-metrics"
"github.com/boltdb/bolt"
"github.com/golang/snappy"
"github.com/hashicorp/consul-template/signals"
@ -161,6 +161,10 @@ type TaskRunner struct {
// persistedHash is the hash of the last persisted snapshot. It is used to
// detect if a new snapshot has to be written to disk.
persistedHash []byte
// baseLabels are used when emitting tagged metrics. All task runner metrics
// will have these tags, and optionally more.
baseLabels []metrics.Label
}
// taskRunnerState is used to snapshot the state of the task runner
@ -247,6 +251,8 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
signalCh: make(chan SignalEvent),
}
tc.baseLabels = []metrics.Label{{"job", tc.alloc.Job.Name}, {"task_group", tc.alloc.TaskGroup}, {"alloc_id", tc.alloc.ID}, {"task", tc.task.Name}}
return tc
}
@ -1786,10 +1792,25 @@ func (r *TaskRunner) setCreatedResources(cr *driver.CreatedResources) {
r.createdResourcesLock.Unlock()
}
// emitStats emits resource usage stats of tasks to remote metrics collector
// sinks
func (r *TaskRunner) emitStats(ru *cstructs.TaskResourceUsage) {
if ru.ResourceUsage.MemoryStats != nil && r.config.PublishAllocationMetrics {
func (r *TaskRunner) setGaugeForMemory(ru *cstructs.TaskResourceUsage) {
if !r.config.DisableTaggedMetrics {
metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "rss"},
float32(ru.ResourceUsage.MemoryStats.RSS), r.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "rss"},
float32(ru.ResourceUsage.MemoryStats.RSS), r.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "cache"},
float32(ru.ResourceUsage.MemoryStats.Cache), r.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "swap"},
float32(ru.ResourceUsage.MemoryStats.Swap), r.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "max_usage"},
float32(ru.ResourceUsage.MemoryStats.MaxUsage), r.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "kernel_usage"},
float32(ru.ResourceUsage.MemoryStats.KernelUsage), r.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "memory", "kernel_max_usage"},
float32(ru.ResourceUsage.MemoryStats.KernelMaxUsage), r.baseLabels)
}
if r.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, r.alloc.ID, r.task.Name, "memory", "rss"}, float32(ru.ResourceUsage.MemoryStats.RSS))
metrics.SetGauge([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, r.alloc.ID, r.task.Name, "memory", "cache"}, float32(ru.ResourceUsage.MemoryStats.Cache))
metrics.SetGauge([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, r.alloc.ID, r.task.Name, "memory", "swap"}, float32(ru.ResourceUsage.MemoryStats.Swap))
@ -1797,8 +1818,25 @@ func (r *TaskRunner) emitStats(ru *cstructs.TaskResourceUsage) {
metrics.SetGauge([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, r.alloc.ID, r.task.Name, "memory", "kernel_usage"}, float32(ru.ResourceUsage.MemoryStats.KernelUsage))
metrics.SetGauge([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, r.alloc.ID, r.task.Name, "memory", "kernel_max_usage"}, float32(ru.ResourceUsage.MemoryStats.KernelMaxUsage))
}
}
if ru.ResourceUsage.CpuStats != nil && r.config.PublishAllocationMetrics {
func (r *TaskRunner) setGaugeForCPU(ru *cstructs.TaskResourceUsage) {
if !r.config.DisableTaggedMetrics {
metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "total_percent"},
float32(ru.ResourceUsage.CpuStats.Percent), r.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "system"},
float32(ru.ResourceUsage.CpuStats.SystemMode), r.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "user"},
float32(ru.ResourceUsage.CpuStats.UserMode), r.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "throttled_time"},
float32(ru.ResourceUsage.CpuStats.ThrottledTime), r.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "throttled_periods"},
float32(ru.ResourceUsage.CpuStats.ThrottledPeriods), r.baseLabels)
metrics.SetGaugeWithLabels([]string{"client", "allocs", "cpu", "total_ticks"},
float32(ru.ResourceUsage.CpuStats.TotalTicks), r.baseLabels)
}
if r.config.BackwardsCompatibleMetrics {
metrics.SetGauge([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, r.alloc.ID, r.task.Name, "cpu", "total_percent"}, float32(ru.ResourceUsage.CpuStats.Percent))
metrics.SetGauge([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, r.alloc.ID, r.task.Name, "cpu", "system"}, float32(ru.ResourceUsage.CpuStats.SystemMode))
metrics.SetGauge([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, r.alloc.ID, r.task.Name, "cpu", "user"}, float32(ru.ResourceUsage.CpuStats.UserMode))
@ -1807,3 +1845,19 @@ func (r *TaskRunner) emitStats(ru *cstructs.TaskResourceUsage) {
metrics.SetGauge([]string{"client", "allocs", r.alloc.Job.Name, r.alloc.TaskGroup, r.alloc.ID, r.task.Name, "cpu", "total_ticks"}, float32(ru.ResourceUsage.CpuStats.TotalTicks))
}
}
// emitStats emits resource usage stats of tasks to remote metrics collector
// sinks
func (r *TaskRunner) emitStats(ru *cstructs.TaskResourceUsage) {
if !r.config.PublishAllocationMetrics {
return
}
if ru.ResourceUsage.MemoryStats != nil {
r.setGaugeForMemory(ru)
}
if ru.ResourceUsage.CpuStats != nil {
r.setGaugeForCPU(ru)
}
}

View File

@ -2,7 +2,6 @@ package agent
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net"
@ -126,7 +125,6 @@ func TestAgent_ServerConfig(t *testing.T) {
t.Fatalf("error normalizing config: %v", err)
}
out, err = a.serverConfig()
fmt.Println(conf.Addresses.RPC)
if err != nil {
t.Fatalf("err: %s", err)
}

View File

@ -60,6 +60,8 @@ client {
gc_inode_usage_threshold = 91
gc_max_allocs = 50
no_host_uuid = false
disable_tagged_metrics = true
backwards_compatible_metrics = true
}
server {
enabled = true

View File

@ -229,6 +229,14 @@ type ClientConfig struct {
// NoHostUUID disables using the host's UUID and will force generation of a
// random UUID.
NoHostUUID *bool `mapstructure:"no_host_uuid"`
// DisableTaggedMetrics disables a new version of generating metrics which
// uses tags
DisableTaggedMetrics bool `mapstructure:"disable_tagged_metrics"`
// BackwardsCompatibleMetrics allows for generating metrics in a simple
// key/value structure as done in older versions of Nomad
BackwardsCompatibleMetrics bool `mapstructure:"backwards_compatible_metrics"`
}
// ACLConfig is configuration specific to the ACL system
@ -1097,6 +1105,14 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig {
result.NoHostUUID = b.NoHostUUID
}
if b.DisableTaggedMetrics {
result.DisableTaggedMetrics = b.DisableTaggedMetrics
}
if b.BackwardsCompatibleMetrics {
result.BackwardsCompatibleMetrics = b.BackwardsCompatibleMetrics
}
// Add the servers
result.Servers = append(result.Servers, b.Servers...)

View File

@ -357,6 +357,8 @@ func parseClient(result **ClientConfig, list *ast.ObjectList) error {
"gc_parallel_destroys",
"gc_max_allocs",
"no_host_uuid",
"disable_tagged_metrics",
"backwards_compatible_metrics",
}
if err := checkHCLKeys(listVal, valid); err != nil {
return err

View File

@ -75,12 +75,14 @@ func TestConfig_Parse(t *testing.T) {
ReservedPorts: "1,100,10-12",
ParsedReservedPorts: []int{1, 10, 11, 12, 100},
},
GCInterval: 6 * time.Second,
GCParallelDestroys: 6,
GCDiskUsageThreshold: 82,
GCInodeUsageThreshold: 91,
GCMaxAllocs: 50,
NoHostUUID: helper.BoolToPtr(false),
GCInterval: 6 * time.Second,
GCParallelDestroys: 6,
GCDiskUsageThreshold: 82,
GCInodeUsageThreshold: 91,
GCMaxAllocs: 50,
NoHostUUID: helper.BoolToPtr(false),
DisableTaggedMetrics: true,
BackwardsCompatibleMetrics: true,
},
Server: &ServerConfig{
Enabled: true,

View File

@ -89,6 +89,8 @@ func TestConfig_Merge(t *testing.T) {
ReservedPorts: "1,10-30,55",
ParsedReservedPorts: []int{1, 2, 4},
},
DisableTaggedMetrics: true,
BackwardsCompatibleMetrics: true,
},
Server: &ServerConfig{
Enabled: false,
@ -224,10 +226,12 @@ func TestConfig_Merge(t *testing.T) {
ReservedPorts: "2,10-30,55",
ParsedReservedPorts: []int{1, 2, 3},
},
GCInterval: 6 * time.Second,
GCParallelDestroys: 6,
GCDiskUsageThreshold: 71,
GCInodeUsageThreshold: 86,
GCInterval: 6 * time.Second,
GCParallelDestroys: 6,
GCDiskUsageThreshold: 71,
GCInodeUsageThreshold: 86,
DisableTaggedMetrics: true,
BackwardsCompatibleMetrics: true,
},
Server: &ServerConfig{
Enabled: true,

View File

@ -33,6 +33,18 @@ err = c.Count("request.count_total", 2, nil, 1)
DogStatsD accepts packets with multiple statsd payloads in them. Using the BufferingClient via `NewBufferingClient` will buffer up commands and send them when the buffer is reached or after 100msec.
## Unix Domain Sockets Client
DogStatsD version 6 accepts packets through a Unix Socket datagram connection. You can use this protocol by giving a
`unix:///path/to/dsd.socket` addr argument to the `New` or `NewBufferingClient`.
With this protocol, writes can become blocking if the server's receiving buffer is full. Our default behaviour is to
timeout and drop the packet after 1 ms. You can set a custom timeout duration via the `SetWriteTimeout` method.
The default mode is to pass write errors from the socket to the caller. This includes write errors the library will
automatically recover from (DogStatsD server not ready yet or is restarting). You can drop these errors and emulate
the UDP behaviour by setting the `SkipErrors` property to `true`. Please note that packets will be dropped in both modes.
## Development
Run the tests with:

View File

@ -27,8 +27,8 @@ import (
"bytes"
"errors"
"fmt"
"io"
"math/rand"
"net"
"strconv"
"strings"
"sync"
@ -54,35 +54,71 @@ any number greater than that will see frames being cut out.
*/
const MaxUDPPayloadSize = 65467
// A Client is a handle for sending udp messages to dogstatsd. It is safe to
/*
UnixAddressPrefix holds the prefix to use to enable Unix Domain Socket
traffic instead of UDP.
*/
const UnixAddressPrefix = "unix://"
/*
Stat suffixes
*/
var (
gaugeSuffix = []byte("|g")
countSuffix = []byte("|c")
histogramSuffix = []byte("|h")
decrSuffix = []byte("-1|c")
incrSuffix = []byte("1|c")
setSuffix = []byte("|s")
timingSuffix = []byte("|ms")
)
// A statsdWriter offers a standard interface regardless of the underlying
// protocol. For now UDS and UPD writers are available.
type statsdWriter interface {
Write(data []byte) error
SetWriteTimeout(time.Duration) error
Close() error
}
// A Client is a handle for sending messages to dogstatsd. It is safe to
// use one Client from multiple goroutines simultaneously.
type Client struct {
conn net.Conn
// Writer handles the underlying networking protocol
writer statsdWriter
// Namespace to prepend to all statsd calls
Namespace string
// Tags are global tags to be added to every statsd call
Tags []string
// skipErrors turns off error passing and allows UDS to emulate UDP behaviour
SkipErrors bool
// BufferLength is the length of the buffer in commands.
bufferLength int
flushTime time.Duration
commands []string
buffer bytes.Buffer
stop bool
stop chan struct{}
sync.Mutex
}
// New returns a pointer to a new Client given an addr in the format "hostname:port".
// New returns a pointer to a new Client given an addr in the format "hostname:port" or
// "unix:///path/to/socket".
func New(addr string) (*Client, error) {
udpAddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return nil, err
if strings.HasPrefix(addr, UnixAddressPrefix) {
w, err := newUdsWriter(addr[len(UnixAddressPrefix)-1:])
if err != nil {
return nil, err
}
client := &Client{writer: w}
return client, nil
} else {
w, err := newUdpWriter(addr)
if err != nil {
return nil, err
}
client := &Client{writer: w, SkipErrors: false}
return client, nil
}
conn, err := net.DialUDP("udp", nil, udpAddr)
if err != nil {
return nil, err
}
client := &Client{conn: conn}
return client, nil
}
// NewBuffered returns a Client that buffers its output and sends it in chunks.
@ -95,56 +131,73 @@ func NewBuffered(addr string, buflen int) (*Client, error) {
client.bufferLength = buflen
client.commands = make([]string, 0, buflen)
client.flushTime = time.Millisecond * 100
client.stop = make(chan struct{}, 1)
go client.watch()
return client, nil
}
// format a message from its name, value, tags and rate. Also adds global
// namespace and tags.
func (c *Client) format(name, value string, tags []string, rate float64) string {
func (c *Client) format(name string, value interface{}, suffix []byte, tags []string, rate float64) string {
var buf bytes.Buffer
if c.Namespace != "" {
buf.WriteString(c.Namespace)
}
buf.WriteString(name)
buf.WriteString(":")
buf.WriteString(value)
switch val := value.(type) {
case float64:
buf.Write(strconv.AppendFloat([]byte{}, val, 'f', 6, 64))
case int64:
buf.Write(strconv.AppendInt([]byte{}, val, 10))
case string:
buf.WriteString(val)
default:
// do nothing
}
buf.Write(suffix)
if rate < 1 {
buf.WriteString(`|@`)
buf.WriteString(strconv.FormatFloat(rate, 'f', -1, 64))
}
// do not append to c.Tags directly, because it's shared
// across all invocations of this function
tagCopy := make([]string, len(c.Tags), len(c.Tags)+len(tags))
copy(tagCopy, c.Tags)
tags = append(tagCopy, tags...)
if len(tags) > 0 {
buf.WriteString("|#")
buf.WriteString(tags[0])
for _, tag := range tags[1:] {
buf.WriteString(",")
buf.WriteString(tag)
}
}
writeTagString(&buf, c.Tags, tags)
return buf.String()
}
// SetWriteTimeout allows the user to set a custom UDS write timeout. Not supported for UDP.
func (c *Client) SetWriteTimeout(d time.Duration) error {
return c.writer.SetWriteTimeout(d)
}
func (c *Client) watch() {
for _ = range time.Tick(c.flushTime) {
if c.stop {
ticker := time.NewTicker(c.flushTime)
for {
select {
case <-ticker.C:
c.Lock()
if len(c.commands) > 0 {
// FIXME: eating error here
c.flush()
}
c.Unlock()
case <-c.stop:
ticker.Stop()
return
}
c.Lock()
if len(c.commands) > 0 {
// FIXME: eating error here
c.flush()
}
c.Unlock()
}
}
func (c *Client) append(cmd string) error {
c.Lock()
defer c.Unlock()
c.commands = append(c.commands, cmd)
// if we should flush, lets do it
if len(c.commands) == c.bufferLength {
@ -208,7 +261,7 @@ func (c *Client) flush() error {
var err error
cmdsFlushed := 0
for i, data := range frames {
_, e := c.conn.Write(data)
e := c.writer.Write(data)
if e != nil {
err = e
break
@ -228,64 +281,65 @@ func (c *Client) flush() error {
}
func (c *Client) sendMsg(msg string) error {
// return an error if message is bigger than MaxUDPPayloadSize
if len(msg) > MaxUDPPayloadSize {
return errors.New("message size exceeds MaxUDPPayloadSize")
}
// if this client is buffered, then we'll just append this
c.Lock()
defer c.Unlock()
if c.bufferLength > 0 {
// return an error if message is bigger than OptimalPayloadSize
if len(msg) > MaxUDPPayloadSize {
return errors.New("message size exceeds MaxUDPPayloadSize")
}
return c.append(msg)
}
_, err := c.conn.Write([]byte(msg))
return err
err := c.writer.Write([]byte(msg))
if c.SkipErrors {
return nil
} else {
return err
}
}
// send handles sampling and sends the message over UDP. It also adds global namespace prefixes and tags.
func (c *Client) send(name, value string, tags []string, rate float64) error {
func (c *Client) send(name string, value interface{}, suffix []byte, tags []string, rate float64) error {
if c == nil {
return nil
}
if rate < 1 && rand.Float64() > rate {
return nil
}
data := c.format(name, value, tags, rate)
data := c.format(name, value, suffix, tags, rate)
return c.sendMsg(data)
}
// Gauge measures the value of a metric at a particular time.
func (c *Client) Gauge(name string, value float64, tags []string, rate float64) error {
stat := fmt.Sprintf("%f|g", value)
return c.send(name, stat, tags, rate)
return c.send(name, value, gaugeSuffix, tags, rate)
}
// Count tracks how many times something happened per second.
func (c *Client) Count(name string, value int64, tags []string, rate float64) error {
stat := fmt.Sprintf("%d|c", value)
return c.send(name, stat, tags, rate)
return c.send(name, value, countSuffix, tags, rate)
}
// Histogram tracks the statistical distribution of a set of values.
func (c *Client) Histogram(name string, value float64, tags []string, rate float64) error {
stat := fmt.Sprintf("%f|h", value)
return c.send(name, stat, tags, rate)
return c.send(name, value, histogramSuffix, tags, rate)
}
// Decr is just Count of 1
// Decr is just Count of -1
func (c *Client) Decr(name string, tags []string, rate float64) error {
return c.send(name, "-1|c", tags, rate)
return c.send(name, nil, decrSuffix, tags, rate)
}
// Incr is just Count of 1
func (c *Client) Incr(name string, tags []string, rate float64) error {
return c.send(name, "1|c", tags, rate)
return c.send(name, nil, incrSuffix, tags, rate)
}
// Set counts the number of unique elements in a group.
func (c *Client) Set(name string, value string, tags []string, rate float64) error {
stat := fmt.Sprintf("%s|s", value)
return c.send(name, stat, tags, rate)
return c.send(name, value, setSuffix, tags, rate)
}
// Timing sends timing information, it is an alias for TimeInMilliseconds
@ -296,12 +350,14 @@ func (c *Client) Timing(name string, value time.Duration, tags []string, rate fl
// TimeInMilliseconds sends timing information in milliseconds.
// It is flushed by statsd with percentiles, mean and other info (https://github.com/etsy/statsd/blob/master/docs/metric_types.md#timing)
func (c *Client) TimeInMilliseconds(name string, value float64, tags []string, rate float64) error {
stat := fmt.Sprintf("%f|ms", value)
return c.send(name, stat, tags, rate)
return c.send(name, value, timingSuffix, tags, rate)
}
// Event sends the provided Event.
func (c *Client) Event(e *Event) error {
if c == nil {
return nil
}
stat, err := e.Encode(c.Tags...)
if err != nil {
return err
@ -325,7 +381,7 @@ func (c *Client) ServiceCheck(sc *ServiceCheck) error {
}
// SimpleServiceCheck sends an serviceCheck with the provided name and status.
func (c *Client) SimpleServiceCheck(name string, status serviceCheckStatus) error {
func (c *Client) SimpleServiceCheck(name string, status ServiceCheckStatus) error {
sc := NewServiceCheck(name, status)
return c.ServiceCheck(sc)
}
@ -335,8 +391,11 @@ func (c *Client) Close() error {
if c == nil {
return nil
}
c.stop = true
return c.conn.Close()
select {
case c.stop <- struct{}{}:
default:
}
return c.writer.Close()
}
// Events support
@ -458,34 +517,24 @@ func (e Event) Encode(tags ...string) (string, error) {
buffer.WriteString(string(e.AlertType))
}
if len(tags)+len(e.Tags) > 0 {
all := make([]string, 0, len(tags)+len(e.Tags))
all = append(all, tags...)
all = append(all, e.Tags...)
buffer.WriteString("|#")
buffer.WriteString(all[0])
for _, tag := range all[1:] {
buffer.WriteString(",")
buffer.WriteString(tag)
}
}
writeTagString(&buffer, tags, e.Tags)
return buffer.String(), nil
}
// ServiceCheck support
type serviceCheckStatus byte
type ServiceCheckStatus byte
const (
// Ok is the "ok" ServiceCheck status
Ok serviceCheckStatus = 0
Ok ServiceCheckStatus = 0
// Warn is the "warning" ServiceCheck status
Warn serviceCheckStatus = 1
Warn ServiceCheckStatus = 1
// Critical is the "critical" ServiceCheck status
Critical serviceCheckStatus = 2
Critical ServiceCheckStatus = 2
// Unknown is the "unknown" ServiceCheck status
Unknown serviceCheckStatus = 3
Unknown ServiceCheckStatus = 3
)
// An ServiceCheck is an object that contains status of DataDog service check.
@ -493,7 +542,7 @@ type ServiceCheck struct {
// Name of the service check. Required.
Name string
// Status of service check. Required.
Status serviceCheckStatus
Status ServiceCheckStatus
// Timestamp is a timestamp for the serviceCheck. If not provided, the dogstatsd
// server will set this to the current time.
Timestamp time.Time
@ -507,7 +556,7 @@ type ServiceCheck struct {
// NewServiceCheck creates a new serviceCheck with the given name and status. Error checking
// against these values is done at send-time, or upon running sc.Check.
func NewServiceCheck(name string, status serviceCheckStatus) *ServiceCheck {
func NewServiceCheck(name string, status ServiceCheckStatus) *ServiceCheck {
return &ServiceCheck{
Name: name,
Status: status,
@ -551,17 +600,7 @@ func (sc ServiceCheck) Encode(tags ...string) (string, error) {
buffer.WriteString(sc.Hostname)
}
if len(tags)+len(sc.Tags) > 0 {
all := make([]string, 0, len(tags)+len(sc.Tags))
all = append(all, tags...)
all = append(all, sc.Tags...)
buffer.WriteString("|#")
buffer.WriteString(all[0])
for _, tag := range all[1:] {
buffer.WriteString(",")
buffer.WriteString(tag)
}
}
writeTagString(&buffer, tags, sc.Tags)
if len(message) != 0 {
buffer.WriteString("|m:")
@ -579,3 +618,27 @@ func (sc ServiceCheck) escapedMessage() string {
msg := strings.Replace(sc.Message, "\n", "\\n", -1)
return strings.Replace(msg, "m:", `m\:`, -1)
}
func removeNewlines(str string) string {
return strings.Replace(str, "\n", "", -1)
}
func writeTagString(w io.Writer, tagList1, tagList2 []string) {
// the tag lists may be shared with other callers, so we cannot modify
// them in any way (which means we cannot append to them either)
// therefore we must make an entirely separate copy just for this call
totalLen := len(tagList1) + len(tagList2)
if totalLen == 0 {
return
}
tags := make([]string, 0, totalLen)
tags = append(tags, tagList1...)
tags = append(tags, tagList2...)
io.WriteString(w, "|#")
io.WriteString(w, removeNewlines(tags[0]))
for _, tag := range tags[1:] {
io.WriteString(w, ",")
io.WriteString(w, removeNewlines(tag))
}
}

41
vendor/github.com/DataDog/datadog-go/statsd/udp.go generated vendored Normal file
View File

@ -0,0 +1,41 @@
package statsd
import (
"errors"
"net"
"time"
)
// udpWriter is an internal class wrapping around management of UDP connection
type udpWriter struct {
conn net.Conn
}
// New returns a pointer to a new udpWriter given an addr in the format "hostname:port".
func newUdpWriter(addr string) (*udpWriter, error) {
udpAddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return nil, err
}
conn, err := net.DialUDP("udp", nil, udpAddr)
if err != nil {
return nil, err
}
writer := &udpWriter{conn: conn}
return writer, nil
}
// SetWriteTimeout is not needed for UDP, returns error
func (w *udpWriter) SetWriteTimeout(d time.Duration) error {
return errors.New("SetWriteTimeout: not supported for UDP connections")
}
// Write data to the UDP connection with no error handling
func (w *udpWriter) Write(data []byte) error {
_, e := w.conn.Write(data)
return e
}
func (w *udpWriter) Close() error {
return w.conn.Close()
}

66
vendor/github.com/DataDog/datadog-go/statsd/uds.go generated vendored Normal file
View File

@ -0,0 +1,66 @@
package statsd
import (
"net"
"strings"
"time"
)
/*
UDSTimeout holds the default timeout for UDS socket writes, as they can get
blocking when the receiving buffer is full.
*/
const defaultUDSTimeout = 1 * time.Millisecond
// udsWriter is an internal class wrapping around management of UDS connection
type udsWriter struct {
// Address to send metrics to, needed to allow reconnection on error
addr net.Addr
// Established connection object, or nil if not connected yet
conn net.Conn
// write timeout
writeTimeout time.Duration
}
// New returns a pointer to a new udsWriter given a socket file path as addr.
func newUdsWriter(addr string) (*udsWriter, error) {
udsAddr, err := net.ResolveUnixAddr("unixgram", addr)
if err != nil {
return nil, err
}
// Defer connection to first Write
writer := &udsWriter{addr: udsAddr, conn: nil, writeTimeout: defaultUDSTimeout}
return writer, nil
}
// SetWriteTimeout allows the user to set a custom write timeout
func (w *udsWriter) SetWriteTimeout(d time.Duration) error {
w.writeTimeout = d
return nil
}
// Write data to the UDS connection with write timeout and minimal error handling:
// create the connection if nil, and destroy it if the statsd server has disconnected
func (w *udsWriter) Write(data []byte) error {
// Try connecting (first packet or connection lost)
if w.conn == nil {
conn, err := net.Dial(w.addr.Network(), w.addr.String())
if err != nil {
return err
} else {
w.conn = conn
}
}
w.conn.SetWriteDeadline(time.Now().Add(w.writeTimeout))
_, e := w.conn.Write(data)
if e != nil && strings.Contains(e.Error(), "transport endpoint is not connected") {
// Statsd server disconnected, retry connecting at next packet
w.conn = nil
return e
}
return e
}
func (w *udsWriter) Close() error {
return w.conn.Close()
}

View File

@ -1,22 +0,0 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe

View File

@ -28,39 +28,42 @@ Examples
Here is an example of using the package:
func SlowMethod() {
// Profiling the runtime of a method
defer metrics.MeasureSince([]string{"SlowMethod"}, time.Now())
}
```go
func SlowMethod() {
// Profiling the runtime of a method
defer metrics.MeasureSince([]string{"SlowMethod"}, time.Now())
}
// Configure a statsite sink as the global metrics sink
sink, _ := metrics.NewStatsiteSink("statsite:8125")
metrics.NewGlobal(metrics.DefaultConfig("service-name"), sink)
// Configure a statsite sink as the global metrics sink
sink, _ := metrics.NewStatsiteSink("statsite:8125")
metrics.NewGlobal(metrics.DefaultConfig("service-name"), sink)
// Emit a Key/Value pair
metrics.EmitKey([]string{"questions", "meaning of life"}, 42)
// Emit a Key/Value pair
metrics.EmitKey([]string{"questions", "meaning of life"}, 42)
```
Here is an example of setting up a signal handler:
Here is an example of setting up an signal handler:
```go
// Setup the inmem sink and signal handler
inm := metrics.NewInmemSink(10*time.Second, time.Minute)
sig := metrics.DefaultInmemSignal(inm)
metrics.NewGlobal(metrics.DefaultConfig("service-name"), inm)
// Setup the inmem sink and signal handler
inm := metrics.NewInmemSink(10*time.Second, time.Minute)
sig := metrics.DefaultInmemSignal(inm)
metrics.NewGlobal(metrics.DefaultConfig("service-name"), inm)
// Run some code
inm.SetGauge([]string{"foo"}, 42)
inm.EmitKey([]string{"bar"}, 30)
// Run some code
inm.SetGauge([]string{"foo"}, 42)
inm.EmitKey([]string{"bar"}, 30)
inm.IncrCounter([]string{"baz"}, 42)
inm.IncrCounter([]string{"baz"}, 1)
inm.IncrCounter([]string{"baz"}, 80)
inm.IncrCounter([]string{"baz"}, 42)
inm.IncrCounter([]string{"baz"}, 1)
inm.IncrCounter([]string{"baz"}, 80)
inm.AddSample([]string{"method", "wow"}, 42)
inm.AddSample([]string{"method", "wow"}, 100)
inm.AddSample([]string{"method", "wow"}, 22)
inm.AddSample([]string{"method", "wow"}, 42)
inm.AddSample([]string{"method", "wow"}, 100)
inm.AddSample([]string{"method", "wow"}, 22)
....
....
```
When a signal comes in, output like the following will be dumped to stderr:

View File

@ -5,6 +5,7 @@ package circonus
import (
"strings"
"github.com/armon/go-metrics"
cgm "github.com/circonus-labs/circonus-gometrics"
)
@ -61,6 +62,12 @@ func (s *CirconusSink) SetGauge(key []string, val float32) {
s.metrics.SetGauge(flatKey, int64(val))
}
// SetGaugeWithLabels sets value for a gauge metric with the given labels
func (s *CirconusSink) SetGaugeWithLabels(key []string, val float32, labels []metrics.Label) {
flatKey := s.flattenKeyLabels(key, labels)
s.metrics.SetGauge(flatKey, int64(val))
}
// EmitKey is not implemented in circonus
func (s *CirconusSink) EmitKey(key []string, val float32) {
// NOP
@ -72,12 +79,24 @@ func (s *CirconusSink) IncrCounter(key []string, val float32) {
s.metrics.IncrementByValue(flatKey, uint64(val))
}
// IncrCounterWithLabels increments a counter metric with the given labels
func (s *CirconusSink) IncrCounterWithLabels(key []string, val float32, labels []metrics.Label) {
flatKey := s.flattenKeyLabels(key, labels)
s.metrics.IncrementByValue(flatKey, uint64(val))
}
// AddSample adds a sample to a histogram metric
func (s *CirconusSink) AddSample(key []string, val float32) {
flatKey := s.flattenKey(key)
s.metrics.RecordValue(flatKey, float64(val))
}
// AddSampleWithLabels adds a sample to a histogram metric with the given labels
func (s *CirconusSink) AddSampleWithLabels(key []string, val float32, labels []metrics.Label) {
flatKey := s.flattenKeyLabels(key, labels)
s.metrics.RecordValue(flatKey, float64(val))
}
// Flattens key to Circonus metric name
func (s *CirconusSink) flattenKey(parts []string) string {
joined := strings.Join(parts, "`")
@ -90,3 +109,11 @@ func (s *CirconusSink) flattenKey(parts []string) string {
}
}, joined)
}
// Flattens the key along with labels for formatting, removes spaces
func (s *CirconusSink) flattenKeyLabels(parts []string, labels []metrics.Label) string {
for _, label := range labels {
parts = append(parts, label.Value)
}
return s.flattenKey(parts)
}

View File

@ -5,6 +5,7 @@ import (
"strings"
"github.com/DataDog/datadog-go/statsd"
"github.com/armon/go-metrics"
)
// DogStatsdSink provides a MetricSink that can be used
@ -45,46 +46,49 @@ func (s *DogStatsdSink) EnableHostNamePropagation() {
func (s *DogStatsdSink) flattenKey(parts []string) string {
joined := strings.Join(parts, ".")
return strings.Map(func(r rune) rune {
switch r {
case ':':
fallthrough
case ' ':
return '_'
default:
return r
}
}, joined)
return strings.Map(sanitize, joined)
}
func (s *DogStatsdSink) parseKey(key []string) ([]string, []string) {
func sanitize(r rune) rune {
switch r {
case ':':
fallthrough
case ' ':
return '_'
default:
return r
}
}
func (s *DogStatsdSink) parseKey(key []string) ([]string, []metrics.Label) {
// Since DogStatsd supports dimensionality via tags on metric keys, this sink's approach is to splice the hostname out of the key in favor of a `host` tag
// The `host` tag is either forced here, or set downstream by the DogStatsd server
var tags []string
var labels []metrics.Label
hostName := s.hostName
//Splice the hostname out of the key
// Splice the hostname out of the key
for i, el := range key {
if el == hostName {
key = append(key[:i], key[i+1:]...)
break
}
}
if s.propagateHostname {
tags = append(tags, fmt.Sprintf("host:%s", hostName))
labels = append(labels, metrics.Label{"host", hostName})
}
return key, tags
return key, labels
}
// Implementation of methods in the MetricSink interface
func (s *DogStatsdSink) SetGauge(key []string, val float32) {
s.SetGaugeWithTags(key, val, []string{})
s.SetGaugeWithLabels(key, val, nil)
}
func (s *DogStatsdSink) IncrCounter(key []string, val float32) {
s.IncrCounterWithTags(key, val, []string{})
s.IncrCounterWithLabels(key, val, nil)
}
// EmitKey is not implemented since DogStatsd does not provide a metric type that holds an
@ -93,33 +97,44 @@ func (s *DogStatsdSink) EmitKey(key []string, val float32) {
}
func (s *DogStatsdSink) AddSample(key []string, val float32) {
s.AddSampleWithTags(key, val, []string{})
s.AddSampleWithLabels(key, val, nil)
}
// The following ...WithTags methods correspond to Datadog's Tag extension to Statsd.
// The following ...WithLabels methods correspond to Datadog's Tag extension to Statsd.
// http://docs.datadoghq.com/guides/dogstatsd/#tags
func (s *DogStatsdSink) SetGaugeWithTags(key []string, val float32, tags []string) {
flatKey, tags := s.getFlatkeyAndCombinedTags(key, tags)
func (s *DogStatsdSink) SetGaugeWithLabels(key []string, val float32, labels []metrics.Label) {
flatKey, tags := s.getFlatkeyAndCombinedLabels(key, labels)
rate := 1.0
s.client.Gauge(flatKey, float64(val), tags, rate)
}
func (s *DogStatsdSink) IncrCounterWithTags(key []string, val float32, tags []string) {
flatKey, tags := s.getFlatkeyAndCombinedTags(key, tags)
func (s *DogStatsdSink) IncrCounterWithLabels(key []string, val float32, labels []metrics.Label) {
flatKey, tags := s.getFlatkeyAndCombinedLabels(key, labels)
rate := 1.0
s.client.Count(flatKey, int64(val), tags, rate)
}
func (s *DogStatsdSink) AddSampleWithTags(key []string, val float32, tags []string) {
flatKey, tags := s.getFlatkeyAndCombinedTags(key, tags)
func (s *DogStatsdSink) AddSampleWithLabels(key []string, val float32, labels []metrics.Label) {
flatKey, tags := s.getFlatkeyAndCombinedLabels(key, labels)
rate := 1.0
s.client.TimeInMilliseconds(flatKey, float64(val), tags, rate)
}
func (s *DogStatsdSink) getFlatkeyAndCombinedTags(key []string, tags []string) (flattenedKey string, combinedTags []string) {
key, hostTags := s.parseKey(key)
func (s *DogStatsdSink) getFlatkeyAndCombinedLabels(key []string, labels []metrics.Label) (string, []string) {
key, parsedLabels := s.parseKey(key)
flatKey := s.flattenKey(key)
tags = append(tags, hostTags...)
labels = append(labels, parsedLabels...)
var tags []string
for _, label := range labels {
label.Name = strings.Map(sanitize, label.Name)
label.Value = strings.Map(sanitize, label.Value)
if label.Value != "" {
tags = append(tags, fmt.Sprintf("%s:%s", label.Name, label.Value))
} else {
tags = append(tags, label.Name)
}
}
return flatKey, tags
}

View File

@ -1,8 +1,10 @@
package metrics
import (
"bytes"
"fmt"
"math"
"net/url"
"strings"
"sync"
"time"
@ -25,6 +27,8 @@ type InmemSink struct {
// intervals is a slice of the retained intervals
intervals []*IntervalMetrics
intervalLock sync.RWMutex
rateDenom float64
}
// IntervalMetrics stores the aggregated metrics
@ -36,7 +40,7 @@ type IntervalMetrics struct {
Interval time.Time
// Gauges maps the key to the last set value
Gauges map[string]float32
Gauges map[string]GaugeValue
// Points maps the string to the list of emitted values
// from EmitKey
@ -44,21 +48,21 @@ type IntervalMetrics struct {
// Counters maps the string key to a sum of the counter
// values
Counters map[string]*AggregateSample
Counters map[string]SampledValue
// Samples maps the key to an AggregateSample,
// which has the rolled up view of a sample
Samples map[string]*AggregateSample
Samples map[string]SampledValue
}
// NewIntervalMetrics creates a new IntervalMetrics for a given interval
func NewIntervalMetrics(intv time.Time) *IntervalMetrics {
return &IntervalMetrics{
Interval: intv,
Gauges: make(map[string]float32),
Gauges: make(map[string]GaugeValue),
Points: make(map[string][]float32),
Counters: make(map[string]*AggregateSample),
Samples: make(map[string]*AggregateSample),
Counters: make(map[string]SampledValue),
Samples: make(map[string]SampledValue),
}
}
@ -66,11 +70,12 @@ func NewIntervalMetrics(intv time.Time) *IntervalMetrics {
// about a sample
type AggregateSample struct {
Count int // The count of emitted pairs
Rate float64 `json:"-"` // The count of emitted pairs per time unit (usually 1 second)
Sum float64 // The sum of values
SumSq float64 // The sum of squared values
SumSq float64 `json:"-"` // The sum of squared values
Min float64 // Minimum value
Max float64 // Maximum value
LastUpdated time.Time // When value was last updated
LastUpdated time.Time `json:"-"` // When value was last updated
}
// Computes a Stddev of the values
@ -92,7 +97,7 @@ func (a *AggregateSample) Mean() float64 {
}
// Ingest is used to update a sample
func (a *AggregateSample) Ingest(v float64) {
func (a *AggregateSample) Ingest(v float64, rateDenom float64) {
a.Count++
a.Sum += v
a.SumSq += (v * v)
@ -102,6 +107,7 @@ func (a *AggregateSample) Ingest(v float64) {
if v > a.Max || a.Count == 1 {
a.Max = v
}
a.Rate = float64(a.Count) / rateDenom
a.LastUpdated = time.Now()
}
@ -116,25 +122,49 @@ func (a *AggregateSample) String() string {
}
}
// NewInmemSinkFromURL creates an InmemSink from a URL. It is used
// (and tested) from NewMetricSinkFromURL.
func NewInmemSinkFromURL(u *url.URL) (MetricSink, error) {
params := u.Query()
interval, err := time.ParseDuration(params.Get("interval"))
if err != nil {
return nil, fmt.Errorf("Bad 'interval' param: %s", err)
}
retain, err := time.ParseDuration(params.Get("retain"))
if err != nil {
return nil, fmt.Errorf("Bad 'retain' param: %s", err)
}
return NewInmemSink(interval, retain), nil
}
// NewInmemSink is used to construct a new in-memory sink.
// Uses an aggregation interval and maximum retention period.
func NewInmemSink(interval, retain time.Duration) *InmemSink {
rateTimeUnit := time.Second
i := &InmemSink{
interval: interval,
retain: retain,
maxIntervals: int(retain / interval),
rateDenom: float64(interval.Nanoseconds()) / float64(rateTimeUnit.Nanoseconds()),
}
i.intervals = make([]*IntervalMetrics, 0, i.maxIntervals)
return i
}
func (i *InmemSink) SetGauge(key []string, val float32) {
k := i.flattenKey(key)
i.SetGaugeWithLabels(key, val, nil)
}
func (i *InmemSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {
k, name := i.flattenKeyLabels(key, labels)
intv := i.getInterval()
intv.Lock()
defer intv.Unlock()
intv.Gauges[k] = val
intv.Gauges[k] = GaugeValue{Name: name, Value: val, Labels: labels}
}
func (i *InmemSink) EmitKey(key []string, val float32) {
@ -148,33 +178,49 @@ func (i *InmemSink) EmitKey(key []string, val float32) {
}
func (i *InmemSink) IncrCounter(key []string, val float32) {
k := i.flattenKey(key)
i.IncrCounterWithLabels(key, val, nil)
}
func (i *InmemSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {
k, name := i.flattenKeyLabels(key, labels)
intv := i.getInterval()
intv.Lock()
defer intv.Unlock()
agg := intv.Counters[k]
if agg == nil {
agg = &AggregateSample{}
agg, ok := intv.Counters[k]
if !ok {
agg = SampledValue{
Name: name,
AggregateSample: &AggregateSample{},
Labels: labels,
}
intv.Counters[k] = agg
}
agg.Ingest(float64(val))
agg.Ingest(float64(val), i.rateDenom)
}
func (i *InmemSink) AddSample(key []string, val float32) {
k := i.flattenKey(key)
i.AddSampleWithLabels(key, val, nil)
}
func (i *InmemSink) AddSampleWithLabels(key []string, val float32, labels []Label) {
k, name := i.flattenKeyLabels(key, labels)
intv := i.getInterval()
intv.Lock()
defer intv.Unlock()
agg := intv.Samples[k]
if agg == nil {
agg = &AggregateSample{}
agg, ok := intv.Samples[k]
if !ok {
agg = SampledValue{
Name: name,
AggregateSample: &AggregateSample{},
Labels: labels,
}
intv.Samples[k] = agg
}
agg.Ingest(float64(val))
agg.Ingest(float64(val), i.rateDenom)
}
// Data is used to retrieve all the aggregated metrics
@ -236,6 +282,38 @@ func (i *InmemSink) getInterval() *IntervalMetrics {
// Flattens the key for formatting, removes spaces
func (i *InmemSink) flattenKey(parts []string) string {
joined := strings.Join(parts, ".")
return strings.Replace(joined, " ", "_", -1)
buf := &bytes.Buffer{}
replacer := strings.NewReplacer(" ", "_")
if len(parts) > 0 {
replacer.WriteString(buf, parts[0])
}
for _, part := range parts[1:] {
replacer.WriteString(buf, ".")
replacer.WriteString(buf, part)
}
return buf.String()
}
// Flattens the key for formatting along with its labels, removes spaces
func (i *InmemSink) flattenKeyLabels(parts []string, labels []Label) (string, string) {
buf := &bytes.Buffer{}
replacer := strings.NewReplacer(" ", "_")
if len(parts) > 0 {
replacer.WriteString(buf, parts[0])
}
for _, part := range parts[1:] {
replacer.WriteString(buf, ".")
replacer.WriteString(buf, part)
}
key := buf.String()
for _, label := range labels {
replacer.WriteString(buf, fmt.Sprintf(";%s=%s", label.Name, label.Value))
}
return buf.String(), key
}

118
vendor/github.com/armon/go-metrics/inmem_endpoint.go generated vendored Normal file
View File

@ -0,0 +1,118 @@
package metrics
import (
"fmt"
"net/http"
"sort"
"time"
)
// MetricsSummary holds a roll-up of metrics info for a given interval
type MetricsSummary struct {
Timestamp string
Gauges []GaugeValue
Points []PointValue
Counters []SampledValue
Samples []SampledValue
}
type GaugeValue struct {
Name string
Hash string `json:"-"`
Value float32
Labels []Label `json:"-"`
DisplayLabels map[string]string `json:"Labels"`
}
type PointValue struct {
Name string
Points []float32
}
type SampledValue struct {
Name string
Hash string `json:"-"`
*AggregateSample
Mean float64
Stddev float64
Labels []Label `json:"-"`
DisplayLabels map[string]string `json:"Labels"`
}
// DisplayMetrics returns a summary of the metrics from the most recent finished interval.
func (i *InmemSink) DisplayMetrics(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
data := i.Data()
var interval *IntervalMetrics
n := len(data)
switch {
case n == 0:
return nil, fmt.Errorf("no metric intervals have been initialized yet")
case n == 1:
// Show the current interval if it's all we have
interval = i.intervals[0]
default:
// Show the most recent finished interval if we have one
interval = i.intervals[n-2]
}
summary := MetricsSummary{
Timestamp: interval.Interval.Round(time.Second).UTC().String(),
Gauges: make([]GaugeValue, 0, len(interval.Gauges)),
Points: make([]PointValue, 0, len(interval.Points)),
}
// Format and sort the output of each metric type, so it gets displayed in a
// deterministic order.
for name, points := range interval.Points {
summary.Points = append(summary.Points, PointValue{name, points})
}
sort.Slice(summary.Points, func(i, j int) bool {
return summary.Points[i].Name < summary.Points[j].Name
})
for hash, value := range interval.Gauges {
value.Hash = hash
value.DisplayLabels = make(map[string]string)
for _, label := range value.Labels {
value.DisplayLabels[label.Name] = label.Value
}
value.Labels = nil
summary.Gauges = append(summary.Gauges, value)
}
sort.Slice(summary.Gauges, func(i, j int) bool {
return summary.Gauges[i].Hash < summary.Gauges[j].Hash
})
summary.Counters = formatSamples(interval.Counters)
summary.Samples = formatSamples(interval.Samples)
return summary, nil
}
func formatSamples(source map[string]SampledValue) []SampledValue {
output := make([]SampledValue, 0, len(source))
for hash, sample := range source {
displayLabels := make(map[string]string)
for _, label := range sample.Labels {
displayLabels[label.Name] = label.Value
}
output = append(output, SampledValue{
Name: sample.Name,
Hash: hash,
AggregateSample: sample.AggregateSample,
Mean: sample.AggregateSample.Mean(),
Stddev: sample.AggregateSample.Stddev(),
DisplayLabels: displayLabels,
})
}
sort.Slice(output, func(i, j int) bool {
return output[i].Hash < output[j].Hash
})
return output
}

View File

@ -6,6 +6,7 @@ import (
"io"
"os"
"os/signal"
"strings"
"sync"
"syscall"
)
@ -75,22 +76,25 @@ func (i *InmemSignal) dumpStats() {
data := i.inm.Data()
// Skip the last period which is still being aggregated
for i := 0; i < len(data)-1; i++ {
intv := data[i]
for j := 0; j < len(data)-1; j++ {
intv := data[j]
intv.RLock()
for name, val := range intv.Gauges {
fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val)
for _, val := range intv.Gauges {
name := i.flattenLabels(val.Name, val.Labels)
fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val.Value)
}
for name, vals := range intv.Points {
for _, val := range vals {
fmt.Fprintf(buf, "[%v][P] '%s': %0.3f\n", intv.Interval, name, val)
}
}
for name, agg := range intv.Counters {
fmt.Fprintf(buf, "[%v][C] '%s': %s\n", intv.Interval, name, agg)
for _, agg := range intv.Counters {
name := i.flattenLabels(agg.Name, agg.Labels)
fmt.Fprintf(buf, "[%v][C] '%s': %s\n", intv.Interval, name, agg.AggregateSample)
}
for name, agg := range intv.Samples {
fmt.Fprintf(buf, "[%v][S] '%s': %s\n", intv.Interval, name, agg)
for _, agg := range intv.Samples {
name := i.flattenLabels(agg.Name, agg.Labels)
fmt.Fprintf(buf, "[%v][S] '%s': %s\n", intv.Interval, name, agg.AggregateSample)
}
intv.RUnlock()
}
@ -98,3 +102,16 @@ func (i *InmemSignal) dumpStats() {
// Write out the bytes
i.w.Write(buf.Bytes())
}
// Flattens the key for formatting along with its labels, removes spaces
func (i *InmemSignal) flattenLabels(name string, labels []Label) string {
buf := bytes.NewBufferString(name)
replacer := strings.NewReplacer(" ", "_", ":", "_")
for _, label := range labels {
replacer.WriteString(buf, ".")
replacer.WriteString(buf, label.Value)
}
return buf.String()
}

View File

@ -2,20 +2,43 @@ package metrics
import (
"runtime"
"strings"
"time"
"github.com/hashicorp/go-immutable-radix"
)
type Label struct {
Name string
Value string
}
func (m *Metrics) SetGauge(key []string, val float32) {
if m.HostName != "" && m.EnableHostname {
key = insert(0, m.HostName, key)
m.SetGaugeWithLabels(key, val, nil)
}
func (m *Metrics) SetGaugeWithLabels(key []string, val float32, labels []Label) {
if m.HostName != "" {
if m.EnableHostnameLabel {
labels = append(labels, Label{"host", m.HostName})
} else if m.EnableHostname {
key = insert(0, m.HostName, key)
}
}
if m.EnableTypePrefix {
key = insert(0, "gauge", key)
}
if m.ServiceName != "" {
key = insert(0, m.ServiceName, key)
if m.EnableServiceLabel {
labels = append(labels, Label{"service", m.ServiceName})
} else {
key = insert(0, m.ServiceName, key)
}
}
m.sink.SetGauge(key, val)
if !m.allowMetric(key) {
return
}
m.sink.SetGaugeWithLabels(key, val, labels)
}
func (m *Metrics) EmitKey(key []string, val float32) {
@ -25,40 +48,118 @@ func (m *Metrics) EmitKey(key []string, val float32) {
if m.ServiceName != "" {
key = insert(0, m.ServiceName, key)
}
if !m.allowMetric(key) {
return
}
m.sink.EmitKey(key, val)
}
func (m *Metrics) IncrCounter(key []string, val float32) {
m.IncrCounterWithLabels(key, val, nil)
}
func (m *Metrics) IncrCounterWithLabels(key []string, val float32, labels []Label) {
if m.HostName != "" && m.EnableHostnameLabel {
labels = append(labels, Label{"host", m.HostName})
}
if m.EnableTypePrefix {
key = insert(0, "counter", key)
}
if m.ServiceName != "" {
key = insert(0, m.ServiceName, key)
if m.EnableServiceLabel {
labels = append(labels, Label{"service", m.ServiceName})
} else {
key = insert(0, m.ServiceName, key)
}
}
m.sink.IncrCounter(key, val)
if !m.allowMetric(key) {
return
}
m.sink.IncrCounterWithLabels(key, val, labels)
}
func (m *Metrics) AddSample(key []string, val float32) {
m.AddSampleWithLabels(key, val, nil)
}
func (m *Metrics) AddSampleWithLabels(key []string, val float32, labels []Label) {
if m.HostName != "" && m.EnableHostnameLabel {
labels = append(labels, Label{"host", m.HostName})
}
if m.EnableTypePrefix {
key = insert(0, "sample", key)
}
if m.ServiceName != "" {
key = insert(0, m.ServiceName, key)
if m.EnableServiceLabel {
labels = append(labels, Label{"service", m.ServiceName})
} else {
key = insert(0, m.ServiceName, key)
}
}
m.sink.AddSample(key, val)
if !m.allowMetric(key) {
return
}
m.sink.AddSampleWithLabels(key, val, labels)
}
func (m *Metrics) MeasureSince(key []string, start time.Time) {
m.MeasureSinceWithLabels(key, start, nil)
}
func (m *Metrics) MeasureSinceWithLabels(key []string, start time.Time, labels []Label) {
if m.HostName != "" && m.EnableHostnameLabel {
labels = append(labels, Label{"host", m.HostName})
}
if m.EnableTypePrefix {
key = insert(0, "timer", key)
}
if m.ServiceName != "" {
key = insert(0, m.ServiceName, key)
if m.EnableServiceLabel {
labels = append(labels, Label{"service", m.ServiceName})
} else {
key = insert(0, m.ServiceName, key)
}
}
if !m.allowMetric(key) {
return
}
now := time.Now()
elapsed := now.Sub(start)
msec := float32(elapsed.Nanoseconds()) / float32(m.TimerGranularity)
m.sink.AddSample(key, msec)
m.sink.AddSampleWithLabels(key, msec, labels)
}
// UpdateFilter overwrites the existing filter with the given rules.
func (m *Metrics) UpdateFilter(allow, block []string) {
m.filterLock.Lock()
defer m.filterLock.Unlock()
m.AllowedPrefixes = allow
m.BlockedPrefixes = block
m.filter = iradix.New()
for _, prefix := range m.AllowedPrefixes {
m.filter, _, _ = m.filter.Insert([]byte(prefix), true)
}
for _, prefix := range m.BlockedPrefixes {
m.filter, _, _ = m.filter.Insert([]byte(prefix), false)
}
}
// Returns whether the metric should be allowed based on configured prefix filters
func (m *Metrics) allowMetric(key []string) bool {
m.filterLock.RLock()
defer m.filterLock.RUnlock()
if m.filter == nil || m.filter.Len() == 0 {
return m.Config.FilterDefault
}
_, allowed, ok := m.filter.Root().LongestPrefix([]byte(strings.Join(key, ".")))
if !ok {
return m.Config.FilterDefault
}
return allowed.(bool)
}
// Periodically collects runtime stats to publish

View File

@ -1,35 +1,50 @@
package metrics
import (
"fmt"
"net/url"
)
// The MetricSink interface is used to transmit metrics information
// to an external system
type MetricSink interface {
// A Gauge should retain the last value it is set to
SetGauge(key []string, val float32)
SetGaugeWithLabels(key []string, val float32, labels []Label)
// Should emit a Key/Value pair for each call
EmitKey(key []string, val float32)
// Counters should accumulate values
IncrCounter(key []string, val float32)
IncrCounterWithLabels(key []string, val float32, labels []Label)
// Samples are for timing information, where quantiles are used
AddSample(key []string, val float32)
AddSampleWithLabels(key []string, val float32, labels []Label)
}
// BlackholeSink is used to just blackhole messages
type BlackholeSink struct{}
func (*BlackholeSink) SetGauge(key []string, val float32) {}
func (*BlackholeSink) EmitKey(key []string, val float32) {}
func (*BlackholeSink) IncrCounter(key []string, val float32) {}
func (*BlackholeSink) AddSample(key []string, val float32) {}
func (*BlackholeSink) SetGauge(key []string, val float32) {}
func (*BlackholeSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {}
func (*BlackholeSink) EmitKey(key []string, val float32) {}
func (*BlackholeSink) IncrCounter(key []string, val float32) {}
func (*BlackholeSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {}
func (*BlackholeSink) AddSample(key []string, val float32) {}
func (*BlackholeSink) AddSampleWithLabels(key []string, val float32, labels []Label) {}
// FanoutSink is used to sink to fanout values to multiple sinks
type FanoutSink []MetricSink
func (fh FanoutSink) SetGauge(key []string, val float32) {
fh.SetGaugeWithLabels(key, val, nil)
}
func (fh FanoutSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {
for _, s := range fh {
s.SetGauge(key, val)
s.SetGaugeWithLabels(key, val, labels)
}
}
@ -40,13 +55,61 @@ func (fh FanoutSink) EmitKey(key []string, val float32) {
}
func (fh FanoutSink) IncrCounter(key []string, val float32) {
fh.IncrCounterWithLabels(key, val, nil)
}
func (fh FanoutSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {
for _, s := range fh {
s.IncrCounter(key, val)
s.IncrCounterWithLabels(key, val, labels)
}
}
func (fh FanoutSink) AddSample(key []string, val float32) {
fh.AddSampleWithLabels(key, val, nil)
}
func (fh FanoutSink) AddSampleWithLabels(key []string, val float32, labels []Label) {
for _, s := range fh {
s.AddSample(key, val)
s.AddSampleWithLabels(key, val, labels)
}
}
// sinkURLFactoryFunc is an generic interface around the *SinkFromURL() function provided
// by each sink type
type sinkURLFactoryFunc func(*url.URL) (MetricSink, error)
// sinkRegistry supports the generic NewMetricSink function by mapping URL
// schemes to metric sink factory functions
var sinkRegistry = map[string]sinkURLFactoryFunc{
"statsd": NewStatsdSinkFromURL,
"statsite": NewStatsiteSinkFromURL,
"inmem": NewInmemSinkFromURL,
}
// NewMetricSinkFromURL allows a generic URL input to configure any of the
// supported sinks. The scheme of the URL identifies the type of the sink, the
// and query parameters are used to set options.
//
// "statsd://" - Initializes a StatsdSink. The host and port are passed through
// as the "addr" of the sink
//
// "statsite://" - Initializes a StatsiteSink. The host and port become the
// "addr" of the sink
//
// "inmem://" - Initializes an InmemSink. The host and port are ignored. The
// "interval" and "duration" query parameters must be specified with valid
// durations, see NewInmemSink for details.
func NewMetricSinkFromURL(urlStr string) (MetricSink, error) {
u, err := url.Parse(urlStr)
if err != nil {
return nil, err
}
sinkURLFactoryFunc := sinkRegistry[u.Scheme]
if sinkURLFactoryFunc == nil {
return nil, fmt.Errorf(
"cannot create metric sink, unrecognized sink name: %q", u.Scheme)
}
return sinkURLFactoryFunc(u)
}

View File

@ -2,7 +2,11 @@ package metrics
import (
"os"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/go-immutable-radix"
)
// Config is used to configure metrics settings
@ -10,26 +14,34 @@ type Config struct {
ServiceName string // Prefixed with keys to seperate services
HostName string // Hostname to use. If not provided and EnableHostname, it will be os.Hostname
EnableHostname bool // Enable prefixing gauge values with hostname
EnableHostnameLabel bool // Enable adding hostname to labels
EnableServiceLabel bool // Enable adding service to labels
EnableRuntimeMetrics bool // Enables profiling of runtime metrics (GC, Goroutines, Memory)
EnableTypePrefix bool // Prefixes key with a type ("counter", "gauge", "timer")
TimerGranularity time.Duration // Granularity of timers.
ProfileInterval time.Duration // Interval to profile runtime metrics
AllowedPrefixes []string // A list of metric prefixes to allow, with '.' as the separator
BlockedPrefixes []string // A list of metric prefixes to block, with '.' as the separator
FilterDefault bool // Whether to allow metrics by default
}
// Metrics represents an instance of a metrics sink that can
// be used to emit
type Metrics struct {
Config
lastNumGC uint32
sink MetricSink
lastNumGC uint32
sink MetricSink
filter *iradix.Tree
filterLock sync.RWMutex
}
// Shared global metrics instance
var globalMetrics *Metrics
var globalMetrics atomic.Value // *Metrics
func init() {
// Initialize to a blackhole sink to avoid errors
globalMetrics = &Metrics{sink: &BlackholeSink{}}
globalMetrics.Store(&Metrics{sink: &BlackholeSink{}})
}
// DefaultConfig provides a sane default configuration
@ -42,6 +54,7 @@ func DefaultConfig(serviceName string) *Config {
EnableTypePrefix: false, // Disable type prefix
TimerGranularity: time.Millisecond, // Timers are in milliseconds
ProfileInterval: time.Second, // Poll runtime every second
FilterDefault: true, // Don't filter metrics by default
}
// Try to get the hostname
@ -55,6 +68,7 @@ func New(conf *Config, sink MetricSink) (*Metrics, error) {
met := &Metrics{}
met.Config = *conf
met.sink = sink
met.UpdateFilter(conf.AllowedPrefixes, conf.BlockedPrefixes)
// Start the runtime collector
if conf.EnableRuntimeMetrics {
@ -68,28 +82,48 @@ func New(conf *Config, sink MetricSink) (*Metrics, error) {
func NewGlobal(conf *Config, sink MetricSink) (*Metrics, error) {
metrics, err := New(conf, sink)
if err == nil {
globalMetrics = metrics
globalMetrics.Store(metrics)
}
return metrics, err
}
// Proxy all the methods to the globalMetrics instance
func SetGauge(key []string, val float32) {
globalMetrics.SetGauge(key, val)
globalMetrics.Load().(*Metrics).SetGauge(key, val)
}
func SetGaugeWithLabels(key []string, val float32, labels []Label) {
globalMetrics.Load().(*Metrics).SetGaugeWithLabels(key, val, labels)
}
func EmitKey(key []string, val float32) {
globalMetrics.EmitKey(key, val)
globalMetrics.Load().(*Metrics).EmitKey(key, val)
}
func IncrCounter(key []string, val float32) {
globalMetrics.IncrCounter(key, val)
globalMetrics.Load().(*Metrics).IncrCounter(key, val)
}
func IncrCounterWithLabels(key []string, val float32, labels []Label) {
globalMetrics.Load().(*Metrics).IncrCounterWithLabels(key, val, labels)
}
func AddSample(key []string, val float32) {
globalMetrics.AddSample(key, val)
globalMetrics.Load().(*Metrics).AddSample(key, val)
}
func AddSampleWithLabels(key []string, val float32, labels []Label) {
globalMetrics.Load().(*Metrics).AddSampleWithLabels(key, val, labels)
}
func MeasureSince(key []string, start time.Time) {
globalMetrics.MeasureSince(key, start)
globalMetrics.Load().(*Metrics).MeasureSince(key, start)
}
func MeasureSinceWithLabels(key []string, start time.Time, labels []Label) {
globalMetrics.Load().(*Metrics).MeasureSinceWithLabels(key, start, labels)
}
func UpdateFilter(allow, block []string) {
globalMetrics.Load().(*Metrics).UpdateFilter(allow, block)
}

View File

@ -5,6 +5,7 @@ import (
"fmt"
"log"
"net"
"net/url"
"strings"
"time"
)
@ -23,6 +24,12 @@ type StatsdSink struct {
metricQueue chan string
}
// NewStatsdSinkFromURL creates an StatsdSink from a URL. It is used
// (and tested) from NewMetricSinkFromURL.
func NewStatsdSinkFromURL(u *url.URL) (MetricSink, error) {
return NewStatsdSink(u.Host)
}
// NewStatsdSink is used to create a new StatsdSink
func NewStatsdSink(addr string) (*StatsdSink, error) {
s := &StatsdSink{
@ -43,6 +50,11 @@ func (s *StatsdSink) SetGauge(key []string, val float32) {
s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
}
func (s *StatsdSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {
flatKey := s.flattenKeyLabels(key, labels)
s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
}
func (s *StatsdSink) EmitKey(key []string, val float32) {
flatKey := s.flattenKey(key)
s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val))
@ -53,11 +65,21 @@ func (s *StatsdSink) IncrCounter(key []string, val float32) {
s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
}
func (s *StatsdSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {
flatKey := s.flattenKeyLabels(key, labels)
s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
}
func (s *StatsdSink) AddSample(key []string, val float32) {
flatKey := s.flattenKey(key)
s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
}
func (s *StatsdSink) AddSampleWithLabels(key []string, val float32, labels []Label) {
flatKey := s.flattenKeyLabels(key, labels)
s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
}
// Flattens the key for formatting, removes spaces
func (s *StatsdSink) flattenKey(parts []string) string {
joined := strings.Join(parts, ".")
@ -73,6 +95,14 @@ func (s *StatsdSink) flattenKey(parts []string) string {
}, joined)
}
// Flattens the key along with labels for formatting, removes spaces
func (s *StatsdSink) flattenKeyLabels(parts []string, labels []Label) string {
for _, label := range labels {
parts = append(parts, label.Value)
}
return s.flattenKey(parts)
}
// Does a non-blocking push to the metrics queue
func (s *StatsdSink) pushMetric(m string) {
select {

View File

@ -5,6 +5,7 @@ import (
"fmt"
"log"
"net"
"net/url"
"strings"
"time"
)
@ -16,6 +17,12 @@ const (
flushInterval = 100 * time.Millisecond
)
// NewStatsiteSinkFromURL creates an StatsiteSink from a URL. It is used
// (and tested) from NewMetricSinkFromURL.
func NewStatsiteSinkFromURL(u *url.URL) (MetricSink, error) {
return NewStatsiteSink(u.Host)
}
// StatsiteSink provides a MetricSink that can be used with a
// statsite metrics server
type StatsiteSink struct {
@ -43,6 +50,11 @@ func (s *StatsiteSink) SetGauge(key []string, val float32) {
s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
}
func (s *StatsiteSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {
flatKey := s.flattenKeyLabels(key, labels)
s.pushMetric(fmt.Sprintf("%s:%f|g\n", flatKey, val))
}
func (s *StatsiteSink) EmitKey(key []string, val float32) {
flatKey := s.flattenKey(key)
s.pushMetric(fmt.Sprintf("%s:%f|kv\n", flatKey, val))
@ -53,11 +65,21 @@ func (s *StatsiteSink) IncrCounter(key []string, val float32) {
s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
}
func (s *StatsiteSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {
flatKey := s.flattenKeyLabels(key, labels)
s.pushMetric(fmt.Sprintf("%s:%f|c\n", flatKey, val))
}
func (s *StatsiteSink) AddSample(key []string, val float32) {
flatKey := s.flattenKey(key)
s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
}
func (s *StatsiteSink) AddSampleWithLabels(key []string, val float32, labels []Label) {
flatKey := s.flattenKeyLabels(key, labels)
s.pushMetric(fmt.Sprintf("%s:%f|ms\n", flatKey, val))
}
// Flattens the key for formatting, removes spaces
func (s *StatsiteSink) flattenKey(parts []string) string {
joined := strings.Join(parts, ".")
@ -73,6 +95,14 @@ func (s *StatsiteSink) flattenKey(parts []string) string {
}, joined)
}
// Flattens the key along with labels for formatting, removes spaces
func (s *StatsiteSink) flattenKeyLabels(parts []string, labels []Label) string {
for _, label := range labels {
parts = append(parts, label.Value)
}
return s.flattenKey(parts)
}
// Does a non-blocking push to the metrics queue
func (s *StatsiteSink) pushMetric(m string) {
select {

47
vendor/vendor.json vendored
View File

@ -19,10 +19,10 @@
"revisionTime": "2016-06-22T17:32:16Z"
},
{
"checksumSHA1": "CMcHvqld6XBCDYA+9jCy6gT1O0Q=",
"checksumSHA1": "WvApwvvSe3i/3KO8300dyeFmkbI=",
"path": "github.com/DataDog/datadog-go/statsd",
"revision": "909c02b65dd8a52e8fa6072db9752a112227cf21",
"revisionTime": "2016-08-22T16:14:30Z"
"revision": "b10af4b12965a1ad08d164f57d14195b4140d8de",
"revisionTime": "2017-08-09T10:47:06Z"
},
{
"checksumSHA1": "AzjRkOQtVBTwIw4RJLTygFhJs3s=",
@ -64,20 +64,27 @@
"revision": "bbbad097214e2918d8543d5201d12bfd7bca254d"
},
{
"checksumSHA1": "0et4hA6AYqZCgYiY+c6Z17t3k3k=",
"path": "github.com/armon/go-metrics",
"revision": "06b60999766278efd6d2b5d8418a58c3d5b99e87"
"revision": "023a4bbe4bb9bfb23ee7e1afc8d0abad217641f3",
"revisionTime": "2017-08-09T01:16:44Z"
},
{
"checksumSHA1": "OmqT9Y1mAHvlAKeJh0jBHC9SH78=",
"origin": "github.com/armon",
"path": "github.com/armon/go-metrics/..",
"revision": ""
},
{
"checksumSHA1": "xCsGGM9TKBogZDfSN536KtQdLko=",
"path": "github.com/armon/go-metrics/circonus",
"revision": "3df31a1ada83e310c2e24b267c8e8b68836547b4",
"revisionTime": "2016-07-17T04:34:58Z"
"revision": "023a4bbe4bb9bfb23ee7e1afc8d0abad217641f3",
"revisionTime": "2017-08-09T01:16:44Z"
},
{
"checksumSHA1": "mAzNU3zeZGEwqjDT4ZkspFvx3TI=",
"checksumSHA1": "Dt0n1sSivvvdZQdzc4Hu/yOG+T0=",
"path": "github.com/armon/go-metrics/datadog",
"revision": "3df31a1ada83e310c2e24b267c8e8b68836547b4",
"revisionTime": "2016-07-17T04:34:58Z"
"revision": "023a4bbe4bb9bfb23ee7e1afc8d0abad217641f3",
"revisionTime": "2017-08-09T01:16:44Z"
},
{
"checksumSHA1": "gNO0JNpLzYOdInGeq7HqMZUzx9M=",
@ -366,14 +373,20 @@
{
"checksumSHA1": "iP5slJJPRZUm0rfdII8OiATAACA=",
"path": "github.com/docker/docker/pkg/idtools",
"revision": "52debcd58ac91bf68503ce60561536911b74ff05",
"revisionTime": "2016-05-20T15:17:10Z"
"revision": "02caa73df411debed164f520a6a1304778f8b88c",
"revisionTime": "2016-05-28T10:48:36Z"
},
{
"checksumSHA1": "iP5slJJPRZUm0rfdII8OiATAACA=",
"path": "github.com/docker/docker/pkg/idtools",
"revision": "02caa73df411debed164f520a6a1304778f8b88c",
"revisionTime": "2016-05-28T10:48:36Z"
"revision": "52debcd58ac91bf68503ce60561536911b74ff05",
"revisionTime": "2016-05-20T15:17:10Z"
},
{
"checksumSHA1": "tdhmIGUaoOMEDymMC23qTS7bt0g=",
"path": "github.com/docker/docker/pkg/ioutils",
"revision": "52debcd58ac91bf68503ce60561536911b74ff05",
"revisionTime": "2016-05-20T15:17:10Z"
},
{
"checksumSHA1": "tdhmIGUaoOMEDymMC23qTS7bt0g=",
@ -381,12 +394,6 @@
"revision": "da39e9a4f920a15683dd0f23923c302d4db6eed5",
"revisionTime": "2016-05-28T08:11:04Z"
},
{
"checksumSHA1": "tdhmIGUaoOMEDymMC23qTS7bt0g=",
"path": "github.com/docker/docker/pkg/ioutils",
"revision": "52debcd58ac91bf68503ce60561536911b74ff05",
"revisionTime": "2016-05-20T15:17:10Z"
},
{
"checksumSHA1": "BlFSSK7zUjPzPuxkLmM/0wpvku8=",
"path": "github.com/docker/docker/pkg/jsonlog",