// Copyright 2013 Ooyala, Inc. /* Package statsd provides a Go dogstatsd client. Dogstatsd extends the popular statsd, adding tags and histograms and pushing upstream to Datadog. Refer to http://docs.datadoghq.com/guides/dogstatsd/ for information about DogStatsD. Example Usage: // Create the client c, err := statsd.New("127.0.0.1:8125") if err != nil { log.Fatal(err) } // Prefix every metric with the app name c.Namespace = "flubber." // Send the EC2 availability zone as a tag with every metric c.Tags = append(c.Tags, "us-east-1a") err = c.Gauge("request.duration", 1.2, nil, 1) statsd is based on go-statsd-client. */ package statsd import ( "bytes" "errors" "fmt" "io" "math/rand" "os" "strconv" "strings" "sync" "time" ) /* OptimalPayloadSize defines the optimal payload size for a UDP datagram, 1432 bytes is optimal for regular networks with an MTU of 1500 so datagrams don't get fragmented. It's generally recommended not to fragment UDP datagrams as losing a single fragment will cause the entire datagram to be lost. This can be increased if your network has a greater MTU or you don't mind UDP datagrams getting fragmented. The practical limit is MaxUDPPayloadSize */ const OptimalPayloadSize = 1432 /* MaxUDPPayloadSize defines the maximum payload size for a UDP datagram. Its value comes from the calculation: 65535 bytes Max UDP datagram size - 8byte UDP header - 60byte max IP headers any number greater than that will see frames being cut out. */ const MaxUDPPayloadSize = 65467 /* UnixAddressPrefix holds the prefix to use to enable Unix Domain Socket traffic instead of UDP. */ const UnixAddressPrefix = "unix://" // Client-side entity ID injection for container tagging const ( entityIDEnvName = "DD_ENTITY_ID" entityIDTagName = "dd.internal.entity_id" ) /* Stat suffixes */ var ( gaugeSuffix = []byte("|g") countSuffix = []byte("|c") histogramSuffix = []byte("|h") distributionSuffix = []byte("|d") decrSuffix = []byte("-1|c") incrSuffix = []byte("1|c") setSuffix = []byte("|s") timingSuffix = []byte("|ms") ) // A statsdWriter offers a standard interface regardless of the underlying // protocol. For now UDS and UPD writers are available. type statsdWriter interface { Write(data []byte) (n int, err error) SetWriteTimeout(time.Duration) error Close() error } // A Client is a handle for sending messages to dogstatsd. It is safe to // use one Client from multiple goroutines simultaneously. type Client struct { // Writer handles the underlying networking protocol writer statsdWriter // Namespace to prepend to all statsd calls Namespace string // Tags are global tags to be added to every statsd call Tags []string // skipErrors turns off error passing and allows UDS to emulate UDP behaviour SkipErrors bool // BufferLength is the length of the buffer in commands. bufferLength int flushTime time.Duration commands [][]byte buffer bytes.Buffer stop chan struct{} sync.Mutex } // New returns a pointer to a new Client given an addr in the format "hostname:port" or // "unix:///path/to/socket". func New(addr string, options ...Option) (*Client, error) { o, err := resolveOptions(options) if err != nil { return nil, err } var w statsdWriter if !strings.HasPrefix(addr, UnixAddressPrefix) { w, err = newUDPWriter(addr) } else if o.AsyncUDS { w, err = newAsyncUdsWriter(addr[len(UnixAddressPrefix)-1:]) } else { w, err = newBlockingUdsWriter(addr[len(UnixAddressPrefix)-1:]) } if err != nil { return nil, err } w.SetWriteTimeout(o.WriteTimeoutUDS) c := Client{ Namespace: o.Namespace, Tags: o.Tags, writer: w, } // Inject DD_ENTITY_ID as a constant tag if found entityID := os.Getenv(entityIDEnvName) if entityID != "" { entityTag := fmt.Sprintf("%s:%s", entityIDTagName, entityID) c.Tags = append(c.Tags, entityTag) } if o.Buffered { c.bufferLength = o.MaxMessagesPerPayload c.commands = make([][]byte, 0, o.MaxMessagesPerPayload) c.flushTime = time.Millisecond * 100 c.stop = make(chan struct{}, 1) go c.watch() } return &c, nil } // NewWithWriter creates a new Client with given writer. Writer is a // io.WriteCloser + SetWriteTimeout(time.Duration) error func NewWithWriter(w statsdWriter) (*Client, error) { client := &Client{writer: w, SkipErrors: false} // Inject DD_ENTITY_ID as a constant tag if found entityID := os.Getenv(entityIDEnvName) if entityID != "" { entityTag := fmt.Sprintf("%s:%s", entityIDTagName, entityID) client.Tags = append(client.Tags, entityTag) } return client, nil } // NewBuffered returns a Client that buffers its output and sends it in chunks. // Buflen is the length of the buffer in number of commands. // // When addr is empty, the client will default to a UDP client and use the DD_AGENT_HOST // and (optionally) the DD_DOGSTATSD_PORT environment variables to build the target address. func NewBuffered(addr string, buflen int) (*Client, error) { return New(addr, Buffered(), WithMaxMessagesPerPayload(buflen)) } // format a message from its name, value, tags and rate. Also adds global // namespace and tags. func (c *Client) format(name string, value interface{}, suffix []byte, tags []string, rate float64) []byte { // preallocated buffer, stack allocated as long as it doesn't escape buf := make([]byte, 0, 200) if c.Namespace != "" { buf = append(buf, c.Namespace...) } buf = append(buf, name...) buf = append(buf, ':') switch val := value.(type) { case float64: buf = strconv.AppendFloat(buf, val, 'f', 6, 64) case int64: buf = strconv.AppendInt(buf, val, 10) case string: buf = append(buf, val...) default: // do nothing } buf = append(buf, suffix...) if rate < 1 { buf = append(buf, "|@"...) buf = strconv.AppendFloat(buf, rate, 'f', -1, 64) } buf = appendTagString(buf, c.Tags, tags) // non-zeroing copy to avoid referencing a larger than necessary underlying array return append([]byte(nil), buf...) } // SetWriteTimeout allows the user to set a custom UDS write timeout. Not supported for UDP. func (c *Client) SetWriteTimeout(d time.Duration) error { if c == nil { return fmt.Errorf("Client is nil") } return c.writer.SetWriteTimeout(d) } func (c *Client) watch() { ticker := time.NewTicker(c.flushTime) for { select { case <-ticker.C: c.Lock() if len(c.commands) > 0 { // FIXME: eating error here c.flushLocked() } c.Unlock() case <-c.stop: ticker.Stop() return } } } func (c *Client) append(cmd []byte) error { c.Lock() defer c.Unlock() c.commands = append(c.commands, cmd) // if we should flush, lets do it if len(c.commands) == c.bufferLength { if err := c.flushLocked(); err != nil { return err } } return nil } func (c *Client) joinMaxSize(cmds [][]byte, sep string, maxSize int) ([][]byte, []int) { c.buffer.Reset() //clear buffer var frames [][]byte var ncmds []int sepBytes := []byte(sep) sepLen := len(sep) elem := 0 for _, cmd := range cmds { needed := len(cmd) if elem != 0 { needed = needed + sepLen } if c.buffer.Len()+needed <= maxSize { if elem != 0 { c.buffer.Write(sepBytes) } c.buffer.Write(cmd) elem++ } else { frames = append(frames, copyAndResetBuffer(&c.buffer)) ncmds = append(ncmds, elem) // if cmd is bigger than maxSize it will get flushed on next loop c.buffer.Write(cmd) elem = 1 } } //add whatever is left! if there's actually something if c.buffer.Len() > 0 { frames = append(frames, copyAndResetBuffer(&c.buffer)) ncmds = append(ncmds, elem) } return frames, ncmds } func copyAndResetBuffer(buf *bytes.Buffer) []byte { tmpBuf := make([]byte, buf.Len()) copy(tmpBuf, buf.Bytes()) buf.Reset() return tmpBuf } // Flush forces a flush of the pending commands in the buffer func (c *Client) Flush() error { if c == nil { return fmt.Errorf("Client is nil") } c.Lock() defer c.Unlock() return c.flushLocked() } // flush the commands in the buffer. Lock must be held by caller. func (c *Client) flushLocked() error { frames, flushable := c.joinMaxSize(c.commands, "\n", OptimalPayloadSize) var err error cmdsFlushed := 0 for i, data := range frames { _, e := c.writer.Write(data) if e != nil { err = e break } cmdsFlushed += flushable[i] } // clear the slice with a slice op, doesn't realloc if cmdsFlushed == len(c.commands) { c.commands = c.commands[:0] } else { //this case will cause a future realloc... // drop problematic command though (sorry). c.commands = c.commands[cmdsFlushed+1:] } return err } func (c *Client) sendMsg(msg []byte) error { // return an error if message is bigger than MaxUDPPayloadSize if len(msg) > MaxUDPPayloadSize { return errors.New("message size exceeds MaxUDPPayloadSize") } // if this client is buffered, then we'll just append this if c.bufferLength > 0 { return c.append(msg) } _, err := c.writer.Write(msg) if c.SkipErrors { return nil } return err } // send handles sampling and sends the message over UDP. It also adds global namespace prefixes and tags. func (c *Client) send(name string, value interface{}, suffix []byte, tags []string, rate float64) error { if c == nil { return fmt.Errorf("Client is nil") } if rate < 1 && rand.Float64() > rate { return nil } data := c.format(name, value, suffix, tags, rate) return c.sendMsg(data) } // Gauge measures the value of a metric at a particular time. func (c *Client) Gauge(name string, value float64, tags []string, rate float64) error { return c.send(name, value, gaugeSuffix, tags, rate) } // Count tracks how many times something happened per second. func (c *Client) Count(name string, value int64, tags []string, rate float64) error { return c.send(name, value, countSuffix, tags, rate) } // Histogram tracks the statistical distribution of a set of values on each host. func (c *Client) Histogram(name string, value float64, tags []string, rate float64) error { return c.send(name, value, histogramSuffix, tags, rate) } // Distribution tracks the statistical distribution of a set of values across your infrastructure. func (c *Client) Distribution(name string, value float64, tags []string, rate float64) error { return c.send(name, value, distributionSuffix, tags, rate) } // Decr is just Count of -1 func (c *Client) Decr(name string, tags []string, rate float64) error { return c.send(name, nil, decrSuffix, tags, rate) } // Incr is just Count of 1 func (c *Client) Incr(name string, tags []string, rate float64) error { return c.send(name, nil, incrSuffix, tags, rate) } // Set counts the number of unique elements in a group. func (c *Client) Set(name string, value string, tags []string, rate float64) error { return c.send(name, value, setSuffix, tags, rate) } // Timing sends timing information, it is an alias for TimeInMilliseconds func (c *Client) Timing(name string, value time.Duration, tags []string, rate float64) error { return c.TimeInMilliseconds(name, value.Seconds()*1000, tags, rate) } // TimeInMilliseconds sends timing information in milliseconds. // It is flushed by statsd with percentiles, mean and other info (https://github.com/etsy/statsd/blob/master/docs/metric_types.md#timing) func (c *Client) TimeInMilliseconds(name string, value float64, tags []string, rate float64) error { return c.send(name, value, timingSuffix, tags, rate) } // Event sends the provided Event. func (c *Client) Event(e *Event) error { if c == nil { return fmt.Errorf("Client is nil") } stat, err := e.Encode(c.Tags...) if err != nil { return err } return c.sendMsg([]byte(stat)) } // SimpleEvent sends an event with the provided title and text. func (c *Client) SimpleEvent(title, text string) error { e := NewEvent(title, text) return c.Event(e) } // ServiceCheck sends the provided ServiceCheck. func (c *Client) ServiceCheck(sc *ServiceCheck) error { if c == nil { return fmt.Errorf("Client is nil") } stat, err := sc.Encode(c.Tags...) if err != nil { return err } return c.sendMsg([]byte(stat)) } // SimpleServiceCheck sends an serviceCheck with the provided name and status. func (c *Client) SimpleServiceCheck(name string, status ServiceCheckStatus) error { sc := NewServiceCheck(name, status) return c.ServiceCheck(sc) } // Close the client connection. func (c *Client) Close() error { if c == nil { return fmt.Errorf("Client is nil") } select { case c.stop <- struct{}{}: default: } // if this client is buffered, flush before closing the writer if c.bufferLength > 0 { if err := c.Flush(); err != nil { return err } } return c.writer.Close() } // Events support // EventAlertType and EventAlertPriority became exported types after this issue was submitted: https://github.com/DataDog/datadog-go/issues/41 // The reason why they got exported is so that client code can directly use the types. // EventAlertType is the alert type for events type EventAlertType string const ( // Info is the "info" AlertType for events Info EventAlertType = "info" // Error is the "error" AlertType for events Error EventAlertType = "error" // Warning is the "warning" AlertType for events Warning EventAlertType = "warning" // Success is the "success" AlertType for events Success EventAlertType = "success" ) // EventPriority is the event priority for events type EventPriority string const ( // Normal is the "normal" Priority for events Normal EventPriority = "normal" // Low is the "low" Priority for events Low EventPriority = "low" ) // An Event is an object that can be posted to your DataDog event stream. type Event struct { // Title of the event. Required. Title string // Text is the description of the event. Required. Text string // Timestamp is a timestamp for the event. If not provided, the dogstatsd // server will set this to the current time. Timestamp time.Time // Hostname for the event. Hostname string // AggregationKey groups this event with others of the same key. AggregationKey string // Priority of the event. Can be statsd.Low or statsd.Normal. Priority EventPriority // SourceTypeName is a source type for the event. SourceTypeName string // AlertType can be statsd.Info, statsd.Error, statsd.Warning, or statsd.Success. // If absent, the default value applied by the dogstatsd server is Info. AlertType EventAlertType // Tags for the event. Tags []string } // NewEvent creates a new event with the given title and text. Error checking // against these values is done at send-time, or upon running e.Check. func NewEvent(title, text string) *Event { return &Event{ Title: title, Text: text, } } // Check verifies that an event is valid. func (e Event) Check() error { if len(e.Title) == 0 { return fmt.Errorf("statsd.Event title is required") } if len(e.Text) == 0 { return fmt.Errorf("statsd.Event text is required") } return nil } // Encode returns the dogstatsd wire protocol representation for an event. // Tags may be passed which will be added to the encoded output but not to // the Event's list of tags, eg. for default tags. func (e Event) Encode(tags ...string) (string, error) { err := e.Check() if err != nil { return "", err } text := e.escapedText() var buffer bytes.Buffer buffer.WriteString("_e{") buffer.WriteString(strconv.FormatInt(int64(len(e.Title)), 10)) buffer.WriteRune(',') buffer.WriteString(strconv.FormatInt(int64(len(text)), 10)) buffer.WriteString("}:") buffer.WriteString(e.Title) buffer.WriteRune('|') buffer.WriteString(text) if !e.Timestamp.IsZero() { buffer.WriteString("|d:") buffer.WriteString(strconv.FormatInt(int64(e.Timestamp.Unix()), 10)) } if len(e.Hostname) != 0 { buffer.WriteString("|h:") buffer.WriteString(e.Hostname) } if len(e.AggregationKey) != 0 { buffer.WriteString("|k:") buffer.WriteString(e.AggregationKey) } if len(e.Priority) != 0 { buffer.WriteString("|p:") buffer.WriteString(string(e.Priority)) } if len(e.SourceTypeName) != 0 { buffer.WriteString("|s:") buffer.WriteString(e.SourceTypeName) } if len(e.AlertType) != 0 { buffer.WriteString("|t:") buffer.WriteString(string(e.AlertType)) } writeTagString(&buffer, tags, e.Tags) return buffer.String(), nil } // ServiceCheckStatus support type ServiceCheckStatus byte const ( // Ok is the "ok" ServiceCheck status Ok ServiceCheckStatus = 0 // Warn is the "warning" ServiceCheck status Warn ServiceCheckStatus = 1 // Critical is the "critical" ServiceCheck status Critical ServiceCheckStatus = 2 // Unknown is the "unknown" ServiceCheck status Unknown ServiceCheckStatus = 3 ) // An ServiceCheck is an object that contains status of DataDog service check. type ServiceCheck struct { // Name of the service check. Required. Name string // Status of service check. Required. Status ServiceCheckStatus // Timestamp is a timestamp for the serviceCheck. If not provided, the dogstatsd // server will set this to the current time. Timestamp time.Time // Hostname for the serviceCheck. Hostname string // A message describing the current state of the serviceCheck. Message string // Tags for the serviceCheck. Tags []string } // NewServiceCheck creates a new serviceCheck with the given name and status. Error checking // against these values is done at send-time, or upon running sc.Check. func NewServiceCheck(name string, status ServiceCheckStatus) *ServiceCheck { return &ServiceCheck{ Name: name, Status: status, } } // Check verifies that an event is valid. func (sc ServiceCheck) Check() error { if len(sc.Name) == 0 { return fmt.Errorf("statsd.ServiceCheck name is required") } if byte(sc.Status) < 0 || byte(sc.Status) > 3 { return fmt.Errorf("statsd.ServiceCheck status has invalid value") } return nil } // Encode returns the dogstatsd wire protocol representation for an serviceCheck. // Tags may be passed which will be added to the encoded output but not to // the Event's list of tags, eg. for default tags. func (sc ServiceCheck) Encode(tags ...string) (string, error) { err := sc.Check() if err != nil { return "", err } message := sc.escapedMessage() var buffer bytes.Buffer buffer.WriteString("_sc|") buffer.WriteString(sc.Name) buffer.WriteRune('|') buffer.WriteString(strconv.FormatInt(int64(sc.Status), 10)) if !sc.Timestamp.IsZero() { buffer.WriteString("|d:") buffer.WriteString(strconv.FormatInt(int64(sc.Timestamp.Unix()), 10)) } if len(sc.Hostname) != 0 { buffer.WriteString("|h:") buffer.WriteString(sc.Hostname) } writeTagString(&buffer, tags, sc.Tags) if len(message) != 0 { buffer.WriteString("|m:") buffer.WriteString(message) } return buffer.String(), nil } func (e Event) escapedText() string { return strings.Replace(e.Text, "\n", "\\n", -1) } func (sc ServiceCheck) escapedMessage() string { msg := strings.Replace(sc.Message, "\n", "\\n", -1) return strings.Replace(msg, "m:", `m\:`, -1) } func removeNewlines(str string) string { return strings.Replace(str, "\n", "", -1) } func writeTagString(w io.Writer, tagList1, tagList2 []string) { // the tag lists may be shared with other callers, so we cannot modify // them in any way (which means we cannot append to them either) // therefore we must make an entirely separate copy just for this call totalLen := len(tagList1) + len(tagList2) if totalLen == 0 { return } tags := make([]string, 0, totalLen) tags = append(tags, tagList1...) tags = append(tags, tagList2...) io.WriteString(w, "|#") io.WriteString(w, removeNewlines(tags[0])) for _, tag := range tags[1:] { io.WriteString(w, ",") io.WriteString(w, removeNewlines(tag)) } } func appendTagString(buf []byte, tagList1, tagList2 []string) []byte { if len(tagList1) == 0 { if len(tagList2) == 0 { return buf } tagList1 = tagList2 tagList2 = nil } buf = append(buf, "|#"...) buf = appendWithoutNewlines(buf, tagList1[0]) for _, tag := range tagList1[1:] { buf = append(buf, ',') buf = appendWithoutNewlines(buf, tag) } for _, tag := range tagList2 { buf = append(buf, ',') buf = appendWithoutNewlines(buf, tag) } return buf } func appendWithoutNewlines(buf []byte, s string) []byte { // fastpath for strings without newlines if strings.IndexByte(s, '\n') == -1 { return append(buf, s...) } for _, b := range []byte(s) { if b != '\n' { buf = append(buf, b) } } return buf }