Wire up the rate limiter to net/rpc calls (#15879)

This commit is contained in:
Semir Patel 2023-01-04 13:38:44 -06:00 committed by GitHub
parent e3997b9533
commit 8242459c66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 38 additions and 8 deletions

View File

@ -109,6 +109,7 @@ type RequestLimitsHandler interface {
Run(ctx context.Context) Run(ctx context.Context)
Allow(op Operation) error Allow(op Operation) error
UpdateConfig(cfg HandlerConfig) UpdateConfig(cfg HandlerConfig)
Register(leaderStatusProvider LeaderStatusProvider)
} }
// Handler enforces rate limits for incoming RPCs. // Handler enforces rate limits for incoming RPCs.
@ -310,3 +311,5 @@ func (nullRequestLimitsHandler) Allow(Operation) error { return nil }
func (nullRequestLimitsHandler) Run(ctx context.Context) {} func (nullRequestLimitsHandler) Run(ctx context.Context) {}
func (nullRequestLimitsHandler) UpdateConfig(cfg HandlerConfig) {} func (nullRequestLimitsHandler) UpdateConfig(cfg HandlerConfig) {}
func (nullRequestLimitsHandler) Register(leaderStatusProvider LeaderStatusProvider) {}

View File

@ -37,6 +37,11 @@ func (_m *MockRequestLimitsHandler) UpdateConfig(cfg HandlerConfig) {
_m.Called(cfg) _m.Called(cfg)
} }
// Register provides a mock function with given fields: leaderStatusProvider
func (_m *MockRequestLimitsHandler) Register(leaderStatusProvider LeaderStatusProvider) {
_m.Called(leaderStatusProvider)
}
type mockConstructorTestingTNewMockRequestLimitsHandler interface { type mockConstructorTestingTNewMockRequestLimitsHandler interface {
mock.TestingT mock.TestingT
Cleanup(func()) Cleanup(func())

View File

@ -469,14 +469,14 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
incomingRPCLimiter: incomingRPCLimiter, incomingRPCLimiter: incomingRPCLimiter,
} }
incomingRPCLimiter.Register(s)
s.hcpManager = hcp.NewManager(hcp.ManagerConfig{ s.hcpManager = hcp.NewManager(hcp.ManagerConfig{
Client: flat.HCP.Client, Client: flat.HCP.Client,
StatusFn: s.hcpServerStatus(flat), StatusFn: s.hcpServerStatus(flat),
Logger: logger.Named("hcp_manager"), Logger: logger.Named("hcp_manager"),
}) })
s.incomingRPCLimiter.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
var recorder *middleware.RequestRecorder var recorder *middleware.RequestRecorder
if flat.NewRequestRecorderFunc != nil { if flat.NewRequestRecorderFunc != nil {
recorder = flat.NewRequestRecorderFunc(serverLogger, s.IsLeader, s.config.Datacenter) recorder = flat.NewRequestRecorderFunc(serverLogger, s.IsLeader, s.config.Datacenter)
@ -487,15 +487,19 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
return nil, fmt.Errorf("cannot initialize server with a nil RPC request recorder") return nil, fmt.Errorf("cannot initialize server with a nil RPC request recorder")
} }
if flat.GetNetRPCInterceptorFunc == nil { rpcServerOpts := []func(*rpc.Server){
s.rpcServer = rpc.NewServer() rpc.WithPreBodyInterceptor(middleware.GetNetRPCRateLimitingInterceptor(s.incomingRPCLimiter)),
s.insecureRPCServer = rpc.NewServer()
} else {
s.rpcServer = rpc.NewServerWithOpts(rpc.WithServerServiceCallInterceptor(flat.GetNetRPCInterceptorFunc(recorder)))
s.insecureRPCServer = rpc.NewServerWithOpts(rpc.WithServerServiceCallInterceptor(flat.GetNetRPCInterceptorFunc(recorder)))
} }
if flat.GetNetRPCInterceptorFunc != nil {
rpcServerOpts = append(rpcServerOpts, rpc.WithServerServiceCallInterceptor(flat.GetNetRPCInterceptorFunc(recorder)))
}
s.rpcServer = rpc.NewServerWithOpts(rpcServerOpts...)
s.insecureRPCServer = rpc.NewServerWithOpts(rpcServerOpts...)
s.rpcRecorder = recorder s.rpcRecorder = recorder
s.incomingRPCLimiter.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
go s.publisher.Run(&lib.StopChannelContext{StopCh: s.shutdownCh}) go s.publisher.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})

View File

@ -1,6 +1,7 @@
package middleware package middleware
import ( import (
"net"
"reflect" "reflect"
"strconv" "strconv"
"strings" "strings"
@ -9,6 +10,7 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus" "github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/consul-net-rpc/net/rpc" "github.com/hashicorp/consul-net-rpc/net/rpc"
rpcRate "github.com/hashicorp/consul/agent/consul/rate"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
) )
@ -157,3 +159,19 @@ func GetNetRPCInterceptor(recorder *RequestRecorder) rpc.ServerServiceCallInterc
recorder.Record(reqServiceMethod, RPCTypeNetRPC, reqStart, argv.Interface(), err != nil) recorder.Record(reqServiceMethod, RPCTypeNetRPC, reqStart, argv.Interface(), err != nil)
} }
} }
func GetNetRPCRateLimitingInterceptor(requestLimitsHandler rpcRate.RequestLimitsHandler) rpc.PreBodyInterceptor {
return func(reqServiceMethod string, sourceAddr net.Addr) error {
op := rpcRate.Operation{
Name: reqServiceMethod,
SourceAddr: sourceAddr,
Type: rpcRateLimitSpecs[reqServiceMethod],
}
// net/rpc does not provide a way to encode the nuances of the
// error response (retry or retry elsewhere) so the error string
// from the rate limiter is all that we have.
return requestLimitsHandler.Allow(op)
}
}