initial work on datadog telemetry
This commit is contained in:
parent
969213b13a
commit
3e953d725a
|
@ -17,6 +17,7 @@ import (
|
|||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/armon/go-metrics/circonus"
|
||||
"github.com/armon/go-metrics/datadog"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/go-checkpoint"
|
||||
"github.com/hashicorp/go-syslog"
|
||||
|
@ -605,6 +606,15 @@ func (c *Command) setupTelemetry(config *Config) error {
|
|||
fanout = append(fanout, sink)
|
||||
}
|
||||
|
||||
// Configure the datadog sink
|
||||
if telConfig.DataDogAddr != "" {
|
||||
sink, err := datadog.NewDogStatsdSink(telConfig.DataDogAddr, config.NodeName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fanout = append(fanout, sink)
|
||||
}
|
||||
|
||||
// Configure the Circonus sink
|
||||
if telConfig.CirconusAPIToken != "" || telConfig.CirconusCheckSubmissionURL != "" {
|
||||
cfg := &circonus.Config{}
|
||||
|
|
|
@ -250,6 +250,7 @@ type ServerConfig struct {
|
|||
type Telemetry struct {
|
||||
StatsiteAddr string `mapstructure:"statsite_address"`
|
||||
StatsdAddr string `mapstructure:"statsd_address"`
|
||||
DataDogAddr string `mapstructure:"datadog_address"`
|
||||
DisableHostname bool `mapstructure:"disable_hostname"`
|
||||
CollectionInterval string `mapstructure:"collection_interval"`
|
||||
collectionInterval time.Duration `mapstructure:"-"`
|
||||
|
@ -761,6 +762,9 @@ func (a *Telemetry) Merge(b *Telemetry) *Telemetry {
|
|||
if b.StatsdAddr != "" {
|
||||
result.StatsdAddr = b.StatsdAddr
|
||||
}
|
||||
if b.DataDogAddr != "" {
|
||||
result.DataDogAddr = b.DataDogAddr
|
||||
}
|
||||
if b.DisableHostname {
|
||||
result.DisableHostname = true
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ func TestConfig_Merge(t *testing.T) {
|
|||
Telemetry: &Telemetry{
|
||||
StatsiteAddr: "127.0.0.1:8125",
|
||||
StatsdAddr: "127.0.0.1:8125",
|
||||
DataDogAddr: "127.0.0.1:8125",
|
||||
DisableHostname: false,
|
||||
CirconusAPIToken: "0",
|
||||
CirconusAPIApp: "nomadic",
|
||||
|
@ -148,6 +149,7 @@ func TestConfig_Merge(t *testing.T) {
|
|||
Telemetry: &Telemetry{
|
||||
StatsiteAddr: "127.0.0.2:8125",
|
||||
StatsdAddr: "127.0.0.2:8125",
|
||||
DataDogAddr: "127.0.0.1:8125",
|
||||
DisableHostname: true,
|
||||
PublishNodeMetrics: true,
|
||||
PublishAllocationMetrics: true,
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
Copyright (c) 2015 Datadog, Inc
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
|
@ -0,0 +1,52 @@
|
|||
## Overview
|
||||
|
||||
Package `statsd` provides a Go [dogstatsd](http://docs.datadoghq.com/guides/dogstatsd/) client. Dogstatsd extends Statsd, adding tags
|
||||
and histograms.
|
||||
|
||||
## Get the code
|
||||
|
||||
$ go get github.com/DataDog/datadog-go/statsd
|
||||
|
||||
## Usage
|
||||
|
||||
```go
|
||||
// Create the client
|
||||
c, err := statsd.New("127.0.0.1:8125")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
// Prefix every metric with the app name
|
||||
c.Namespace = "flubber."
|
||||
// Send the EC2 availability zone as a tag with every metric
|
||||
c.Tags = append(c.Tags, "us-east-1a")
|
||||
|
||||
// Do some metrics!
|
||||
err = c.Gauge("request.queue_depth", 12, nil, 1)
|
||||
err = c.Timing("request.duration", duration, nil, 1) // Uses a time.Duration!
|
||||
err = c.TimeInMilliseconds("request", 12, nil, 1)
|
||||
err = c.Incr("request.count_total", nil, 1)
|
||||
err = c.Decr("request.count_total", nil, 1)
|
||||
err = c.Count("request.count_total", 2, nil, 1)
|
||||
```
|
||||
|
||||
## Buffering Client
|
||||
|
||||
DogStatsD accepts packets with multiple statsd payloads in them. Using the BufferingClient via `NewBufferingClient` will buffer up commands and send them when the buffer is reached or after 100msec.
|
||||
|
||||
## Development
|
||||
|
||||
Run the tests with:
|
||||
|
||||
$ go test
|
||||
|
||||
## Documentation
|
||||
|
||||
Please see: http://godoc.org/github.com/DataDog/datadog-go/statsd
|
||||
|
||||
## License
|
||||
|
||||
go-dogstatsd is released under the [MIT license](http://www.opensource.org/licenses/mit-license.php).
|
||||
|
||||
## Credits
|
||||
|
||||
Original code by [ooyala](https://github.com/ooyala/go-dogstatsd).
|
|
@ -0,0 +1,581 @@
|
|||
// Copyright 2013 Ooyala, Inc.
|
||||
|
||||
/*
|
||||
Package statsd provides a Go dogstatsd client. Dogstatsd extends the popular statsd,
|
||||
adding tags and histograms and pushing upstream to Datadog.
|
||||
|
||||
Refer to http://docs.datadoghq.com/guides/dogstatsd/ for information about DogStatsD.
|
||||
|
||||
Example Usage:
|
||||
|
||||
// Create the client
|
||||
c, err := statsd.New("127.0.0.1:8125")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
// Prefix every metric with the app name
|
||||
c.Namespace = "flubber."
|
||||
// Send the EC2 availability zone as a tag with every metric
|
||||
c.Tags = append(c.Tags, "us-east-1a")
|
||||
err = c.Gauge("request.duration", 1.2, nil, 1)
|
||||
|
||||
statsd is based on go-statsd-client.
|
||||
*/
|
||||
package statsd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
/*
|
||||
OptimalPayloadSize defines the optimal payload size for a UDP datagram, 1432 bytes
|
||||
is optimal for regular networks with an MTU of 1500 so datagrams don't get
|
||||
fragmented. It's generally recommended not to fragment UDP datagrams as losing
|
||||
a single fragment will cause the entire datagram to be lost.
|
||||
|
||||
This can be increased if your network has a greater MTU or you don't mind UDP
|
||||
datagrams getting fragmented. The practical limit is MaxUDPPayloadSize
|
||||
*/
|
||||
const OptimalPayloadSize = 1432
|
||||
|
||||
/*
|
||||
MaxUDPPayloadSize defines the maximum payload size for a UDP datagram.
|
||||
Its value comes from the calculation: 65535 bytes Max UDP datagram size -
|
||||
8byte UDP header - 60byte max IP headers
|
||||
any number greater than that will see frames being cut out.
|
||||
*/
|
||||
const MaxUDPPayloadSize = 65467
|
||||
|
||||
// A Client is a handle for sending udp messages to dogstatsd. It is safe to
|
||||
// use one Client from multiple goroutines simultaneously.
|
||||
type Client struct {
|
||||
conn net.Conn
|
||||
// Namespace to prepend to all statsd calls
|
||||
Namespace string
|
||||
// Tags are global tags to be added to every statsd call
|
||||
Tags []string
|
||||
// BufferLength is the length of the buffer in commands.
|
||||
bufferLength int
|
||||
flushTime time.Duration
|
||||
commands []string
|
||||
buffer bytes.Buffer
|
||||
stop bool
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
// New returns a pointer to a new Client given an addr in the format "hostname:port".
|
||||
func New(addr string) (*Client, error) {
|
||||
udpAddr, err := net.ResolveUDPAddr("udp", addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conn, err := net.DialUDP("udp", nil, udpAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
client := &Client{conn: conn}
|
||||
return client, nil
|
||||
}
|
||||
|
||||
// NewBuffered returns a Client that buffers its output and sends it in chunks.
|
||||
// Buflen is the length of the buffer in number of commands.
|
||||
func NewBuffered(addr string, buflen int) (*Client, error) {
|
||||
client, err := New(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
client.bufferLength = buflen
|
||||
client.commands = make([]string, 0, buflen)
|
||||
client.flushTime = time.Millisecond * 100
|
||||
go client.watch()
|
||||
return client, nil
|
||||
}
|
||||
|
||||
// format a message from its name, value, tags and rate. Also adds global
|
||||
// namespace and tags.
|
||||
func (c *Client) format(name, value string, tags []string, rate float64) string {
|
||||
var buf bytes.Buffer
|
||||
if c.Namespace != "" {
|
||||
buf.WriteString(c.Namespace)
|
||||
}
|
||||
buf.WriteString(name)
|
||||
buf.WriteString(":")
|
||||
buf.WriteString(value)
|
||||
if rate < 1 {
|
||||
buf.WriteString(`|@`)
|
||||
buf.WriteString(strconv.FormatFloat(rate, 'f', -1, 64))
|
||||
}
|
||||
|
||||
// do not append to c.Tags directly, because it's shared
|
||||
// across all invocations of this function
|
||||
tagCopy := make([]string, len(c.Tags), len(c.Tags)+len(tags))
|
||||
copy(tagCopy, c.Tags)
|
||||
tags = append(tagCopy, tags...)
|
||||
if len(tags) > 0 {
|
||||
buf.WriteString("|#")
|
||||
buf.WriteString(tags[0])
|
||||
for _, tag := range tags[1:] {
|
||||
buf.WriteString(",")
|
||||
buf.WriteString(tag)
|
||||
}
|
||||
}
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
func (c *Client) watch() {
|
||||
for _ = range time.Tick(c.flushTime) {
|
||||
if c.stop {
|
||||
return
|
||||
}
|
||||
c.Lock()
|
||||
if len(c.commands) > 0 {
|
||||
// FIXME: eating error here
|
||||
c.flush()
|
||||
}
|
||||
c.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) append(cmd string) error {
|
||||
c.commands = append(c.commands, cmd)
|
||||
// if we should flush, lets do it
|
||||
if len(c.commands) == c.bufferLength {
|
||||
if err := c.flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) joinMaxSize(cmds []string, sep string, maxSize int) ([][]byte, []int) {
|
||||
c.buffer.Reset() //clear buffer
|
||||
|
||||
var frames [][]byte
|
||||
var ncmds []int
|
||||
sepBytes := []byte(sep)
|
||||
sepLen := len(sep)
|
||||
|
||||
elem := 0
|
||||
for _, cmd := range cmds {
|
||||
needed := len(cmd)
|
||||
|
||||
if elem != 0 {
|
||||
needed = needed + sepLen
|
||||
}
|
||||
|
||||
if c.buffer.Len()+needed <= maxSize {
|
||||
if elem != 0 {
|
||||
c.buffer.Write(sepBytes)
|
||||
}
|
||||
c.buffer.WriteString(cmd)
|
||||
elem++
|
||||
} else {
|
||||
frames = append(frames, copyAndResetBuffer(&c.buffer))
|
||||
ncmds = append(ncmds, elem)
|
||||
// if cmd is bigger than maxSize it will get flushed on next loop
|
||||
c.buffer.WriteString(cmd)
|
||||
elem = 1
|
||||
}
|
||||
}
|
||||
|
||||
//add whatever is left! if there's actually something
|
||||
if c.buffer.Len() > 0 {
|
||||
frames = append(frames, copyAndResetBuffer(&c.buffer))
|
||||
ncmds = append(ncmds, elem)
|
||||
}
|
||||
|
||||
return frames, ncmds
|
||||
}
|
||||
|
||||
func copyAndResetBuffer(buf *bytes.Buffer) []byte {
|
||||
tmpBuf := make([]byte, buf.Len())
|
||||
copy(tmpBuf, buf.Bytes())
|
||||
buf.Reset()
|
||||
return tmpBuf
|
||||
}
|
||||
|
||||
// flush the commands in the buffer. Lock must be held by caller.
|
||||
func (c *Client) flush() error {
|
||||
frames, flushable := c.joinMaxSize(c.commands, "\n", OptimalPayloadSize)
|
||||
var err error
|
||||
cmdsFlushed := 0
|
||||
for i, data := range frames {
|
||||
_, e := c.conn.Write(data)
|
||||
if e != nil {
|
||||
err = e
|
||||
break
|
||||
}
|
||||
cmdsFlushed += flushable[i]
|
||||
}
|
||||
|
||||
// clear the slice with a slice op, doesn't realloc
|
||||
if cmdsFlushed == len(c.commands) {
|
||||
c.commands = c.commands[:0]
|
||||
} else {
|
||||
//this case will cause a future realloc...
|
||||
// drop problematic command though (sorry).
|
||||
c.commands = c.commands[cmdsFlushed+1:]
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Client) sendMsg(msg string) error {
|
||||
// if this client is buffered, then we'll just append this
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
if c.bufferLength > 0 {
|
||||
// return an error if message is bigger than OptimalPayloadSize
|
||||
if len(msg) > MaxUDPPayloadSize {
|
||||
return errors.New("message size exceeds MaxUDPPayloadSize")
|
||||
}
|
||||
return c.append(msg)
|
||||
}
|
||||
_, err := c.conn.Write([]byte(msg))
|
||||
return err
|
||||
}
|
||||
|
||||
// send handles sampling and sends the message over UDP. It also adds global namespace prefixes and tags.
|
||||
func (c *Client) send(name, value string, tags []string, rate float64) error {
|
||||
if c == nil {
|
||||
return nil
|
||||
}
|
||||
if rate < 1 && rand.Float64() > rate {
|
||||
return nil
|
||||
}
|
||||
data := c.format(name, value, tags, rate)
|
||||
return c.sendMsg(data)
|
||||
}
|
||||
|
||||
// Gauge measures the value of a metric at a particular time.
|
||||
func (c *Client) Gauge(name string, value float64, tags []string, rate float64) error {
|
||||
stat := fmt.Sprintf("%f|g", value)
|
||||
return c.send(name, stat, tags, rate)
|
||||
}
|
||||
|
||||
// Count tracks how many times something happened per second.
|
||||
func (c *Client) Count(name string, value int64, tags []string, rate float64) error {
|
||||
stat := fmt.Sprintf("%d|c", value)
|
||||
return c.send(name, stat, tags, rate)
|
||||
}
|
||||
|
||||
// Histogram tracks the statistical distribution of a set of values.
|
||||
func (c *Client) Histogram(name string, value float64, tags []string, rate float64) error {
|
||||
stat := fmt.Sprintf("%f|h", value)
|
||||
return c.send(name, stat, tags, rate)
|
||||
}
|
||||
|
||||
// Decr is just Count of 1
|
||||
func (c *Client) Decr(name string, tags []string, rate float64) error {
|
||||
return c.send(name, "-1|c", tags, rate)
|
||||
}
|
||||
|
||||
// Incr is just Count of 1
|
||||
func (c *Client) Incr(name string, tags []string, rate float64) error {
|
||||
return c.send(name, "1|c", tags, rate)
|
||||
}
|
||||
|
||||
// Set counts the number of unique elements in a group.
|
||||
func (c *Client) Set(name string, value string, tags []string, rate float64) error {
|
||||
stat := fmt.Sprintf("%s|s", value)
|
||||
return c.send(name, stat, tags, rate)
|
||||
}
|
||||
|
||||
// Timing sends timing information, it is an alias for TimeInMilliseconds
|
||||
func (c *Client) Timing(name string, value time.Duration, tags []string, rate float64) error {
|
||||
return c.TimeInMilliseconds(name, value.Seconds()*1000, tags, rate)
|
||||
}
|
||||
|
||||
// TimeInMilliseconds sends timing information in milliseconds.
|
||||
// It is flushed by statsd with percentiles, mean and other info (https://github.com/etsy/statsd/blob/master/docs/metric_types.md#timing)
|
||||
func (c *Client) TimeInMilliseconds(name string, value float64, tags []string, rate float64) error {
|
||||
stat := fmt.Sprintf("%f|ms", value)
|
||||
return c.send(name, stat, tags, rate)
|
||||
}
|
||||
|
||||
// Event sends the provided Event.
|
||||
func (c *Client) Event(e *Event) error {
|
||||
stat, err := e.Encode(c.Tags...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return c.sendMsg(stat)
|
||||
}
|
||||
|
||||
// SimpleEvent sends an event with the provided title and text.
|
||||
func (c *Client) SimpleEvent(title, text string) error {
|
||||
e := NewEvent(title, text)
|
||||
return c.Event(e)
|
||||
}
|
||||
|
||||
// ServiceCheck sends the provided ServiceCheck.
|
||||
func (c *Client) ServiceCheck(sc *ServiceCheck) error {
|
||||
stat, err := sc.Encode(c.Tags...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return c.sendMsg(stat)
|
||||
}
|
||||
|
||||
// SimpleServiceCheck sends an serviceCheck with the provided name and status.
|
||||
func (c *Client) SimpleServiceCheck(name string, status serviceCheckStatus) error {
|
||||
sc := NewServiceCheck(name, status)
|
||||
return c.ServiceCheck(sc)
|
||||
}
|
||||
|
||||
// Close the client connection.
|
||||
func (c *Client) Close() error {
|
||||
if c == nil {
|
||||
return nil
|
||||
}
|
||||
c.stop = true
|
||||
return c.conn.Close()
|
||||
}
|
||||
|
||||
// Events support
|
||||
|
||||
type eventAlertType string
|
||||
|
||||
const (
|
||||
// Info is the "info" AlertType for events
|
||||
Info eventAlertType = "info"
|
||||
// Error is the "error" AlertType for events
|
||||
Error eventAlertType = "error"
|
||||
// Warning is the "warning" AlertType for events
|
||||
Warning eventAlertType = "warning"
|
||||
// Success is the "success" AlertType for events
|
||||
Success eventAlertType = "success"
|
||||
)
|
||||
|
||||
type eventPriority string
|
||||
|
||||
const (
|
||||
// Normal is the "normal" Priority for events
|
||||
Normal eventPriority = "normal"
|
||||
// Low is the "low" Priority for events
|
||||
Low eventPriority = "low"
|
||||
)
|
||||
|
||||
// An Event is an object that can be posted to your DataDog event stream.
|
||||
type Event struct {
|
||||
// Title of the event. Required.
|
||||
Title string
|
||||
// Text is the description of the event. Required.
|
||||
Text string
|
||||
// Timestamp is a timestamp for the event. If not provided, the dogstatsd
|
||||
// server will set this to the current time.
|
||||
Timestamp time.Time
|
||||
// Hostname for the event.
|
||||
Hostname string
|
||||
// AggregationKey groups this event with others of the same key.
|
||||
AggregationKey string
|
||||
// Priority of the event. Can be statsd.Low or statsd.Normal.
|
||||
Priority eventPriority
|
||||
// SourceTypeName is a source type for the event.
|
||||
SourceTypeName string
|
||||
// AlertType can be statsd.Info, statsd.Error, statsd.Warning, or statsd.Success.
|
||||
// If absent, the default value applied by the dogstatsd server is Info.
|
||||
AlertType eventAlertType
|
||||
// Tags for the event.
|
||||
Tags []string
|
||||
}
|
||||
|
||||
// NewEvent creates a new event with the given title and text. Error checking
|
||||
// against these values is done at send-time, or upon running e.Check.
|
||||
func NewEvent(title, text string) *Event {
|
||||
return &Event{
|
||||
Title: title,
|
||||
Text: text,
|
||||
}
|
||||
}
|
||||
|
||||
// Check verifies that an event is valid.
|
||||
func (e Event) Check() error {
|
||||
if len(e.Title) == 0 {
|
||||
return fmt.Errorf("statsd.Event title is required")
|
||||
}
|
||||
if len(e.Text) == 0 {
|
||||
return fmt.Errorf("statsd.Event text is required")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Encode returns the dogstatsd wire protocol representation for an event.
|
||||
// Tags may be passed which will be added to the encoded output but not to
|
||||
// the Event's list of tags, eg. for default tags.
|
||||
func (e Event) Encode(tags ...string) (string, error) {
|
||||
err := e.Check()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
text := e.escapedText()
|
||||
|
||||
var buffer bytes.Buffer
|
||||
buffer.WriteString("_e{")
|
||||
buffer.WriteString(strconv.FormatInt(int64(len(e.Title)), 10))
|
||||
buffer.WriteRune(',')
|
||||
buffer.WriteString(strconv.FormatInt(int64(len(text)), 10))
|
||||
buffer.WriteString("}:")
|
||||
buffer.WriteString(e.Title)
|
||||
buffer.WriteRune('|')
|
||||
buffer.WriteString(text)
|
||||
|
||||
if !e.Timestamp.IsZero() {
|
||||
buffer.WriteString("|d:")
|
||||
buffer.WriteString(strconv.FormatInt(int64(e.Timestamp.Unix()), 10))
|
||||
}
|
||||
|
||||
if len(e.Hostname) != 0 {
|
||||
buffer.WriteString("|h:")
|
||||
buffer.WriteString(e.Hostname)
|
||||
}
|
||||
|
||||
if len(e.AggregationKey) != 0 {
|
||||
buffer.WriteString("|k:")
|
||||
buffer.WriteString(e.AggregationKey)
|
||||
|
||||
}
|
||||
|
||||
if len(e.Priority) != 0 {
|
||||
buffer.WriteString("|p:")
|
||||
buffer.WriteString(string(e.Priority))
|
||||
}
|
||||
|
||||
if len(e.SourceTypeName) != 0 {
|
||||
buffer.WriteString("|s:")
|
||||
buffer.WriteString(e.SourceTypeName)
|
||||
}
|
||||
|
||||
if len(e.AlertType) != 0 {
|
||||
buffer.WriteString("|t:")
|
||||
buffer.WriteString(string(e.AlertType))
|
||||
}
|
||||
|
||||
if len(tags)+len(e.Tags) > 0 {
|
||||
all := make([]string, 0, len(tags)+len(e.Tags))
|
||||
all = append(all, tags...)
|
||||
all = append(all, e.Tags...)
|
||||
buffer.WriteString("|#")
|
||||
buffer.WriteString(all[0])
|
||||
for _, tag := range all[1:] {
|
||||
buffer.WriteString(",")
|
||||
buffer.WriteString(tag)
|
||||
}
|
||||
}
|
||||
|
||||
return buffer.String(), nil
|
||||
}
|
||||
|
||||
// ServiceCheck support
|
||||
|
||||
type serviceCheckStatus byte
|
||||
|
||||
const (
|
||||
// Ok is the "ok" ServiceCheck status
|
||||
Ok serviceCheckStatus = 0
|
||||
// Warn is the "warning" ServiceCheck status
|
||||
Warn serviceCheckStatus = 1
|
||||
// Critical is the "critical" ServiceCheck status
|
||||
Critical serviceCheckStatus = 2
|
||||
// Unknown is the "unknown" ServiceCheck status
|
||||
Unknown serviceCheckStatus = 3
|
||||
)
|
||||
|
||||
// An ServiceCheck is an object that contains status of DataDog service check.
|
||||
type ServiceCheck struct {
|
||||
// Name of the service check. Required.
|
||||
Name string
|
||||
// Status of service check. Required.
|
||||
Status serviceCheckStatus
|
||||
// Timestamp is a timestamp for the serviceCheck. If not provided, the dogstatsd
|
||||
// server will set this to the current time.
|
||||
Timestamp time.Time
|
||||
// Hostname for the serviceCheck.
|
||||
Hostname string
|
||||
// A message describing the current state of the serviceCheck.
|
||||
Message string
|
||||
// Tags for the serviceCheck.
|
||||
Tags []string
|
||||
}
|
||||
|
||||
// NewServiceCheck creates a new serviceCheck with the given name and status. Error checking
|
||||
// against these values is done at send-time, or upon running sc.Check.
|
||||
func NewServiceCheck(name string, status serviceCheckStatus) *ServiceCheck {
|
||||
return &ServiceCheck{
|
||||
Name: name,
|
||||
Status: status,
|
||||
}
|
||||
}
|
||||
|
||||
// Check verifies that an event is valid.
|
||||
func (sc ServiceCheck) Check() error {
|
||||
if len(sc.Name) == 0 {
|
||||
return fmt.Errorf("statsd.ServiceCheck name is required")
|
||||
}
|
||||
if byte(sc.Status) < 0 || byte(sc.Status) > 3 {
|
||||
return fmt.Errorf("statsd.ServiceCheck status has invalid value")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Encode returns the dogstatsd wire protocol representation for an serviceCheck.
|
||||
// Tags may be passed which will be added to the encoded output but not to
|
||||
// the Event's list of tags, eg. for default tags.
|
||||
func (sc ServiceCheck) Encode(tags ...string) (string, error) {
|
||||
err := sc.Check()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
message := sc.escapedMessage()
|
||||
|
||||
var buffer bytes.Buffer
|
||||
buffer.WriteString("_sc|")
|
||||
buffer.WriteString(sc.Name)
|
||||
buffer.WriteRune('|')
|
||||
buffer.WriteString(strconv.FormatInt(int64(sc.Status), 10))
|
||||
|
||||
if !sc.Timestamp.IsZero() {
|
||||
buffer.WriteString("|d:")
|
||||
buffer.WriteString(strconv.FormatInt(int64(sc.Timestamp.Unix()), 10))
|
||||
}
|
||||
|
||||
if len(sc.Hostname) != 0 {
|
||||
buffer.WriteString("|h:")
|
||||
buffer.WriteString(sc.Hostname)
|
||||
}
|
||||
|
||||
if len(tags)+len(sc.Tags) > 0 {
|
||||
all := make([]string, 0, len(tags)+len(sc.Tags))
|
||||
all = append(all, tags...)
|
||||
all = append(all, sc.Tags...)
|
||||
buffer.WriteString("|#")
|
||||
buffer.WriteString(all[0])
|
||||
for _, tag := range all[1:] {
|
||||
buffer.WriteString(",")
|
||||
buffer.WriteString(tag)
|
||||
}
|
||||
}
|
||||
|
||||
if len(message) != 0 {
|
||||
buffer.WriteString("|m:")
|
||||
buffer.WriteString(message)
|
||||
}
|
||||
|
||||
return buffer.String(), nil
|
||||
}
|
||||
|
||||
func (e Event) escapedText() string {
|
||||
return strings.Replace(e.Text, "\n", "\\n", -1)
|
||||
}
|
||||
|
||||
func (sc ServiceCheck) escapedMessage() string {
|
||||
msg := strings.Replace(sc.Message, "\n", "\\n", -1)
|
||||
return strings.Replace(msg, "m:", `m\:`, -1)
|
||||
}
|
|
@ -0,0 +1,125 @@
|
|||
package datadog
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/DataDog/datadog-go/statsd"
|
||||
)
|
||||
|
||||
// DogStatsdSink provides a MetricSink that can be used
|
||||
// with a dogstatsd server. It utilizes the Dogstatsd client at github.com/DataDog/datadog-go/statsd
|
||||
type DogStatsdSink struct {
|
||||
client *statsd.Client
|
||||
hostName string
|
||||
propagateHostname bool
|
||||
}
|
||||
|
||||
// NewDogStatsdSink is used to create a new DogStatsdSink with sane defaults
|
||||
func NewDogStatsdSink(addr string, hostName string) (*DogStatsdSink, error) {
|
||||
client, err := statsd.New(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sink := &DogStatsdSink{
|
||||
client: client,
|
||||
hostName: hostName,
|
||||
propagateHostname: false,
|
||||
}
|
||||
return sink, nil
|
||||
}
|
||||
|
||||
// SetTags sets common tags on the Dogstatsd Client that will be sent
|
||||
// along with all dogstatsd packets.
|
||||
// Ref: http://docs.datadoghq.com/guides/dogstatsd/#tags
|
||||
func (s *DogStatsdSink) SetTags(tags []string) {
|
||||
s.client.Tags = tags
|
||||
}
|
||||
|
||||
// EnableHostnamePropagation forces a Dogstatsd `host` tag with the value specified by `s.HostName`
|
||||
// Since the go-metrics package has its own mechanism for attaching a hostname to metrics,
|
||||
// setting the `propagateHostname` flag ensures that `s.HostName` overrides the host tag naively set by the DogStatsd server
|
||||
func (s *DogStatsdSink) EnableHostNamePropagation() {
|
||||
s.propagateHostname = true
|
||||
}
|
||||
|
||||
func (s *DogStatsdSink) flattenKey(parts []string) string {
|
||||
joined := strings.Join(parts, ".")
|
||||
return strings.Map(func(r rune) rune {
|
||||
switch r {
|
||||
case ':':
|
||||
fallthrough
|
||||
case ' ':
|
||||
return '_'
|
||||
default:
|
||||
return r
|
||||
}
|
||||
}, joined)
|
||||
}
|
||||
|
||||
func (s *DogStatsdSink) parseKey(key []string) ([]string, []string) {
|
||||
// Since DogStatsd supports dimensionality via tags on metric keys, this sink's approach is to splice the hostname out of the key in favor of a `host` tag
|
||||
// The `host` tag is either forced here, or set downstream by the DogStatsd server
|
||||
|
||||
var tags []string
|
||||
hostName := s.hostName
|
||||
|
||||
//Splice the hostname out of the key
|
||||
for i, el := range key {
|
||||
if el == hostName {
|
||||
key = append(key[:i], key[i+1:]...)
|
||||
}
|
||||
}
|
||||
|
||||
if s.propagateHostname {
|
||||
tags = append(tags, fmt.Sprintf("host:%s", hostName))
|
||||
}
|
||||
return key, tags
|
||||
}
|
||||
|
||||
// Implementation of methods in the MetricSink interface
|
||||
|
||||
func (s *DogStatsdSink) SetGauge(key []string, val float32) {
|
||||
s.SetGaugeWithTags(key, val, []string{})
|
||||
}
|
||||
|
||||
func (s *DogStatsdSink) IncrCounter(key []string, val float32) {
|
||||
s.IncrCounterWithTags(key, val, []string{})
|
||||
}
|
||||
|
||||
// EmitKey is not implemented since DogStatsd does not provide a metric type that holds an
|
||||
// arbitrary number of values
|
||||
func (s *DogStatsdSink) EmitKey(key []string, val float32) {
|
||||
}
|
||||
|
||||
func (s *DogStatsdSink) AddSample(key []string, val float32) {
|
||||
s.AddSampleWithTags(key, val, []string{})
|
||||
}
|
||||
|
||||
// The following ...WithTags methods correspond to Datadog's Tag extension to Statsd.
|
||||
// http://docs.datadoghq.com/guides/dogstatsd/#tags
|
||||
|
||||
func (s *DogStatsdSink) SetGaugeWithTags(key []string, val float32, tags []string) {
|
||||
flatKey, tags := s.getFlatkeyAndCombinedTags(key, tags)
|
||||
rate := 1.0
|
||||
s.client.Gauge(flatKey, float64(val), tags, rate)
|
||||
}
|
||||
|
||||
func (s *DogStatsdSink) IncrCounterWithTags(key []string, val float32, tags []string) {
|
||||
flatKey, tags := s.getFlatkeyAndCombinedTags(key, tags)
|
||||
rate := 1.0
|
||||
s.client.Count(flatKey, int64(val), tags, rate)
|
||||
}
|
||||
|
||||
func (s *DogStatsdSink) AddSampleWithTags(key []string, val float32, tags []string) {
|
||||
flatKey, tags := s.getFlatkeyAndCombinedTags(key, tags)
|
||||
rate := 1.0
|
||||
s.client.TimeInMilliseconds(flatKey, float64(val), tags, rate)
|
||||
}
|
||||
|
||||
func (s *DogStatsdSink) getFlatkeyAndCombinedTags(key []string, tags []string) (flattenedKey string, combinedTags []string) {
|
||||
key, hostTags := s.parseKey(key)
|
||||
flatKey := s.flattenKey(key)
|
||||
tags = append(tags, hostTags...)
|
||||
return flatKey, tags
|
||||
}
|
|
@ -6,6 +6,12 @@
|
|||
"path": "context",
|
||||
"revision": ""
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "CMcHvqld6XBCDYA+9jCy6gT1O0Q=",
|
||||
"path": "github.com/DataDog/datadog-go/statsd",
|
||||
"revision": "909c02b65dd8a52e8fa6072db9752a112227cf21",
|
||||
"revisionTime": "2016-08-22T16:14:30Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "VAQq2NnE8Qhzdf1BL00XgZNFwyY=",
|
||||
"path": "github.com/Microsoft/go-winio",
|
||||
|
@ -47,6 +53,12 @@
|
|||
"revision": "3df31a1ada83e310c2e24b267c8e8b68836547b4",
|
||||
"revisionTime": "2016-07-17T04:34:58Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "mAzNU3zeZGEwqjDT4ZkspFvx3TI=",
|
||||
"path": "github.com/armon/go-metrics/datadog",
|
||||
"revision": "3df31a1ada83e310c2e24b267c8e8b68836547b4",
|
||||
"revisionTime": "2016-07-17T04:34:58Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "gNO0JNpLzYOdInGeq7HqMZUzx9M=",
|
||||
"path": "github.com/armon/go-radix",
|
||||
|
@ -246,14 +258,14 @@
|
|||
{
|
||||
"checksumSHA1": "iP5slJJPRZUm0rfdII8OiATAACA=",
|
||||
"path": "github.com/docker/docker/pkg/idtools",
|
||||
"revision": "02caa73df411debed164f520a6a1304778f8b88c",
|
||||
"revisionTime": "2016-05-28T10:48:36Z"
|
||||
"revision": "52debcd58ac91bf68503ce60561536911b74ff05",
|
||||
"revisionTime": "2016-05-20T15:17:10Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "iP5slJJPRZUm0rfdII8OiATAACA=",
|
||||
"path": "github.com/docker/docker/pkg/idtools",
|
||||
"revision": "52debcd58ac91bf68503ce60561536911b74ff05",
|
||||
"revisionTime": "2016-05-20T15:17:10Z"
|
||||
"revision": "02caa73df411debed164f520a6a1304778f8b88c",
|
||||
"revisionTime": "2016-05-28T10:48:36Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "tdhmIGUaoOMEDymMC23qTS7bt0g=",
|
||||
|
|
Loading…
Reference in New Issue