Update metric names and add a legacy config flag

This commit is contained in:
Kyle Havlovitz 2017-10-04 16:43:27 -07:00
parent 23b057f46b
commit 0063516e5e
No known key found for this signature in database
GPG Key ID: 8A5E6B173056AD6C
25 changed files with 419 additions and 13 deletions

View File

@ -144,9 +144,11 @@ func (m *aclManager) lookupACL(a *Agent, id string) (acl.ACL, error) {
}
if cached != nil && time.Now().Before(cached.Expires) {
metrics.IncrCounter([]string{"consul", "acl", "cache_hit"}, 1)
metrics.IncrCounter([]string{"acl", "cache_hit"}, 1)
return cached.ACL, nil
}
metrics.IncrCounter([]string{"consul", "acl", "cache_miss"}, 1)
metrics.IncrCounter([]string{"acl", "cache_miss"}, 1)
// At this point we might have a stale cached ACL, or none at all, so
// try to contact the servers.

View File

@ -437,6 +437,14 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
}
}
// Add a filter rule if needed for enabling the deprecated metric names
enableDeprecatedNames := b.boolVal(c.Telemetry.EnableDeprecatedNames)
if enableDeprecatedNames {
telemetryAllowedPrefixes = append(telemetryAllowedPrefixes, "consul.consul")
} else {
telemetryBlockedPrefixes = append(telemetryBlockedPrefixes, "consul.consul")
}
// raft performance scaling
performanceRaftMultiplier := b.intVal(c.Performance.RaftMultiplier)
if performanceRaftMultiplier < 1 || uint(performanceRaftMultiplier) > consul.MaxRaftMultiplier {

View File

@ -368,6 +368,7 @@ type Telemetry struct {
MetricsPrefix *string `json:"metrics_prefix,omitempty" hcl:"metrics_prefix" mapstructure:"metrics_prefix"`
StatsdAddr *string `json:"statsd_address,omitempty" hcl:"statsd_address" mapstructure:"statsd_address"`
StatsiteAddr *string `json:"statsite_address,omitempty" hcl:"statsite_address" mapstructure:"statsite_address"`
EnableDeprecatedNames *bool `json:"enable_deprecated_names" hcl:"enable_deprecated_names" mapstructure:"enable_deprecated_names"`
}
type Ports struct {

View File

@ -48,17 +48,17 @@ type RuntimeConfig struct {
ConsulSerfWANSuspicionMult int
ConsulServerHealthInterval time.Duration
ACLAgentMasterToken string
ACLAgentToken string
ACLDatacenter string
ACLDefaultPolicy string
ACLDownPolicy string
ACLEnforceVersion8 bool
ACLAgentMasterToken string
ACLAgentToken string
ACLDatacenter string
ACLDefaultPolicy string
ACLDownPolicy string
ACLEnforceVersion8 bool
ACLEnableKeyListPolicy bool
ACLMasterToken string
ACLReplicationToken string
ACLTTL time.Duration
ACLToken string
ACLMasterToken string
ACLReplicationToken string
ACLTTL time.Duration
ACLToken string
AutopilotCleanupDeadServers bool
AutopilotDisableUpgradeMigration bool

View File

@ -1658,8 +1658,8 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
`},
patch: func(rt *RuntimeConfig) {
rt.DataDir = dataDir
rt.TelemetryAllowedPrefixes = []string{"foo"}
rt.TelemetryBlockedPrefixes = []string{"bar"}
rt.TelemetryAllowedPrefixes = append([]string{"foo"}, rt.TelemetryAllowedPrefixes...)
rt.TelemetryBlockedPrefixes = append([]string{"bar"}, rt.TelemetryBlockedPrefixes...)
},
warns: []string{`Filter rule must begin with either '+' or '-': "nix"`},
},
@ -2312,6 +2312,7 @@ func TestFullConfig(t *testing.T) {
"dogstatsd_tags": [ "3N81zSUB","Xtj8AnXZ" ],
"filter_default": true,
"prefix_filter": [ "+oJotS8XJ","-cazlEhGn" ],
"enable_deprecated_names": true,
"metrics_prefix": "ftO6DySn",
"statsd_address": "drce87cy",
"statsite_address": "HpFwKB8R"
@ -2728,6 +2729,7 @@ func TestFullConfig(t *testing.T) {
dogstatsd_tags = [ "3N81zSUB","Xtj8AnXZ" ]
filter_default = true
prefix_filter = [ "+oJotS8XJ","-cazlEhGn" ]
enable_deprecated_names = true
metrics_prefix = "ftO6DySn"
statsd_address = "drce87cy"
statsite_address = "HpFwKB8R"
@ -3266,7 +3268,7 @@ func TestFullConfig(t *testing.T) {
TelemetryDogstatsdAddr: "0wSndumK",
TelemetryDogstatsdTags: []string{"3N81zSUB", "Xtj8AnXZ"},
TelemetryFilterDefault: true,
TelemetryAllowedPrefixes: []string{"oJotS8XJ"},
TelemetryAllowedPrefixes: []string{"oJotS8XJ", "consul.consul"},
TelemetryBlockedPrefixes: []string{"cazlEhGn"},
TelemetryMetricsPrefix: "ftO6DySn",
TelemetryStatsdAddr: "drce87cy",

View File

@ -42,6 +42,7 @@ type aclCacheEntry struct {
// using its replicated ACLs during an outage.
func (s *Server) aclLocalFault(id string) (string, string, error) {
defer metrics.MeasureSince([]string{"consul", "acl", "fault"}, time.Now())
defer metrics.MeasureSince([]string{"acl", "fault"}, time.Now())
// Query the state store.
state := s.fsm.State()
@ -75,6 +76,7 @@ func (s *Server) resolveToken(id string) (acl.ACL, error) {
return nil, nil
}
defer metrics.MeasureSince([]string{"consul", "acl", "resolveToken"}, time.Now())
defer metrics.MeasureSince([]string{"acl", "resolveToken"}, time.Now())
// Handle the anonymous token
if len(id) == 0 {
@ -158,9 +160,11 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
// Check for live cache.
if cached != nil && time.Now().Before(cached.Expires) {
metrics.IncrCounter([]string{"consul", "acl", "cache_hit"}, 1)
metrics.IncrCounter([]string{"acl", "cache_hit"}, 1)
return cached.ACL, nil
}
metrics.IncrCounter([]string{"consul", "acl", "cache_miss"}, 1)
metrics.IncrCounter([]string{"acl", "cache_miss"}, 1)
// Attempt to refresh the policy from the ACL datacenter via an RPC.
args := structs.ACLPolicyRequest{
@ -223,6 +227,7 @@ func (c *aclCache) lookupACL(id, authDC string) (acl.ACL, error) {
// Note we use the local TTL here, so this'll be used for that
// amount of time even once the ACL datacenter becomes available.
metrics.IncrCounter([]string{"consul", "acl", "replication_hit"}, 1)
metrics.IncrCounter([]string{"acl", "replication_hit"}, 1)
reply.ETag = makeACLETag(parent, policy)
reply.TTL = c.config.ACLTTL
reply.Parent = parent

View File

@ -146,6 +146,7 @@ func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error {
return err
}
defer metrics.MeasureSince([]string{"consul", "acl", "apply"}, time.Now())
defer metrics.MeasureSince([]string{"acl", "apply"}, time.Now())
// Verify we are allowed to serve this request
if a.srv.config.ACLDatacenter != a.srv.config.Datacenter {

View File

@ -150,6 +150,7 @@ func (s *Server) fetchLocalACLs() (structs.ACLs, error) {
// have replicated to, so this is expected to block until something changes.
func (s *Server) fetchRemoteACLs(lastRemoteIndex uint64) (*structs.IndexedACLs, error) {
defer metrics.MeasureSince([]string{"consul", "leader", "fetchRemoteACLs"}, time.Now())
defer metrics.MeasureSince([]string{"leader", "fetchRemoteACLs"}, time.Now())
args := structs.DCSpecificRequest{
Datacenter: s.config.ACLDatacenter,
@ -170,6 +171,7 @@ func (s *Server) fetchRemoteACLs(lastRemoteIndex uint64) (*structs.IndexedACLs,
// local ACLs in-line with the remote ACLs from the ACL datacenter.
func (s *Server) updateLocalACLs(changes structs.ACLRequests) error {
defer metrics.MeasureSince([]string{"consul", "leader", "updateLocalACLs"}, time.Now())
defer metrics.MeasureSince([]string{"leader", "updateLocalACLs"}, time.Now())
minTimePerOp := time.Second / time.Duration(s.config.ACLReplicationApplyLimit)
for _, change := range changes {
@ -217,6 +219,7 @@ func (s *Server) replicateACLs(lastRemoteIndex uint64) (uint64, error) {
// periods of time. This metric is a good measure of how expensive the
// replication process is.
defer metrics.MeasureSince([]string{"consul", "leader", "replicateACLs"}, time.Now())
defer metrics.MeasureSince([]string{"leader", "replicateACLs"}, time.Now())
local, err := s.fetchLocalACLs()
if err != nil {

View File

@ -360,10 +360,13 @@ func (s *Server) updateClusterHealth() error {
// Heartbeat a metric for monitoring if we're the leader
if s.IsLeader() {
metrics.SetGauge([]string{"consul", "autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance))
metrics.SetGauge([]string{"autopilot", "failure_tolerance"}, float32(clusterHealth.FailureTolerance))
if clusterHealth.Healthy {
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 1)
metrics.SetGauge([]string{"autopilot", "healthy"}, 1)
} else {
metrics.SetGauge([]string{"consul", "autopilot", "healthy"}, 0)
metrics.SetGauge([]string{"autopilot", "healthy"}, 0)
}
}

View File

@ -25,6 +25,7 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error
return err
}
defer metrics.MeasureSince([]string{"consul", "catalog", "register"}, time.Now())
defer metrics.MeasureSince([]string{"catalog", "register"}, time.Now())
// Verify the args.
if args.Node == "" || args.Address == "" {
@ -114,6 +115,7 @@ func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) e
return err
}
defer metrics.MeasureSince([]string{"consul", "catalog", "deregister"}, time.Now())
defer metrics.MeasureSince([]string{"catalog", "deregister"}, time.Now())
// Verify the args
if args.Node == "" {
@ -272,13 +274,19 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
if err == nil {
metrics.IncrCounterWithLabels([]string{"consul", "catalog", "service", "query"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
metrics.IncrCounterWithLabels([]string{"catalog", "service", "query"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
if args.ServiceTag != "" {
metrics.IncrCounterWithLabels([]string{"consul", "catalog", "service", "query-tag"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}, {Name: "tag", Value: args.ServiceTag}})
metrics.IncrCounterWithLabels([]string{"catalog", "service", "query-tag"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}, {Name: "tag", Value: args.ServiceTag}})
}
if len(reply.ServiceNodes) == 0 {
metrics.IncrCounterWithLabels([]string{"consul", "catalog", "service", "not-found"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
metrics.IncrCounterWithLabels([]string{"catalog", "service", "not-found"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
}
}
return err

View File

@ -240,8 +240,10 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
// Enforce the RPC limit.
metrics.IncrCounter([]string{"consul", "client", "rpc"}, 1)
metrics.IncrCounter([]string{"client", "rpc"}, 1)
if !c.rpcLimiter.Allow() {
metrics.IncrCounter([]string{"consul", "client", "rpc", "exceeded"}, 1)
metrics.IncrCounter([]string{"client", "rpc", "exceeded"}, 1)
return structs.ErrRPCRateExceeded
}
@ -267,8 +269,10 @@ func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io
// Enforce the RPC limit.
metrics.IncrCounter([]string{"consul", "client", "rpc"}, 1)
metrics.IncrCounter([]string{"client", "rpc"}, 1)
if !c.rpcLimiter.Allow() {
metrics.IncrCounter([]string{"consul", "client", "rpc", "exceeded"}, 1)
metrics.IncrCounter([]string{"client", "rpc", "exceeded"}, 1)
return structs.ErrRPCRateExceeded
}

View File

@ -125,6 +125,7 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
func (c *consulFSM) applyRegister(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"consul", "fsm", "register"}, time.Now())
defer metrics.MeasureSince([]string{"fsm", "register"}, time.Now())
var req structs.RegisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
@ -140,6 +141,7 @@ func (c *consulFSM) applyRegister(buf []byte, index uint64) interface{} {
func (c *consulFSM) applyDeregister(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"consul", "fsm", "deregister"}, time.Now())
defer metrics.MeasureSince([]string{"fsm", "deregister"}, time.Now())
var req structs.DeregisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
@ -174,6 +176,8 @@ func (c *consulFSM) applyKVSOperation(buf []byte, index uint64) interface{} {
}
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "kvs"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
defer metrics.MeasureSinceWithLabels([]string{"fsm", "kvs"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
switch req.Op {
case api.KVSet:
return c.state.KVSSet(index, &req.DirEnt)
@ -219,6 +223,8 @@ func (c *consulFSM) applySessionOperation(buf []byte, index uint64) interface{}
}
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "session"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
defer metrics.MeasureSinceWithLabels([]string{"fsm", "session"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
switch req.Op {
case structs.SessionCreate:
if err := c.state.SessionCreate(index, &req.Session); err != nil {
@ -240,6 +246,8 @@ func (c *consulFSM) applyACLOperation(buf []byte, index uint64) interface{} {
}
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "acl"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
switch req.Op {
case structs.ACLBootstrapInit:
enabled, err := c.state.ACLBootstrapInit(index)
@ -272,6 +280,8 @@ func (c *consulFSM) applyTombstoneOperation(buf []byte, index uint64) interface{
}
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "tombstone"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
defer metrics.MeasureSinceWithLabels([]string{"fsm", "tombstone"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
switch req.Op {
case structs.TombstoneReap:
return c.state.ReapTombstones(req.ReapIndex)
@ -291,6 +301,7 @@ func (c *consulFSM) applyCoordinateBatchUpdate(buf []byte, index uint64) interfa
panic(fmt.Errorf("failed to decode batch updates: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "coordinate", "batch-update"}, time.Now())
defer metrics.MeasureSince([]string{"fsm", "coordinate", "batch-update"}, time.Now())
if err := c.state.CoordinateBatchUpdate(index, updates); err != nil {
return err
}
@ -307,6 +318,8 @@ func (c *consulFSM) applyPreparedQueryOperation(buf []byte, index uint64) interf
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "prepared-query"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
defer metrics.MeasureSinceWithLabels([]string{"fsm", "prepared-query"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
switch req.Op {
case structs.PreparedQueryCreate, structs.PreparedQueryUpdate:
return c.state.PreparedQuerySet(index, req.Query)
@ -324,6 +337,7 @@ func (c *consulFSM) applyTxn(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "txn"}, time.Now())
defer metrics.MeasureSince([]string{"fsm", "txn"}, time.Now())
results, errors := c.state.TxnRW(index, req.Ops)
return structs.TxnResponse{
Results: results,
@ -337,6 +351,7 @@ func (c *consulFSM) applyAutopilotUpdate(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "autopilot"}, time.Now())
defer metrics.MeasureSince([]string{"fsm", "autopilot"}, time.Now())
if req.CAS {
act, err := c.state.AutopilotCASConfig(index, req.Config.ModifyIndex, &req.Config)
@ -506,6 +521,7 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
defer metrics.MeasureSince([]string{"consul", "fsm", "persist"}, time.Now())
defer metrics.MeasureSince([]string{"fsm", "persist"}, time.Now())
// Register the nodes
encoder := codec.NewEncoder(sink, msgpackHandle)

View File

@ -141,13 +141,19 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
if err == nil {
metrics.IncrCounterWithLabels([]string{"consul", "health", "service", "query"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
metrics.IncrCounterWithLabels([]string{"health", "service", "query"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
if args.ServiceTag != "" {
metrics.IncrCounterWithLabels([]string{"consul", "health", "service", "query-tag"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}, {Name: "tag", Value: args.ServiceTag}})
metrics.IncrCounterWithLabels([]string{"health", "service", "query-tag"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}, {Name: "tag", Value: args.ServiceTag}})
}
if len(reply.Nodes) == 0 {
metrics.IncrCounterWithLabels([]string{"consul", "health", "service", "not-found"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
metrics.IncrCounterWithLabels([]string{"health", "service", "not-found"}, 1,
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
}
}
return err

View File

@ -82,6 +82,7 @@ func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
return err
}
defer metrics.MeasureSince([]string{"consul", "kvs", "apply"}, time.Now())
defer metrics.MeasureSince([]string{"kvs", "apply"}, time.Now())
// Perform the pre-apply checks.
acl, err := k.srv.resolveToken(args.Token)

View File

@ -100,6 +100,7 @@ RECONCILE:
return
}
metrics.MeasureSince([]string{"consul", "leader", "barrier"}, start)
metrics.MeasureSince([]string{"leader", "barrier"}, start)
// Check if we need to handle initial leadership actions
if !establishedLeader {
@ -411,6 +412,7 @@ func (s *Server) reconcileMember(member serf.Member) error {
return nil
}
defer metrics.MeasureSince([]string{"consul", "leader", "reconcileMember"}, time.Now())
defer metrics.MeasureSince([]string{"leader", "reconcileMember"}, time.Now())
var err error
switch member.Status {
case serf.StatusAlive:
@ -774,6 +776,7 @@ func (s *Server) removeConsulServer(m serf.Member, port int) error {
// to avoid blocking.
func (s *Server) reapTombstones(index uint64) {
defer metrics.MeasureSince([]string{"consul", "leader", "reapTombstones"}, time.Now())
defer metrics.MeasureSince([]string{"leader", "reapTombstones"}, time.Now())
req := structs.TombstoneRequest{
Datacenter: s.config.Datacenter,
Op: structs.TombstoneReap,

View File

@ -33,6 +33,7 @@ func (p *PreparedQuery) Apply(args *structs.PreparedQueryRequest, reply *string)
return err
}
defer metrics.MeasureSince([]string{"consul", "prepared-query", "apply"}, time.Now())
defer metrics.MeasureSince([]string{"prepared-query", "apply"}, time.Now())
// Validate the ID. We must create new IDs before applying to the Raft
// log since it's not deterministic.
@ -287,6 +288,7 @@ func (p *PreparedQuery) Explain(args *structs.PreparedQueryExecuteRequest,
return err
}
defer metrics.MeasureSince([]string{"consul", "prepared-query", "explain"}, time.Now())
defer metrics.MeasureSince([]string{"prepared-query", "explain"}, time.Now())
// We have to do this ourselves since we are not doing a blocking RPC.
p.srv.setQueryMeta(&reply.QueryMeta)
@ -334,6 +336,7 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
return err
}
defer metrics.MeasureSince([]string{"consul", "prepared-query", "execute"}, time.Now())
defer metrics.MeasureSince([]string{"prepared-query", "execute"}, time.Now())
// We have to do this ourselves since we are not doing a blocking RPC.
p.srv.setQueryMeta(&reply.QueryMeta)
@ -444,6 +447,7 @@ func (p *PreparedQuery) ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRe
return err
}
defer metrics.MeasureSince([]string{"consul", "prepared-query", "execute_remote"}, time.Now())
defer metrics.MeasureSince([]string{"prepared-query", "execute_remote"}, time.Now())
// We have to do this ourselves since we are not doing a blocking RPC.
p.srv.setQueryMeta(&reply.QueryMeta)

View File

@ -60,6 +60,7 @@ func (s *Server) listen(listener net.Listener) {
go s.handleConn(conn, false)
metrics.IncrCounter([]string{"consul", "rpc", "accept_conn"}, 1)
metrics.IncrCounter([]string{"rpc", "accept_conn"}, 1)
}
}
@ -97,6 +98,7 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) {
case pool.RPCRaft:
metrics.IncrCounter([]string{"consul", "rpc", "raft_handoff"}, 1)
metrics.IncrCounter([]string{"rpc", "raft_handoff"}, 1)
s.raftLayer.Handoff(conn)
case pool.RPCTLS:
@ -155,10 +157,12 @@ func (s *Server) handleConsulConn(conn net.Conn) {
if err != io.EOF && !strings.Contains(err.Error(), "closed") {
s.logger.Printf("[ERR] consul.rpc: RPC error: %v %s", err, logConn(conn))
metrics.IncrCounter([]string{"consul", "rpc", "request_error"}, 1)
metrics.IncrCounter([]string{"rpc", "request_error"}, 1)
}
return
}
metrics.IncrCounter([]string{"consul", "rpc", "request"}, 1)
metrics.IncrCounter([]string{"rpc", "request"}, 1)
}
}
@ -263,6 +267,8 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{
metrics.IncrCounterWithLabels([]string{"consul", "rpc", "cross-dc"}, 1,
[]metrics.Label{{Name: "datacenter", Value: dc}})
metrics.IncrCounterWithLabels([]string{"rpc", "cross-dc"}, 1,
[]metrics.Label{{Name: "datacenter", Value: dc}})
if err := s.connPool.RPC(dc, server.Addr, server.Version, method, server.UseTLS, args, reply); err != nil {
manager.NotifyFailedServer(server)
s.logger.Printf("[ERR] consul: RPC failed to server %s in DC %q: %v", server.Addr, dc, err)
@ -372,6 +378,7 @@ RUN_QUERY:
// Run the query.
metrics.IncrCounter([]string{"consul", "rpc", "query"}, 1)
metrics.IncrCounter([]string{"rpc", "query"}, 1)
// Operate on a consistent set of state. This makes sure that the
// abandon channel goes with the state that the caller is using to
@ -422,6 +429,7 @@ func (s *Server) setQueryMeta(m *structs.QueryMeta) {
// read. This is done by verifying leadership before the read.
func (s *Server) consistentRead() error {
defer metrics.MeasureSince([]string{"consul", "rpc", "consistentRead"}, time.Now())
defer metrics.MeasureSince([]string{"rpc", "consistentRead"}, time.Now())
future := s.raft.VerifyLeader()
if err := future.Error(); err != nil {
return err //fail fast if leader verification fails

View File

@ -60,6 +60,7 @@ func (s *Server) floodSegments(config *Config) {
// left nodes are de-registered.
func (s *Server) reconcile() (err error) {
defer metrics.MeasureSince([]string{"consul", "leader", "reconcile"}, time.Now())
defer metrics.MeasureSince([]string{"leader", "reconcile"}, time.Now())
members := s.serfLAN.Members()
knownMembers := make(map[string]struct{})
for _, member := range members {

View File

@ -24,6 +24,7 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error {
return err
}
defer metrics.MeasureSince([]string{"consul", "session", "apply"}, time.Now())
defer metrics.MeasureSince([]string{"session", "apply"}, time.Now())
// Verify the args
if args.Session.ID == "" && args.Op == structs.SessionDestroy {
@ -222,6 +223,7 @@ func (s *Session) Renew(args *structs.SessionSpecificRequest,
return err
}
defer metrics.MeasureSince([]string{"consul", "session", "renew"}, time.Now())
defer metrics.MeasureSince([]string{"session", "renew"}, time.Now())
// Get the session, from local state.
state := s.srv.fsm.State()

View File

@ -85,6 +85,7 @@ func (s *Server) createSessionTimer(id string, ttl time.Duration) {
// need to invalidate the session.
func (s *Server) invalidateSession(id string) {
defer metrics.MeasureSince([]string{"consul", "session_ttl", "invalidate"}, time.Now())
defer metrics.MeasureSince([]string{"session_ttl", "invalidate"}, time.Now())
// Clear the session timer
s.sessionTimers.Del(id)
@ -134,6 +135,7 @@ func (s *Server) sessionStats() {
select {
case <-time.After(5 * time.Second):
metrics.SetGauge([]string{"consul", "session_ttl", "active"}, float32(s.sessionTimers.Len()))
metrics.SetGauge([]string{"session_ttl", "active"}, float32(s.sessionTimers.Len()))
case <-s.shutdownCh:
return

View File

@ -47,6 +47,7 @@ func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error
return err
}
defer metrics.MeasureSince([]string{"consul", "txn", "apply"}, time.Now())
defer metrics.MeasureSince([]string{"txn", "apply"}, time.Now())
// Run the pre-checks before we send the transaction into Raft.
acl, err := t.srv.resolveToken(args.Token)
@ -90,6 +91,7 @@ func (t *Txn) Read(args *structs.TxnReadRequest, reply *structs.TxnReadResponse)
return err
}
defer metrics.MeasureSince([]string{"consul", "txn", "read"}, time.Now())
defer metrics.MeasureSince([]string{"txn", "read"}, time.Now())
// We have to do this ourselves since we are not doing a blocking RPC.
t.srv.setQueryMeta(&reply.QueryMeta)

View File

@ -156,6 +156,8 @@ func (d *DNSServer) handlePtr(resp dns.ResponseWriter, req *dns.Msg) {
defer func(s time.Time) {
metrics.MeasureSinceWithLabels([]string{"consul", "dns", "ptr_query"}, s,
[]metrics.Label{{Name: "node", Value: d.agent.config.NodeName}})
metrics.MeasureSinceWithLabels([]string{"dns", "ptr_query"}, s,
[]metrics.Label{{Name: "node", Value: d.agent.config.NodeName}})
d.logger.Printf("[DEBUG] dns: request for %v (%v) from client %s (%s)",
q, time.Now().Sub(s), resp.RemoteAddr().String(),
resp.RemoteAddr().Network())
@ -226,6 +228,8 @@ func (d *DNSServer) handleQuery(resp dns.ResponseWriter, req *dns.Msg) {
defer func(s time.Time) {
metrics.MeasureSinceWithLabels([]string{"consul", "dns", "domain_query"}, s,
[]metrics.Label{{Name: "node", Value: d.agent.config.NodeName}})
metrics.MeasureSinceWithLabels([]string{"dns", "domain_query"}, s,
[]metrics.Label{{Name: "node", Value: d.agent.config.NodeName}})
d.logger.Printf("[DEBUG] dns: request for %v (%v) from client %s (%s)",
q, time.Now().Sub(s), resp.RemoteAddr().String(),
resp.RemoteAddr().Network())
@ -516,6 +520,7 @@ RPC:
goto RPC
} else if out.LastContact > staleCounterThreshold {
metrics.IncrCounter([]string{"consul", "dns", "stale_queries"}, 1)
metrics.IncrCounter([]string{"dns", "stale_queries"}, 1)
}
}
@ -761,6 +766,7 @@ func (d *DNSServer) lookupServiceNodes(datacenter, service, tag string) (structs
if args.AllowStale && out.LastContact > staleCounterThreshold {
metrics.IncrCounter([]string{"consul", "dns", "stale_queries"}, 1)
metrics.IncrCounter([]string{"dns", "stale_queries"}, 1)
}
// redo the request the response was too stale
@ -887,6 +893,7 @@ RPC:
goto RPC
} else if out.LastContact > staleCounterThreshold {
metrics.IncrCounter([]string{"consul", "dns", "stale_queries"}, 1)
metrics.IncrCounter([]string{"dns", "stale_queries"}, 1)
}
}

View File

@ -223,6 +223,8 @@ func startupTelemetry(conf *config.RuntimeConfig) (*metrics.InmemSink, error) {
metricsConf := metrics.DefaultConfig(conf.TelemetryMetricsPrefix)
metricsConf.EnableHostname = !conf.TelemetryDisableHostname
metricsConf.FilterDefault = conf.TelemetryFilterDefault
metricsConf.AllowedPrefixes = conf.TelemetryAllowedPrefixes
metricsConf.BlockedPrefixes = conf.TelemetryBlockedPrefixes
var sinks metrics.FanoutSink
addSink := func(name string, fn func(*config.RuntimeConfig, string) (metrics.MetricSink, error)) error {

View File

@ -1168,6 +1168,10 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass
is overlap between two rules, the more specific rule will take precedence. Blocking will take priority if the same
prefix is listed multiple times.
* <a name="telemetry-enable_deprecated_names"></a><a href="#telemetry-enable_deprecated_names">`enable_deprecated_names`
</a>Added in Consul 1.0, this enables old metric names of the format `consul.consul...` to be sent alongside
other metrics. Defaults to false.
* <a name="telemetry-statsd_address"></a><a href="#telemetry-statsd_address">`statsd_address`</a> This provides the
address of a statsd instance in the format `host:port`. If provided, Consul will send various telemetry information to that instance for
aggregation. This can be used to capture runtime information. This sends UDP packets only and can be used with

View File

@ -92,6 +92,42 @@ These metrics are used to monitor the health of specific Consul agents.
<td>number of objects</td>
<td>gauge</td>
</tr>
<tr>
<td>`consul.acl.cache_hit`</td>
<td>The number of ACL cache hits.</td>
<td>hits</td>
<td>counter</td>
</tr>
<tr>
<td>`consul.acl.cache_miss`</td>
<td>The number of ACL cache misses.</td>
<td>misses</td>
<td>counter</td>
</tr>
<tr>
<td>`consul.acl.replication_hit`</td>
<td>The number of ACL replication cache hits (when not running in the ACL datacenter).</td>
<td>hits</td>
<td>counter</td>
</tr>
<tr>
<td>`consul.dns.stale_queries`</td>
<td>This increments when an agent serves a query within the allowed stale threshold.</td>
<td>queries</td>
<td>counter</td>
</tr>
<tr>
<td>`consul.dns.ptr_query.<node>`</td>
<td>This measures the time spent handling a reverse DNS query for the given node.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.dns.domain_query.<node>`</td>
<td>This measures the time spent handling a domain query for the given node.</td>
<td>ms</td>
<td>timer</td>
</tr>
</table>
## Server Health
@ -147,6 +183,239 @@ These metrics are used to monitor the health of the Consul servers.
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.acl.apply`</td>
<td>This measures the time it takes to complete an update to the ACL store.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.acl.fault`</td>
<td>This measures the time it takes to fault in the rules for an ACL during a cache miss.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.acl.fetchRemoteACLs`</td>
<td>This measures the time it takes to fetch remote ACLs during replication.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.acl.updateLocalACLs`</td>
<td>This measures the time it takes to apply replication changes to the local ACL store.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.acl.replicateACLs`</td>
<td>This measures the time it takes to do one pass of the ACL replication algorithm.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.acl.resolveToken`</td>
<td>This measures the time it takes to resolve an ACL token.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.rpc.accept_conn`</td>
<td>This increments when a server accepts an RPC connection.</td>
<td>connections</td>
<td>counter</td>
</tr>
<tr>
<td>`consul.catalog.register`</td>
<td>This measures the time it takes to complete a catalog register operation.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.catalog.deregister`</td>
<td>This measures the time it takes to complete a catalog deregister operation.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.fsm.register`</td>
<td>This measures the time it takes to apply a catalog register operation to the FSM.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.fsm.deregister`</td>
<td>This measures the time it takes to apply a catalog deregister operation to the FSM.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.fsm.acl.<op>`</td>
<td>This measures the time it takes to apply the given ACL operation to the FSM.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.fsm.session.<op>`</td>
<td>This measures the time it takes to apply the given session operation to the FSM.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.fsm.kvs.<op>`</td>
<td>This measures the time it takes to apply the given KV operation to the FSM.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.fsm.tombstone.<op>`</td>
<td>This measures the time it takes to apply the given tombstone operation to the FSM.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.fsm.coordinate.batch-update`</td>
<td>This measures the time it takes to apply the given batch coordinate update to the FSM.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.fsm.prepared-query.<op>`</td>
<td>This measures the time it takes to apply the given prepared query update operation to the FSM.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.fsm.txn`</td>
<td>This measures the time it takes to apply the given transaction update to the FSM.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.fsm.autopilot`</td>
<td>This measures the time it takes to apply the given autopilot update to the FSM.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.fsm.persist`</td>
<td>This measures the time it takes to persist the FSM to a raft snapshot.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.kvs.apply`</td>
<td>This measures the time it takes to complete an update to the KV store.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.leader.barrier`</td>
<td>This measures the time spent waiting for the raft barrier upon gaining leadership.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.leader.reconcile`</td>
<td>This measures the time spent updating the raft store from the serf member information.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.leader.reconcileMember`</td>
<td>This measures the time spent updating the raft store for a single serf member's information.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.leader.reapTombstones`</td>
<td>This measures the time spent clearing tombstones.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.prepared-query.apply`</td>
<td>This measures the time it takes to apply a prepared query update.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.prepared-query.explain`</td>
<td>This measures the time it takes to process a prepared query explain request.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.prepared-query.execute`</td>
<td>This measures the time it takes to process a prepared query execute request.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.prepared-query.execute`</td>
<td>This measures the time it takes to process a prepared query execute request that was forwarded to another datacenter.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.rpc.raft_handoff`</td>
<td>This increments when a server accepts a Raft-related RPC connection.</td>
<td>connections</td>
<td>counter</td>
</tr>
<tr>
<td>`consul.rpc.request_error`</td>
<td>This increments when a server returns an error from an RPC request.</td>
<td>errors</td>
<td>counter</td>
</tr>
<tr>
<td>`consul.rpc.request`</td>
<td>This increments when a server receives a Consul-related RPC request.</td>
<td>requests</td>
<td>counter</td>
</tr>
<tr>
<td>`consul.rpc.query`</td>
<td>This increments when a server receives a (potentially blocking) RPC query.</td>
<td>queries</td>
<td>counter</td>
</tr>
<tr>
<td>`consul.rpc.consistentRead`</td>
<td>This measures the time spent confirming that a consistent read can be performed.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.session.apply`</td>
<td>This measures the time spent applying a session update.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.session.renew`</td>
<td>This measures the time spent renewing a session.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.session_ttl.invalidate`</td>
<td>This measures the time spent invalidating an expired session.</td>
<td>ms</td>
<td>timer</td>
</tr>
<tr>
<td>`consul.txn.apply`</td>
<td>This measures the time spent applying a transaction operation.</td>
<td>ms</td>
<td>timer</td>
</tr>
<td>`consul.txn.read`</td>
<td>This measures the time spent returning a read transaction.</td>
<td>ms</td>
<td>timer</td>
</tr>
</table>
## Cluster Health
@ -214,4 +483,46 @@ These metrics give insight into the health of the cluster as a whole.
<td>boolean</td>
<td>gauge</td>
</tr>
<tr>
<td>`consul.session_ttl.active`</td>
<td>This tracks the active number of sessions being tracked.</td>
<td>sessions</td>
<td>gauge</td>
</tr>
<tr>
<td>`consul.catalog.service.query.<service>`</td>
<td>This increments for each catalog query for the given service.</td>
<td>queries</td>
<td>counter</td>
</tr>
<tr>
<td>`consul.catalog.service.query-tag.<service>.<tag>`</td>
<td>This increments for each catalog query for the given service with the given tag.</td>
<td>queries</td>
<td>counter</td>
</tr>
<tr>
<td>`consul.catalog.service.not-found.<service>`</td>
<td>This increments for each catalog query where the given service could not be found.</td>
<td>queries</td>
<td>counter</td>
</tr>
<tr>
<td>`consul.health.service.query.<service>`</td>
<td>This increments for each health query for the given service.</td>
<td>queries</td>
<td>counter</td>
</tr>
<tr>
<td>`consul.health.service.query-tag.<service>.<tag>`</td>
<td>This increments for each health query for the given service with the given tag.</td>
<td>queries</td>
<td>counter</td>
</tr>
<tr>
<td>`consul.health.service.not-found.<service>`</td>
<td>This increments for each health query where the given service could not be found.</td>
<td>queries</td>
<td>counter</td>
</tr>
</table>