956bff398a
defaultMetrics was being set at package import time, which meant that it received an instance of the original default. But lib/telemetry.InitTelemetry sets a new global when it is called. This resulted in the metrics being sent nowhere. This commit changes defaultMetrics to be a function, so it will return the global instance when called. Since it is called after InitTelemetry it will return the correct metrics instance.
161 lines
4.4 KiB
Go
161 lines
4.4 KiB
Go
package grpc
|
|
|
|
import (
|
|
"context"
|
|
"net"
|
|
"sort"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/armon/go-metrics"
|
|
"github.com/google/go-cmp/cmp"
|
|
"github.com/stretchr/testify/require"
|
|
"golang.org/x/sync/errgroup"
|
|
"google.golang.org/grpc"
|
|
|
|
"github.com/hashicorp/consul/agent/grpc/internal/testservice"
|
|
)
|
|
|
|
func noopRegister(*grpc.Server) {}
|
|
|
|
func TestHandler_EmitsStats(t *testing.T) {
|
|
sink, reset := patchGlobalMetrics(t)
|
|
|
|
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
|
|
handler := NewHandler(addr, noopRegister)
|
|
reset()
|
|
|
|
testservice.RegisterSimpleServer(handler.srv, &simple{})
|
|
|
|
lis, err := net.Listen("tcp", "127.0.0.1:0")
|
|
require.NoError(t, err)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
t.Cleanup(cancel)
|
|
|
|
g, ctx := errgroup.WithContext(ctx)
|
|
g.Go(func() error {
|
|
return handler.srv.Serve(lis)
|
|
})
|
|
t.Cleanup(func() {
|
|
if err := handler.Shutdown(); err != nil {
|
|
t.Logf("grpc server shutdown: %v", err)
|
|
}
|
|
if err := g.Wait(); err != nil {
|
|
t.Logf("grpc server error: %v", err)
|
|
}
|
|
})
|
|
|
|
conn, err := grpc.DialContext(ctx, lis.Addr().String(), grpc.WithInsecure())
|
|
require.NoError(t, err)
|
|
t.Cleanup(func() { conn.Close() })
|
|
|
|
client := testservice.NewSimpleClient(conn)
|
|
fClient, err := client.Flow(ctx, &testservice.Req{Datacenter: "mine"})
|
|
require.NoError(t, err)
|
|
|
|
// Wait for the first event so that we know the stream is sending.
|
|
_, err = fClient.Recv()
|
|
require.NoError(t, err)
|
|
|
|
cancel()
|
|
conn.Close()
|
|
handler.srv.GracefulStop()
|
|
// Wait for the server to stop so that active_streams is predictable.
|
|
require.NoError(t, g.Wait())
|
|
|
|
// Occasionally the active_stream=0 metric may be emitted before the
|
|
// active_conns=0 metric. The order of those metrics is not really important
|
|
// so we sort the calls to match the expected.
|
|
sort.Slice(sink.gaugeCalls, func(i, j int) bool {
|
|
if i < 2 || j < 2 {
|
|
return i < j
|
|
}
|
|
if len(sink.gaugeCalls[i].key) < 4 || len(sink.gaugeCalls[j].key) < 4 {
|
|
return i < j
|
|
}
|
|
return sink.gaugeCalls[i].key[3] < sink.gaugeCalls[j].key[3]
|
|
})
|
|
|
|
cmpMetricCalls := cmp.AllowUnexported(metricCall{})
|
|
expectedGauge := []metricCall{
|
|
{key: []string{"testing", "grpc", "server", "connections"}, val: 1},
|
|
{key: []string{"testing", "grpc", "server", "streams"}, val: 1},
|
|
{key: []string{"testing", "grpc", "server", "connections"}, val: 0},
|
|
{key: []string{"testing", "grpc", "server", "streams"}, val: 0},
|
|
}
|
|
assertDeepEqual(t, expectedGauge, sink.gaugeCalls, cmpMetricCalls)
|
|
|
|
expectedCounter := []metricCall{
|
|
{key: []string{"testing", "grpc", "server", "connection", "count"}, val: 1},
|
|
{key: []string{"testing", "grpc", "server", "request", "count"}, val: 1},
|
|
{key: []string{"testing", "grpc", "server", "stream", "count"}, val: 1},
|
|
}
|
|
assertDeepEqual(t, expectedCounter, sink.incrCounterCalls, cmpMetricCalls)
|
|
}
|
|
|
|
func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) {
|
|
t.Helper()
|
|
if diff := cmp.Diff(x, y, opts...); diff != "" {
|
|
t.Fatalf("assertion failed: values are not equal\n--- expected\n+++ actual\n%v", diff)
|
|
}
|
|
}
|
|
|
|
func patchGlobalMetrics(t *testing.T) (*fakeMetricsSink, func()) {
|
|
t.Helper()
|
|
|
|
sink := &fakeMetricsSink{}
|
|
cfg := &metrics.Config{
|
|
ServiceName: "testing",
|
|
TimerGranularity: time.Millisecond, // Timers are in milliseconds
|
|
ProfileInterval: time.Second, // Poll runtime every second
|
|
FilterDefault: true,
|
|
}
|
|
var err error
|
|
defaultMetrics = func() *metrics.Metrics {
|
|
m, _ := metrics.New(cfg, sink)
|
|
return m
|
|
}
|
|
require.NoError(t, err)
|
|
reset := func() {
|
|
t.Helper()
|
|
defaultMetrics = metrics.Default
|
|
require.NoError(t, err, "failed to reset global metrics")
|
|
}
|
|
return sink, reset
|
|
}
|
|
|
|
type fakeMetricsSink struct {
|
|
lock sync.Mutex
|
|
metrics.BlackholeSink
|
|
gaugeCalls []metricCall
|
|
incrCounterCalls []metricCall
|
|
}
|
|
|
|
func (f *fakeMetricsSink) SetGaugeWithLabels(key []string, val float32, labels []metrics.Label) {
|
|
f.lock.Lock()
|
|
f.gaugeCalls = append(f.gaugeCalls, metricCall{key: key, val: val, labels: labels})
|
|
f.lock.Unlock()
|
|
}
|
|
|
|
func (f *fakeMetricsSink) IncrCounterWithLabels(key []string, val float32, labels []metrics.Label) {
|
|
f.lock.Lock()
|
|
f.incrCounterCalls = append(f.incrCounterCalls, metricCall{key: key, val: val, labels: labels})
|
|
f.lock.Unlock()
|
|
}
|
|
|
|
type metricCall struct {
|
|
key []string
|
|
val float32
|
|
labels []metrics.Label
|
|
}
|
|
|
|
func logError(t *testing.T, f func() error) func() {
|
|
return func() {
|
|
if err := f(); err != nil {
|
|
t.Logf(err.Error())
|
|
}
|
|
}
|
|
}
|