agent/grpc: pass metrics to constructor

Instead of referencing a package var. This does not fix the flaky test, but it seems more correct.
This commit is contained in:
Daniel Nephin 2020-10-14 15:43:48 -04:00
parent 70fea7a77e
commit 87793cd090
4 changed files with 16 additions and 14 deletions

View File

@ -61,8 +61,7 @@ func (c *ClientConnPool) ClientConn(datacenter string) (*grpc.ClientConn, error)
grpc.WithInsecure(), grpc.WithInsecure(),
grpc.WithContextDialer(c.dialer), grpc.WithContextDialer(c.dialer),
grpc.WithDisableRetry(), grpc.WithDisableRetry(),
// TODO: previously this statsHandler was shared with the Handler. Is that necessary? grpc.WithStatsHandler(newStatsHandler(defaultMetrics)),
grpc.WithStatsHandler(newStatsHandler()),
// nolint:staticcheck // there is no other supported alternative to WithBalancerName // nolint:staticcheck // there is no other supported alternative to WithBalancerName
grpc.WithBalancerName("pick_first")) grpc.WithBalancerName("pick_first"))
if err != nil { if err != nil {

View File

@ -17,8 +17,8 @@ func NewHandler(addr net.Addr, register func(server *grpc.Server)) *Handler {
// We don't need to pass tls.Config to the server since it's multiplexed // We don't need to pass tls.Config to the server since it's multiplexed
// behind the RPC listener, which already has TLS configured. // behind the RPC listener, which already has TLS configured.
srv := grpc.NewServer( srv := grpc.NewServer(
grpc.StatsHandler(newStatsHandler()), grpc.StatsHandler(newStatsHandler(defaultMetrics)),
grpc.StreamInterceptor((&activeStreamCounter{}).Intercept), grpc.StreamInterceptor((&activeStreamCounter{metrics: defaultMetrics}).Intercept),
) )
register(srv) register(srv)

View File

@ -18,8 +18,8 @@ type statsHandler struct {
activeConns uint64 // must be 8-byte aligned for atomic access activeConns uint64 // must be 8-byte aligned for atomic access
} }
func newStatsHandler() *statsHandler { func newStatsHandler(m *metrics.Metrics) *statsHandler {
return &statsHandler{metrics: defaultMetrics} return &statsHandler{metrics: m}
} }
// TagRPC implements grpcStats.StatsHandler // TagRPC implements grpcStats.StatsHandler
@ -64,6 +64,7 @@ func (c *statsHandler) HandleConn(_ context.Context, s stats.ConnStats) {
} }
type activeStreamCounter struct { type activeStreamCounter struct {
metrics *metrics.Metrics
// count of the number of open streaming RPCs on a server. It is accessed // count of the number of open streaming RPCs on a server. It is accessed
// atomically. // atomically.
count uint64 count uint64
@ -78,10 +79,10 @@ func (i *activeStreamCounter) Intercept(
handler grpc.StreamHandler, handler grpc.StreamHandler,
) error { ) error {
count := atomic.AddUint64(&i.count, 1) count := atomic.AddUint64(&i.count, 1)
defaultMetrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count)) i.metrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count))
defer func() { defer func() {
count := atomic.AddUint64(&i.count, ^uint64(0)) count := atomic.AddUint64(&i.count, ^uint64(0))
defaultMetrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count)) i.metrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count))
}() }()
return handler(srv, ss) return handler(srv, ss)

View File

@ -20,10 +20,11 @@ import (
func noopRegister(*grpc.Server) {} func noopRegister(*grpc.Server) {}
func TestHandler_EmitsStats(t *testing.T) { func TestHandler_EmitsStats(t *testing.T) {
sink := patchGlobalMetrics(t) sink, reset := patchGlobalMetrics(t)
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")} addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
handler := NewHandler(addr, noopRegister) handler := NewHandler(addr, noopRegister)
reset()
testservice.RegisterSimpleServer(handler.srv, &simple{}) testservice.RegisterSimpleServer(handler.srv, &simple{})
@ -99,7 +100,7 @@ func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) {
} }
} }
func patchGlobalMetrics(t *testing.T) *fakeMetricsSink { func patchGlobalMetrics(t *testing.T) (*fakeMetricsSink, func()) {
t.Helper() t.Helper()
sink := &fakeMetricsSink{} sink := &fakeMetricsSink{}
@ -112,11 +113,12 @@ func patchGlobalMetrics(t *testing.T) *fakeMetricsSink {
var err error var err error
defaultMetrics, err = metrics.New(cfg, sink) defaultMetrics, err = metrics.New(cfg, sink)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { reset := func() {
_, err = metrics.NewGlobal(cfg, &metrics.BlackholeSink{}) t.Helper()
defaultMetrics, err = metrics.New(cfg, &metrics.BlackholeSink{})
require.NoError(t, err, "failed to reset global metrics") require.NoError(t, err, "failed to reset global metrics")
}) }
return sink return sink, reset
} }
type fakeMetricsSink struct { type fakeMetricsSink struct {