emit metrics for global rate limiting (#15891)
This commit is contained in:
parent
c088753e7b
commit
2a90faa4b1
|
@ -9,9 +9,9 @@ import (
|
||||||
"reflect"
|
"reflect"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/armon/go-metrics"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/consul/multilimiter"
|
"github.com/hashicorp/consul/agent/consul/multilimiter"
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -214,6 +214,21 @@ func (h *Handler) Allow(op Operation) error {
|
||||||
"limit_enforced", enforced,
|
"limit_enforced", enforced,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
metrics.IncrCounterWithLabels([]string{"consul", "rate_limit"}, 1, []metrics.Label{
|
||||||
|
{
|
||||||
|
Name: "limit_type",
|
||||||
|
Value: l.desc,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "op",
|
||||||
|
Value: op.Name,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "mode",
|
||||||
|
Value: l.mode.String(),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
if enforced {
|
if enforced {
|
||||||
// TODO(NET-1382) - use the logger to print rate limiter logs.
|
// TODO(NET-1382) - use the logger to print rate limiter logs.
|
||||||
if h.leaderStatusProvider.IsLeader() && op.Type == OperationTypeWrite {
|
if h.leaderStatusProvider.IsLeader() && op.Type == OperationTypeWrite {
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/consul/multilimiter"
|
"github.com/hashicorp/consul/agent/consul/multilimiter"
|
||||||
|
"github.com/hashicorp/consul/agent/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
//
|
//
|
||||||
|
@ -42,12 +43,15 @@ func TestHandler(t *testing.T) {
|
||||||
allow bool
|
allow bool
|
||||||
}
|
}
|
||||||
testCases := map[string]struct {
|
testCases := map[string]struct {
|
||||||
op Operation
|
op Operation
|
||||||
globalMode Mode
|
globalMode Mode
|
||||||
checks []limitCheck
|
checks []limitCheck
|
||||||
isLeader bool
|
isLeader bool
|
||||||
expectErr error
|
expectErr error
|
||||||
expectLog bool
|
expectLog bool
|
||||||
|
expectMetric bool
|
||||||
|
expectMetricName string
|
||||||
|
expectMetricCount float64
|
||||||
}{
|
}{
|
||||||
"operation exempt from limiting": {
|
"operation exempt from limiting": {
|
||||||
op: Operation{
|
op: Operation{
|
||||||
|
@ -55,10 +59,11 @@ func TestHandler(t *testing.T) {
|
||||||
Name: rpcName,
|
Name: rpcName,
|
||||||
SourceAddr: sourceAddr,
|
SourceAddr: sourceAddr,
|
||||||
},
|
},
|
||||||
globalMode: ModeEnforcing,
|
globalMode: ModeEnforcing,
|
||||||
checks: []limitCheck{},
|
checks: []limitCheck{},
|
||||||
expectErr: nil,
|
expectErr: nil,
|
||||||
expectLog: false,
|
expectLog: false,
|
||||||
|
expectMetric: false,
|
||||||
},
|
},
|
||||||
"global write limit disabled": {
|
"global write limit disabled": {
|
||||||
op: Operation{
|
op: Operation{
|
||||||
|
@ -66,10 +71,11 @@ func TestHandler(t *testing.T) {
|
||||||
Name: rpcName,
|
Name: rpcName,
|
||||||
SourceAddr: sourceAddr,
|
SourceAddr: sourceAddr,
|
||||||
},
|
},
|
||||||
globalMode: ModeDisabled,
|
globalMode: ModeDisabled,
|
||||||
checks: []limitCheck{},
|
checks: []limitCheck{},
|
||||||
expectErr: nil,
|
expectErr: nil,
|
||||||
expectLog: false,
|
expectLog: false,
|
||||||
|
expectMetric: false,
|
||||||
},
|
},
|
||||||
"global write limit within allowance": {
|
"global write limit within allowance": {
|
||||||
op: Operation{
|
op: Operation{
|
||||||
|
@ -81,8 +87,9 @@ func TestHandler(t *testing.T) {
|
||||||
checks: []limitCheck{
|
checks: []limitCheck{
|
||||||
{limit: globalWrite, allow: true},
|
{limit: globalWrite, allow: true},
|
||||||
},
|
},
|
||||||
expectErr: nil,
|
expectErr: nil,
|
||||||
expectLog: false,
|
expectLog: false,
|
||||||
|
expectMetric: false,
|
||||||
},
|
},
|
||||||
"global write limit exceeded (permissive)": {
|
"global write limit exceeded (permissive)": {
|
||||||
op: Operation{
|
op: Operation{
|
||||||
|
@ -94,8 +101,11 @@ func TestHandler(t *testing.T) {
|
||||||
checks: []limitCheck{
|
checks: []limitCheck{
|
||||||
{limit: globalWrite, allow: false},
|
{limit: globalWrite, allow: false},
|
||||||
},
|
},
|
||||||
expectErr: nil,
|
expectErr: nil,
|
||||||
expectLog: true,
|
expectLog: true,
|
||||||
|
expectMetric: true,
|
||||||
|
expectMetricName: "consul.rate_limit;limit_type=global/write;op=Foo.Bar;mode=permissive",
|
||||||
|
expectMetricCount: 1,
|
||||||
},
|
},
|
||||||
"global write limit exceeded (enforcing, leader)": {
|
"global write limit exceeded (enforcing, leader)": {
|
||||||
op: Operation{
|
op: Operation{
|
||||||
|
@ -107,9 +117,12 @@ func TestHandler(t *testing.T) {
|
||||||
checks: []limitCheck{
|
checks: []limitCheck{
|
||||||
{limit: globalWrite, allow: false},
|
{limit: globalWrite, allow: false},
|
||||||
},
|
},
|
||||||
isLeader: true,
|
isLeader: true,
|
||||||
expectErr: ErrRetryLater,
|
expectErr: ErrRetryLater,
|
||||||
expectLog: true,
|
expectLog: true,
|
||||||
|
expectMetric: true,
|
||||||
|
expectMetricName: "consul.rate_limit;limit_type=global/write;op=Foo.Bar;mode=enforcing",
|
||||||
|
expectMetricCount: 1,
|
||||||
},
|
},
|
||||||
"global write limit exceeded (enforcing, follower)": {
|
"global write limit exceeded (enforcing, follower)": {
|
||||||
op: Operation{
|
op: Operation{
|
||||||
|
@ -121,9 +134,12 @@ func TestHandler(t *testing.T) {
|
||||||
checks: []limitCheck{
|
checks: []limitCheck{
|
||||||
{limit: globalWrite, allow: false},
|
{limit: globalWrite, allow: false},
|
||||||
},
|
},
|
||||||
isLeader: false,
|
isLeader: false,
|
||||||
expectErr: ErrRetryElsewhere,
|
expectErr: ErrRetryElsewhere,
|
||||||
expectLog: true,
|
expectLog: true,
|
||||||
|
expectMetric: true,
|
||||||
|
expectMetricName: "consul.rate_limit;limit_type=global/write;op=Foo.Bar;mode=enforcing",
|
||||||
|
expectMetricCount: 1,
|
||||||
},
|
},
|
||||||
"global read limit disabled": {
|
"global read limit disabled": {
|
||||||
op: Operation{
|
op: Operation{
|
||||||
|
@ -131,10 +147,11 @@ func TestHandler(t *testing.T) {
|
||||||
Name: rpcName,
|
Name: rpcName,
|
||||||
SourceAddr: sourceAddr,
|
SourceAddr: sourceAddr,
|
||||||
},
|
},
|
||||||
globalMode: ModeDisabled,
|
globalMode: ModeDisabled,
|
||||||
checks: []limitCheck{},
|
checks: []limitCheck{},
|
||||||
expectErr: nil,
|
expectErr: nil,
|
||||||
expectLog: false,
|
expectLog: false,
|
||||||
|
expectMetric: false,
|
||||||
},
|
},
|
||||||
"global read limit within allowance": {
|
"global read limit within allowance": {
|
||||||
op: Operation{
|
op: Operation{
|
||||||
|
@ -146,8 +163,9 @@ func TestHandler(t *testing.T) {
|
||||||
checks: []limitCheck{
|
checks: []limitCheck{
|
||||||
{limit: globalRead, allow: true},
|
{limit: globalRead, allow: true},
|
||||||
},
|
},
|
||||||
expectErr: nil,
|
expectErr: nil,
|
||||||
expectLog: false,
|
expectLog: false,
|
||||||
|
expectMetric: false,
|
||||||
},
|
},
|
||||||
"global read limit exceeded (permissive)": {
|
"global read limit exceeded (permissive)": {
|
||||||
op: Operation{
|
op: Operation{
|
||||||
|
@ -159,8 +177,11 @@ func TestHandler(t *testing.T) {
|
||||||
checks: []limitCheck{
|
checks: []limitCheck{
|
||||||
{limit: globalRead, allow: false},
|
{limit: globalRead, allow: false},
|
||||||
},
|
},
|
||||||
expectErr: nil,
|
expectErr: nil,
|
||||||
expectLog: true,
|
expectLog: true,
|
||||||
|
expectMetric: true,
|
||||||
|
expectMetricName: "consul.rate_limit;limit_type=global/read;op=Foo.Bar;mode=permissive",
|
||||||
|
expectMetricCount: 1,
|
||||||
},
|
},
|
||||||
"global read limit exceeded (enforcing, leader)": {
|
"global read limit exceeded (enforcing, leader)": {
|
||||||
op: Operation{
|
op: Operation{
|
||||||
|
@ -172,9 +193,12 @@ func TestHandler(t *testing.T) {
|
||||||
checks: []limitCheck{
|
checks: []limitCheck{
|
||||||
{limit: globalRead, allow: false},
|
{limit: globalRead, allow: false},
|
||||||
},
|
},
|
||||||
isLeader: true,
|
isLeader: true,
|
||||||
expectErr: ErrRetryElsewhere,
|
expectErr: ErrRetryElsewhere,
|
||||||
expectLog: true,
|
expectLog: true,
|
||||||
|
expectMetric: true,
|
||||||
|
expectMetricName: "consul.rate_limit;limit_type=global/read;op=Foo.Bar;mode=enforcing",
|
||||||
|
expectMetricCount: 1,
|
||||||
},
|
},
|
||||||
"global read limit exceeded (enforcing, follower)": {
|
"global read limit exceeded (enforcing, follower)": {
|
||||||
op: Operation{
|
op: Operation{
|
||||||
|
@ -186,13 +210,17 @@ func TestHandler(t *testing.T) {
|
||||||
checks: []limitCheck{
|
checks: []limitCheck{
|
||||||
{limit: globalRead, allow: false},
|
{limit: globalRead, allow: false},
|
||||||
},
|
},
|
||||||
isLeader: false,
|
isLeader: false,
|
||||||
expectErr: ErrRetryElsewhere,
|
expectErr: ErrRetryElsewhere,
|
||||||
expectLog: true,
|
expectLog: true,
|
||||||
|
expectMetric: true,
|
||||||
|
expectMetricName: "consul.rate_limit;limit_type=global/read;op=Foo.Bar;mode=enforcing",
|
||||||
|
expectMetricCount: 1,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for desc, tc := range testCases {
|
for desc, tc := range testCases {
|
||||||
t.Run(desc, func(t *testing.T) {
|
t.Run(desc, func(t *testing.T) {
|
||||||
|
sink := metrics.TestSetupMetrics(t, "")
|
||||||
limiter := newMockLimiter(t)
|
limiter := newMockLimiter(t)
|
||||||
limiter.On("UpdateConfig", mock.Anything, mock.Anything).Return()
|
limiter.On("UpdateConfig", mock.Anything, mock.Anything).Return()
|
||||||
for _, c := range tc.checks {
|
for _, c := range tc.checks {
|
||||||
|
@ -224,6 +252,10 @@ func TestHandler(t *testing.T) {
|
||||||
} else {
|
} else {
|
||||||
require.Zero(t, output.Len(), "expected no logs to be emitted")
|
require.Zero(t, output.Len(), "expected no logs to be emitted")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if tc.expectMetric {
|
||||||
|
metrics.AssertCounter(t, sink, tc.expectMetricName, tc.expectMetricCount)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,101 @@
|
||||||
|
package metrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/armon/go-metrics"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Returns an in memory metrics sink for tests to assert metrics are emitted.
|
||||||
|
// Do not enable t.Parallel() since this relies on the global metrics instance.
|
||||||
|
func TestSetupMetrics(t *testing.T, serviceName string) *metrics.InmemSink {
|
||||||
|
// Record for ages (5 mins) so we can be confident that our assertions won't
|
||||||
|
// fail on silly long test runs due to dropped data.
|
||||||
|
s := metrics.NewInmemSink(10*time.Second, 300*time.Second)
|
||||||
|
cfg := metrics.DefaultConfig(serviceName)
|
||||||
|
cfg.EnableHostname = false
|
||||||
|
cfg.EnableRuntimeMetrics = false
|
||||||
|
metrics.NewGlobal(cfg, s)
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
// Asserts that a counter metric has the given value
|
||||||
|
func AssertCounter(t *testing.T, sink *metrics.InmemSink, name string, value float64) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
data := sink.Data()
|
||||||
|
|
||||||
|
var got float64
|
||||||
|
for _, intv := range data {
|
||||||
|
intv.RLock()
|
||||||
|
// Note that InMemSink uses SampledValue and treats the _Sum_ not the Count
|
||||||
|
// as the entire value.
|
||||||
|
if sample, ok := intv.Counters[name]; ok {
|
||||||
|
got += sample.Sum
|
||||||
|
}
|
||||||
|
intv.RUnlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
if !assert.Equal(t, value, got) {
|
||||||
|
// no nice way to dump this - this is copied from private method in
|
||||||
|
// InMemSink used for dumping to stdout on SIGUSR1.
|
||||||
|
buf := bytes.NewBuffer(nil)
|
||||||
|
for _, intv := range data {
|
||||||
|
intv.RLock()
|
||||||
|
for name, val := range intv.Gauges {
|
||||||
|
fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val.Value)
|
||||||
|
}
|
||||||
|
for name, vals := range intv.Points {
|
||||||
|
for _, val := range vals {
|
||||||
|
fmt.Fprintf(buf, "[%v][P] '%s': %0.3f\n", intv.Interval, name, val)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for name, agg := range intv.Counters {
|
||||||
|
fmt.Fprintf(buf, "[%v][C] '%s': %s\n", intv.Interval, name, agg.AggregateSample)
|
||||||
|
}
|
||||||
|
for name, agg := range intv.Samples {
|
||||||
|
fmt.Fprintf(buf, "[%v][S] '%s': %s\n", intv.Interval, name, agg.AggregateSample)
|
||||||
|
}
|
||||||
|
intv.RUnlock()
|
||||||
|
}
|
||||||
|
t.Log(buf.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Asserts that a gauge metric has the current value
|
||||||
|
func AssertGauge(t *testing.T, sink *metrics.InmemSink, name string, value float32) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
data := sink.Data()
|
||||||
|
|
||||||
|
// Loop backward through intervals until there is a non-empty one
|
||||||
|
// Addresses flakiness around recording to one interval but accessing during the next
|
||||||
|
var got float32
|
||||||
|
for i := len(data) - 1; i >= 0; i-- {
|
||||||
|
currentInterval := data[i]
|
||||||
|
|
||||||
|
currentInterval.RLock()
|
||||||
|
if len(currentInterval.Gauges) > 0 {
|
||||||
|
got = currentInterval.Gauges[name].Value
|
||||||
|
currentInterval.RUnlock()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
currentInterval.RUnlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
if !assert.Equal(t, value, got) {
|
||||||
|
buf := bytes.NewBuffer(nil)
|
||||||
|
for _, intv := range data {
|
||||||
|
intv.RLock()
|
||||||
|
for name, val := range intv.Gauges {
|
||||||
|
fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val.Value)
|
||||||
|
}
|
||||||
|
intv.RUnlock()
|
||||||
|
}
|
||||||
|
t.Log(buf.String())
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,113 +1,21 @@
|
||||||
package proxy
|
package proxy
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"net"
|
"net"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/hashicorp/consul/connect"
|
"github.com/hashicorp/consul/connect"
|
||||||
|
|
||||||
metrics "github.com/armon/go-metrics"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
agConnect "github.com/hashicorp/consul/agent/connect"
|
agConnect "github.com/hashicorp/consul/agent/connect"
|
||||||
|
agMetrics "github.com/hashicorp/consul/agent/metrics"
|
||||||
"github.com/hashicorp/consul/ipaddr"
|
"github.com/hashicorp/consul/ipaddr"
|
||||||
"github.com/hashicorp/consul/sdk/freeport"
|
"github.com/hashicorp/consul/sdk/freeport"
|
||||||
"github.com/hashicorp/consul/sdk/testutil"
|
"github.com/hashicorp/consul/sdk/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
func testSetupMetrics(t *testing.T) *metrics.InmemSink {
|
|
||||||
// Record for ages (5 mins) so we can be confident that our assertions won't
|
|
||||||
// fail on silly long test runs due to dropped data.
|
|
||||||
s := metrics.NewInmemSink(10*time.Second, 300*time.Second)
|
|
||||||
cfg := metrics.DefaultConfig("consul.proxy.test")
|
|
||||||
cfg.EnableHostname = false
|
|
||||||
cfg.EnableRuntimeMetrics = false
|
|
||||||
metrics.NewGlobal(cfg, s)
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|
||||||
func assertCurrentGaugeValue(t *testing.T, sink *metrics.InmemSink,
|
|
||||||
name string, value float32) {
|
|
||||||
t.Helper()
|
|
||||||
|
|
||||||
data := sink.Data()
|
|
||||||
|
|
||||||
// Loop backward through intervals until there is a non-empty one
|
|
||||||
// Addresses flakiness around recording to one interval but accessing during the next
|
|
||||||
var got float32
|
|
||||||
for i := len(data) - 1; i >= 0; i-- {
|
|
||||||
currentInterval := data[i]
|
|
||||||
|
|
||||||
currentInterval.RLock()
|
|
||||||
if len(currentInterval.Gauges) > 0 {
|
|
||||||
got = currentInterval.Gauges[name].Value
|
|
||||||
currentInterval.RUnlock()
|
|
||||||
break
|
|
||||||
}
|
|
||||||
currentInterval.RUnlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
if !assert.Equal(t, value, got) {
|
|
||||||
buf := bytes.NewBuffer(nil)
|
|
||||||
for _, intv := range data {
|
|
||||||
intv.RLock()
|
|
||||||
for name, val := range intv.Gauges {
|
|
||||||
fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val.Value)
|
|
||||||
}
|
|
||||||
intv.RUnlock()
|
|
||||||
}
|
|
||||||
t.Log(buf.String())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func assertAllTimeCounterValue(t *testing.T, sink *metrics.InmemSink,
|
|
||||||
name string, value float64) {
|
|
||||||
t.Helper()
|
|
||||||
|
|
||||||
data := sink.Data()
|
|
||||||
|
|
||||||
var got float64
|
|
||||||
for _, intv := range data {
|
|
||||||
intv.RLock()
|
|
||||||
// Note that InMemSink uses SampledValue and treats the _Sum_ not the Count
|
|
||||||
// as the entire value.
|
|
||||||
if sample, ok := intv.Counters[name]; ok {
|
|
||||||
got += sample.Sum
|
|
||||||
}
|
|
||||||
intv.RUnlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
if !assert.Equal(t, value, got) {
|
|
||||||
// no nice way to dump this - this is copied from private method in
|
|
||||||
// InMemSink used for dumping to stdout on SIGUSR1.
|
|
||||||
buf := bytes.NewBuffer(nil)
|
|
||||||
for _, intv := range data {
|
|
||||||
intv.RLock()
|
|
||||||
for name, val := range intv.Gauges {
|
|
||||||
fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val.Value)
|
|
||||||
}
|
|
||||||
for name, vals := range intv.Points {
|
|
||||||
for _, val := range vals {
|
|
||||||
fmt.Fprintf(buf, "[%v][P] '%s': %0.3f\n", intv.Interval, name, val)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for name, agg := range intv.Counters {
|
|
||||||
fmt.Fprintf(buf, "[%v][C] '%s': %s\n", intv.Interval, name, agg.AggregateSample)
|
|
||||||
}
|
|
||||||
for name, agg := range intv.Samples {
|
|
||||||
fmt.Fprintf(buf, "[%v][S] '%s': %s\n", intv.Interval, name, agg.AggregateSample)
|
|
||||||
}
|
|
||||||
intv.RUnlock()
|
|
||||||
}
|
|
||||||
t.Log(buf.String())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPublicListener(t *testing.T) {
|
func TestPublicListener(t *testing.T) {
|
||||||
// Can't enable t.Parallel since we rely on the global metrics instance.
|
// Can't enable t.Parallel since we rely on the global metrics instance.
|
||||||
|
|
||||||
|
@ -125,7 +33,7 @@ func TestPublicListener(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Setup metrics to test they are recorded
|
// Setup metrics to test they are recorded
|
||||||
sink := testSetupMetrics(t)
|
sink := agMetrics.TestSetupMetrics(t, "consul.proxy.test")
|
||||||
|
|
||||||
svc := connect.TestService(t, "db", ca)
|
svc := connect.TestService(t, "db", ca)
|
||||||
l := NewPublicListener(svc, cfg, testutil.Logger(t))
|
l := NewPublicListener(svc, cfg, testutil.Logger(t))
|
||||||
|
@ -150,14 +58,14 @@ func TestPublicListener(t *testing.T) {
|
||||||
TestEchoConn(t, conn, "")
|
TestEchoConn(t, conn, "")
|
||||||
|
|
||||||
// Check active conn is tracked in gauges
|
// Check active conn is tracked in gauges
|
||||||
assertCurrentGaugeValue(t, sink, "consul.proxy.test.inbound.conns;dst=db", 1)
|
agMetrics.AssertGauge(t, sink, "consul.proxy.test.inbound.conns;dst=db", 1)
|
||||||
|
|
||||||
// Close listener to ensure all conns are closed and have reported their metrics
|
// Close listener to ensure all conns are closed and have reported their metrics
|
||||||
l.Close()
|
l.Close()
|
||||||
|
|
||||||
// Check all the tx/rx counters got added
|
// Check all the tx/rx counters got added
|
||||||
assertAllTimeCounterValue(t, sink, "consul.proxy.test.inbound.tx_bytes;dst=db", 11)
|
agMetrics.AssertCounter(t, sink, "consul.proxy.test.inbound.tx_bytes;dst=db", 11)
|
||||||
assertAllTimeCounterValue(t, sink, "consul.proxy.test.inbound.rx_bytes;dst=db", 11)
|
agMetrics.AssertCounter(t, sink, "consul.proxy.test.inbound.rx_bytes;dst=db", 11)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUpstreamListener(t *testing.T) {
|
func TestUpstreamListener(t *testing.T) {
|
||||||
|
@ -183,7 +91,7 @@ func TestUpstreamListener(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Setup metrics to test they are recorded
|
// Setup metrics to test they are recorded
|
||||||
sink := testSetupMetrics(t)
|
sink := agMetrics.TestSetupMetrics(t, "consul.proxy.test")
|
||||||
|
|
||||||
svc := connect.TestService(t, "web", ca)
|
svc := connect.TestService(t, "web", ca)
|
||||||
|
|
||||||
|
@ -214,12 +122,12 @@ func TestUpstreamListener(t *testing.T) {
|
||||||
TestEchoConn(t, conn, "")
|
TestEchoConn(t, conn, "")
|
||||||
|
|
||||||
// Check active conn is tracked in gauges
|
// Check active conn is tracked in gauges
|
||||||
assertCurrentGaugeValue(t, sink, "consul.proxy.test.upstream.conns;src=web;dst_type=service;dst=db", 1)
|
agMetrics.AssertGauge(t, sink, "consul.proxy.test.upstream.conns;src=web;dst_type=service;dst=db", 1)
|
||||||
|
|
||||||
// Close listener to ensure all conns are closed and have reported their metrics
|
// Close listener to ensure all conns are closed and have reported their metrics
|
||||||
l.Close()
|
l.Close()
|
||||||
|
|
||||||
// Check all the tx/rx counters got added
|
// Check all the tx/rx counters got added
|
||||||
assertAllTimeCounterValue(t, sink, "consul.proxy.test.upstream.tx_bytes;src=web;dst_type=service;dst=db", 11)
|
agMetrics.AssertCounter(t, sink, "consul.proxy.test.upstream.tx_bytes;src=web;dst_type=service;dst=db", 11)
|
||||||
assertAllTimeCounterValue(t, sink, "consul.proxy.test.upstream.rx_bytes;src=web;dst_type=service;dst=db", 11)
|
agMetrics.AssertCounter(t, sink, "consul.proxy.test.upstream.rx_bytes;src=web;dst_type=service;dst=db", 11)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue