diff --git a/agent/consul/rate/handler.go b/agent/consul/rate/handler.go index 15b49e6bf..25a184e26 100644 --- a/agent/consul/rate/handler.go +++ b/agent/consul/rate/handler.go @@ -9,9 +9,9 @@ import ( "reflect" "sync/atomic" - "github.com/hashicorp/go-hclog" - + "github.com/armon/go-metrics" "github.com/hashicorp/consul/agent/consul/multilimiter" + "github.com/hashicorp/go-hclog" ) var ( @@ -214,6 +214,21 @@ func (h *Handler) Allow(op Operation) error { "limit_enforced", enforced, ) + metrics.IncrCounterWithLabels([]string{"consul", "rate_limit"}, 1, []metrics.Label{ + { + Name: "limit_type", + Value: l.desc, + }, + { + Name: "op", + Value: op.Name, + }, + { + Name: "mode", + Value: l.mode.String(), + }, + }) + if enforced { // TODO(NET-1382) - use the logger to print rate limiter logs. if h.leaderStatusProvider.IsLeader() && op.Type == OperationTypeWrite { diff --git a/agent/consul/rate/handler_test.go b/agent/consul/rate/handler_test.go index 112d74541..2ac77d973 100644 --- a/agent/consul/rate/handler_test.go +++ b/agent/consul/rate/handler_test.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/consul/agent/consul/multilimiter" + "github.com/hashicorp/consul/agent/metrics" ) // @@ -42,12 +43,15 @@ func TestHandler(t *testing.T) { allow bool } testCases := map[string]struct { - op Operation - globalMode Mode - checks []limitCheck - isLeader bool - expectErr error - expectLog bool + op Operation + globalMode Mode + checks []limitCheck + isLeader bool + expectErr error + expectLog bool + expectMetric bool + expectMetricName string + expectMetricCount float64 }{ "operation exempt from limiting": { op: Operation{ @@ -55,10 +59,11 @@ func TestHandler(t *testing.T) { Name: rpcName, SourceAddr: sourceAddr, }, - globalMode: ModeEnforcing, - checks: []limitCheck{}, - expectErr: nil, - expectLog: false, + globalMode: ModeEnforcing, + checks: []limitCheck{}, + expectErr: nil, + expectLog: false, + expectMetric: false, }, "global write limit disabled": { op: Operation{ @@ -66,10 +71,11 @@ func TestHandler(t *testing.T) { Name: rpcName, SourceAddr: sourceAddr, }, - globalMode: ModeDisabled, - checks: []limitCheck{}, - expectErr: nil, - expectLog: false, + globalMode: ModeDisabled, + checks: []limitCheck{}, + expectErr: nil, + expectLog: false, + expectMetric: false, }, "global write limit within allowance": { op: Operation{ @@ -81,8 +87,9 @@ func TestHandler(t *testing.T) { checks: []limitCheck{ {limit: globalWrite, allow: true}, }, - expectErr: nil, - expectLog: false, + expectErr: nil, + expectLog: false, + expectMetric: false, }, "global write limit exceeded (permissive)": { op: Operation{ @@ -94,8 +101,11 @@ func TestHandler(t *testing.T) { checks: []limitCheck{ {limit: globalWrite, allow: false}, }, - expectErr: nil, - expectLog: true, + expectErr: nil, + expectLog: true, + expectMetric: true, + expectMetricName: "consul.rate_limit;limit_type=global/write;op=Foo.Bar;mode=permissive", + expectMetricCount: 1, }, "global write limit exceeded (enforcing, leader)": { op: Operation{ @@ -107,9 +117,12 @@ func TestHandler(t *testing.T) { checks: []limitCheck{ {limit: globalWrite, allow: false}, }, - isLeader: true, - expectErr: ErrRetryLater, - expectLog: true, + isLeader: true, + expectErr: ErrRetryLater, + expectLog: true, + expectMetric: true, + expectMetricName: "consul.rate_limit;limit_type=global/write;op=Foo.Bar;mode=enforcing", + expectMetricCount: 1, }, "global write limit exceeded (enforcing, follower)": { op: Operation{ @@ -121,9 +134,12 @@ func TestHandler(t *testing.T) { checks: []limitCheck{ {limit: globalWrite, allow: false}, }, - isLeader: false, - expectErr: ErrRetryElsewhere, - expectLog: true, + isLeader: false, + expectErr: ErrRetryElsewhere, + expectLog: true, + expectMetric: true, + expectMetricName: "consul.rate_limit;limit_type=global/write;op=Foo.Bar;mode=enforcing", + expectMetricCount: 1, }, "global read limit disabled": { op: Operation{ @@ -131,10 +147,11 @@ func TestHandler(t *testing.T) { Name: rpcName, SourceAddr: sourceAddr, }, - globalMode: ModeDisabled, - checks: []limitCheck{}, - expectErr: nil, - expectLog: false, + globalMode: ModeDisabled, + checks: []limitCheck{}, + expectErr: nil, + expectLog: false, + expectMetric: false, }, "global read limit within allowance": { op: Operation{ @@ -146,8 +163,9 @@ func TestHandler(t *testing.T) { checks: []limitCheck{ {limit: globalRead, allow: true}, }, - expectErr: nil, - expectLog: false, + expectErr: nil, + expectLog: false, + expectMetric: false, }, "global read limit exceeded (permissive)": { op: Operation{ @@ -159,8 +177,11 @@ func TestHandler(t *testing.T) { checks: []limitCheck{ {limit: globalRead, allow: false}, }, - expectErr: nil, - expectLog: true, + expectErr: nil, + expectLog: true, + expectMetric: true, + expectMetricName: "consul.rate_limit;limit_type=global/read;op=Foo.Bar;mode=permissive", + expectMetricCount: 1, }, "global read limit exceeded (enforcing, leader)": { op: Operation{ @@ -172,9 +193,12 @@ func TestHandler(t *testing.T) { checks: []limitCheck{ {limit: globalRead, allow: false}, }, - isLeader: true, - expectErr: ErrRetryElsewhere, - expectLog: true, + isLeader: true, + expectErr: ErrRetryElsewhere, + expectLog: true, + expectMetric: true, + expectMetricName: "consul.rate_limit;limit_type=global/read;op=Foo.Bar;mode=enforcing", + expectMetricCount: 1, }, "global read limit exceeded (enforcing, follower)": { op: Operation{ @@ -186,13 +210,17 @@ func TestHandler(t *testing.T) { checks: []limitCheck{ {limit: globalRead, allow: false}, }, - isLeader: false, - expectErr: ErrRetryElsewhere, - expectLog: true, + isLeader: false, + expectErr: ErrRetryElsewhere, + expectLog: true, + expectMetric: true, + expectMetricName: "consul.rate_limit;limit_type=global/read;op=Foo.Bar;mode=enforcing", + expectMetricCount: 1, }, } for desc, tc := range testCases { t.Run(desc, func(t *testing.T) { + sink := metrics.TestSetupMetrics(t, "") limiter := newMockLimiter(t) limiter.On("UpdateConfig", mock.Anything, mock.Anything).Return() for _, c := range tc.checks { @@ -224,6 +252,10 @@ func TestHandler(t *testing.T) { } else { require.Zero(t, output.Len(), "expected no logs to be emitted") } + + if tc.expectMetric { + metrics.AssertCounter(t, sink, tc.expectMetricName, tc.expectMetricCount) + } }) } } diff --git a/agent/metrics/testing.go b/agent/metrics/testing.go new file mode 100644 index 000000000..53dac409e --- /dev/null +++ b/agent/metrics/testing.go @@ -0,0 +1,101 @@ +package metrics + +import ( + "bytes" + "fmt" + "testing" + "time" + + "github.com/armon/go-metrics" + "github.com/stretchr/testify/assert" +) + +// Returns an in memory metrics sink for tests to assert metrics are emitted. +// Do not enable t.Parallel() since this relies on the global metrics instance. +func TestSetupMetrics(t *testing.T, serviceName string) *metrics.InmemSink { + // Record for ages (5 mins) so we can be confident that our assertions won't + // fail on silly long test runs due to dropped data. + s := metrics.NewInmemSink(10*time.Second, 300*time.Second) + cfg := metrics.DefaultConfig(serviceName) + cfg.EnableHostname = false + cfg.EnableRuntimeMetrics = false + metrics.NewGlobal(cfg, s) + return s +} + +// Asserts that a counter metric has the given value +func AssertCounter(t *testing.T, sink *metrics.InmemSink, name string, value float64) { + t.Helper() + + data := sink.Data() + + var got float64 + for _, intv := range data { + intv.RLock() + // Note that InMemSink uses SampledValue and treats the _Sum_ not the Count + // as the entire value. + if sample, ok := intv.Counters[name]; ok { + got += sample.Sum + } + intv.RUnlock() + } + + if !assert.Equal(t, value, got) { + // no nice way to dump this - this is copied from private method in + // InMemSink used for dumping to stdout on SIGUSR1. + buf := bytes.NewBuffer(nil) + for _, intv := range data { + intv.RLock() + for name, val := range intv.Gauges { + fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val.Value) + } + for name, vals := range intv.Points { + for _, val := range vals { + fmt.Fprintf(buf, "[%v][P] '%s': %0.3f\n", intv.Interval, name, val) + } + } + for name, agg := range intv.Counters { + fmt.Fprintf(buf, "[%v][C] '%s': %s\n", intv.Interval, name, agg.AggregateSample) + } + for name, agg := range intv.Samples { + fmt.Fprintf(buf, "[%v][S] '%s': %s\n", intv.Interval, name, agg.AggregateSample) + } + intv.RUnlock() + } + t.Log(buf.String()) + } +} + +// Asserts that a gauge metric has the current value +func AssertGauge(t *testing.T, sink *metrics.InmemSink, name string, value float32) { + t.Helper() + + data := sink.Data() + + // Loop backward through intervals until there is a non-empty one + // Addresses flakiness around recording to one interval but accessing during the next + var got float32 + for i := len(data) - 1; i >= 0; i-- { + currentInterval := data[i] + + currentInterval.RLock() + if len(currentInterval.Gauges) > 0 { + got = currentInterval.Gauges[name].Value + currentInterval.RUnlock() + break + } + currentInterval.RUnlock() + } + + if !assert.Equal(t, value, got) { + buf := bytes.NewBuffer(nil) + for _, intv := range data { + intv.RLock() + for name, val := range intv.Gauges { + fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val.Value) + } + intv.RUnlock() + } + t.Log(buf.String()) + } +} diff --git a/connect/proxy/listener_test.go b/connect/proxy/listener_test.go index 8a3006b69..e0f182b95 100644 --- a/connect/proxy/listener_test.go +++ b/connect/proxy/listener_test.go @@ -1,113 +1,21 @@ package proxy import ( - "bytes" "context" - "fmt" "net" "testing" - "time" "github.com/hashicorp/consul/connect" - metrics "github.com/armon/go-metrics" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" agConnect "github.com/hashicorp/consul/agent/connect" + agMetrics "github.com/hashicorp/consul/agent/metrics" "github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/sdk/freeport" "github.com/hashicorp/consul/sdk/testutil" ) -func testSetupMetrics(t *testing.T) *metrics.InmemSink { - // Record for ages (5 mins) so we can be confident that our assertions won't - // fail on silly long test runs due to dropped data. - s := metrics.NewInmemSink(10*time.Second, 300*time.Second) - cfg := metrics.DefaultConfig("consul.proxy.test") - cfg.EnableHostname = false - cfg.EnableRuntimeMetrics = false - metrics.NewGlobal(cfg, s) - return s -} - -func assertCurrentGaugeValue(t *testing.T, sink *metrics.InmemSink, - name string, value float32) { - t.Helper() - - data := sink.Data() - - // Loop backward through intervals until there is a non-empty one - // Addresses flakiness around recording to one interval but accessing during the next - var got float32 - for i := len(data) - 1; i >= 0; i-- { - currentInterval := data[i] - - currentInterval.RLock() - if len(currentInterval.Gauges) > 0 { - got = currentInterval.Gauges[name].Value - currentInterval.RUnlock() - break - } - currentInterval.RUnlock() - } - - if !assert.Equal(t, value, got) { - buf := bytes.NewBuffer(nil) - for _, intv := range data { - intv.RLock() - for name, val := range intv.Gauges { - fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val.Value) - } - intv.RUnlock() - } - t.Log(buf.String()) - } -} - -func assertAllTimeCounterValue(t *testing.T, sink *metrics.InmemSink, - name string, value float64) { - t.Helper() - - data := sink.Data() - - var got float64 - for _, intv := range data { - intv.RLock() - // Note that InMemSink uses SampledValue and treats the _Sum_ not the Count - // as the entire value. - if sample, ok := intv.Counters[name]; ok { - got += sample.Sum - } - intv.RUnlock() - } - - if !assert.Equal(t, value, got) { - // no nice way to dump this - this is copied from private method in - // InMemSink used for dumping to stdout on SIGUSR1. - buf := bytes.NewBuffer(nil) - for _, intv := range data { - intv.RLock() - for name, val := range intv.Gauges { - fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val.Value) - } - for name, vals := range intv.Points { - for _, val := range vals { - fmt.Fprintf(buf, "[%v][P] '%s': %0.3f\n", intv.Interval, name, val) - } - } - for name, agg := range intv.Counters { - fmt.Fprintf(buf, "[%v][C] '%s': %s\n", intv.Interval, name, agg.AggregateSample) - } - for name, agg := range intv.Samples { - fmt.Fprintf(buf, "[%v][S] '%s': %s\n", intv.Interval, name, agg.AggregateSample) - } - intv.RUnlock() - } - t.Log(buf.String()) - } -} - func TestPublicListener(t *testing.T) { // Can't enable t.Parallel since we rely on the global metrics instance. @@ -125,7 +33,7 @@ func TestPublicListener(t *testing.T) { } // Setup metrics to test they are recorded - sink := testSetupMetrics(t) + sink := agMetrics.TestSetupMetrics(t, "consul.proxy.test") svc := connect.TestService(t, "db", ca) l := NewPublicListener(svc, cfg, testutil.Logger(t)) @@ -150,14 +58,14 @@ func TestPublicListener(t *testing.T) { TestEchoConn(t, conn, "") // Check active conn is tracked in gauges - assertCurrentGaugeValue(t, sink, "consul.proxy.test.inbound.conns;dst=db", 1) + agMetrics.AssertGauge(t, sink, "consul.proxy.test.inbound.conns;dst=db", 1) // Close listener to ensure all conns are closed and have reported their metrics l.Close() // Check all the tx/rx counters got added - assertAllTimeCounterValue(t, sink, "consul.proxy.test.inbound.tx_bytes;dst=db", 11) - assertAllTimeCounterValue(t, sink, "consul.proxy.test.inbound.rx_bytes;dst=db", 11) + agMetrics.AssertCounter(t, sink, "consul.proxy.test.inbound.tx_bytes;dst=db", 11) + agMetrics.AssertCounter(t, sink, "consul.proxy.test.inbound.rx_bytes;dst=db", 11) } func TestUpstreamListener(t *testing.T) { @@ -183,7 +91,7 @@ func TestUpstreamListener(t *testing.T) { } // Setup metrics to test they are recorded - sink := testSetupMetrics(t) + sink := agMetrics.TestSetupMetrics(t, "consul.proxy.test") svc := connect.TestService(t, "web", ca) @@ -214,12 +122,12 @@ func TestUpstreamListener(t *testing.T) { TestEchoConn(t, conn, "") // Check active conn is tracked in gauges - assertCurrentGaugeValue(t, sink, "consul.proxy.test.upstream.conns;src=web;dst_type=service;dst=db", 1) + agMetrics.AssertGauge(t, sink, "consul.proxy.test.upstream.conns;src=web;dst_type=service;dst=db", 1) // Close listener to ensure all conns are closed and have reported their metrics l.Close() // Check all the tx/rx counters got added - assertAllTimeCounterValue(t, sink, "consul.proxy.test.upstream.tx_bytes;src=web;dst_type=service;dst=db", 11) - assertAllTimeCounterValue(t, sink, "consul.proxy.test.upstream.rx_bytes;src=web;dst_type=service;dst=db", 11) + agMetrics.AssertCounter(t, sink, "consul.proxy.test.upstream.tx_bytes;src=web;dst_type=service;dst=db", 11) + agMetrics.AssertCounter(t, sink, "consul.proxy.test.upstream.rx_bytes;src=web;dst_type=service;dst=db", 11) }