From 4894848993b07f9c2f58a2bdd02cba73d79ef150 Mon Sep 17 00:00:00 2001 From: Dan Upton Date: Tue, 13 Dec 2022 11:41:54 +0000 Subject: [PATCH] server: add placeholder glue for rate limit handler (#15539) Adds a no-op implementation of the rate-limit handler and exposes it on the consul.Server struct. It allows us to start working on the net/rpc and gRPC interceptors and config (re)loading logic, without having to implement the full handler up-front. Co-authored-by: John Murret Co-authored-by: Dhia Ayachi --- agent/consul/multilimiter/multilimiter.go | 5 +- agent/consul/rate/handler.go | 151 ++++++++++++++++++++++ agent/consul/server.go | 14 ++ 3 files changed, 168 insertions(+), 2 deletions(-) create mode 100644 agent/consul/rate/handler.go diff --git a/agent/consul/multilimiter/multilimiter.go b/agent/consul/multilimiter/multilimiter.go index 539cced1c..9de1bdaa1 100644 --- a/agent/consul/multilimiter/multilimiter.go +++ b/agent/consul/multilimiter/multilimiter.go @@ -3,11 +3,12 @@ package multilimiter import ( "bytes" "context" - radix "github.com/hashicorp/go-immutable-radix" - "golang.org/x/time/rate" "sync" "sync/atomic" "time" + + radix "github.com/hashicorp/go-immutable-radix" + "golang.org/x/time/rate" ) var _ RateLimiter = &MultiLimiter{} diff --git a/agent/consul/rate/handler.go b/agent/consul/rate/handler.go new file mode 100644 index 000000000..189c0fa0f --- /dev/null +++ b/agent/consul/rate/handler.go @@ -0,0 +1,151 @@ +// package rate implements server-side RPC rate limiting. +package rate + +import ( + "context" + "errors" + "net" + "sync/atomic" + + "github.com/hashicorp/consul/agent/consul/multilimiter" +) + +var ( + // ErrRetryElsewhere indicates that the operation was not allowed because the + // rate limit was exhausted, but may succeed on a different server. + // + // Results in a RESOURCE_EXHAUSTED or "429 Too Many Requests" response. + ErrRetryElsewhere = errors.New("rate limit exceeded, try a different server") + + // ErrRetryLater indicates that the operation was not allowed because the rate + // limit was exhausted, and trying a different server won't help (e.g. because + // the operation can only be performed on the leader). + // + // Results in an UNAVAILABLE or "503 Service Unavailable" response. + ErrRetryLater = errors.New("rate limit exceeded, try again later") +) + +// Mode determines the action that will be taken when a rate limit has been +// exhausted (e.g. log and allow, or reject). +type Mode int + +const ( + // ModePermissive causes the handler to log the rate-limited operation but + // still allow it to proceed. + ModePermissive Mode = iota + + // ModeEnforcing causes the handler to reject the rate-limted operation. + ModeEnforcing +) + +// OperationType is the type of operation the client is attempting to perform. +type OperationType int + +const ( + // OperationTypeRead represents a read operation. + OperationTypeRead OperationType = iota + + // OperationTypeWrite represents a write operation. + OperationTypeWrite +) + +// Operation the client is attempting to perform. +type Operation struct { + // Name of the RPC endpoint (e.g. "Foo.Bar" for net/rpc and "/foo.service/Bar" for gRPC). + Name string + + // SourceAddr is the client's (or forwarding server's) IP address. + SourceAddr net.Addr + + // Type of operation to be performed (e.g. read or write). + Type OperationType +} + +// Handler enforces rate limits for incoming RPCs. +type Handler struct { + cfg *atomic.Pointer[HandlerConfig] + delegate HandlerDelegate + + limiter multilimiter.RateLimiter +} + +type HandlerConfig struct { + multilimiter.Config + + // GlobalMode configures the action that will be taken when a global rate-limit + // has been exhausted. + // + // Note: in the future there'll be a separate Mode for IP-based limits. + GlobalMode Mode + + // GlobalWriteConfig configures the global rate limiter for write operations. + GlobalWriteConfig multilimiter.LimiterConfig + + // GlobalReadConfig configures the global rate limiter for read operations. + GlobalReadConfig multilimiter.LimiterConfig +} + +type HandlerDelegate interface { + // IsLeader is used to determine whether the operation is being performed + // against the cluster leader, such that if it can _only_ be performed by + // the leader (e.g. write operations) we don't tell clients to retry against + // a different server. + IsLeader() bool +} + +// NewHandler creates a new RPC rate limit handler. +func NewHandler(cfg HandlerConfig, delegate HandlerDelegate) *Handler { + limiter := multilimiter.NewMultiLimiter(cfg.Config) + limiter.UpdateConfig(cfg.GlobalWriteConfig, globalWrite) + limiter.UpdateConfig(cfg.GlobalReadConfig, globalRead) + + h := &Handler{ + cfg: new(atomic.Pointer[HandlerConfig]), + delegate: delegate, + limiter: limiter, + } + h.cfg.Store(&cfg) + + return h +} + +// Run the limiter cleanup routine until the given context is canceled. +// +// Note: this starts a goroutine. +func (h *Handler) Run(ctx context.Context) { + h.limiter.Run(ctx) +} + +// Allow returns an error if the given operation is not allowed to proceed +// because of an exhausted rate-limit. +func (h *Handler) Allow(op Operation) error { + // TODO(NET-1383): actually implement the rate limiting logic. + // + // Example: + // if !h.limiter.Allow(globalWrite) { + // } + return nil +} + +// TODO(NET-1379): call this on `consul reload`. +func (h *Handler) UpdateConfig(cfg HandlerConfig) { + h.cfg.Store(&cfg) + h.limiter.UpdateConfig(cfg.GlobalWriteConfig, globalWrite) + h.limiter.UpdateConfig(cfg.GlobalReadConfig, globalRead) +} + +var ( + // globalWrite identifies the global rate limit applied to write operations. + globalWrite = globalLimit("global.write") + + // globalRead identifies the global rate limit applied to read operations. + globalRead = globalLimit("global.read") +) + +// globalLimit represents a limit that applies to all writes or reads. +type globalLimit []byte + +// Key satisfies the multilimiter.LimitedEntity interface. +func (prefix globalLimit) Key() multilimiter.KeyType { + return multilimiter.Key(prefix, nil) +} diff --git a/agent/consul/server.go b/agent/consul/server.go index 24cf98e56..0d4a0f0f3 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -5,6 +5,7 @@ import ( "crypto/x509" "errors" "fmt" + "github.com/hashicorp/consul/agent/consul/multilimiter" "io" "net" "os" @@ -33,6 +34,7 @@ import ( "github.com/hashicorp/consul/agent/consul/authmethod" "github.com/hashicorp/consul/agent/consul/authmethod/ssoauth" "github.com/hashicorp/consul/agent/consul/fsm" + rpcRate "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/consul/usagemetrics" @@ -275,6 +277,9 @@ type Server struct { grpcHandler connHandler rpcServer *rpc.Server + // incomingRPCLimiter rate-limits incoming net/rpc and gRPC calls. + incomingRPCLimiter *rpcRate.Handler + // insecureRPCServer is a RPC server that is configure with // IncomingInsecureRPCConfig to allow clients to call AutoEncrypt.Sign // to request client certificates. At this point a client doesn't have @@ -462,6 +467,15 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Ser Logger: logger.Named("hcp_manager"), }) + // TODO(NET-1380, NET-1381): thread this into the net/rpc and gRPC interceptors. + s.incomingRPCLimiter = rpcRate.NewHandler(rpcRate.HandlerConfig{ + // TODO(server-rate-limit): revisit those value based on the multilimiter final implementation + Config: multilimiter.Config{ReconcileCheckLimit: 30 * time.Second, ReconcileCheckInterval: time.Second}, + // TODO(NET-1379): pass in _real_ configuration. + GlobalMode: rpcRate.ModePermissive, + }, s) + s.incomingRPCLimiter.Run(&lib.StopChannelContext{StopCh: s.shutdownCh}) + var recorder *middleware.RequestRecorder if flat.NewRequestRecorderFunc != nil { recorder = flat.NewRequestRecorderFunc(serverLogger, s.IsLeader, s.config.Datacenter)