diff --git a/.changelog/8696.txt b/.changelog/8696.txt new file mode 100644 index 000000000..eda07dd57 --- /dev/null +++ b/.changelog/8696.txt @@ -0,0 +1,4 @@ +```release-note:bug +config: Fixed a bug where `rpc_max_conns_per_client` could not be changed by reloading the +config. +``` diff --git a/agent/acl_test.go b/agent/acl_test.go index e5d4df594..7145fda34 100644 --- a/agent/acl_test.go +++ b/agent/acl_test.go @@ -162,7 +162,7 @@ func (a *TestACLAgent) Shutdown() error { func (a *TestACLAgent) Stats() map[string]map[string]string { return nil } -func (a *TestACLAgent) ReloadConfig(config *consul.Config) error { +func (a *TestACLAgent) ReloadConfig(_ consul.ReloadableConfig) error { return fmt.Errorf("Unimplemented") } diff --git a/agent/agent.go b/agent/agent.go index 1d48cd85d..21e7b98a1 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -145,7 +145,7 @@ type delegate interface { SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn structs.SnapshotReplyFn) error Shutdown() error Stats() map[string]map[string]string - ReloadConfig(config *consul.Config) error + ReloadConfig(config consul.ReloadableConfig) error enterpriseDelegate } @@ -1143,7 +1143,7 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co // Rate limiting for RPC calls. if runtimeCfg.RPCRateLimit > 0 { - cfg.RPCRate = runtimeCfg.RPCRateLimit + cfg.RPCRateLimit = runtimeCfg.RPCRateLimit } if runtimeCfg.RPCMaxBurst > 0 { cfg.RPCMaxBurst = runtimeCfg.RPCMaxBurst @@ -3517,11 +3517,6 @@ func (a *Agent) DisableNodeMaintenance() { a.logger.Info("Node left maintenance mode") } -func (a *Agent) loadLimits(conf *config.RuntimeConfig) { - a.config.RPCRateLimit = conf.RPCRateLimit - a.config.RPCMaxBurst = conf.RPCMaxBurst -} - // ReloadConfig will atomically reload all configuration, including // all services, checks, tokens, metadata, dnsServer configs, etc. // It will also reload all ongoing watches. @@ -3602,8 +3597,6 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error { return fmt.Errorf("Failed reloading watches: %v", err) } - a.loadLimits(newCfg) - a.httpConnLimiter.SetConfig(connlimit.Config{ MaxConnsPerClientIP: newCfg.HTTPMaxConnsPerClient, }) @@ -3614,24 +3607,18 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error { } } - // this only gets used by the consulConfig function and since - // that is only ever done during init and reload here then - // an in place modification is safe as reloads cannot be - // concurrent due to both gaining a full lock on the stateLock - a.config.ConfigEntryBootstrap = newCfg.ConfigEntryBootstrap - err := a.reloadEnterprise(newCfg) if err != nil { return err } - // create the config for the rpc server/client - consulCfg, err := newConsulConfig(a.config, a.logger) - if err != nil { - return err + cc := consul.ReloadableConfig{ + RPCRateLimit: newCfg.RPCRateLimit, + RPCMaxBurst: newCfg.RPCMaxBurst, + RPCMaxConnsPerClient: newCfg.RPCMaxConnsPerClient, + ConfigEntryBootstrap: newCfg.ConfigEntryBootstrap, } - - if err := a.delegate.ReloadConfig(consulCfg); err != nil { + if err := a.delegate.ReloadConfig(cc); err != nil { return err } diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index 0d42589e5..73dc267b1 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -21,6 +21,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/debug" "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/structs" @@ -37,6 +38,7 @@ import ( "github.com/hashicorp/serf/serf" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/time/rate" ) func makeReadOnlyAgentACL(t *testing.T, srv *HTTPHandlers) string { @@ -1452,6 +1454,8 @@ func TestAgent_Reload(t *testing.T) { `, }) + shim := &delegateConfigReloadShim{delegate: a.delegate} + a.delegate = shim if err := a.reloadConfigInternal(cfg2); err != nil { t.Fatalf("got error %v want nil", err) } @@ -1459,13 +1463,8 @@ func TestAgent_Reload(t *testing.T) { t.Fatal("missing redis-reloaded service") } - if a.config.RPCRateLimit != 2 { - t.Fatalf("RPC rate not set correctly. Got %v. Want 2", a.config.RPCRateLimit) - } - - if a.config.RPCMaxBurst != 200 { - t.Fatalf("RPC max burst not set correctly. Got %v. Want 200", a.config.RPCMaxBurst) - } + require.Equal(t, rate.Limit(2), shim.newCfg.RPCRateLimit) + require.Equal(t, 200, shim.newCfg.RPCMaxBurst) for _, wp := range a.watchPlans { if !wp.IsStopped() { @@ -1474,6 +1473,16 @@ func TestAgent_Reload(t *testing.T) { } } +type delegateConfigReloadShim struct { + delegate + newCfg consul.ReloadableConfig +} + +func (s *delegateConfigReloadShim) ReloadConfig(cfg consul.ReloadableConfig) error { + s.newCfg = cfg + return s.delegate.ReloadConfig(cfg) +} + // TestAgent_ReloadDoesNotTriggerWatch Ensure watches not triggered after reload // see https://github.com/hashicorp/consul/issues/7446 func TestAgent_ReloadDoesNotTriggerWatch(t *testing.T) { diff --git a/agent/config/runtime.go b/agent/config/runtime.go index a1e5baa0f..c5f387218 100644 --- a/agent/config/runtime.go +++ b/agent/config/runtime.go @@ -921,8 +921,8 @@ type RuntimeConfig struct { // RPCRateLimit and RPCMaxBurst control how frequently RPC calls are allowed // to happen. In any large enough time interval, rate limiter limits the - // rate to RPCRate tokens per second, with a maximum burst size of - // RPCMaxBurst events. As a special case, if RPCRate == Inf (the infinite + // rate to RPCRateLimit tokens per second, with a maximum burst size of + // RPCMaxBurst events. As a special case, if RPCRateLimit == Inf (the infinite // rate), RPCMaxBurst is ignored. // // See https://en.wikipedia.org/wiki/Token_bucket for more about token diff --git a/agent/consul/client.go b/agent/consul/client.go index d2ae9a1ed..7b55dfaec 100644 --- a/agent/consul/client.go +++ b/agent/consul/client.go @@ -113,7 +113,7 @@ func NewClient(config *Config, deps Deps) (*Client, error) { tlsConfigurator: deps.TLSConfigurator, } - c.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst)) + c.rpcLimiter.Store(rate.NewLimiter(config.RPCRateLimit, config.RPCMaxBurst)) if err := c.initEnterprise(); err != nil { c.Shutdown() @@ -403,7 +403,7 @@ func (c *Client) GetLANCoordinate() (lib.CoordinateSet, error) { // ReloadConfig is used to have the Client do an online reload of // relevant configuration information -func (c *Client) ReloadConfig(config *Config) error { - c.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst)) +func (c *Client) ReloadConfig(config ReloadableConfig) error { + c.rpcLimiter.Store(rate.NewLimiter(config.RPCRateLimit, config.RPCMaxBurst)) return nil } diff --git a/agent/consul/client_test.go b/agent/consul/client_test.go index a65c9f454..8253c2512 100644 --- a/agent/consul/client_test.go +++ b/agent/consul/client_test.go @@ -9,6 +9,12 @@ import ( "testing" "time" + "github.com/hashicorp/go-hclog" + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/hashicorp/serf/serf" + "github.com/stretchr/testify/require" + "golang.org/x/time/rate" + "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/structs" @@ -18,11 +24,6 @@ import ( "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/tlsutil" - "github.com/hashicorp/go-hclog" - msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" - "github.com/hashicorp/serf/serf" - "github.com/stretchr/testify/require" - "golang.org/x/time/rate" ) func testClientConfig(t *testing.T) (string, *Config) { @@ -532,7 +533,7 @@ func TestClient_RPC_RateLimit(t *testing.T) { testrpc.WaitForLeader(t, s1.RPC, "dc1") _, conf2 := testClientConfig(t) - conf2.RPCRate = 2 + conf2.RPCRateLimit = 2 conf2.RPCMaxBurst = 2 c1 := newClient(t, conf2) @@ -601,7 +602,7 @@ func TestClient_SnapshotRPC_RateLimit(t *testing.T) { testrpc.WaitForLeader(t, s1.RPC, "dc1") _, conf1 := testClientConfig(t) - conf1.RPCRate = 2 + conf1.RPCRateLimit = 2 conf1.RPCMaxBurst = 2 c1 := newClient(t, conf1) @@ -762,23 +763,25 @@ func TestClientServer_UserEvent(t *testing.T) { } } -func TestClient_Reload(t *testing.T) { - t.Parallel() - dir1, c := testClientWithConfig(t, func(c *Config) { - c.RPCRate = 500 - c.RPCMaxBurst = 5000 - }) - defer os.RemoveAll(dir1) - defer c.Shutdown() +func TestClient_ReloadConfig(t *testing.T) { + _, cfg := testClientConfig(t) + cfg.RPCRateLimit = rate.Limit(500) + cfg.RPCMaxBurst = 5000 + deps := newDefaultDeps(t, &Config{NodeName: "node1", Datacenter: "dc1"}) + c, err := NewClient(cfg, deps) + require.NoError(t, err) limiter := c.rpcLimiter.Load().(*rate.Limiter) require.Equal(t, rate.Limit(500), limiter.Limit()) require.Equal(t, 5000, limiter.Burst()) - c.config.RPCRate = 1000 - c.config.RPCMaxBurst = 10000 + rc := ReloadableConfig{ + RPCRateLimit: 1000, + RPCMaxBurst: 10000, + RPCMaxConnsPerClient: 0, + } + require.NoError(t, c.ReloadConfig(rc)) - require.NoError(t, c.ReloadConfig(c.config)) limiter = c.rpcLimiter.Load().(*rate.Limiter) require.Equal(t, rate.Limit(1000), limiter.Limit()) require.Equal(t, 10000, limiter.Burst()) diff --git a/agent/consul/config.go b/agent/consul/config.go index 7b4cbb507..509b4cfe9 100644 --- a/agent/consul/config.go +++ b/agent/consul/config.go @@ -416,16 +416,16 @@ type Config struct { // place, and a small jitter is applied to avoid a thundering herd. RPCHoldTimeout time.Duration - // RPCRate and RPCMaxBurst control how frequently RPC calls are allowed + // RPCRateLimit and RPCMaxBurst control how frequently RPC calls are allowed // to happen. In any large enough time interval, rate limiter limits the - // rate to RPCRate tokens per second, with a maximum burst size of - // RPCMaxBurst events. As a special case, if RPCRate == Inf (the infinite + // rate to RPCRateLimit tokens per second, with a maximum burst size of + // RPCMaxBurst events. As a special case, if RPCRateLimit == Inf (the infinite // rate), RPCMaxBurst is ignored. // // See https://en.wikipedia.org/wiki/Token_bucket for more about token // buckets. - RPCRate rate.Limit - RPCMaxBurst int + RPCRateLimit rate.Limit + RPCMaxBurst int // RPCMaxConnsPerClient is the limit of how many concurrent connections are // allowed from a single source IP. @@ -582,8 +582,8 @@ func DefaultConfig() *Config { CheckOutputMaxSize: checks.DefaultBufSize, - RPCRate: rate.Inf, - RPCMaxBurst: 1000, + RPCRateLimit: rate.Inf, + RPCMaxBurst: 1000, TLSMinVersion: "tls10", @@ -655,3 +655,12 @@ func DefaultConfig() *Config { type RPCConfig struct { EnableStreaming bool } + +// ReloadableConfig is the configuration that is passed to ReloadConfig when +// application config is reloaded. +type ReloadableConfig struct { + RPCRateLimit rate.Limit + RPCMaxBurst int + RPCMaxConnsPerClient int + ConfigEntryBootstrap []structs.ConfigEntry +} diff --git a/agent/consul/rpc_test.go b/agent/consul/rpc_test.go index cd80ddfa7..4c283867e 100644 --- a/agent/consul/rpc_test.go +++ b/agent/consul/rpc_test.go @@ -729,9 +729,12 @@ func TestRPC_RPCMaxConnsPerClient(t *testing.T) { defer conn4.Close() // Reload config with higher limit - newCfg := *s1.config - newCfg.RPCMaxConnsPerClient = 10 - require.NoError(t, s1.ReloadConfig(&newCfg)) + rc := ReloadableConfig{ + RPCRateLimit: s1.config.RPCRateLimit, + RPCMaxBurst: s1.config.RPCMaxBurst, + RPCMaxConnsPerClient: 10, + } + require.NoError(t, s1.ReloadConfig(rc)) // Now another conn should be allowed conn5 := connectClient(t, s1, tc.magicByte, tc.tlsEnabled, true, "conn5") diff --git a/agent/consul/server.go b/agent/consul/server.go index 7b01bce67..72be18c16 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -395,7 +395,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) { return nil, err } - s.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst)) + s.rpcLimiter.Store(rate.NewLimiter(config.RPCRateLimit, config.RPCMaxBurst)) configReplicatorConfig := ReplicatorConfig{ Name: logging.ConfigEntry, @@ -1384,8 +1384,8 @@ func (s *Server) GetLANCoordinate() (lib.CoordinateSet, error) { // ReloadConfig is used to have the Server do an online reload of // relevant configuration information -func (s *Server) ReloadConfig(config *Config) error { - s.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst)) +func (s *Server) ReloadConfig(config ReloadableConfig) error { + s.rpcLimiter.Store(rate.NewLimiter(config.RPCRateLimit, config.RPCMaxBurst)) s.rpcConnLimiter.SetConfig(connlimit.Config{ MaxConnsPerClientIP: config.RPCMaxConnsPerClient, }) diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index 4b862b23e..5fd40b06f 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -16,9 +16,13 @@ import ( "time" "github.com/google/tcpproxy" + "github.com/hashicorp/memberlist" + "github.com/hashicorp/consul/agent/connect/ca" "github.com/hashicorp/consul/ipaddr" - "github.com/hashicorp/memberlist" + + "github.com/hashicorp/go-uuid" + "golang.org/x/time/rate" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/metadata" @@ -30,8 +34,6 @@ import ( "github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/types" - "github.com/hashicorp/go-uuid" - "golang.org/x/time/rate" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -1458,14 +1460,13 @@ func TestServer_RevokeLeadershipIdempotent(t *testing.T) { s1.revokeLeadership() } -func TestServer_Reload(t *testing.T) { +func TestServer_ReloadConfig(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } - t.Parallel() - global_entry_init := &structs.ProxyConfigEntry{ + entryInit := &structs.ProxyConfigEntry{ Kind: structs.ProxyDefaults, Name: structs.ProxyConfigGlobal, Config: map[string]interface{}{ @@ -1478,7 +1479,7 @@ func TestServer_Reload(t *testing.T) { dir1, s := testServerWithConfig(t, func(c *Config) { c.Build = "1.5.0" - c.RPCRate = 500 + c.RPCRateLimit = 500 c.RPCMaxBurst = 5000 }) defer os.RemoveAll(dir1) @@ -1486,28 +1487,25 @@ func TestServer_Reload(t *testing.T) { testrpc.WaitForTestAgent(t, s.RPC, "dc1") - s.config.ConfigEntryBootstrap = []structs.ConfigEntry{ - global_entry_init, - } - limiter := s.rpcLimiter.Load().(*rate.Limiter) require.Equal(t, rate.Limit(500), limiter.Limit()) require.Equal(t, 5000, limiter.Burst()) - // Change rate limit - s.config.RPCRate = 1000 - s.config.RPCMaxBurst = 10000 - - s.ReloadConfig(s.config) + rc := ReloadableConfig{ + RPCRateLimit: 1000, + RPCMaxBurst: 10000, + ConfigEntryBootstrap: []structs.ConfigEntry{entryInit}, + } + require.NoError(t, s.ReloadConfig(rc)) _, entry, err := s.fsm.State().ConfigEntry(nil, structs.ProxyDefaults, structs.ProxyConfigGlobal, structs.DefaultEnterpriseMeta()) require.NoError(t, err) require.NotNil(t, entry) global, ok := entry.(*structs.ProxyConfigEntry) require.True(t, ok) - require.Equal(t, global_entry_init.Kind, global.Kind) - require.Equal(t, global_entry_init.Name, global.Name) - require.Equal(t, global_entry_init.Config, global.Config) + require.Equal(t, entryInit.Kind, global.Kind) + require.Equal(t, entryInit.Name, global.Name) + require.Equal(t, entryInit.Config, global.Config) // Check rate limiter got updated limiter = s.rpcLimiter.Load().(*rate.Limiter) @@ -1522,7 +1520,7 @@ func TestServer_RPC_RateLimit(t *testing.T) { t.Parallel() _, conf1 := testServerConfig(t) - conf1.RPCRate = 2 + conf1.RPCRateLimit = 2 conf1.RPCMaxBurst = 2 s1, err := newServer(t, conf1) if err != nil {