inject logger and create logdrop sink (#15822)

* inject logger and create logdrop sink

* init sink with an empty struct instead of nil

* wrap a logger instead of a sink and add a discard logger to avoid double logging

* fix compile errors

* fix linter errors

* Fix bug where log arguments aren't properly formatted

* Move log sink construction outside of handler

* Add prometheus definition and docs for log drop counter

Co-authored-by: Daniel Upton <daniel@floppy.co>
This commit is contained in:
Dhia Ayachi 2023-01-06 13:33:53 -05:00 committed by GitHub
parent 01a0142d1f
commit f17bc5ed73
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 92 additions and 73 deletions

View file

@ -603,7 +603,12 @@ func (a *Agent) Start(ctx context.Context) error {
// Setup either the client or the server.
if c.ServerMode {
serverLogger := a.baseDeps.Logger.NamedIntercept(logging.ConsulServer)
incomingRPCLimiter := consul.ConfiguredIncomingRPCLimiter(serverLogger, consulCfg)
incomingRPCLimiter := consul.ConfiguredIncomingRPCLimiter(
&lib.StopChannelContext{StopCh: a.shutdownCh},
serverLogger,
consulCfg,
)
a.externalGRPCServer = external.NewServer(
a.logger.Named("grpc.external"),

View file

@ -9,8 +9,9 @@ import (
"reflect"
"sync/atomic"
"github.com/hashicorp/consul/agent/consul/multilimiter"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/consul/multilimiter"
)
var (
@ -119,8 +120,6 @@ type Handler struct {
limiter multilimiter.RateLimiter
// TODO: replace this with the real logger.
// https://github.com/hashicorp/consul/pull/15822
logger hclog.Logger
}
@ -205,8 +204,7 @@ func (h *Handler) Allow(op Operation) error {
continue
}
// TODO: metrics.
// TODO: is this the correct log-level?
// TODO(NET-1382): is this the correct log-level?
enforced := l.mode == ModeEnforcing
h.logger.Trace("RPC exceeded allowed rate limit",
@ -217,6 +215,7 @@ func (h *Handler) Allow(op Operation) error {
)
if enforced {
// TODO(NET-1382) - use the logger to print rate limiter logs.
if h.leaderStatusProvider.IsLeader() && op.Type == OperationTypeWrite {
return ErrRetryLater
}

View file

@ -0,0 +1,10 @@
package rate
import "github.com/armon/go-metrics/prometheus"
var Counters = []prometheus.CounterDefinition{
{
Name: []string{"rpc", "rate_limit", "log_dropped"},
Help: "Increments whenever a log that is emitted because an RPC exceeded a rate limit gets dropped because the output buffer is full.",
},
}

View file

@ -49,6 +49,7 @@ import (
agentgrpc "github.com/hashicorp/consul/agent/grpc-internal"
"github.com/hashicorp/consul/agent/grpc-internal/services/subscribe"
"github.com/hashicorp/consul/agent/hcp"
logdrop "github.com/hashicorp/consul/agent/log-drop"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
@ -1846,7 +1847,7 @@ func (s *Server) hcpServerStatus(deps Deps) hcp.StatusCallback {
}
}
func ConfiguredIncomingRPCLimiter(serverLogger hclog.InterceptLogger, consulCfg *Config) *rpcRate.Handler {
func ConfiguredIncomingRPCLimiter(ctx context.Context, serverLogger hclog.InterceptLogger, consulCfg *Config) *rpcRate.Handler {
mlCfg := &multilimiter.Config{ReconcileCheckLimit: 30 * time.Second, ReconcileCheckInterval: time.Second}
limitsConfig := &RequestLimits{
Mode: rpcRate.RequestLimitsModeFromNameWithDefault(consulCfg.RequestLimitsMode),
@ -1854,14 +1855,15 @@ func ConfiguredIncomingRPCLimiter(serverLogger hclog.InterceptLogger, consulCfg
WriteRate: consulCfg.RequestLimitsWriteRate,
}
sink := logdrop.NewLogDropSink(ctx, 100, serverLogger.Named("rpc-rate-limit"), func(l logdrop.Log) {
metrics.IncrCounter([]string{"rpc", "rate_limit", "log_dropped"}, 1)
})
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{Output: io.Discard})
logger.RegisterSink(sink)
rateLimiterConfig := convertConsulConfigToRateLimitHandlerConfig(*limitsConfig, mlCfg)
incomingRPCLimiter := rpcRate.NewHandler(
*rateLimiterConfig,
serverLogger.Named("rpc-rate-limit"),
)
return incomingRPCLimiter
return rpcRate.NewHandler(*rateLimiterConfig, logger)
}
func convertConsulConfigToRateLimitHandlerConfig(limitsConfig RequestLimits, multilimiterConfig *multilimiter.Config) *rpcRate.HandlerConfig {

View file

@ -2,34 +2,33 @@ package logdrop
import (
"context"
"github.com/hashicorp/go-hclog"
)
// SinkAdapter mimic the interface from hclog.SinkAdapter
// Logger mimic the interface from hclog.Logger
//
//go:generate mockery --name SinkAdapter --inpackage
type SinkAdapter interface {
Accept(name string, level hclog.Level, msg string, args ...interface{})
//go:generate mockery --name Logger --inpackage
type Logger interface {
Log(level hclog.Level, msg string, args ...interface{})
}
type Log struct {
n string
s string
i []interface{}
l hclog.Level
}
type logDropSink struct {
sink SinkAdapter
logger Logger
logCh chan Log
name string
dropFn func(l Log)
}
// Accept consume a log and push it into a channel,
// if the channel is filled it will call dropFn
func (r *logDropSink) Accept(name string, level hclog.Level, msg string, args ...interface{}) {
r.pushLog(Log{n: name, l: level, s: msg, i: args})
func (r *logDropSink) Accept(_ string, level hclog.Level, msg string, args ...interface{}) {
r.pushLog(Log{l: level, s: msg, i: args})
}
func (r *logDropSink) pushLog(l Log) {
@ -44,21 +43,20 @@ func (r *logDropSink) logConsumer(ctx context.Context) {
for {
select {
case l := <-r.logCh:
r.sink.Accept(l.n, l.l, l.s, l.i)
r.logger.Log(l.l, l.s, l.i...)
case <-ctx.Done():
return
}
}
}
// NewLogDropSink create a log SinkAdapter that wrap another SinkAdapter
// NewLogDropSink create a log Logger that wrap another Logger
// It also create a go routine for consuming logs, the given context need to be canceled
// to properly deallocate the SinkAdapter.
func NewLogDropSink(ctx context.Context, name string, depth int, sink SinkAdapter, dropFn func(l Log)) hclog.SinkAdapter {
// to properly deallocate the Logger.
func NewLogDropSink(ctx context.Context, depth int, logger Logger, dropFn func(l Log)) hclog.SinkAdapter {
r := &logDropSink{
sink: sink,
logger: logger,
logCh: make(chan Log, depth),
name: name,
dropFn: dropFn,
}
go r.logConsumer(ctx)

View file

@ -2,37 +2,39 @@ package logdrop
import (
"context"
"github.com/hashicorp/consul/sdk/testutil/retry"
"testing"
"time"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"testing"
"time"
"github.com/hashicorp/consul/sdk/testutil/retry"
)
func TestNewLogDrop(t *testing.T) {
mockLogger := NewMockSinkAdapter(t)
mockLogger.On("Accept", "test Accept", hclog.Info, "hello", []interface{}{"test", 0}).Return()
ld := NewLogDropSink(context.Background(), "test", 10, mockLogger, func(_ Log) {})
mockLogger := NewMockLogger(t)
mockLogger.On("Log", hclog.Info, "hello", "test", 0).Return()
ld := NewLogDropSink(context.Background(), 10, mockLogger, func(_ Log) {})
require.NotNil(t, ld)
ld.Accept("test Accept", hclog.Info, "hello", "test", 0)
ld.Accept("test Log", hclog.Info, "hello", "test", 0)
retry.Run(t, func(r *retry.R) {
mockLogger.AssertNumberOfCalls(r, "Accept", 1)
mockLogger.AssertNumberOfCalls(r, "Log", 1)
})
}
func TestLogDroppedWhenChannelFilled(t *testing.T) {
mockLogger := NewMockSinkAdapter(t)
mockLogger := NewMockLogger(t)
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
block := make(chan interface{})
mockLogger.On("Accept", "test", hclog.Debug, "hello", []interface{}(nil)).Run(func(args mock.Arguments) {
mockLogger.On("Log", hclog.Debug, "hello").Run(func(args mock.Arguments) {
<-block
})
var called = make(chan interface{})
ld := NewLogDropSink(ctx, "test", 1, mockLogger, func(l Log) {
ld := NewLogDropSink(ctx, 1, mockLogger, func(l Log) {
close(called)
})
for i := 0; i < 2; i++ {
@ -46,6 +48,6 @@ func TestLogDroppedWhenChannelFilled(t *testing.T) {
t.Fatal("timeout waiting for drop func to be called")
}
retry.Run(t, func(r *retry.R) {
mockLogger.AssertNumberOfCalls(r, "Accept", 1)
mockLogger.AssertNumberOfCalls(r, "Log", 1)
})
}

View file

@ -0,0 +1,33 @@
// Code generated by mockery v2.12.2. DO NOT EDIT.
package logdrop
import (
hclog "github.com/hashicorp/go-hclog"
mock "github.com/stretchr/testify/mock"
testing "testing"
)
// MockLogger is an autogenerated mock type for the Logger type
type MockLogger struct {
mock.Mock
}
// Log provides a mock function with given fields: level, msg, args
func (_m *MockLogger) Log(level hclog.Level, msg string, args ...interface{}) {
var _ca []interface{}
_ca = append(_ca, level, msg)
_ca = append(_ca, args...)
_m.Called(_ca...)
}
// NewMockLogger creates a new instance of MockLogger. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockLogger(t testing.TB) *MockLogger {
mock := &MockLogger{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View file

@ -1,33 +0,0 @@
// Code generated by mockery v2.12.2. DO NOT EDIT.
package logdrop
import (
hclog "github.com/hashicorp/go-hclog"
mock "github.com/stretchr/testify/mock"
testing "testing"
)
// MockSinkAdapter is an autogenerated mock type for the SinkAdapter type
type MockSinkAdapter struct {
mock.Mock
}
// Accept provides a mock function with given fields: name, level, msg, args
func (_m *MockSinkAdapter) Accept(name string, level hclog.Level, msg string, args ...interface{}) {
var _ca []interface{}
_ca = append(_ca, name, level, msg)
_ca = append(_ca, args...)
_m.Called(_ca...)
}
// NewMockSinkAdapter creates a new instance of MockSinkAdapter. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockSinkAdapter(t testing.TB) *MockSinkAdapter {
mock := &MockSinkAdapter{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View file

@ -16,6 +16,7 @@ import (
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/consul/fsm"
"github.com/hashicorp/consul/agent/consul/rate"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/consul/usagemetrics"
"github.com/hashicorp/consul/agent/consul/xdscapacity"
@ -313,6 +314,7 @@ func getPrometheusDefs(cfg lib.TelemetryConfig, isServer bool) ([]prometheus.Gau
local.StateCounters,
xds.StatsCounters,
raftCounters,
rate.Counters,
}
// Flatten definitions
// NOTE(kit): Do we actually want to create a set here so we can ensure definition names are unique?

View file

@ -477,6 +477,7 @@ These metrics are used to monitor the health of the Consul servers.
| `consul.raft.transition.heartbeat_timeout` | The number of times an agent has transitioned to the Candidate state, after receive no heartbeat messages from the last known leader. | timeouts / interval | counter |
| `consul.raft.verify_leader` | This metric doesn't have a direct correlation to the leader change. It just counts the number of times an agent checks if it is still the leader or not. For example, during every consistent read, the check is done. Depending on the load in the system, this metric count can be high as it is incremented each time a consistent read is completed. | checks / interval | Counter |
| `consul.rpc.accept_conn` | Increments when a server accepts an RPC connection. | connections | counter |
| `consul.rpc.rate_limit.log_dropped` | Increments whenever a log that is emitted because an RPC exceeded a rate limit gets dropped because the output buffer is full. | log messages dropped | counter |
| `consul.catalog.register` | Measures the time it takes to complete a catalog register operation. | ms | timer |
| `consul.catalog.deregister` | Measures the time it takes to complete a catalog deregister operation. | ms | timer |
| `consul.server.isLeader` | Track if a server is a leader(1) or not(0) | 1 or 0 | gauge |