From 700c693b33d8d2ed68e1c870d61a251662acc923 Mon Sep 17 00:00:00 2001 From: John Murret Date: Tue, 13 Dec 2022 13:09:55 -0700 Subject: [PATCH] adding config for request_limits (#15531) * server: add placeholder glue for rate limit handler This commit adds a no-op implementation of the rate-limit handler and adds it to the `consul.Server` struct and setup code. This allows us to start working on the net/rpc and gRPC interceptors and config logic. * Add handler errors * Set the global read and write limits * fixing multilimiter moving packages * Fix typo * Simplify globalLimit usage * add multilimiter and tests * exporting LimitedEntity * Apply suggestions from code review Co-authored-by: John Murret * add config update and rename config params * add doc string and split config * Apply suggestions from code review Co-authored-by: Dan Upton * use timer to avoid go routine leak and change the interface * add comments to tests * fix failing test * add prefix with config edge, refactor tests * Apply suggestions from code review Co-authored-by: Dan Upton * refactor to apply configs for limiters under a prefix * add fuzz tests and fix bugs found. Refactor reconcile loop to have a simpler logic * make KeyType an exported type * split the config and limiter trees to fix race conditions in config update * rename variables * fix race in test and remove dead code * fix reconcile loop to not create a timer on each loop * add extra benchmark tests and fix tests * fix benchmark test to pass value to func * server: add placeholder glue for rate limit handler This commit adds a no-op implementation of the rate-limit handler and adds it to the `consul.Server` struct and setup code. This allows us to start working on the net/rpc and gRPC interceptors and config logic. * Set the global read and write limits * fixing multilimiter moving packages * add server configuration for global rate limiting. * remove agent test * remove added stuff from handler * remove added stuff from multilimiter * removing unnecessary TODOs * Removing TODO comment from handler * adding in defaulting to infinite * add disabled status in there * adding in documentation for disabled mode. * make disabled the default. * Add mock and agent test * addig documentation and missing mock file. * Fixing test TestLoad_IntegrationWithFlags * updating docs based on PR feedback. * Updating Request Limits mode to use int based on PR feedback. * Adding RequestLimits struct so we have a nested struct in ReloadableConfig. * fixing linting references * Update agent/consul/rate/handler.go Co-authored-by: Dan Upton * Update agent/consul/config.go Co-authored-by: Dan Upton * removing the ignore of the request limits in JSON. addingbuilder logic to convert any read rate or write rate less than 0 to rate.Inf * added conversion function to convert request limits object to handler config. * Updating docs to reflect gRPC and RPC are rate limit and as a result, HTTP requests are as well. * Updating values for TestLoad_FullConfig() so that they were different and discernable. * Updating TestRuntimeConfig_Sanitize * Fixing TestLoad_IntegrationWithFlags test * putting nil check in place * fixing rebase * removing change for missing error checks. will put in another PR * Rebasing after default multilimiter config change * resolving rebase issues * updating reference for incomingRPCLimiter to use interface * updating interface * Updating interfaces * Fixing mock reference Co-authored-by: Daniel Upton Co-authored-by: Dhia Ayachi --- agent/agent.go | 18 +- agent/agent_test.go | 22 ++ agent/config/builder.go | 26 ++ agent/config/config.go | 25 +- agent/config/default.go | 5 + agent/config/runtime.go | 36 +- agent/config/runtime_test.go | 8 + .../TestRuntimeConfig_Sanitize.golden | 3 + agent/config/testdata/full-config.hcl | 5 + agent/config/testdata/full-config.json | 311 +++++++++++++----- agent/consul/config.go | 33 ++ agent/consul/multilimiter/multilimiter.go | 5 +- agent/consul/rate/handler.go | 62 +++- .../consul/rate/mock_RequestLimitsHandler.go | 53 +++ agent/consul/server.go | 51 ++- agent/consul/server_test.go | 30 +- agent/grpc-external/server.go | 3 +- agent/grpc-external/stats_test.go | 3 +- agent/grpc-internal/handler.go | 3 +- agent/grpc-internal/server_test.go | 4 +- .../services/subscribe/subscribe_test.go | 4 +- agent/grpc-internal/stats_test.go | 4 +- agent/grpc-middleware/mock_RateLimiter.go | 39 --- agent/grpc-middleware/rate.go | 16 +- agent/grpc-middleware/rate_test.go | 2 +- agent/rpc/peering/service_test.go | 3 +- docs/config/checklist-adding-config-fields.md | 2 +- .../docs/agent/config/config-files.mdx | 4 + 28 files changed, 601 insertions(+), 179 deletions(-) create mode 100644 agent/consul/rate/mock_RequestLimitsHandler.go delete mode 100644 agent/grpc-middleware/mock_RateLimiter.go diff --git a/agent/agent.go b/agent/agent.go index 670ec8591..3998ba3ab 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -40,6 +40,7 @@ import ( "github.com/hashicorp/consul/agent/checks" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/consul" + rpcRate "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/consul/servercert" "github.com/hashicorp/consul/agent/dns" external "github.com/hashicorp/consul/agent/grpc-external" @@ -564,7 +565,8 @@ func (a *Agent) Start(ctx context.Context) error { } // gRPC calls are only rate-limited on server, not client agents. - grpcRateLimiter := middleware.NullRateLimiter() + var grpcRateLimiter rpcRate.RequestLimitsHandler + grpcRateLimiter = rpcRate.NullRateLimiter() if s, ok := a.delegate.(*consul.Server); ok { grpcRateLimiter = s.IncomingRPCLimiter() } @@ -1479,6 +1481,10 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co cfg.PeeringEnabled = runtimeCfg.PeeringEnabled cfg.PeeringTestAllowPeerRegistrations = runtimeCfg.PeeringTestAllowPeerRegistrations + cfg.RequestLimitsMode = runtimeCfg.RequestLimitsMode.String() + cfg.RequestLimitsReadRate = runtimeCfg.RequestLimitsReadRate + cfg.RequestLimitsWriteRate = runtimeCfg.RequestLimitsWriteRate + enterpriseConsulConfig(cfg, runtimeCfg) return cfg, nil } @@ -4034,17 +4040,18 @@ func (a *Agent) reloadConfig(autoReload bool) error { {a.config.TLS.HTTPS, newCfg.TLS.HTTPS}, } { if f.oldCfg.KeyFile != f.newCfg.KeyFile { - err = a.configFileWatcher.Replace(f.oldCfg.KeyFile, f.newCfg.KeyFile) + a.configFileWatcher.Replace(f.oldCfg.KeyFile, f.newCfg.KeyFile) if err != nil { return err } } if f.oldCfg.CertFile != f.newCfg.CertFile { - err = a.configFileWatcher.Replace(f.oldCfg.CertFile, f.newCfg.CertFile) + a.configFileWatcher.Replace(f.oldCfg.CertFile, f.newCfg.CertFile) if err != nil { return err } } + if revertStaticConfig(f.oldCfg, f.newCfg) { a.logger.Warn("Changes to your configuration were detected that for security reasons cannot be automatically applied by 'auto_reload_config'. Manually reload your configuration (e.g. with 'consul reload') to apply these changes.", "StaticRuntimeConfig", f.oldCfg, "StaticRuntimeConfig From file", f.newCfg) } @@ -4145,6 +4152,11 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error { } cc := consul.ReloadableConfig{ + RequestLimits: &consul.RequestLimits{ + Mode: newCfg.RequestLimitsMode, + ReadRate: newCfg.RequestLimitsReadRate, + WriteRate: newCfg.RequestLimitsWriteRate, + }, RPCClientTimeout: newCfg.RPCClientTimeout, RPCRateLimit: newCfg.RPCRateLimit, RPCMaxBurst: newCfg.RPCMaxBurst, diff --git a/agent/agent_test.go b/agent/agent_test.go index d32c4981d..9d9f710a6 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -4243,6 +4243,28 @@ func TestAgent_consulConfig_RaftTrailingLogs(t *testing.T) { require.Equal(t, uint64(812345), a.consulConfig().RaftConfig.TrailingLogs) } +func TestAgent_consulConfig_RequestLimits(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + hcl := ` + limits { + request_limits { + mode = "enforcing" + read_rate = 8888 + write_rate = 9999 + } + } + ` + a := NewTestAgent(t, hcl) + defer a.Shutdown() + require.Equal(t, "enforcing", a.consulConfig().RequestLimitsMode) + require.Equal(t, rate.Limit(8888), a.consulConfig().RequestLimitsReadRate) + require.Equal(t, rate.Limit(9999), a.consulConfig().RequestLimitsWriteRate) +} + func TestAgent_grpcInjectAddr(t *testing.T) { tt := []struct { name string diff --git a/agent/config/builder.go b/agent/config/builder.go index f071bd206..a28ddab2a 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -32,6 +32,7 @@ import ( "github.com/hashicorp/consul/agent/connect/ca" "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/consul/authmethod/ssoauth" + consulrate "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/dns" "github.com/hashicorp/consul/agent/rpc/middleware" "github.com/hashicorp/consul/agent/structs" @@ -1045,6 +1046,9 @@ func (b *builder) build() (rt RuntimeConfig, err error) { ReconnectTimeoutLAN: b.durationVal("reconnect_timeout", c.ReconnectTimeoutLAN), ReconnectTimeoutWAN: b.durationVal("reconnect_timeout_wan", c.ReconnectTimeoutWAN), RejoinAfterLeave: boolVal(c.RejoinAfterLeave), + RequestLimitsMode: b.requestsLimitsModeVal(stringVal(c.Limits.RequestLimits.Mode)), + RequestLimitsReadRate: limitVal(c.Limits.RequestLimits.ReadRate), + RequestLimitsWriteRate: limitVal(c.Limits.RequestLimits.WriteRate), RetryJoinIntervalLAN: b.durationVal("retry_interval", c.RetryJoinIntervalLAN), RetryJoinIntervalWAN: b.durationVal("retry_interval_wan", c.RetryJoinIntervalWAN), RetryJoinLAN: b.expandAllOptionalAddrs("retry_join", c.RetryJoinLAN), @@ -1778,6 +1782,19 @@ func (b *builder) dnsRecursorStrategyVal(v string) dns.RecursorStrategy { return out } +func (b *builder) requestsLimitsModeVal(v string) consulrate.Mode { + var out consulrate.Mode + + mode, ok := consulrate.RequestLimitsModeFromName(v) + if !ok { + b.err = multierror.Append(b.err, fmt.Errorf("limits.request_limits.mode: invalid mode: %q", v)) + } else { + out = mode + } + + return out +} + func (b *builder) exposeConfVal(v *ExposeConfig) structs.ExposeConfig { var out structs.ExposeConfig if v == nil { @@ -1993,6 +2010,15 @@ func float64Val(v *float64) float64 { return float64ValWithDefault(v, 0) } +func limitVal(v *float64) rate.Limit { + f := float64Val(v) + if f < 0 { + return rate.Inf + } + + return rate.Limit(f) +} + func (b *builder) cidrsVal(name string, v []string) (nets []*net.IPNet) { if v == nil { return diff --git a/agent/config/config.go b/agent/config/config.go index e19922541..186ebb10a 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -712,16 +712,23 @@ type UnixSocket struct { User *string `mapstructure:"user"` } +type RequestLimits struct { + Mode *string `mapstructure:"mode"` + ReadRate *float64 `mapstructure:"read_rate"` + WriteRate *float64 `mapstructure:"write_rate"` +} + type Limits struct { - HTTPMaxConnsPerClient *int `mapstructure:"http_max_conns_per_client"` - HTTPSHandshakeTimeout *string `mapstructure:"https_handshake_timeout"` - RPCClientTimeout *string `mapstructure:"rpc_client_timeout"` - RPCHandshakeTimeout *string `mapstructure:"rpc_handshake_timeout"` - RPCMaxBurst *int `mapstructure:"rpc_max_burst"` - RPCMaxConnsPerClient *int `mapstructure:"rpc_max_conns_per_client"` - RPCRate *float64 `mapstructure:"rpc_rate"` - KVMaxValueSize *uint64 `mapstructure:"kv_max_value_size"` - TxnMaxReqLen *uint64 `mapstructure:"txn_max_req_len"` + HTTPMaxConnsPerClient *int `mapstructure:"http_max_conns_per_client"` + HTTPSHandshakeTimeout *string `mapstructure:"https_handshake_timeout"` + RequestLimits RequestLimits `mapstructure:"request_limits"` + RPCClientTimeout *string `mapstructure:"rpc_client_timeout"` + RPCHandshakeTimeout *string `mapstructure:"rpc_handshake_timeout"` + RPCMaxBurst *int `mapstructure:"rpc_max_burst"` + RPCMaxConnsPerClient *int `mapstructure:"rpc_max_conns_per_client"` + RPCRate *float64 `mapstructure:"rpc_rate"` + KVMaxValueSize *uint64 `mapstructure:"kv_max_value_size"` + TxnMaxReqLen *uint64 `mapstructure:"txn_max_req_len"` } type Segment struct { diff --git a/agent/config/default.go b/agent/config/default.go index e80a0d6c3..2615579c1 100644 --- a/agent/config/default.go +++ b/agent/config/default.go @@ -97,6 +97,11 @@ func DefaultSource() Source { limits = { http_max_conns_per_client = 200 https_handshake_timeout = "5s" + request_limits = { + mode = "disabled" + read_rate = -1 + write_rate = -1 + } rpc_handshake_timeout = "5s" rpc_client_timeout = "60s" rpc_rate = -1 diff --git a/agent/config/runtime.go b/agent/config/runtime.go index 199e2ac09..6d31f0543 100644 --- a/agent/config/runtime.go +++ b/agent/config/runtime.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/consul" + consulrate "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/dns" hcpconfig "github.com/hashicorp/consul/agent/hcp/config" "github.com/hashicorp/consul/agent/structs" @@ -923,14 +924,14 @@ type RuntimeConfig struct { // See https://en.wikipedia.org/wiki/Token_bucket for more about token // buckets. // - // hcl: limit { rpc_rate = (float64|MaxFloat64) rpc_max_burst = int } + // hcl: limits { rpc_rate = (float64|MaxFloat64) rpc_max_burst = int } RPCRateLimit rate.Limit RPCMaxBurst int // RPCMaxConnsPerClient limits the number of concurrent TCP connections the // RPC server will accept from any single source IP address. // - // hcl: limits{ rpc_max_conns_per_client = 100 } + // hcl: limits { rpc_max_conns_per_client = 100 } RPCMaxConnsPerClient int // RPCProtocol is the Consul protocol version to use. @@ -1009,6 +1010,37 @@ type RuntimeConfig struct { // flag: -rejoin RejoinAfterLeave bool + // RequestLimitsMode will disable or enable rate limiting. If not disabled, it + // enforces the action that will occur when RequestLimitsReadRate + // or RequestLimitsWriteRate is exceeded. The default value of "disabled" will + // prevent any rate limiting from occuring. A value of "enforce" will block + // the request from processings by returning an error. A value of + // "permissive" will not block the request and will allow the request to + // continue processing. + // + // hcl: limits { request_limits { mode = "permissive" } } + RequestLimitsMode consulrate.Mode + + // RequestLimitsReadRate controls how frequently RPC, gRPC, and HTTP + // queries are allowed to happen. In any large enough time interval, rate + // limiter limits the rate to RequestLimitsReadRate tokens per second. + // + // See https://en.wikipedia.org/wiki/Token_bucket for more about token + // buckets. + // + // hcl: limits { request_limits { read_rate = (float64|MaxFloat64) } } + RequestLimitsReadRate rate.Limit + + // RequestLimitsWriteRate controls how frequently RPC, gRPC, and HTTP + // writes are allowed to happen. In any large enough time interval, rate + // limiter limits the rate to RequestLimitsWriteRate tokens per second. + // + // See https://en.wikipedia.org/wiki/Token_bucket for more about token + // buckets. + // + // hcl: limits { request_limits { write_rate = (float64|MaxFloat64) } } + RequestLimitsWriteRate rate.Limit + // RetryJoinIntervalLAN specifies the amount of time to wait in between join // attempts on agent start. The minimum allowed value is 1 second and // the default is 30s. diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index e5e571fee..842cf16a0 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -20,6 +20,7 @@ import ( "github.com/armon/go-metrics/prometheus" "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/require" + "golang.org/x/time/rate" hcpconfig "github.com/hashicorp/consul/agent/hcp/config" @@ -27,6 +28,7 @@ import ( "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/checks" "github.com/hashicorp/consul/agent/consul" + consulrate "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/lib" @@ -4615,6 +4617,9 @@ func TestLoad_IntegrationWithFlags(t *testing.T) { rt.HTTPSHandshakeTimeout = 5 * time.Second rt.HTTPMaxConnsPerClient = 200 rt.RPCMaxConnsPerClient = 100 + rt.RequestLimitsMode = consulrate.ModeDisabled + rt.RequestLimitsReadRate = rate.Inf + rt.RequestLimitsWriteRate = rate.Inf rt.SegmentLimit = 64 rt.XDSUpdateRateLimit = 250 }, @@ -6163,6 +6168,9 @@ func TestLoad_FullConfig(t *testing.T) { RaftTrailingLogs: 83749, ReconnectTimeoutLAN: 23739 * time.Second, ReconnectTimeoutWAN: 26694 * time.Second, + RequestLimitsMode: consulrate.ModePermissive, + RequestLimitsReadRate: 99.0, + RequestLimitsWriteRate: 101.0, RejoinAfterLeave: true, RetryJoinIntervalLAN: 8067 * time.Second, RetryJoinIntervalWAN: 28866 * time.Second, diff --git a/agent/config/testdata/TestRuntimeConfig_Sanitize.golden b/agent/config/testdata/TestRuntimeConfig_Sanitize.golden index 98317d3cf..a568789c5 100644 --- a/agent/config/testdata/TestRuntimeConfig_Sanitize.golden +++ b/agent/config/testdata/TestRuntimeConfig_Sanitize.golden @@ -278,6 +278,9 @@ "ReconnectTimeoutLAN": "0s", "ReconnectTimeoutWAN": "0s", "RejoinAfterLeave": false, + "RequestLimitsMode": 0, + "RequestLimitsReadRate": 0, + "RequestLimitsWriteRate": 0, "RetryJoinIntervalLAN": "0s", "RetryJoinIntervalWAN": "0s", "RetryJoinLAN": [ diff --git a/agent/config/testdata/full-config.hcl b/agent/config/testdata/full-config.hcl index f49946419..f237451f0 100644 --- a/agent/config/testdata/full-config.hcl +++ b/agent/config/testdata/full-config.hcl @@ -305,6 +305,11 @@ limits { rpc_max_conns_per_client = 2954 kv_max_value_size = 1234567800 txn_max_req_len = 567800000 + request_limits { + mode = "permissive" + read_rate = 99.0 + write_rate = 101.0 + } } log_level = "k1zo9Spt" log_json = true diff --git a/agent/config/testdata/full-config.json b/agent/config/testdata/full-config.json index e578e6f33..6a07dff88 100644 --- a/agent/config/testdata/full-config.json +++ b/agent/config/testdata/full-config.json @@ -9,25 +9,25 @@ "acl_replication_token": "LMmgy5dO", "acl_token": "O1El0wan", "acl_ttl": "18060s", - "acl" : { - "enabled" : true, - "down_policy" : "03eb2aee", - "default_policy" : "72c2e7a0", + "acl": { + "enabled": true, + "down_policy": "03eb2aee", + "default_policy": "72c2e7a0", "enable_key_list_policy": true, "enable_token_persistence": true, "policy_ttl": "1123s", "role_ttl": "9876s", "token_ttl": "3321s", - "enable_token_replication" : true, + "enable_token_replication": true, "msp_disable_bootstrap": true, - "tokens" : { - "master" : "8a19ac27", - "initial_management" : "3820e09a", - "agent_master" : "64fd0e08", - "agent_recovery" : "1dba6aba", - "replication" : "5795983a", - "agent" : "bed2377c", - "default" : "418fdff1", + "tokens": { + "master": "8a19ac27", + "initial_management": "3820e09a", + "agent_master": "64fd0e08", + "agent_recovery": "1dba6aba", + "replication": "5795983a", + "agent": "bed2377c", + "default": "418fdff1", "managed_service_provider": [ { "accessor_id": "first", @@ -57,9 +57,15 @@ "enabled": false, "intro_token": "OpBPGRwt", "intro_token_file": "gFvAXwI8", - "dns_sans": ["6zdaWg9J"], - "ip_sans": ["198.18.99.99"], - "server_addresses": ["198.18.100.1"], + "dns_sans": [ + "6zdaWg9J" + ], + "ip_sans": [ + "198.18.99.99" + ], + "server_addresses": [ + "198.18.100.1" + ], "authorization": { "enabled": true, "static": { @@ -71,9 +77,15 @@ "foo": "bar" }, "bound_issuer": "consul", - "bound_audiences": ["consul-cluster-1"], - "claim_assertions": ["value.node == \"${node}\""], - "jwt_validation_pub_keys": ["-----BEGIN PUBLIC KEY-----\nMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAERVchfCZng4mmdvQz1+sJHRN40snC\nYt8NjYOnbnScEXMkyoUmASr88gb7jaVAVt3RYASAbgBjB2Z+EUizWkx5Tg==\n-----END PUBLIC KEY-----"] + "bound_audiences": [ + "consul-cluster-1" + ], + "claim_assertions": [ + "value.node == \"${node}\"" + ], + "jwt_validation_pub_keys": [ + "-----BEGIN PUBLIC KEY-----\nMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAERVchfCZng4mmdvQz1+sJHRN40snC\nYt8NjYOnbnScEXMkyoUmASr88gb7jaVAVt3RYASAbgBjB2Z+EUizWkx5Tg==\n-----END PUBLIC KEY-----" + ] } } }, @@ -82,7 +94,7 @@ "disable_upgrade_migration": true, "last_contact_threshold": "12705s", "max_trailing_logs": 17849, - "min_quorum": 3, + "min_quorum": 3, "redundancy_zone_tag": "3IsufDJf", "server_stabilization_time": "23057s", "upgrade_version_tag": "W9pDwFAL" @@ -104,11 +116,20 @@ "service_id": "L8G0QNmR", "token": "oo4BCTgJ", "status": "qLykAl5u", - "args": ["f3BemRjy", "e5zgpef7"], + "args": [ + "f3BemRjy", + "e5zgpef7" + ], "http": "29B93haH", "header": { - "hBq0zn1q": [ "2a9o9ZKP", "vKwA5lR6" ], - "f3r6xFtM": [ "RyuIdDWv", "QbxEcIUM" ] + "hBq0zn1q": [ + "2a9o9ZKP", + "vKwA5lR6" + ], + "f3r6xFtM": [ + "RyuIdDWv", + "QbxEcIUM" + ] }, "method": "Dou0nGT5", "body": "5PBQd2OT", @@ -134,11 +155,20 @@ "service_id": "lSulPcyz", "token": "toO59sh8", "status": "9RlWsXMV", - "args": ["4BAJttck", "4D2NPtTQ"], + "args": [ + "4BAJttck", + "4D2NPtTQ" + ], "http": "dohLcyQ2", "header": { - "ZBfTin3L": [ "1sDbEqYG", "lJGASsWK" ], - "Ui0nU99X": [ "LMccm3Qe", "k5H5RggQ" ] + "ZBfTin3L": [ + "1sDbEqYG", + "lJGASsWK" + ], + "Ui0nU99X": [ + "LMccm3Qe", + "k5H5RggQ" + ] }, "method": "aldrIQ4l", "body": "wSjTy7dg", @@ -163,11 +193,20 @@ "service_id": "CmUUcRna", "token": "a3nQzHuy", "status": "irj26nf3", - "args": ["9s526ogY", "gSlOHj1w"], + "args": [ + "9s526ogY", + "gSlOHj1w" + ], "http": "yzhgsQ7Y", "header": { - "zcqwA8dO": [ "qb1zx0DL", "sXCxPFsD" ], - "qxvdnSE9": [ "6wBPUYdF", "YYh8wtSZ" ] + "zcqwA8dO": [ + "qb1zx0DL", + "sXCxPFsD" + ], + "qxvdnSE9": [ + "6wBPUYdF", + "YYh8wtSZ" + ] }, "method": "gLrztrNw", "body": "0jkKgGUC", @@ -202,15 +241,21 @@ }, "auto_encrypt": { "tls": false, - "dns_san": ["a.com", "b.com"], - "ip_san": ["192.168.4.139", "192.168.4.140"], + "dns_san": [ + "a.com", + "b.com" + ], + "ip_san": [ + "192.168.4.139", + "192.168.4.140" + ], "allow_tls": true }, "cloud": { "resource_id": "N43DsscE", "client_id": "6WvsDZCP", - "client_secret": "lCSMHOpB", - "hostname": "DH4bh7aC", + "client_secret": "lCSMHOpB", + "hostname": "DH4bh7aC", "auth_url": "332nCdR2", "scada_address": "aoeusth232" }, @@ -226,21 +271,21 @@ "enable_mesh_gateway_wan_federation": false, "enabled": true }, - "gossip_lan" : { + "gossip_lan": { "gossip_nodes": 6, - "gossip_interval" : "25252s", - "retransmit_mult" : 1234, - "suspicion_mult" : 1235, - "probe_interval" : "101ms", - "probe_timeout" : "102ms" + "gossip_interval": "25252s", + "retransmit_mult": 1234, + "suspicion_mult": 1235, + "probe_interval": "101ms", + "probe_timeout": "102ms" }, - "gossip_wan" : { - "gossip_nodes" : 2, - "gossip_interval" : "6966s", - "retransmit_mult" : 16384, - "suspicion_mult" : 16385, - "probe_interval" : "103ms", - "probe_timeout" : "104ms" + "gossip_wan": { + "gossip_nodes": 2, + "gossip_interval": "6966s", + "retransmit_mult": 16384, + "suspicion_mult": 16385, + "probe_interval": "103ms", + "probe_timeout": "104ms" }, "datacenter": "rzo029wg", "default_query_time": "16743s", @@ -283,8 +328,15 @@ "encrypt_verify_incoming": true, "encrypt_verify_outgoing": true, "http_config": { - "block_endpoints": [ "RBvAFcGD", "fWOWFznh" ], - "allow_write_http_from": [ "127.0.0.1/8", "22.33.44.55/32", "0.0.0.0/0" ], + "block_endpoints": [ + "RBvAFcGD", + "fWOWFznh" + ], + "allow_write_http_from": [ + "127.0.0.1/8", + "22.33.44.55/32", + "0.0.0.0/0" + ], "response_headers": { "M6TKa9NP": "xjuxjOzQ", "JRCrHZed": "rl0mTx81" @@ -304,7 +356,12 @@ "rpc_max_burst": 44848, "rpc_max_conns_per_client": 2954, "kv_max_value_size": 1234567800, - "txn_max_req_len": 567800000 + "txn_max_req_len": 567800000, + "request_limits": { + "mode": "permissive", + "read_rate": 99.0, + "write_rate": 101.0 + } }, "log_level": "k1zo9Spt", "log_json": true, @@ -340,7 +397,10 @@ }, "protocol": 30793, "primary_datacenter": "ejtmd43d", - "primary_gateways": [ "aej8eeZo", "roh2KahS" ], + "primary_gateways": [ + "aej8eeZo", + "roh2KahS" + ], "primary_gateways_interval": "18866s", "raft_protocol": 3, "raft_snapshot_threshold": 16384, @@ -352,15 +412,26 @@ "read_replica": true, "reconnect_timeout": "23739s", "reconnect_timeout_wan": "26694s", - "recursors": [ "63.38.39.58", "92.49.18.18" ], + "recursors": [ + "63.38.39.58", + "92.49.18.18" + ], "rejoin_after_leave": true, "retry_interval": "8067s", "retry_interval_wan": "28866s", - "retry_join": [ "pbsSFY7U", "l0qLtWij" ], - "retry_join_wan": [ "PFsR02Ye", "rJdQIhER" ], + "retry_join": [ + "pbsSFY7U", + "l0qLtWij" + ], + "retry_join_wan": [ + "PFsR02Ye", + "rJdQIhER" + ], "retry_max": 913, "retry_max_wan": 23160, - "rpc": {"enable_streaming": true}, + "rpc": { + "enable_streaming": true + }, "segment_limit": 123, "serf_lan": "99.43.63.15", "serf_wan": "67.88.33.19", @@ -382,7 +453,10 @@ "port": 6109 } }, - "tags": ["nkwshvM5", "NTDWn3ek"], + "tags": [ + "nkwshvM5", + "NTDWn3ek" + ], "address": "cOlSOhbp", "token": "msy7iWER", "port": 24237, @@ -396,11 +470,19 @@ "name": "iehanzuq", "status": "rCvn53TH", "notes": "fti5lfF3", - "args": ["16WRUmwS", "QWk7j7ae"], + "args": [ + "16WRUmwS", + "QWk7j7ae" + ], "http": "dl3Fgme3", "header": { - "rjm4DEd3": ["2m3m2Fls"], - "l4HwQ112": ["fk56MNlo", "dhLK56aZ"] + "rjm4DEd3": [ + "2m3m2Fls" + ], + "l4HwQ112": [ + "fk56MNlo", + "dhLK56aZ" + ] }, "method": "9afLm3Mj", "body": "wVVL2V6f", @@ -424,11 +506,20 @@ "name": "sgV4F7Pk", "notes": "yP5nKbW0", "status": "7oLMEyfu", - "args": ["5wEZtZpv", "0Ihyk8cS"], + "args": [ + "5wEZtZpv", + "0Ihyk8cS" + ], "http": "KyDjGY9H", "header": { - "gv5qefTz": [ "5Olo2pMG", "PvvKWQU5" ], - "SHOVq1Vv": [ "jntFhyym", "GYJh32pp" ] + "gv5qefTz": [ + "5Olo2pMG", + "PvvKWQU5" + ], + "SHOVq1Vv": [ + "jntFhyym", + "GYJh32pp" + ] }, "method": "T66MFBfR", "body": "OwGjTFQi", @@ -451,11 +542,20 @@ "name": "IEqrzrsd", "notes": "SVqApqeM", "status": "XXkVoZXt", - "args": ["wD05Bvao", "rLYB7kQC"], + "args": [ + "wD05Bvao", + "rLYB7kQC" + ], "http": "kyICZsn8", "header": { - "4ebP5vL4": [ "G20SrL5Q", "DwPKlMbo" ], - "p2UI34Qz": [ "UsG1D0Qh", "NHhRiB6s" ] + "4ebP5vL4": [ + "G20SrL5Q", + "DwPKlMbo" + ], + "p2UI34Qz": [ + "UsG1D0Qh", + "NHhRiB6s" + ] }, "method": "ciYHWors", "body": "lUVLGYU7", @@ -482,7 +582,10 @@ { "id": "wI1dzxS4", "name": "7IszXMQ1", - "tags": ["0Zwg8l6v", "zebELdN5"], + "tags": [ + "0Zwg8l6v", + "zebELdN5" + ], "address": "9RhqPSPB", "token": "myjKJkWH", "port": 72219, @@ -492,11 +595,19 @@ "name": "atDGP7n5", "status": "pDQKEhWL", "notes": "Yt8EDLev", - "args": ["81EDZLPa", "bPY5X8xd"], + "args": [ + "81EDZLPa", + "bPY5X8xd" + ], "http": "qzHYvmJO", "header": { - "UkpmZ3a3": ["2dfzXuxZ"], - "cVFpko4u": ["gGqdEB6k", "9LsRo22u"] + "UkpmZ3a3": [ + "2dfzXuxZ" + ], + "cVFpko4u": [ + "gGqdEB6k", + "9LsRo22u" + ] }, "method": "X5DrovFc", "body": "WeikigLh", @@ -521,7 +632,10 @@ { "id": "MRHVMZuD", "name": "6L6BVfgH", - "tags": ["7Ale4y6o", "PMBW08hy"], + "tags": [ + "7Ale4y6o", + "PMBW08hy" + ], "address": "R6H6g8h0", "token": "ZgY8gjMI", "port": 38292, @@ -536,11 +650,20 @@ "name": "9OOS93ne", "notes": "CQy86DH0", "status": "P0SWDvrk", - "args": ["EXvkYIuG", "BATOyt6h"], + "args": [ + "EXvkYIuG", + "BATOyt6h" + ], "http": "u97ByEiW", "header": { - "MUlReo8L": [ "AUZG7wHG", "gsN0Dc2N" ], - "1UJXjVrT": [ "OJgxzTfk", "xZZrFsq7" ] + "MUlReo8L": [ + "AUZG7wHG", + "gsN0Dc2N" + ], + "1UJXjVrT": [ + "OJgxzTfk", + "xZZrFsq7" + ] }, "method": "5wkAxCUE", "body": "7CRjCJyz", @@ -621,8 +744,8 @@ "destination_namespace": "9nakw0td", "destination_partition": "part-9nakw0td", "destination_type": "prepared_query", - "local_bind_socket_path": "/foo/bar/upstream", - "local_bind_socket_mode": "0600" + "local_bind_socket_path": "/foo/bar/upstream", + "local_bind_socket_mode": "0600" } ] } @@ -634,15 +757,21 @@ "port": 27147, "proxy": { "config": { - "1CuJHVfw" : "Kzqsa7yc" + "1CuJHVfw": "Kzqsa7yc" } } } ], "session_ttl_min": "26627s", "skip_leave_on_interrupt": true, - "start_join": [ "LR3hGDoG", "MwVpZ4Up" ], - "start_join_wan": [ "EbFSc3nA", "kwXTh623" ], + "start_join": [ + "LR3hGDoG", + "MwVpZ4Up" + ], + "start_join_wan": [ + "EbFSc3nA", + "kwXTh623" + ], "syslog_facility": "hHv79Uia", "tagged_addresses": { "7MYgHrYH": "dALJAhLD", @@ -664,10 +793,16 @@ "circonus_submission_url": "gTcbS93G", "disable_hostname": true, "dogstatsd_addr": "0wSndumK", - "dogstatsd_tags": [ "3N81zSUB","Xtj8AnXZ" ], + "dogstatsd_tags": [ + "3N81zSUB", + "Xtj8AnXZ" + ], "retry_failed_connection": true, "filter_default": true, - "prefix_filter": [ "+oJotS8XJ","-cazlEhGn" ], + "prefix_filter": [ + "+oJotS8XJ", + "-cazlEhGn" + ], "metrics_prefix": "ftO6DySn", "prometheus_retention_time": "15s", "statsd_address": "drce87cy", @@ -722,7 +857,10 @@ "dir": "pVncV4Ey", "content_path": "qp1WRhYH", "metrics_provider": "sgnaoa_lower_case", - "metrics_provider_files": ["sgnaMFoa", "dicnwkTH"], + "metrics_provider_files": [ + "sgnaMFoa", + "dicnwkTH" + ], "metrics_provider_options_json": "{\"DIbVQadX\": 1}", "metrics_proxy": { "base_url": "http://foo.bar", @@ -732,7 +870,10 @@ "value": "TYBgnN2F" } ], - "path_allowlist": ["/aSh3cu", "/eiK/2Th"] + "path_allowlist": [ + "/aSh3cu", + "/eiK/2Th" + ] }, "dashboard_url_templates": { "u2eziu2n_lower_case": "http://lkjasd.otr" @@ -754,11 +895,15 @@ "datacenter": "GyE6jpeW", "key": "j9lF1Tve", "handler": "90N7S4LN" - }, { + }, + { "type": "keyprefix", "datacenter": "fYrl3F5d", "key": "sl3Dffu7", - "args": ["dltjDJ2a", "flEa7C2d"] + "args": [ + "dltjDJ2a", + "flEa7C2d" + ] } ], "xds": { diff --git a/agent/consul/config.go b/agent/consul/config.go index 72bd9b396..b76abdff7 100644 --- a/agent/consul/config.go +++ b/agent/consul/config.go @@ -12,6 +12,7 @@ import ( "golang.org/x/time/rate" "github.com/hashicorp/consul/agent/checks" + consulrate "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/structs" libserf "github.com/hashicorp/consul/lib/serf" "github.com/hashicorp/consul/tlsutil" @@ -318,6 +319,25 @@ type Config struct { // CheckOutputMaxSize control the max size of output of checks CheckOutputMaxSize int + // RequestLimitsMode will disable or enable rate limiting. If not disabled, it + // enforces the action that will occur when RequestLimitsReadRate + // or RequestLimitsWriteRate is exceeded. The default value of "disabled" will + // prevent any rate limiting from occuring. A value of "enforce" will block + // the request from processings by returning an error. A value of + // "permissive" will not block the request and will allow the request to + // continue processing. + RequestLimitsMode string + + // RequestLimitsReadRate controls how frequently RPC, gRPC, and HTTP + // queries are allowed to happen. In any large enough time interval, rate + // limiter limits the rate to RequestLimitsReadRate tokens per second. + RequestLimitsReadRate rate.Limit + + // RequestLimitsWriteRate controls how frequently RPC, gRPC, and HTTP + // writes are allowed to happen. In any large enough time interval, rate + // limiter limits the rate to RequestLimitsWriteRate tokens per second. + RequestLimitsWriteRate rate.Limit + // RPCHandshakeTimeout limits how long we will wait for the initial magic byte // on an RPC client connection. It also governs how long we will wait for a // TLS handshake when TLS is configured however the timout applies separately @@ -501,6 +521,10 @@ func DefaultConfig() *Config { CheckOutputMaxSize: checks.DefaultBufSize, + RequestLimitsMode: "disabled", + RequestLimitsReadRate: rate.Inf, // ops / sec + RequestLimitsWriteRate: rate.Inf, // ops / sec + RPCRateLimit: rate.Inf, RPCMaxBurst: 1000, @@ -620,9 +644,18 @@ type RPCConfig struct { EnableStreaming bool } +// RequestLimits is configuration for serverrate limiting that is a part of +// ReloadableConfig. +type RequestLimits struct { + Mode consulrate.Mode + ReadRate rate.Limit + WriteRate rate.Limit +} + // ReloadableConfig is the configuration that is passed to ReloadConfig when // application config is reloaded. type ReloadableConfig struct { + RequestLimits *RequestLimits RPCClientTimeout time.Duration RPCRateLimit rate.Limit RPCMaxBurst int diff --git a/agent/consul/multilimiter/multilimiter.go b/agent/consul/multilimiter/multilimiter.go index 9de1bdaa1..539cced1c 100644 --- a/agent/consul/multilimiter/multilimiter.go +++ b/agent/consul/multilimiter/multilimiter.go @@ -3,12 +3,11 @@ package multilimiter import ( "bytes" "context" + radix "github.com/hashicorp/go-immutable-radix" + "golang.org/x/time/rate" "sync" "sync/atomic" "time" - - radix "github.com/hashicorp/go-immutable-radix" - "golang.org/x/time/rate" ) var _ RateLimiter = &MultiLimiter{} diff --git a/agent/consul/rate/handler.go b/agent/consul/rate/handler.go index 189c0fa0f..de20ab668 100644 --- a/agent/consul/rate/handler.go +++ b/agent/consul/rate/handler.go @@ -30,14 +30,51 @@ var ( type Mode int const ( + // ModeDisabled causes rate limiting to be bypassed. + ModeDisabled Mode = iota + // ModePermissive causes the handler to log the rate-limited operation but // still allow it to proceed. - ModePermissive Mode = iota + ModePermissive - // ModeEnforcing causes the handler to reject the rate-limted operation. + // ModeEnforcing causes the handler to reject the rate-limited operation. ModeEnforcing ) +var modeToName = map[Mode]string{ + ModeDisabled: "disabled", + ModeEnforcing: "enforcing", + ModePermissive: "permissive", +} +var modeFromName = func() map[string]Mode { + vals := map[string]Mode{ + "": ModeDisabled, + } + for k, v := range modeToName { + vals[v] = k + } + return vals +}() + +func (m Mode) String() string { + return modeToName[m] +} + +// RequestLimitsModeFromName will unmarshal the string form of a configMode. +func RequestLimitsModeFromName(name string) (Mode, bool) { + s, ok := modeFromName[name] + return s, ok +} + +// RequestLimitsModeFromNameWithDefault will unmarshal the string form of a configMode. +func RequestLimitsModeFromNameWithDefault(name string) Mode { + s, ok := modeFromName[name] + if !ok { + return ModePermissive + } + return s +} + // OperationType is the type of operation the client is attempting to perform. type OperationType int @@ -61,6 +98,13 @@ type Operation struct { Type OperationType } +//go:generate mockery --name RequestLimitsHandler --inpackage --filename mock_RequestLimitsHandler_test.go +type RequestLimitsHandler interface { + Run(ctx context.Context) + Allow(op Operation) error + UpdateConfig(cfg HandlerConfig) +} + // Handler enforces rate limits for incoming RPCs. type Handler struct { cfg *atomic.Pointer[HandlerConfig] @@ -127,7 +171,6 @@ func (h *Handler) Allow(op Operation) error { return nil } -// TODO(NET-1379): call this on `consul reload`. func (h *Handler) UpdateConfig(cfg HandlerConfig) { h.cfg.Store(&cfg) h.limiter.UpdateConfig(cfg.GlobalWriteConfig, globalWrite) @@ -149,3 +192,16 @@ type globalLimit []byte func (prefix globalLimit) Key() multilimiter.KeyType { return multilimiter.Key(prefix, nil) } + +// NullRateLimiter returns a RateLimiter that allows every operation. +func NullRateLimiter() RequestLimitsHandler { + return nullRateLimiter{} +} + +type nullRateLimiter struct{} + +func (nullRateLimiter) Allow(Operation) error { return nil } + +func (nullRateLimiter) Run(ctx context.Context) {} + +func (nullRateLimiter) UpdateConfig(cfg HandlerConfig) {} diff --git a/agent/consul/rate/mock_RequestLimitsHandler.go b/agent/consul/rate/mock_RequestLimitsHandler.go new file mode 100644 index 000000000..02569e56e --- /dev/null +++ b/agent/consul/rate/mock_RequestLimitsHandler.go @@ -0,0 +1,53 @@ +// Code generated by mockery v2.15.0. DO NOT EDIT. + +package rate + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// MockRequestLimitsHandler is an autogenerated mock type for the RequestLimitsHandler type +type MockRequestLimitsHandler struct { + mock.Mock +} + +// Allow provides a mock function with given fields: op +func (_m *MockRequestLimitsHandler) Allow(op Operation) error { + ret := _m.Called(op) + + var r0 error + if rf, ok := ret.Get(0).(func(Operation) error); ok { + r0 = rf(op) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Run provides a mock function with given fields: ctx +func (_m *MockRequestLimitsHandler) Run(ctx context.Context) { + _m.Called(ctx) +} + +// UpdateConfig provides a mock function with given fields: cfg +func (_m *MockRequestLimitsHandler) UpdateConfig(cfg HandlerConfig) { + _m.Called(cfg) +} + +type mockConstructorTestingTNewMockRequestLimitsHandler interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockRequestLimitsHandler creates a new instance of MockRequestLimitsHandler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockRequestLimitsHandler(t mockConstructorTestingTNewMockRequestLimitsHandler) *MockRequestLimitsHandler { + mock := &MockRequestLimitsHandler{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/agent/consul/server.go b/agent/consul/server.go index 3ac9fbb25..c37f415e5 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -5,7 +5,6 @@ import ( "crypto/x509" "errors" "fmt" - "github.com/hashicorp/consul/agent/consul/multilimiter" "io" "net" "os" @@ -35,6 +34,7 @@ import ( "github.com/hashicorp/consul/agent/consul/authmethod" "github.com/hashicorp/consul/agent/consul/authmethod/ssoauth" "github.com/hashicorp/consul/agent/consul/fsm" + "github.com/hashicorp/consul/agent/consul/multilimiter" rpcRate "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" @@ -144,6 +144,8 @@ const ( PoolKindSegment = "segment" ) +const requestLimitsBurstMultiplier = 10 + // Server is Consul server which manages the service discovery, // health checking, DC forwarding, Raft, and multiple Serf pools. type Server struct { @@ -279,7 +281,7 @@ type Server struct { rpcServer *rpc.Server // incomingRPCLimiter rate-limits incoming net/rpc and gRPC calls. - incomingRPCLimiter *rpcRate.Handler + incomingRPCLimiter rpcRate.RequestLimitsHandler // insecureRPCServer is a RPC server that is configure with // IncomingInsecureRPCConfig to allow clients to call AutoEncrypt.Sign @@ -396,6 +398,7 @@ type Server struct { EnterpriseServer operatorServer *operator.Server } + type connHandler interface { Run() error Handle(conn net.Conn) @@ -469,12 +472,16 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Ser }) // TODO(NET-1380, NET-1381): thread this into the net/rpc and gRPC interceptors. - s.incomingRPCLimiter = rpcRate.NewHandler(rpcRate.HandlerConfig{ - // TODO(server-rate-limit): revisit those value based on the multilimiter final implementation - Config: multilimiter.Config{ReconcileCheckLimit: 30 * time.Second, ReconcileCheckInterval: time.Second}, - // TODO(NET-1379): pass in _real_ configuration. - GlobalMode: rpcRate.ModePermissive, - }, s) + if s.incomingRPCLimiter == nil { + mlCfg := &multilimiter.Config{ReconcileCheckLimit: 30 * time.Second, ReconcileCheckInterval: time.Second} + limitsConfig := &RequestLimits{ + Mode: rpcRate.RequestLimitsModeFromNameWithDefault(config.RequestLimitsMode), + ReadRate: config.RequestLimitsReadRate, + WriteRate: config.RequestLimitsWriteRate, + } + + s.incomingRPCLimiter = rpcRate.NewHandler(*s.convertConsulConfigToRateLimitHandlerConfig(*limitsConfig, mlCfg), s) + } s.incomingRPCLimiter.Run(&lib.StopChannelContext{StopCh: s.shutdownCh}) var recorder *middleware.RequestRecorder @@ -1680,6 +1687,11 @@ func (s *Server) ReloadConfig(config ReloadableConfig) error { } s.rpcLimiter.Store(rate.NewLimiter(config.RPCRateLimit, config.RPCMaxBurst)) + + if config.RequestLimits != nil { + s.incomingRPCLimiter.UpdateConfig(*s.convertConsulConfigToRateLimitHandlerConfig(*config.RequestLimits, nil)) + } + s.rpcConnLimiter.SetConfig(connlimit.Config{ MaxConnsPerClientIP: config.RPCMaxConnsPerClient, }) @@ -1830,10 +1842,31 @@ func (s *Server) hcpServerStatus(deps Deps) hcp.StatusCallback { } } +// convertConsulConfigToRateLimitHandlerConfig creates a rate limite handler config +// from the relevant fields in the consul runtime config. +func (s *Server) convertConsulConfigToRateLimitHandlerConfig(limitsConfig RequestLimits, multilimiterConfig *multilimiter.Config) *rpcRate.HandlerConfig { + hc := &rpcRate.HandlerConfig{ + GlobalMode: limitsConfig.Mode, + GlobalReadConfig: multilimiter.LimiterConfig{ + Rate: limitsConfig.ReadRate, + Burst: int(limitsConfig.ReadRate) * requestLimitsBurstMultiplier, + }, + GlobalWriteConfig: multilimiter.LimiterConfig{ + Rate: limitsConfig.WriteRate, + Burst: int(limitsConfig.WriteRate) * requestLimitsBurstMultiplier, + }, + } + if multilimiterConfig != nil { + hc.Config = *multilimiterConfig + } + + return hc +} + // IncomingRPCLimiter returns the server's configured rate limit handler for // incoming RPCs. This is necessary because the external gRPC server is created // by the agent (as it is also used for xDS). -func (s *Server) IncomingRPCLimiter() *rpcRate.Handler { return s.incomingRPCLimiter } +func (s *Server) IncomingRPCLimiter() rpcRate.RequestLimitsHandler { return s.incomingRPCLimiter } // peersInfoContent is used to help operators understand what happened to the // peers.json file. This is written to a file called peers.info in the same diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index b1be492cd..4cb7b5d97 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -29,6 +29,8 @@ import ( "github.com/hashicorp/consul-net-rpc/net/rpc" "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/agent/consul/multilimiter" + rpcRate "github.com/hashicorp/consul/agent/consul/rate" external "github.com/hashicorp/consul/agent/grpc-external" grpcmiddleware "github.com/hashicorp/consul/agent/grpc-middleware" "github.com/hashicorp/consul/agent/metadata" @@ -331,7 +333,7 @@ func newServerWithDeps(t *testing.T, c *Config, deps Deps) (*Server, error) { oldNotify() } } - grpcServer := external.NewServer(deps.Logger.Named("grpc.external"), nil, deps.TLSConfigurator, grpcmiddleware.NullRateLimiter()) + grpcServer := external.NewServer(deps.Logger.Named("grpc.external"), nil, deps.TLSConfigurator, rpcRate.NullRateLimiter()) srv, err := NewServer(c, deps, grpcServer) if err != nil { return nil, err @@ -1818,6 +1820,9 @@ func TestServer_ReloadConfig(t *testing.T) { c.Build = "1.5.0" c.RPCRateLimit = 500 c.RPCMaxBurst = 5000 + c.RequestLimitsMode = "permissive" + c.RequestLimitsReadRate = 500 + c.RequestLimitsWriteRate = 500 c.RPCClientTimeout = 60 * time.Second // Set one raft param to be non-default in the initial config, others are // default. @@ -1835,6 +1840,11 @@ func TestServer_ReloadConfig(t *testing.T) { require.Equal(t, 60*time.Second, s.connPool.RPCClientTimeout()) rc := ReloadableConfig{ + RequestLimits: &RequestLimits{ + Mode: rpcRate.ModeEnforcing, + ReadRate: 1000, + WriteRate: 1100, + }, RPCClientTimeout: 2 * time.Minute, RPCRateLimit: 1000, RPCMaxBurst: 10000, @@ -1848,6 +1858,11 @@ func TestServer_ReloadConfig(t *testing.T) { // Leave other raft fields default } + + mockHandler := rpcRate.NewMockRequestLimitsHandler(t) + mockHandler.On("UpdateConfig", mock.Anything).Return(func(cfg rpcRate.HandlerConfig) {}) + + s.incomingRPCLimiter = mockHandler require.NoError(t, s.ReloadConfig(rc)) _, entry, err := s.fsm.State().ConfigEntry(nil, structs.ProxyDefaults, structs.ProxyConfigGlobal, structs.DefaultEnterpriseMetaInDefaultPartition()) @@ -1864,6 +1879,19 @@ func TestServer_ReloadConfig(t *testing.T) { require.Equal(t, rate.Limit(1000), limiter.Limit()) require.Equal(t, 10000, limiter.Burst()) + // Check the incoming RPC rate limiter got updated + mockHandler.AssertCalled(t, "UpdateConfig", rpcRate.HandlerConfig{ + GlobalMode: rc.RequestLimits.Mode, + GlobalReadConfig: multilimiter.LimiterConfig{ + Rate: rc.RequestLimits.ReadRate, + Burst: int(rc.RequestLimits.ReadRate) * requestLimitsBurstMultiplier, + }, + GlobalWriteConfig: multilimiter.LimiterConfig{ + Rate: rc.RequestLimits.WriteRate, + Burst: int(rc.RequestLimits.WriteRate) * requestLimitsBurstMultiplier, + }, + }) + // Check RPC client timeout got updated require.Equal(t, 2*time.Minute, s.connPool.RPCClientTimeout()) diff --git a/agent/grpc-external/server.go b/agent/grpc-external/server.go index 98de599c8..3ea687f8c 100644 --- a/agent/grpc-external/server.go +++ b/agent/grpc-external/server.go @@ -10,6 +10,7 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" + "github.com/hashicorp/consul/agent/consul/rate" agentmiddleware "github.com/hashicorp/consul/agent/grpc-middleware" "github.com/hashicorp/consul/tlsutil" ) @@ -23,7 +24,7 @@ var ( // NewServer constructs a gRPC server for the external gRPC port, to which // handlers can be registered. -func NewServer(logger agentmiddleware.Logger, metricsObj *metrics.Metrics, tls *tlsutil.Configurator, limiter agentmiddleware.RateLimiter) *grpc.Server { +func NewServer(logger agentmiddleware.Logger, metricsObj *metrics.Metrics, tls *tlsutil.Configurator, limiter rate.RequestLimitsHandler) *grpc.Server { if metricsObj == nil { metricsObj = metrics.Default() } diff --git a/agent/grpc-external/stats_test.go b/agent/grpc-external/stats_test.go index c62eb6559..40d850996 100644 --- a/agent/grpc-external/stats_test.go +++ b/agent/grpc-external/stats_test.go @@ -14,6 +14,7 @@ import ( "github.com/hashicorp/go-hclog" + "github.com/hashicorp/consul/agent/consul/rate" grpcmiddleware "github.com/hashicorp/consul/agent/grpc-middleware" "github.com/hashicorp/consul/agent/grpc-middleware/testutil" "github.com/hashicorp/consul/agent/grpc-middleware/testutil/testservice" @@ -23,7 +24,7 @@ import ( func TestServer_EmitsStats(t *testing.T) { sink, metricsObj := testutil.NewFakeSink(t) - srv := NewServer(hclog.Default(), metricsObj, nil, grpcmiddleware.NullRateLimiter()) + srv := NewServer(hclog.Default(), metricsObj, nil, rate.NullRateLimiter()) testservice.RegisterSimpleServer(srv, &testservice.Simple{}) diff --git a/agent/grpc-internal/handler.go b/agent/grpc-internal/handler.go index fc563df4e..f78e17c53 100644 --- a/agent/grpc-internal/handler.go +++ b/agent/grpc-internal/handler.go @@ -11,6 +11,7 @@ import ( middleware "github.com/grpc-ecosystem/go-grpc-middleware" recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" + "github.com/hashicorp/consul/agent/consul/rate" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" ) @@ -25,7 +26,7 @@ var ( // NewHandler returns a gRPC server that accepts connections from Handle(conn). // The register function will be called with the grpc.Server to register // gRPC services with the server. -func NewHandler(logger Logger, addr net.Addr, register func(server *grpc.Server), metricsObj *metrics.Metrics, rateLimiter agentmiddleware.RateLimiter) *Handler { +func NewHandler(logger Logger, addr net.Addr, register func(server *grpc.Server), metricsObj *metrics.Metrics, rateLimiter rate.RequestLimitsHandler) *Handler { if metricsObj == nil { metricsObj = metrics.Default() } diff --git a/agent/grpc-internal/server_test.go b/agent/grpc-internal/server_test.go index 1cd66ec0c..4e76ac594 100644 --- a/agent/grpc-internal/server_test.go +++ b/agent/grpc-internal/server_test.go @@ -14,7 +14,7 @@ import ( "golang.org/x/sync/errgroup" "google.golang.org/grpc" - middleware "github.com/hashicorp/consul/agent/grpc-middleware" + "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/grpc-middleware/testutil/testservice" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/pool" @@ -55,7 +55,7 @@ func newPanicTestServer(t *testing.T, logger hclog.Logger, name, dc string, tlsC func newTestServer(t *testing.T, logger hclog.Logger, name, dc string, tlsConf *tlsutil.Configurator, register func(server *grpc.Server)) testServer { addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")} - handler := NewHandler(logger, addr, register, nil, middleware.NullRateLimiter()) + handler := NewHandler(logger, addr, register, nil, rate.NullRateLimiter()) lis, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) diff --git a/agent/grpc-internal/services/subscribe/subscribe_test.go b/agent/grpc-internal/services/subscribe/subscribe_test.go index c304543f4..2ef6a41e8 100644 --- a/agent/grpc-internal/services/subscribe/subscribe_test.go +++ b/agent/grpc-internal/services/subscribe/subscribe_test.go @@ -19,10 +19,10 @@ import ( "google.golang.org/grpc/status" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" grpc "github.com/hashicorp/consul/agent/grpc-internal" - middleware "github.com/hashicorp/consul/agent/grpc-middleware" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/proto/pbcommon" @@ -381,7 +381,7 @@ func runTestServer(t *testing.T, server *Server) net.Addr { pbsubscribe.RegisterStateChangeSubscriptionServer(srv, server) }, nil, - middleware.NullRateLimiter(), + rate.NullRateLimiter(), ) lis, err := net.Listen("tcp", "127.0.0.1:0") diff --git a/agent/grpc-internal/stats_test.go b/agent/grpc-internal/stats_test.go index 2672beba4..13f71b79b 100644 --- a/agent/grpc-internal/stats_test.go +++ b/agent/grpc-internal/stats_test.go @@ -14,7 +14,7 @@ import ( "github.com/hashicorp/go-hclog" - middleware "github.com/hashicorp/consul/agent/grpc-middleware" + "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/grpc-middleware/testutil" "github.com/hashicorp/consul/agent/grpc-middleware/testutil/testservice" "github.com/hashicorp/consul/proto/prototest" @@ -26,7 +26,7 @@ func TestHandler_EmitsStats(t *testing.T) { sink, metricsObj := testutil.NewFakeSink(t) addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")} - handler := NewHandler(hclog.Default(), addr, noopRegister, metricsObj, middleware.NullRateLimiter()) + handler := NewHandler(hclog.Default(), addr, noopRegister, metricsObj, rate.NullRateLimiter()) testservice.RegisterSimpleServer(handler.srv, &testservice.Simple{}) diff --git a/agent/grpc-middleware/mock_RateLimiter.go b/agent/grpc-middleware/mock_RateLimiter.go deleted file mode 100644 index 9f427b7bc..000000000 --- a/agent/grpc-middleware/mock_RateLimiter.go +++ /dev/null @@ -1,39 +0,0 @@ -// Code generated by mockery v2.12.0. DO NOT EDIT. - -package middleware - -import ( - testing "testing" - - rate "github.com/hashicorp/consul/agent/consul/rate" - mock "github.com/stretchr/testify/mock" -) - -// MockRateLimiter is an autogenerated mock type for the RateLimiter type -type MockRateLimiter struct { - mock.Mock -} - -// Allow provides a mock function with given fields: _a0 -func (_m *MockRateLimiter) Allow(_a0 rate.Operation) error { - ret := _m.Called(_a0) - - var r0 error - if rf, ok := ret.Get(0).(func(rate.Operation) error); ok { - r0 = rf(_a0) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// NewMockRateLimiter creates a new instance of MockRateLimiter. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockRateLimiter(t testing.TB) *MockRateLimiter { - mock := &MockRateLimiter{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/agent/grpc-middleware/rate.go b/agent/grpc-middleware/rate.go index 5683d6940..d2254000f 100644 --- a/agent/grpc-middleware/rate.go +++ b/agent/grpc-middleware/rate.go @@ -17,7 +17,7 @@ import ( // ServerRateLimiterMiddleware implements a ServerInHandle function to perform // RPC rate limiting at the cheapest possible point (before the full request has // been decoded). -func ServerRateLimiterMiddleware(limiter RateLimiter, panicHandler recovery.RecoveryHandlerFunc) tap.ServerInHandle { +func ServerRateLimiterMiddleware(limiter rate.RequestLimitsHandler, panicHandler recovery.RecoveryHandlerFunc) tap.ServerInHandle { return func(ctx context.Context, info *tap.Info) (_ context.Context, retErr error) { // This function is called before unary and stream RPC interceptors, so we // must handle our own panics here. @@ -56,17 +56,3 @@ func ServerRateLimiterMiddleware(limiter RateLimiter, panicHandler recovery.Reco } } } - -//go:generate mockery --name RateLimiter --inpackage -type RateLimiter interface { - Allow(rate.Operation) error -} - -// NullRateLimiter returns a RateLimiter that allows every operation. -func NullRateLimiter() RateLimiter { - return nullRateLimiter{} -} - -type nullRateLimiter struct{} - -func (nullRateLimiter) Allow(rate.Operation) error { return nil } diff --git a/agent/grpc-middleware/rate_test.go b/agent/grpc-middleware/rate_test.go index 1c5d41704..2d9c1eb3e 100644 --- a/agent/grpc-middleware/rate_test.go +++ b/agent/grpc-middleware/rate_test.go @@ -21,7 +21,7 @@ import ( ) func TestServerRateLimiterMiddleware_Integration(t *testing.T) { - limiter := NewMockRateLimiter(t) + limiter := rate.NewMockRequestLimitsHandler(t) server := grpc.NewServer( grpc.InTapHandle(ServerRateLimiterMiddleware(limiter, NewPanicHandler(hclog.NewNullLogger()))), diff --git a/agent/rpc/peering/service_test.go b/agent/rpc/peering/service_test.go index c971ccf72..029bfa2c5 100644 --- a/agent/rpc/peering/service_test.go +++ b/agent/rpc/peering/service_test.go @@ -22,6 +22,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul" + "github.com/hashicorp/consul/agent/consul/rate" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" external "github.com/hashicorp/consul/agent/grpc-external" @@ -1591,7 +1592,7 @@ func newTestServer(t *testing.T, cb func(conf *consul.Config)) testingServer { conf.ACLResolverSettings.EnterpriseMeta = *conf.AgentEnterpriseMeta() deps := newDefaultDeps(t, conf) - externalGRPCServer := external.NewServer(deps.Logger, nil, deps.TLSConfigurator, agentmiddleware.NullRateLimiter()) + externalGRPCServer := external.NewServer(deps.Logger, nil, deps.TLSConfigurator, rate.NullRateLimiter()) server, err := consul.NewServer(conf, deps, externalGRPCServer) require.NoError(t, err) diff --git a/docs/config/checklist-adding-config-fields.md b/docs/config/checklist-adding-config-fields.md index 7a47eb841..59737f7a4 100644 --- a/docs/config/checklist-adding-config-fields.md +++ b/docs/config/checklist-adding-config-fields.md @@ -46,7 +46,7 @@ There are four specific cases covered with increasing complexity: - [ ] Add a test case to the table test `TestLoad_IntegrationWithFlags` in `agent/config/runtime_test.go`. - [ ] If the config needs to be defaulted for the test server used in unit tests, - also add it to `DefaultConfig()` in `agent/consul/defaults.go`. + also add it to `DefaultConfig()` in `agent/consul/config.go`. - [ ] **If** your config should take effect on a reload/HUP. - [ ] Add necessary code to to trigger a safe (locked or atomic) update to any state the feature needs changing. This needs to be added to one or diff --git a/website/content/docs/agent/config/config-files.mdx b/website/content/docs/agent/config/config-files.mdx index 31809605a..f562d1605 100644 --- a/website/content/docs/agent/config/config-files.mdx +++ b/website/content/docs/agent/config/config-files.mdx @@ -541,6 +541,10 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'." - `http_max_conns_per_client` - Configures a limit of how many concurrent TCP connections a single client IP address is allowed to open to the agent's HTTP(S) server. This affects the HTTP(S) servers in both client and server agents. Default value is `200`. - `https_handshake_timeout` - Configures the limit for how long the HTTPS server in both client and server agents will wait for a client to complete a TLS handshake. This should be kept conservative as it limits how many connections an unauthenticated attacker can open if `verify_incoming` is being using to authenticate clients (strongly recommended in production). Default value is `5s`. + - `request_limits` - This object povides configuration for rate limiting RPC and gRPC requests on the consul server. As a result of rate limiting gRPC and RPC request, HTTP requests to the Consul server are rate limited. + - `mode` - Configures whether rate limiting is enabled or not as well as how it behaves through the use of 3 possible modes. The default value of "disabled" will prevent any rate limiting from occuring. A value of "permissive" will cause the system to track requests against the `read_rate` and `write_rate` but will only log violations and will not block and will allow the request to continue processing. A value of "enforcing" also tracks requests against the `read_rate` and `write_rate` but in addition to logging violations, the system will block the request from processings by returning an error. + - `read_rate` - Configures how frequently RPC, gRPC, and HTTP queries are allowed to happen. The rate limiter limits the rate to tokens per second equal to this value. See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets. + - `write_rate` - Configures how frequently RPC, gRPC, and HTTP write are allowed to happen. The rate limiter limits the rate to tokens per second equal to this value. See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets. - `rpc_handshake_timeout` - Configures the limit for how long servers will wait after a client TCP connection is established before they complete the connection handshake. When TLS is used, the same timeout applies to the TLS handshake separately from the initial protocol negotiation. All Consul clients should perform this immediately on establishing a new connection. This should be kept conservative as it limits how many connections an unauthenticated attacker can open if `verify_incoming` is being using to authenticate clients (strongly recommended in production). When `verify_incoming` is true on servers, this limits how long the connection socket and associated goroutines will be held open before the client successfully authenticates. Default value is `5s`. - `rpc_client_timeout` - Configures the limit for how long a client is allowed to read from an RPC connection. This is used to set an upper bound for calls to eventually terminate so that RPC connections are not held indefinitely. Blocking queries can override this timeout. Default is `60s`. - `rpc_max_conns_per_client` - Configures a limit of how many concurrent TCP connections a single source IP address is allowed to open to a single server. It affects both clients connections and other server connections. In general Consul clients multiplex many RPC calls over a single TCP connection so this can typically be kept low. It needs to be more than one though since servers open at least one additional connection for raft RPC, possibly more for WAN federation when using network areas, and snapshot requests from clients run over a separate TCP conn. A reasonably low limit significantly reduces the ability of an unauthenticated attacker to consume unbounded resources by holding open many connections. You may need to increase this if WAN federated servers connect via proxies or NAT gateways or similar causing many legitimate connections from a single source IP. Default value is `100` which is designed to be extremely conservative to limit issues with certain deployment patterns. Most deployments can probably reduce this safely. 100 connections on modern server hardware should not cause a significant impact on resource usage from an unauthenticated attacker though.