diff --git a/.changelog/14922.txt b/.changelog/14922.txt new file mode 100644 index 000000000..e2f04991b --- /dev/null +++ b/.changelog/14922.txt @@ -0,0 +1,3 @@ +```release-note:feature +grpc: Added metrics for external gRPC server. Added `server_type=internal|external` label to gRPC metrics. +``` diff --git a/agent/agent.go b/agent/agent.go index 3de22c6ae..d0a1a2915 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -553,7 +553,10 @@ func (a *Agent) Start(ctx context.Context) error { // This needs to happen after the initial auto-config is loaded, because TLS // can only be configured on the gRPC server at the point of creation. - a.externalGRPCServer = external.NewServer(a.logger.Named("grpc.external")) + a.externalGRPCServer = external.NewServer( + a.logger.Named("grpc.external"), + metrics.Default(), + ) if err := a.startLicenseManager(ctx); err != nil { return err diff --git a/agent/consul/server.go b/agent/consul/server.go index d6e412fe2..0972bab8d 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -845,7 +845,7 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler s.externalConnectCAServer.Register(srv) } - return agentgrpc.NewHandler(deps.Logger, config.RPCAddr, register) + return agentgrpc.NewHandler(deps.Logger, config.RPCAddr, register, nil) } func (s *Server) connectCARootsMonitor(ctx context.Context) { diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index b9f8e7db9..c9b8a9324 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -328,7 +328,7 @@ func newServerWithDeps(t *testing.T, c *Config, deps Deps) (*Server, error) { oldNotify() } } - grpcServer := external.NewServer(deps.Logger.Named("grpc.external")) + grpcServer := external.NewServer(deps.Logger.Named("grpc.external"), nil) srv, err := NewServer(c, deps, grpcServer) if err != nil { return nil, err diff --git a/agent/grpc-external/server.go b/agent/grpc-external/server.go index 7ab72d77c..634205337 100644 --- a/agent/grpc-external/server.go +++ b/agent/grpc-external/server.go @@ -3,6 +3,7 @@ package external import ( "time" + "github.com/armon/go-metrics" middleware "github.com/grpc-ecosystem/go-grpc-middleware" recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" "google.golang.org/grpc" @@ -11,13 +12,24 @@ import ( agentmiddleware "github.com/hashicorp/consul/agent/grpc-middleware" ) +var ( + metricsLabels = []metrics.Label{{ + Name: "server_type", + Value: "external", + }} +) + // NewServer constructs a gRPC server for the external gRPC port, to which // handlers can be registered. -func NewServer(logger agentmiddleware.Logger) *grpc.Server { +func NewServer(logger agentmiddleware.Logger, metricsObj *metrics.Metrics) *grpc.Server { + if metricsObj == nil { + metricsObj = metrics.Default() + } recoveryOpts := agentmiddleware.PanicHandlerMiddlewareOpts(logger) opts := []grpc.ServerOption{ grpc.MaxConcurrentStreams(2048), + grpc.StatsHandler(agentmiddleware.NewStatsHandler(metricsObj, metricsLabels)), middleware.WithUnaryServerChain( // Add middlware interceptors to recover in case of panics. recovery.UnaryServerInterceptor(recoveryOpts...), @@ -25,6 +37,7 @@ func NewServer(logger agentmiddleware.Logger) *grpc.Server { middleware.WithStreamServerChain( // Add middlware interceptors to recover in case of panics. recovery.StreamServerInterceptor(recoveryOpts...), + agentmiddleware.NewActiveStreamCounter(metricsObj, metricsLabels).Intercept, ), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ // This must be less than the keealive.ClientParameters Time setting, otherwise diff --git a/agent/grpc-external/stats_test.go b/agent/grpc-external/stats_test.go new file mode 100644 index 000000000..8f5dffb67 --- /dev/null +++ b/agent/grpc-external/stats_test.go @@ -0,0 +1,104 @@ +package external + +import ( + "context" + "net" + "sort" + "testing" + + "github.com/armon/go-metrics" + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + + "github.com/hashicorp/go-hclog" + + "github.com/hashicorp/consul/agent/grpc-middleware/testutil" + "github.com/hashicorp/consul/agent/grpc-middleware/testutil/testservice" + "github.com/hashicorp/consul/proto/prototest" +) + +func TestServer_EmitsStats(t *testing.T) { + sink, metricsObj := testutil.NewFakeSink(t) + + srv := NewServer(hclog.Default(), metricsObj) + + testservice.RegisterSimpleServer(srv, &testservice.Simple{}) + + lis, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + return srv.Serve(lis) + }) + t.Cleanup(func() { + srv.Stop() + if err := g.Wait(); err != nil { + t.Logf("grpc server error: %v", err) + } + }) + + conn, err := grpc.DialContext(ctx, lis.Addr().String(), grpc.WithInsecure()) + require.NoError(t, err) + t.Cleanup(func() { conn.Close() }) + + client := testservice.NewSimpleClient(conn) + fClient, err := client.Flow(ctx, &testservice.Req{Datacenter: "mine"}) + require.NoError(t, err) + + // Wait for the first event so that we know the stream is sending. + _, err = fClient.Recv() + require.NoError(t, err) + + cancel() + conn.Close() + srv.GracefulStop() + // Wait for the server to stop so that active_streams is predictable. + require.NoError(t, g.Wait()) + + // Occasionally the active_stream=0 metric may be emitted before the + // active_conns=0 metric. The order of those metrics is not really important + // so we sort the calls to match the expected. + sort.Slice(sink.GaugeCalls, func(i, j int) bool { + if i < 2 || j < 2 { + return i < j + } + if len(sink.GaugeCalls[i].Key) < 4 || len(sink.GaugeCalls[j].Key) < 4 { + return i < j + } + return sink.GaugeCalls[i].Key[3] < sink.GaugeCalls[j].Key[3] + }) + + cmpMetricCalls := cmp.AllowUnexported(testutil.MetricCall{}) + expLabels := []metrics.Label{{ + Name: "server_type", + Value: "external", + }} + expectedGauge := []testutil.MetricCall{ + {Key: []string{"testing", "grpc", "server", "connections"}, Val: 1, Labels: expLabels}, + {Key: []string{"testing", "grpc", "server", "streams"}, Val: 1, Labels: expLabels}, + {Key: []string{"testing", "grpc", "server", "connections"}, Val: 0, Labels: expLabels}, + {Key: []string{"testing", "grpc", "server", "streams"}, Val: 0, Labels: expLabels}, + } + prototest.AssertDeepEqual(t, expectedGauge, sink.GaugeCalls, cmpMetricCalls) + + expectedCounter := []testutil.MetricCall{ + {Key: []string{"testing", "grpc", "server", "connection", "count"}, Val: 1, Labels: expLabels}, + {Key: []string{"testing", "grpc", "server", "request", "count"}, Val: 1, Labels: expLabels}, + {Key: []string{"testing", "grpc", "server", "stream", "count"}, Val: 1, Labels: expLabels}, + } + prototest.AssertDeepEqual(t, expectedCounter, sink.IncrCounterCalls, cmpMetricCalls) +} + +func logError(t *testing.T, f func() error) func() { + return func() { + if err := f(); err != nil { + t.Logf(err.Error()) + } + } +} diff --git a/agent/grpc-internal/client.go b/agent/grpc-internal/client.go index 451a5236e..711310538 100644 --- a/agent/grpc-internal/client.go +++ b/agent/grpc-internal/client.go @@ -10,6 +10,8 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/keepalive" + "github.com/armon/go-metrics" + agentmiddleware "github.com/hashicorp/consul/agent/grpc-middleware" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/tlsutil" @@ -129,7 +131,7 @@ func (c *ClientConnPool) dial(datacenter string, serverType string) (*grpc.Clien grpc.WithInsecure(), grpc.WithContextDialer(c.dialer), grpc.WithDisableRetry(), - grpc.WithStatsHandler(newStatsHandler(defaultMetrics())), + grpc.WithStatsHandler(agentmiddleware.NewStatsHandler(metrics.Default(), metricsLabels)), // nolint:staticcheck // there is no other supported alternative to WithBalancerName grpc.WithBalancerName("pick_first"), // Keep alive parameters are based on the same default ones we used for diff --git a/agent/grpc-internal/client_test.go b/agent/grpc-internal/client_test.go index e36cd6aad..d9d264d80 100644 --- a/agent/grpc-internal/client_test.go +++ b/agent/grpc-internal/client_test.go @@ -14,8 +14,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/hashicorp/consul/agent/grpc-internal/internal/testservice" "github.com/hashicorp/consul/agent/grpc-internal/resolver" + "github.com/hashicorp/consul/agent/grpc-middleware/testutil/testservice" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/sdk/freeport" diff --git a/agent/grpc-internal/handler.go b/agent/grpc-internal/handler.go index b5e7d2573..a576d656e 100644 --- a/agent/grpc-internal/handler.go +++ b/agent/grpc-internal/handler.go @@ -5,6 +5,7 @@ import ( "net" "time" + "github.com/armon/go-metrics" agentmiddleware "github.com/hashicorp/consul/agent/grpc-middleware" middleware "github.com/grpc-ecosystem/go-grpc-middleware" @@ -13,18 +14,27 @@ import ( "google.golang.org/grpc/keepalive" ) +var ( + metricsLabels = []metrics.Label{{ + Name: "server_type", + Value: "internal", + }} +) + // NewHandler returns a gRPC server that accepts connections from Handle(conn). // The register function will be called with the grpc.Server to register // gRPC services with the server. -func NewHandler(logger Logger, addr net.Addr, register func(server *grpc.Server)) *Handler { - metrics := defaultMetrics() +func NewHandler(logger Logger, addr net.Addr, register func(server *grpc.Server), metricsObj *metrics.Metrics) *Handler { + if metricsObj == nil { + metricsObj = metrics.Default() + } // We don't need to pass tls.Config to the server since it's multiplexed // behind the RPC listener, which already has TLS configured. recoveryOpts := agentmiddleware.PanicHandlerMiddlewareOpts(logger) opts := []grpc.ServerOption{ - grpc.StatsHandler(newStatsHandler(metrics)), + grpc.StatsHandler(agentmiddleware.NewStatsHandler(metricsObj, metricsLabels)), middleware.WithUnaryServerChain( // Add middlware interceptors to recover in case of panics. recovery.UnaryServerInterceptor(recoveryOpts...), @@ -32,7 +42,7 @@ func NewHandler(logger Logger, addr net.Addr, register func(server *grpc.Server) middleware.WithStreamServerChain( // Add middlware interceptors to recover in case of panics. recovery.StreamServerInterceptor(recoveryOpts...), - (&activeStreamCounter{metrics: metrics}).Intercept, + agentmiddleware.NewActiveStreamCounter(metricsObj, metricsLabels).Intercept, ), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ MinTime: 15 * time.Second, diff --git a/agent/grpc-internal/handler_test.go b/agent/grpc-internal/handler_test.go index f53877301..4f093ac65 100644 --- a/agent/grpc-internal/handler_test.go +++ b/agent/grpc-internal/handler_test.go @@ -13,8 +13,8 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "github.com/hashicorp/consul/agent/grpc-internal/internal/testservice" "github.com/hashicorp/consul/agent/grpc-internal/resolver" + "github.com/hashicorp/consul/agent/grpc-middleware/testutil/testservice" ) func TestHandler_PanicRecoveryInterceptor(t *testing.T) { @@ -57,5 +57,6 @@ func TestHandler_PanicRecoveryInterceptor(t *testing.T) { // Checking the entire stack trace is not possible, let's // make sure that it contains a couple of expected strings. require.Contains(t, strLog, `[ERROR] panic serving grpc request: panic="panic from Something`) - require.Contains(t, strLog, `github.com/hashicorp/consul/agent/grpc-internal.(*simplePanic).Something`) + require.Contains(t, strLog, `github.com/hashicorp/consul/agent/grpc-middleware/testutil/testservice.(*SimplePanic).Something`) + } diff --git a/agent/grpc-internal/server_test.go b/agent/grpc-internal/server_test.go index 45e1ad59f..56e18da1d 100644 --- a/agent/grpc-internal/server_test.go +++ b/agent/grpc-internal/server_test.go @@ -1,7 +1,6 @@ package internal import ( - "context" "crypto/tls" "fmt" "io" @@ -15,7 +14,7 @@ import ( "golang.org/x/sync/errgroup" "google.golang.org/grpc" - "github.com/hashicorp/consul/agent/grpc-internal/internal/testservice" + "github.com/hashicorp/consul/agent/grpc-middleware/testutil/testservice" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/tlsutil" @@ -42,20 +41,20 @@ func (s testServer) Metadata() *metadata.Server { func newSimpleTestServer(t *testing.T, name, dc string, tlsConf *tlsutil.Configurator) testServer { return newTestServer(t, hclog.Default(), name, dc, tlsConf, func(server *grpc.Server) { - testservice.RegisterSimpleServer(server, &simple{name: name, dc: dc}) + testservice.RegisterSimpleServer(server, &testservice.Simple{Name: name, DC: dc}) }) } // newPanicTestServer sets up a simple server with handlers that panic. func newPanicTestServer(t *testing.T, logger hclog.Logger, name, dc string, tlsConf *tlsutil.Configurator) testServer { return newTestServer(t, logger, name, dc, tlsConf, func(server *grpc.Server) { - testservice.RegisterSimpleServer(server, &simplePanic{name: name, dc: dc}) + testservice.RegisterSimpleServer(server, &testservice.SimplePanic{Name: name, DC: dc}) }) } func newTestServer(t *testing.T, logger hclog.Logger, name, dc string, tlsConf *tlsutil.Configurator, register func(server *grpc.Server)) testServer { addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")} - handler := NewHandler(logger, addr, register) + handler := NewHandler(logger, addr, register, nil) lis, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) @@ -95,43 +94,6 @@ func newTestServer(t *testing.T, logger hclog.Logger, name, dc string, tlsConf * } } -type simple struct { - name string - dc string -} - -func (s *simple) Flow(_ *testservice.Req, flow testservice.Simple_FlowServer) error { - for flow.Context().Err() == nil { - resp := &testservice.Resp{ServerName: "one", Datacenter: s.dc} - if err := flow.Send(resp); err != nil { - return err - } - time.Sleep(time.Millisecond) - } - return nil -} - -func (s *simple) Something(_ context.Context, _ *testservice.Req) (*testservice.Resp, error) { - return &testservice.Resp{ServerName: s.name, Datacenter: s.dc}, nil -} - -type simplePanic struct { - name, dc string -} - -func (s *simplePanic) Flow(_ *testservice.Req, flow testservice.Simple_FlowServer) error { - for flow.Context().Err() == nil { - time.Sleep(time.Millisecond) - panic("panic from Flow") - } - return nil -} - -func (s *simplePanic) Something(_ context.Context, _ *testservice.Req) (*testservice.Resp, error) { - time.Sleep(time.Millisecond) - panic("panic from Something") -} - // fakeRPCListener mimics agent/consul.Server.listen to handle the RPCType byte. // In the future we should be able to refactor Server and extract this RPC // handling logic so that we don't need to use a fake. diff --git a/agent/grpc-internal/services/subscribe/subscribe_test.go b/agent/grpc-internal/services/subscribe/subscribe_test.go index 26a8e148c..c737b4d4a 100644 --- a/agent/grpc-internal/services/subscribe/subscribe_test.go +++ b/agent/grpc-internal/services/subscribe/subscribe_test.go @@ -370,10 +370,15 @@ var _ Backend = (*testBackend)(nil) func runTestServer(t *testing.T, server *Server) net.Addr { addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")} var grpcServer *gogrpc.Server - handler := grpc.NewHandler(hclog.New(nil), addr, func(srv *gogrpc.Server) { - grpcServer = srv - pbsubscribe.RegisterStateChangeSubscriptionServer(srv, server) - }) + handler := grpc.NewHandler( + hclog.New(nil), + addr, + func(srv *gogrpc.Server) { + grpcServer = srv + pbsubscribe.RegisterStateChangeSubscriptionServer(srv, server) + }, + nil, + ) lis, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) diff --git a/agent/grpc-internal/stats_test.go b/agent/grpc-internal/stats_test.go index a32ea5525..8ded41dd8 100644 --- a/agent/grpc-internal/stats_test.go +++ b/agent/grpc-internal/stats_test.go @@ -4,9 +4,7 @@ import ( "context" "net" "sort" - "sync" "testing" - "time" "github.com/armon/go-metrics" "github.com/google/go-cmp/cmp" @@ -16,20 +14,20 @@ import ( "github.com/hashicorp/go-hclog" - "github.com/hashicorp/consul/agent/grpc-internal/internal/testservice" + "github.com/hashicorp/consul/agent/grpc-middleware/testutil" + "github.com/hashicorp/consul/agent/grpc-middleware/testutil/testservice" "github.com/hashicorp/consul/proto/prototest" ) func noopRegister(*grpc.Server) {} func TestHandler_EmitsStats(t *testing.T) { - sink, reset := patchGlobalMetrics(t) + sink, metricsObj := testutil.NewFakeSink(t) addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")} - handler := NewHandler(hclog.Default(), addr, noopRegister) - reset() + handler := NewHandler(hclog.Default(), addr, noopRegister, metricsObj) - testservice.RegisterSimpleServer(handler.srv, &simple{}) + testservice.RegisterSimpleServer(handler.srv, &testservice.Simple{}) lis, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) @@ -71,80 +69,35 @@ func TestHandler_EmitsStats(t *testing.T) { // Occasionally the active_stream=0 metric may be emitted before the // active_conns=0 metric. The order of those metrics is not really important // so we sort the calls to match the expected. - sort.Slice(sink.gaugeCalls, func(i, j int) bool { + sort.Slice(sink.GaugeCalls, func(i, j int) bool { if i < 2 || j < 2 { return i < j } - if len(sink.gaugeCalls[i].key) < 4 || len(sink.gaugeCalls[j].key) < 4 { + if len(sink.GaugeCalls[i].Key) < 4 || len(sink.GaugeCalls[j].Key) < 4 { return i < j } - return sink.gaugeCalls[i].key[3] < sink.gaugeCalls[j].key[3] + return sink.GaugeCalls[i].Key[3] < sink.GaugeCalls[j].Key[3] }) - cmpMetricCalls := cmp.AllowUnexported(metricCall{}) - expectedGauge := []metricCall{ - {key: []string{"testing", "grpc", "server", "connections"}, val: 1}, - {key: []string{"testing", "grpc", "server", "streams"}, val: 1}, - {key: []string{"testing", "grpc", "server", "connections"}, val: 0}, - {key: []string{"testing", "grpc", "server", "streams"}, val: 0}, + cmpMetricCalls := cmp.AllowUnexported(testutil.MetricCall{}) + expLabels := []metrics.Label{{ + Name: "server_type", + Value: "internal", + }} + expectedGauge := []testutil.MetricCall{ + {Key: []string{"testing", "grpc", "server", "connections"}, Val: 1, Labels: expLabels}, + {Key: []string{"testing", "grpc", "server", "streams"}, Val: 1, Labels: expLabels}, + {Key: []string{"testing", "grpc", "server", "connections"}, Val: 0, Labels: expLabels}, + {Key: []string{"testing", "grpc", "server", "streams"}, Val: 0, Labels: expLabels}, } - prototest.AssertDeepEqual(t, expectedGauge, sink.gaugeCalls, cmpMetricCalls) + prototest.AssertDeepEqual(t, expectedGauge, sink.GaugeCalls, cmpMetricCalls) - expectedCounter := []metricCall{ - {key: []string{"testing", "grpc", "server", "connection", "count"}, val: 1}, - {key: []string{"testing", "grpc", "server", "request", "count"}, val: 1}, - {key: []string{"testing", "grpc", "server", "stream", "count"}, val: 1}, + expectedCounter := []testutil.MetricCall{ + {Key: []string{"testing", "grpc", "server", "connection", "count"}, Val: 1, Labels: expLabels}, + {Key: []string{"testing", "grpc", "server", "request", "count"}, Val: 1, Labels: expLabels}, + {Key: []string{"testing", "grpc", "server", "stream", "count"}, Val: 1, Labels: expLabels}, } - prototest.AssertDeepEqual(t, expectedCounter, sink.incrCounterCalls, cmpMetricCalls) -} - -func patchGlobalMetrics(t *testing.T) (*fakeMetricsSink, func()) { - t.Helper() - - sink := &fakeMetricsSink{} - cfg := &metrics.Config{ - ServiceName: "testing", - TimerGranularity: time.Millisecond, // Timers are in milliseconds - ProfileInterval: time.Second, // Poll runtime every second - FilterDefault: true, - } - var err error - defaultMetrics = func() *metrics.Metrics { - m, _ := metrics.New(cfg, sink) - return m - } - require.NoError(t, err) - reset := func() { - t.Helper() - defaultMetrics = metrics.Default - require.NoError(t, err, "failed to reset global metrics") - } - return sink, reset -} - -type fakeMetricsSink struct { - lock sync.Mutex - metrics.BlackholeSink - gaugeCalls []metricCall - incrCounterCalls []metricCall -} - -func (f *fakeMetricsSink) SetGaugeWithLabels(key []string, val float32, labels []metrics.Label) { - f.lock.Lock() - f.gaugeCalls = append(f.gaugeCalls, metricCall{key: key, val: val, labels: labels}) - f.lock.Unlock() -} - -func (f *fakeMetricsSink) IncrCounterWithLabels(key []string, val float32, labels []metrics.Label) { - f.lock.Lock() - f.incrCounterCalls = append(f.incrCounterCalls, metricCall{key: key, val: val, labels: labels}) - f.lock.Unlock() -} - -type metricCall struct { - key []string - val float32 - labels []metrics.Label + prototest.AssertDeepEqual(t, expectedCounter, sink.IncrCounterCalls, cmpMetricCalls) } func logError(t *testing.T, f func() error) func() { diff --git a/agent/grpc-internal/stats.go b/agent/grpc-middleware/stats.go similarity index 79% rename from agent/grpc-internal/stats.go rename to agent/grpc-middleware/stats.go index ab5226cee..3a1b3f23e 100644 --- a/agent/grpc-internal/stats.go +++ b/agent/grpc-middleware/stats.go @@ -1,4 +1,4 @@ -package internal +package middleware import ( "context" @@ -47,8 +47,6 @@ var StatsCounters = []prometheus.CounterDefinition{ }, } -var defaultMetrics = metrics.Default - // statsHandler is a grpc/stats.StatsHandler which emits connection and // request metrics to go-metrics. type statsHandler struct { @@ -57,10 +55,11 @@ type statsHandler struct { // the struct. See https://golang.org/pkg/sync/atomic/#pkg-note-BUG. activeConns uint64 metrics *metrics.Metrics + labels []metrics.Label } -func newStatsHandler(m *metrics.Metrics) *statsHandler { - return &statsHandler{metrics: m} +func NewStatsHandler(m *metrics.Metrics, labels []metrics.Label) *statsHandler { + return &statsHandler{metrics: m, labels: labels} } // TagRPC implements grpcStats.StatsHandler @@ -77,7 +76,7 @@ func (c *statsHandler) HandleRPC(_ context.Context, s stats.RPCStats) { } switch s.(type) { case *stats.InHeader: - c.metrics.IncrCounter([]string{"grpc", label, "request", "count"}, 1) + c.metrics.IncrCounterWithLabels([]string{"grpc", label, "request", "count"}, 1, c.labels) } } @@ -97,12 +96,12 @@ func (c *statsHandler) HandleConn(_ context.Context, s stats.ConnStats) { switch s.(type) { case *stats.ConnBegin: count = atomic.AddUint64(&c.activeConns, 1) - c.metrics.IncrCounter([]string{"grpc", label, "connection", "count"}, 1) + c.metrics.IncrCounterWithLabels([]string{"grpc", label, "connection", "count"}, 1, c.labels) case *stats.ConnEnd: // Decrement! count = atomic.AddUint64(&c.activeConns, ^uint64(0)) } - c.metrics.SetGauge([]string{"grpc", label, "connections"}, float32(count)) + c.metrics.SetGaugeWithLabels([]string{"grpc", label, "connections"}, float32(count), c.labels) } type activeStreamCounter struct { @@ -111,6 +110,11 @@ type activeStreamCounter struct { // the struct. See https://golang.org/pkg/sync/atomic/#pkg-note-BUG. count uint64 metrics *metrics.Metrics + labels []metrics.Label +} + +func NewActiveStreamCounter(m *metrics.Metrics, labels []metrics.Label) *activeStreamCounter { + return &activeStreamCounter{metrics: m, labels: labels} } // GRPCCountingStreamInterceptor is a grpc.ServerStreamInterceptor that emits a @@ -122,11 +126,11 @@ func (i *activeStreamCounter) Intercept( handler grpc.StreamHandler, ) error { count := atomic.AddUint64(&i.count, 1) - i.metrics.SetGauge([]string{"grpc", "server", "streams"}, float32(count)) - i.metrics.IncrCounter([]string{"grpc", "server", "stream", "count"}, 1) + i.metrics.SetGaugeWithLabels([]string{"grpc", "server", "streams"}, float32(count), i.labels) + i.metrics.IncrCounterWithLabels([]string{"grpc", "server", "stream", "count"}, 1, i.labels) defer func() { count := atomic.AddUint64(&i.count, ^uint64(0)) - i.metrics.SetGauge([]string{"grpc", "server", "streams"}, float32(count)) + i.metrics.SetGaugeWithLabels([]string{"grpc", "server", "streams"}, float32(count), i.labels) }() return handler(srv, ss) diff --git a/agent/grpc-middleware/testutil/fake_sink.go b/agent/grpc-middleware/testutil/fake_sink.go new file mode 100644 index 000000000..7439d78a2 --- /dev/null +++ b/agent/grpc-middleware/testutil/fake_sink.go @@ -0,0 +1,50 @@ +package testutil + +import ( + "sync" + "testing" + "time" + + "github.com/armon/go-metrics" + "github.com/stretchr/testify/require" +) + +func NewFakeSink(t *testing.T) (*FakeMetricsSink, *metrics.Metrics) { + t.Helper() + + sink := &FakeMetricsSink{} + cfg := &metrics.Config{ + ServiceName: "testing", + TimerGranularity: time.Millisecond, // Timers are in milliseconds + ProfileInterval: time.Second, // Poll runtime every second + FilterDefault: true, + } + m, err := metrics.New(cfg, sink) + require.NoError(t, err) + return sink, m +} + +type FakeMetricsSink struct { + lock sync.Mutex + metrics.BlackholeSink + GaugeCalls []MetricCall + IncrCounterCalls []MetricCall +} + +func (f *FakeMetricsSink) SetGaugeWithLabels(key []string, val float32, labels []metrics.Label) { + f.lock.Lock() + f.GaugeCalls = append(f.GaugeCalls, MetricCall{Key: key, Val: val, Labels: labels}) + f.lock.Unlock() +} + +func (f *FakeMetricsSink) IncrCounterWithLabels(key []string, val float32, labels []metrics.Label) { + f.lock.Lock() + f.IncrCounterCalls = append(f.IncrCounterCalls, MetricCall{Key: key, Val: val, Labels: labels}) + f.lock.Unlock() +} + +type MetricCall struct { + Key []string + Val float32 + Labels []metrics.Label +} diff --git a/agent/grpc-middleware/testutil/testservice/fake_service.go b/agent/grpc-middleware/testutil/testservice/fake_service.go new file mode 100644 index 000000000..f9999cc71 --- /dev/null +++ b/agent/grpc-middleware/testutil/testservice/fake_service.go @@ -0,0 +1,43 @@ +package testservice + +import ( + "context" + "time" +) + +type Simple struct { + Name string + DC string +} + +func (s *Simple) Flow(_ *Req, flow Simple_FlowServer) error { + for flow.Context().Err() == nil { + resp := &Resp{ServerName: "one", Datacenter: s.DC} + if err := flow.Send(resp); err != nil { + return err + } + time.Sleep(time.Millisecond) + } + return nil +} + +func (s *Simple) Something(_ context.Context, _ *Req) (*Resp, error) { + return &Resp{ServerName: s.Name, Datacenter: s.DC}, nil +} + +type SimplePanic struct { + Name, DC string +} + +func (s *SimplePanic) Flow(_ *Req, flow Simple_FlowServer) error { + for flow.Context().Err() == nil { + time.Sleep(time.Millisecond) + panic("panic from Flow") + } + return nil +} + +func (s *SimplePanic) Something(_ context.Context, _ *Req) (*Resp, error) { + time.Sleep(time.Millisecond) + panic("panic from Something") +} diff --git a/agent/grpc-internal/internal/testservice/simple.pb.binary.go b/agent/grpc-middleware/testutil/testservice/simple.pb.binary.go similarity index 90% rename from agent/grpc-internal/internal/testservice/simple.pb.binary.go rename to agent/grpc-middleware/testutil/testservice/simple.pb.binary.go index fd2d7e13f..da258818d 100644 --- a/agent/grpc-internal/internal/testservice/simple.pb.binary.go +++ b/agent/grpc-middleware/testutil/testservice/simple.pb.binary.go @@ -1,5 +1,5 @@ // Code generated by protoc-gen-go-binary. DO NOT EDIT. -// source: agent/grpc-internal/internal/testservice/simple.proto +// source: agent/grpc-middleware/testutil/testservice/simple.pb.binary.go package testservice diff --git a/agent/grpc-internal/internal/testservice/simple.pb.go b/agent/grpc-middleware/testutil/testservice/simple.pb.go similarity index 99% rename from agent/grpc-internal/internal/testservice/simple.pb.go rename to agent/grpc-middleware/testutil/testservice/simple.pb.go index dfca35fa0..25aa074f7 100644 --- a/agent/grpc-internal/internal/testservice/simple.pb.go +++ b/agent/grpc-middleware/testutil/testservice/simple.pb.go @@ -2,7 +2,7 @@ // versions: // protoc-gen-go v1.23.0 // protoc v3.15.8 -// source: agent/grpc-internal/internal/testservice/simple.proto +// source: agent/grpc-middleware/testutil/testservice/simple.proto package testservice diff --git a/agent/grpc-internal/internal/testservice/simple.proto b/agent/grpc-middleware/testutil/testservice/simple.proto similarity index 100% rename from agent/grpc-internal/internal/testservice/simple.proto rename to agent/grpc-middleware/testutil/testservice/simple.proto diff --git a/agent/rpc/peering/service_test.go b/agent/rpc/peering/service_test.go index ae822c24a..7f21b9578 100644 --- a/agent/rpc/peering/service_test.go +++ b/agent/rpc/peering/service_test.go @@ -1414,7 +1414,7 @@ func newTestServer(t *testing.T, cb func(conf *consul.Config)) testingServer { conf.ACLResolverSettings.EnterpriseMeta = *conf.AgentEnterpriseMeta() deps := newDefaultDeps(t, conf) - externalGRPCServer := external.NewServer(deps.Logger) + externalGRPCServer := external.NewServer(deps.Logger, nil) server, err := consul.NewServer(conf, deps, externalGRPCServer) require.NoError(t, err) diff --git a/agent/setup.go b/agent/setup.go index 4fdeab213..02dc03426 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -21,8 +21,9 @@ import ( "github.com/hashicorp/consul/agent/consul/usagemetrics" "github.com/hashicorp/consul/agent/consul/xdscapacity" "github.com/hashicorp/consul/agent/grpc-external/limiter" - grpc "github.com/hashicorp/consul/agent/grpc-internal" + grpcInt "github.com/hashicorp/consul/agent/grpc-internal" "github.com/hashicorp/consul/agent/grpc-internal/resolver" + grpcWare "github.com/hashicorp/consul/agent/grpc-middleware" "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" @@ -112,11 +113,11 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error) Authority: cfg.Datacenter + "." + string(cfg.NodeID), }) resolver.Register(builder) - d.GRPCConnPool = grpc.NewClientConnPool(grpc.ClientConnPoolConfig{ + d.GRPCConnPool = grpcInt.NewClientConnPool(grpcInt.ClientConnPoolConfig{ Servers: builder, SrcAddr: d.ConnPool.SrcAddr, - TLSWrapper: grpc.TLSWrapper(d.TLSConfigurator.OutgoingRPCWrapper()), - ALPNWrapper: grpc.ALPNWrapper(d.TLSConfigurator.OutgoingALPNRPCWrapper()), + TLSWrapper: grpcInt.TLSWrapper(d.TLSConfigurator.OutgoingRPCWrapper()), + ALPNWrapper: grpcInt.ALPNWrapper(d.TLSConfigurator.OutgoingALPNRPCWrapper()), UseTLSForDC: d.TLSConfigurator.UseTLS, DialingFromServer: cfg.ServerMode, DialingFromDatacenter: cfg.Datacenter, @@ -201,7 +202,8 @@ func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil } // getPrometheusDefs reaches into every slice of prometheus defs we've defined in each part of the agent, and appends -// all of our slices into one nice slice of definitions per metric type for the Consul agent to pass to go-metrics. +// +// all of our slices into one nice slice of definitions per metric type for the Consul agent to pass to go-metrics. func getPrometheusDefs(cfg lib.TelemetryConfig, isServer bool) ([]prometheus.GaugeDefinition, []prometheus.CounterDefinition, []prometheus.SummaryDefinition) { // TODO: "raft..." metrics come from the raft lib and we should migrate these to a telemetry // package within. In the mean time, we're going to define a few here because they're key to monitoring Consul. @@ -228,7 +230,7 @@ func getPrometheusDefs(cfg lib.TelemetryConfig, isServer bool) ([]prometheus.Gau cache.Gauges, consul.RPCGauges, consul.SessionGauges, - grpc.StatsGauges, + grpcWare.StatsGauges, xds.StatsGauges, usagemetrics.Gauges, consul.ReplicationGauges, @@ -286,7 +288,7 @@ func getPrometheusDefs(cfg lib.TelemetryConfig, isServer bool) ([]prometheus.Gau consul.CatalogCounters, consul.ClientCounters, consul.RPCCounters, - grpc.StatsCounters, + grpcWare.StatsCounters, local.StateCounters, xds.StatsCounters, raftCounters,