agent/consul: make Client/Server config reloading more obvious

I believe this commit also fixes a bug. Previously RPCMaxConnsPerClient was not being re-read from the RuntimeConfig, so passing it to Server.ReloadConfig was never changing the value.

Also improve the test runtime by not doing a lot of unnecessary work.
This commit is contained in:
Daniel Nephin 2020-09-16 13:28:03 -04:00
parent f2b504873a
commit e5320c2db6
8 changed files with 62 additions and 62 deletions

View File

@ -162,7 +162,7 @@ func (a *TestACLAgent) Shutdown() error {
func (a *TestACLAgent) Stats() map[string]map[string]string { func (a *TestACLAgent) Stats() map[string]map[string]string {
return nil return nil
} }
func (a *TestACLAgent) ReloadConfig(config *consul.Config) error { func (a *TestACLAgent) ReloadConfig(_ consul.ReloadableConfig) error {
return fmt.Errorf("Unimplemented") return fmt.Errorf("Unimplemented")
} }

View File

@ -145,7 +145,7 @@ type delegate interface {
SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn structs.SnapshotReplyFn) error SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn structs.SnapshotReplyFn) error
Shutdown() error Shutdown() error
Stats() map[string]map[string]string Stats() map[string]map[string]string
ReloadConfig(config *consul.Config) error ReloadConfig(config consul.ReloadableConfig) error
enterpriseDelegate enterpriseDelegate
} }
@ -3517,11 +3517,6 @@ func (a *Agent) DisableNodeMaintenance() {
a.logger.Info("Node left maintenance mode") a.logger.Info("Node left maintenance mode")
} }
func (a *Agent) loadLimits(conf *config.RuntimeConfig) {
a.config.RPCRateLimit = conf.RPCRateLimit
a.config.RPCMaxBurst = conf.RPCMaxBurst
}
// ReloadConfig will atomically reload all configuration, including // ReloadConfig will atomically reload all configuration, including
// all services, checks, tokens, metadata, dnsServer configs, etc. // all services, checks, tokens, metadata, dnsServer configs, etc.
// It will also reload all ongoing watches. // It will also reload all ongoing watches.
@ -3602,8 +3597,6 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error {
return fmt.Errorf("Failed reloading watches: %v", err) return fmt.Errorf("Failed reloading watches: %v", err)
} }
a.loadLimits(newCfg)
a.httpConnLimiter.SetConfig(connlimit.Config{ a.httpConnLimiter.SetConfig(connlimit.Config{
MaxConnsPerClientIP: newCfg.HTTPMaxConnsPerClient, MaxConnsPerClientIP: newCfg.HTTPMaxConnsPerClient,
}) })
@ -3614,24 +3607,18 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error {
} }
} }
// this only gets used by the consulConfig function and since
// that is only ever done during init and reload here then
// an in place modification is safe as reloads cannot be
// concurrent due to both gaining a full lock on the stateLock
a.config.ConfigEntryBootstrap = newCfg.ConfigEntryBootstrap
err := a.reloadEnterprise(newCfg) err := a.reloadEnterprise(newCfg)
if err != nil { if err != nil {
return err return err
} }
// create the config for the rpc server/client cc := consul.ReloadableConfig{
consulCfg, err := newConsulConfig(a.config, a.logger) RPCRateLimit: newCfg.RPCRateLimit,
if err != nil { RPCMaxBurst: newCfg.RPCMaxBurst,
return err RPCMaxConnsPerClient: newCfg.RPCMaxConnsPerClient,
ConfigEntryBootstrap: newCfg.ConfigEntryBootstrap,
} }
if err := a.delegate.ReloadConfig(cc); err != nil {
if err := a.delegate.ReloadConfig(consulCfg); err != nil {
return err return err
} }

View File

@ -403,7 +403,7 @@ func (c *Client) GetLANCoordinate() (lib.CoordinateSet, error) {
// ReloadConfig is used to have the Client do an online reload of // ReloadConfig is used to have the Client do an online reload of
// relevant configuration information // relevant configuration information
func (c *Client) ReloadConfig(config *Config) error { func (c *Client) ReloadConfig(config ReloadableConfig) error {
c.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst)) c.rpcLimiter.Store(rate.NewLimiter(config.RPCRateLimit, config.RPCMaxBurst))
return nil return nil
} }

View File

@ -9,6 +9,12 @@ import (
"testing" "testing"
"time" "time"
"github.com/hashicorp/go-hclog"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
"github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
@ -18,11 +24,6 @@ import (
"github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/go-hclog"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
) )
func testClientConfig(t *testing.T) (string, *Config) { func testClientConfig(t *testing.T) (string, *Config) {
@ -762,23 +763,25 @@ func TestClientServer_UserEvent(t *testing.T) {
} }
} }
func TestClient_Reload(t *testing.T) { func TestClient_ReloadConfig(t *testing.T) {
t.Parallel() _, cfg := testClientConfig(t)
dir1, c := testClientWithConfig(t, func(c *Config) { cfg.RPCRate = rate.Limit(500)
c.RPCRate = 500 cfg.RPCMaxBurst = 5000
c.RPCMaxBurst = 5000 deps := newDefaultDeps(t, &Config{NodeName: "node1", Datacenter: "dc1"})
}) c, err := NewClient(cfg, deps)
defer os.RemoveAll(dir1) require.NoError(t, err)
defer c.Shutdown()
limiter := c.rpcLimiter.Load().(*rate.Limiter) limiter := c.rpcLimiter.Load().(*rate.Limiter)
require.Equal(t, rate.Limit(500), limiter.Limit()) require.Equal(t, rate.Limit(500), limiter.Limit())
require.Equal(t, 5000, limiter.Burst()) require.Equal(t, 5000, limiter.Burst())
c.config.RPCRate = 1000 rc := ReloadableConfig{
c.config.RPCMaxBurst = 10000 RPCRateLimit: 1000,
RPCMaxBurst: 10000,
RPCMaxConnsPerClient: 0,
}
require.NoError(t, c.ReloadConfig(rc))
require.NoError(t, c.ReloadConfig(c.config))
limiter = c.rpcLimiter.Load().(*rate.Limiter) limiter = c.rpcLimiter.Load().(*rate.Limiter)
require.Equal(t, rate.Limit(1000), limiter.Limit()) require.Equal(t, rate.Limit(1000), limiter.Limit())
require.Equal(t, 10000, limiter.Burst()) require.Equal(t, 10000, limiter.Burst())

View File

@ -655,3 +655,12 @@ func DefaultConfig() *Config {
type RPCConfig struct { type RPCConfig struct {
EnableStreaming bool EnableStreaming bool
} }
// ReloadableConfig is the configuration that is passed to ReloadConfig when
// application config is reloaded.
type ReloadableConfig struct {
RPCRateLimit rate.Limit
RPCMaxBurst int
RPCMaxConnsPerClient int
ConfigEntryBootstrap []structs.ConfigEntry
}

View File

@ -729,9 +729,12 @@ func TestRPC_RPCMaxConnsPerClient(t *testing.T) {
defer conn4.Close() defer conn4.Close()
// Reload config with higher limit // Reload config with higher limit
newCfg := *s1.config rc := ReloadableConfig{
newCfg.RPCMaxConnsPerClient = 10 RPCRateLimit: s1.config.RPCRate,
require.NoError(t, s1.ReloadConfig(&newCfg)) RPCMaxBurst: s1.config.RPCMaxBurst,
RPCMaxConnsPerClient: 10,
}
require.NoError(t, s1.ReloadConfig(rc))
// Now another conn should be allowed // Now another conn should be allowed
conn5 := connectClient(t, s1, tc.magicByte, tc.tlsEnabled, true, "conn5") conn5 := connectClient(t, s1, tc.magicByte, tc.tlsEnabled, true, "conn5")

View File

@ -1384,8 +1384,8 @@ func (s *Server) GetLANCoordinate() (lib.CoordinateSet, error) {
// ReloadConfig is used to have the Server do an online reload of // ReloadConfig is used to have the Server do an online reload of
// relevant configuration information // relevant configuration information
func (s *Server) ReloadConfig(config *Config) error { func (s *Server) ReloadConfig(config ReloadableConfig) error {
s.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst)) s.rpcLimiter.Store(rate.NewLimiter(config.RPCRateLimit, config.RPCMaxBurst))
s.rpcConnLimiter.SetConfig(connlimit.Config{ s.rpcConnLimiter.SetConfig(connlimit.Config{
MaxConnsPerClientIP: config.RPCMaxConnsPerClient, MaxConnsPerClientIP: config.RPCMaxConnsPerClient,
}) })

View File

@ -16,9 +16,13 @@ import (
"time" "time"
"github.com/google/tcpproxy" "github.com/google/tcpproxy"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/consul/agent/connect/ca" "github.com/hashicorp/consul/agent/connect/ca"
"github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/go-uuid"
"golang.org/x/time/rate"
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/metadata"
@ -30,8 +34,6 @@ import (
"github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
"github.com/hashicorp/go-uuid"
"golang.org/x/time/rate"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -1458,14 +1460,13 @@ func TestServer_RevokeLeadershipIdempotent(t *testing.T) {
s1.revokeLeadership() s1.revokeLeadership()
} }
func TestServer_Reload(t *testing.T) { func TestServer_ReloadConfig(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("too slow for testing.Short") t.Skip("too slow for testing.Short")
} }
t.Parallel() t.Parallel()
global_entry_init := &structs.ProxyConfigEntry{ entryInit := &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults, Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal, Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{ Config: map[string]interface{}{
@ -1486,28 +1487,25 @@ func TestServer_Reload(t *testing.T) {
testrpc.WaitForTestAgent(t, s.RPC, "dc1") testrpc.WaitForTestAgent(t, s.RPC, "dc1")
s.config.ConfigEntryBootstrap = []structs.ConfigEntry{
global_entry_init,
}
limiter := s.rpcLimiter.Load().(*rate.Limiter) limiter := s.rpcLimiter.Load().(*rate.Limiter)
require.Equal(t, rate.Limit(500), limiter.Limit()) require.Equal(t, rate.Limit(500), limiter.Limit())
require.Equal(t, 5000, limiter.Burst()) require.Equal(t, 5000, limiter.Burst())
// Change rate limit rc := ReloadableConfig{
s.config.RPCRate = 1000 RPCRateLimit: 1000,
s.config.RPCMaxBurst = 10000 RPCMaxBurst: 10000,
ConfigEntryBootstrap: []structs.ConfigEntry{entryInit},
s.ReloadConfig(s.config) }
require.NoError(t, s.ReloadConfig(rc))
_, entry, err := s.fsm.State().ConfigEntry(nil, structs.ProxyDefaults, structs.ProxyConfigGlobal, structs.DefaultEnterpriseMeta()) _, entry, err := s.fsm.State().ConfigEntry(nil, structs.ProxyDefaults, structs.ProxyConfigGlobal, structs.DefaultEnterpriseMeta())
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, entry) require.NotNil(t, entry)
global, ok := entry.(*structs.ProxyConfigEntry) global, ok := entry.(*structs.ProxyConfigEntry)
require.True(t, ok) require.True(t, ok)
require.Equal(t, global_entry_init.Kind, global.Kind) require.Equal(t, entryInit.Kind, global.Kind)
require.Equal(t, global_entry_init.Name, global.Name) require.Equal(t, entryInit.Name, global.Name)
require.Equal(t, global_entry_init.Config, global.Config) require.Equal(t, entryInit.Config, global.Config)
// Check rate limiter got updated // Check rate limiter got updated
limiter = s.rpcLimiter.Load().(*rate.Limiter) limiter = s.rpcLimiter.Load().(*rate.Limiter)