1e40f00567
Occasionally this test would flake. The flakes were fixed by: 1. Stopping the service and retrying to check on metrics. This way we also include the active_streams going to 0 in the metric calls. 2. Using a reference to the global Metrics. This way when other tests have background goroutines that are still shutting down, they won't emit metrics to the metric instance with the fake Sink. The stats test can patch the local reference to the global, so the existing statHandlers will continue to emit to the global, but the stats test will send all metrics to the replacement.
89 lines
2.2 KiB
Go
89 lines
2.2 KiB
Go
package grpc
|
|
|
|
import (
|
|
"context"
|
|
"sync/atomic"
|
|
|
|
"github.com/armon/go-metrics"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/stats"
|
|
)
|
|
|
|
var defaultMetrics = metrics.Default()
|
|
|
|
// statsHandler is a grpc/stats.StatsHandler which emits connection and
|
|
// request metrics to go-metrics.
|
|
type statsHandler struct {
|
|
metrics *metrics.Metrics
|
|
activeConns uint64 // must be 8-byte aligned for atomic access
|
|
}
|
|
|
|
func newStatsHandler() *statsHandler {
|
|
return &statsHandler{metrics: defaultMetrics}
|
|
}
|
|
|
|
// TagRPC implements grpcStats.StatsHandler
|
|
func (c *statsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
|
|
// No-op
|
|
return ctx
|
|
}
|
|
|
|
// HandleRPC implements grpcStats.StatsHandler
|
|
func (c *statsHandler) HandleRPC(_ context.Context, s stats.RPCStats) {
|
|
label := "server"
|
|
if s.IsClient() {
|
|
label = "client"
|
|
}
|
|
switch s.(type) {
|
|
case *stats.InHeader:
|
|
c.metrics.IncrCounter([]string{"grpc", label, "request"}, 1)
|
|
}
|
|
}
|
|
|
|
// TagConn implements grpcStats.StatsHandler
|
|
func (c *statsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
|
|
// No-op
|
|
return ctx
|
|
}
|
|
|
|
// HandleConn implements grpcStats.StatsHandler
|
|
func (c *statsHandler) HandleConn(_ context.Context, s stats.ConnStats) {
|
|
label := "server"
|
|
if s.IsClient() {
|
|
label = "client"
|
|
}
|
|
var count uint64
|
|
switch s.(type) {
|
|
case *stats.ConnBegin:
|
|
count = atomic.AddUint64(&c.activeConns, 1)
|
|
case *stats.ConnEnd:
|
|
// Decrement!
|
|
count = atomic.AddUint64(&c.activeConns, ^uint64(0))
|
|
}
|
|
c.metrics.SetGauge([]string{"grpc", label, "active_conns"}, float32(count))
|
|
}
|
|
|
|
type activeStreamCounter struct {
|
|
// count of the number of open streaming RPCs on a server. It is accessed
|
|
// atomically.
|
|
count uint64
|
|
}
|
|
|
|
// GRPCCountingStreamInterceptor is a grpc.ServerStreamInterceptor that emits a
|
|
// a metric of the count of open streams.
|
|
func (i *activeStreamCounter) Intercept(
|
|
srv interface{},
|
|
ss grpc.ServerStream,
|
|
_ *grpc.StreamServerInfo,
|
|
handler grpc.StreamHandler,
|
|
) error {
|
|
count := atomic.AddUint64(&i.count, 1)
|
|
defaultMetrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count))
|
|
defer func() {
|
|
count := atomic.AddUint64(&i.count, ^uint64(0))
|
|
defaultMetrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count))
|
|
}()
|
|
|
|
return handler(srv, ss)
|
|
}
|