proxycfg: rate-limit delivery of config snapshots (#14960)
Adds a user-configurable rate limiter to proxycfg snapshot delivery, with a default limit of 250 updates per second. This addresses a problem observed in our load testing of Consul Dataplane where updating a "global" resource such as a wildcard intention or the proxy-defaults config entry could starve the Raft or Memberlist goroutines of CPU time, causing general cluster instability.
This commit is contained in:
parent
6c355134e8
commit
3b9297f95a
|
@ -0,0 +1,3 @@
|
|||
```release-note:enhancement
|
||||
xds: Added a rate limiter to the delivery of proxy config updates, to prevent updates to "global" resources such as wildcard intentions from overwhelming servers (see: `xds.update_max_per_second` config field)
|
||||
```
|
|
@ -683,6 +683,7 @@ func (a *Agent) Start(ctx context.Context) error {
|
|||
},
|
||||
TLSConfigurator: a.tlsConfigurator,
|
||||
IntentionDefaultAllow: intentionDefaultAllow,
|
||||
UpdateRateLimit: a.config.XDSUpdateRateLimit,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -4133,6 +4134,8 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error {
|
|||
}
|
||||
}
|
||||
|
||||
a.proxyConfig.SetUpdateRateLimit(newCfg.XDSUpdateRateLimit)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -36,6 +36,7 @@ import (
|
|||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/time/rate"
|
||||
"google.golang.org/grpc"
|
||||
"gopkg.in/square/go-jose.v2/jwt"
|
||||
|
||||
|
@ -4155,6 +4156,28 @@ func TestAgent_ReloadConfigTLSConfigFailure(t *testing.T) {
|
|||
assertDeepEqual(t, expectedCaPoolByFile, tlsConf.ClientCAs, cmpCertPool)
|
||||
}
|
||||
|
||||
func TestAgent_ReloadConfig_XDSUpdateRateLimit(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
cfg := fmt.Sprintf(`data_dir = %q`, testutil.TempDir(t, "agent"))
|
||||
|
||||
a := NewTestAgent(t, cfg)
|
||||
defer a.Shutdown()
|
||||
|
||||
c := TestConfig(
|
||||
testutil.Logger(t),
|
||||
config.FileSource{
|
||||
Name: t.Name(),
|
||||
Format: "hcl",
|
||||
Data: cfg + ` xds { update_max_per_second = 1000 }`,
|
||||
},
|
||||
)
|
||||
require.NoError(t, a.reloadConfigInternal(c))
|
||||
require.Equal(t, rate.Limit(1000), a.proxyConfig.UpdateRateLimit())
|
||||
}
|
||||
|
||||
func TestAgent_consulConfig_AutoEncryptAllowTLS(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
|
|
|
@ -1075,6 +1075,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
|
|||
UnixSocketMode: stringVal(c.UnixSocket.Mode),
|
||||
UnixSocketUser: stringVal(c.UnixSocket.User),
|
||||
Watches: c.Watches,
|
||||
XDSUpdateRateLimit: rate.Limit(float64Val(c.XDS.UpdateMaxPerSecond)),
|
||||
AutoReloadConfigCoalesceInterval: 1 * time.Second,
|
||||
}
|
||||
|
||||
|
|
|
@ -237,6 +237,7 @@ type Config struct {
|
|||
TaggedAddresses map[string]string `mapstructure:"tagged_addresses"`
|
||||
Telemetry Telemetry `mapstructure:"telemetry"`
|
||||
TranslateWANAddrs *bool `mapstructure:"translate_wan_addrs"`
|
||||
XDS XDS `mapstructure:"xds"`
|
||||
|
||||
// DEPRECATED (ui-config) - moved to the ui_config stanza
|
||||
UI *bool `mapstructure:"ui"`
|
||||
|
@ -909,3 +910,7 @@ type Peering struct {
|
|||
// This always gets overridden in NonUserSource()
|
||||
TestAllowPeerRegistrations *bool `mapstructure:"test_allow_peer_registrations"`
|
||||
}
|
||||
|
||||
type XDS struct {
|
||||
UpdateMaxPerSecond *float64 `mapstructure:"update_max_per_second"`
|
||||
}
|
||||
|
|
|
@ -135,6 +135,9 @@ func DefaultSource() Source {
|
|||
raft_snapshot_interval = "` + cfg.RaftConfig.SnapshotInterval.String() + `"
|
||||
raft_trailing_logs = ` + strconv.Itoa(int(cfg.RaftConfig.TrailingLogs)) + `
|
||||
|
||||
xds {
|
||||
update_max_per_second = 250
|
||||
}
|
||||
`,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1440,6 +1440,15 @@ type RuntimeConfig struct {
|
|||
//
|
||||
Watches []map[string]interface{}
|
||||
|
||||
// XDSUpdateRateLimit controls the maximum rate at which proxy config updates
|
||||
// will be delivered, across all connected xDS streams. This is used to stop
|
||||
// updates to "global" resources (e.g. wildcard intentions) from saturating
|
||||
// system resources at the expense of other work, such as raft and gossip,
|
||||
// which could cause general cluster instability.
|
||||
//
|
||||
// hcl: xds { update_max_per_second = (float64|MaxFloat64) }
|
||||
XDSUpdateRateLimit rate.Limit
|
||||
|
||||
// AutoReloadConfigCoalesceInterval Coalesce Interval for auto reload config
|
||||
AutoReloadConfigCoalesceInterval time.Duration
|
||||
|
||||
|
|
|
@ -4547,6 +4547,7 @@ func TestLoad_IntegrationWithFlags(t *testing.T) {
|
|||
rt.HTTPMaxConnsPerClient = 200
|
||||
rt.RPCMaxConnsPerClient = 100
|
||||
rt.SegmentLimit = 64
|
||||
rt.XDSUpdateRateLimit = 250
|
||||
},
|
||||
})
|
||||
|
||||
|
@ -6517,6 +6518,7 @@ func TestLoad_FullConfig(t *testing.T) {
|
|||
"args": []interface{}{"dltjDJ2a", "flEa7C2d"},
|
||||
},
|
||||
},
|
||||
XDSUpdateRateLimit: 9526.2,
|
||||
RaftBoltDBConfig: consul.RaftBoltDBConfig{NoFreelistSync: true},
|
||||
AutoReloadConfigCoalesceInterval: 1 * time.Second,
|
||||
}
|
||||
|
|
|
@ -479,5 +479,6 @@
|
|||
"Version": "",
|
||||
"VersionMetadata": "",
|
||||
"VersionPrerelease": "",
|
||||
"Watches": []
|
||||
"Watches": [],
|
||||
"XDSUpdateRateLimit": 0
|
||||
}
|
|
@ -755,3 +755,6 @@ watches = [{
|
|||
key = "sl3Dffu7"
|
||||
args = ["dltjDJ2a", "flEa7C2d"]
|
||||
}]
|
||||
xds {
|
||||
update_max_per_second = 9526.2
|
||||
}
|
||||
|
|
|
@ -752,5 +752,8 @@
|
|||
"key": "sl3Dffu7",
|
||||
"args": ["dltjDJ2a", "flEa7C2d"]
|
||||
}
|
||||
]
|
||||
],
|
||||
"xds": {
|
||||
"update_max_per_second": 9526.2
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
|
@ -46,6 +47,8 @@ type CancelFunc func()
|
|||
type Manager struct {
|
||||
ManagerConfig
|
||||
|
||||
rateLimiter *rate.Limiter
|
||||
|
||||
mu sync.Mutex
|
||||
proxies map[ProxyID]*state
|
||||
watchers map[ProxyID]map[uint64]chan *ConfigSnapshot
|
||||
|
@ -75,6 +78,15 @@ type ManagerConfig struct {
|
|||
// information to proxies that need to make intention decisions on their
|
||||
// own.
|
||||
IntentionDefaultAllow bool
|
||||
|
||||
// UpdateRateLimit controls the rate at which config snapshots are delivered
|
||||
// when updates are received from data sources. This enables us to reduce the
|
||||
// impact of updates to "global" resources (e.g. proxy-defaults and wildcard
|
||||
// intentions) that could otherwise saturate system resources, and cause Raft
|
||||
// or gossip instability.
|
||||
//
|
||||
// Defaults to rate.Inf (no rate limit).
|
||||
UpdateRateLimit rate.Limit
|
||||
}
|
||||
|
||||
// NewManager constructs a Manager.
|
||||
|
@ -82,14 +94,30 @@ func NewManager(cfg ManagerConfig) (*Manager, error) {
|
|||
if cfg.Source == nil || cfg.Logger == nil {
|
||||
return nil, errors.New("all ManagerConfig fields must be provided")
|
||||
}
|
||||
|
||||
if cfg.UpdateRateLimit == 0 {
|
||||
cfg.UpdateRateLimit = rate.Inf
|
||||
}
|
||||
|
||||
m := &Manager{
|
||||
ManagerConfig: cfg,
|
||||
proxies: make(map[ProxyID]*state),
|
||||
watchers: make(map[ProxyID]map[uint64]chan *ConfigSnapshot),
|
||||
rateLimiter: rate.NewLimiter(cfg.UpdateRateLimit, 1),
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// UpdateRateLimit returns the configured update rate limit (see ManagerConfig).
|
||||
func (m *Manager) UpdateRateLimit() rate.Limit {
|
||||
return m.rateLimiter.Limit()
|
||||
}
|
||||
|
||||
// SetUpdateRateLimit configures the update rate limit (see ManagerConfig).
|
||||
func (m *Manager) SetUpdateRateLimit(l rate.Limit) {
|
||||
m.rateLimiter.SetLimit(l)
|
||||
}
|
||||
|
||||
// RegisteredProxies returns a list of the proxies tracked by Manager, filtered
|
||||
// by source.
|
||||
func (m *Manager) RegisteredProxies(source ProxySource) []ProxyID {
|
||||
|
@ -143,7 +171,7 @@ func (m *Manager) Register(id ProxyID, ns *structs.NodeService, source ProxySour
|
|||
}
|
||||
|
||||
var err error
|
||||
state, err = newState(id, ns, source, token, stateConfig)
|
||||
state, err = newState(id, ns, source, token, stateConfig, m.rateLimiter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
|
@ -79,6 +80,8 @@ type state struct {
|
|||
ch chan UpdateEvent
|
||||
snapCh chan ConfigSnapshot
|
||||
reqCh chan chan *ConfigSnapshot
|
||||
|
||||
rateLimiter *rate.Limiter
|
||||
}
|
||||
|
||||
// failed returns whether run exited because a data source is in an
|
||||
|
@ -148,7 +151,7 @@ func copyProxyConfig(ns *structs.NodeService) (structs.ConnectProxyConfig, error
|
|||
//
|
||||
// The returned state needs its required dependencies to be set before Watch
|
||||
// can be called.
|
||||
func newState(id ProxyID, ns *structs.NodeService, source ProxySource, token string, config stateConfig) (*state, error) {
|
||||
func newState(id ProxyID, ns *structs.NodeService, source ProxySource, token string, config stateConfig, rateLimiter *rate.Limiter) (*state, error) {
|
||||
// 10 is fairly arbitrary here but allow for the 3 mandatory and a
|
||||
// reasonable number of upstream watches to all deliver their initial
|
||||
// messages in parallel without blocking the cache.Notify loops. It's not a
|
||||
|
@ -176,6 +179,7 @@ func newState(id ProxyID, ns *structs.NodeService, source ProxySource, token str
|
|||
ch: ch,
|
||||
snapCh: make(chan ConfigSnapshot, 1),
|
||||
reqCh: make(chan chan *ConfigSnapshot, 1),
|
||||
rateLimiter: rateLimiter,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -303,6 +307,20 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) {
|
|||
sendCh := make(chan struct{})
|
||||
var coalesceTimer *time.Timer
|
||||
|
||||
scheduleUpdate := func() {
|
||||
// Wait for MAX(<rate limiter delay>, coalesceTimeout)
|
||||
delay := s.rateLimiter.Reserve().Delay()
|
||||
if delay < coalesceTimeout {
|
||||
delay = coalesceTimeout
|
||||
}
|
||||
coalesceTimer = time.AfterFunc(delay, func() {
|
||||
// This runs in another goroutine so we can't just do the send
|
||||
// directly here as access to snap is racy. Instead, signal the main
|
||||
// loop above.
|
||||
sendCh <- struct{}{}
|
||||
})
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
@ -345,9 +363,7 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) {
|
|||
s.logger.Trace("Failed to deliver new snapshot to proxy config watchers")
|
||||
|
||||
// Reset the timer to retry later. This is to ensure we attempt to redeliver the updated snapshot shortly.
|
||||
coalesceTimer = time.AfterFunc(coalesceTimeout, func() {
|
||||
sendCh <- struct{}{}
|
||||
})
|
||||
scheduleUpdate()
|
||||
|
||||
// Do not reset coalesceTimer since we just queued a timer-based refresh
|
||||
continue
|
||||
|
@ -375,15 +391,10 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) {
|
|||
// Check if snap is complete enough to be a valid config to deliver to a
|
||||
// proxy yet.
|
||||
if snap.Valid() {
|
||||
// Don't send it right away, set a short timer that will wait for updates
|
||||
// from any of the other cache values and deliver them all together.
|
||||
if coalesceTimer == nil {
|
||||
coalesceTimer = time.AfterFunc(coalesceTimeout, func() {
|
||||
// This runs in another goroutine so we can't just do the send
|
||||
// directly here as access to snap is racy. Instead, signal the main
|
||||
// loop above.
|
||||
sendCh <- struct{}{}
|
||||
})
|
||||
// Don't send it right away, set a short timer that will wait for updates
|
||||
// from any of the other cache values and deliver them all together.
|
||||
scheduleUpdate()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||
|
@ -106,7 +107,7 @@ func TestStateChanged(t *testing.T) {
|
|||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
proxyID := ProxyID{ServiceID: tt.ns.CompoundServiceID()}
|
||||
state, err := newState(proxyID, tt.ns, testSource, tt.token, stateConfig{logger: hclog.New(nil)})
|
||||
state, err := newState(proxyID, tt.ns, testSource, tt.token, stateConfig{logger: hclog.New(nil)}, rate.NewLimiter(rate.Inf, 1))
|
||||
require.NoError(t, err)
|
||||
otherNS, otherToken := tt.mutate(*tt.ns, tt.token)
|
||||
require.Equal(t, tt.want, state.Changed(otherNS, otherToken))
|
||||
|
@ -3463,7 +3464,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
}
|
||||
wr := recordWatches(&sc)
|
||||
|
||||
state, err := newState(proxyID, &tc.ns, testSource, "", sc)
|
||||
state, err := newState(proxyID, &tc.ns, testSource, "", sc, rate.NewLimiter(rate.Inf, 0))
|
||||
|
||||
// verify building the initial state worked
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -2206,3 +2206,15 @@ tls {
|
|||
Consul will not enable TLS for the HTTP or gRPC API unless the `https` port has
|
||||
been assigned a port number `> 0`. We recommend using `8501` for `https` as this
|
||||
default will automatically work with some tooling.
|
||||
|
||||
## xDS Server Parameters
|
||||
|
||||
- `xds`: This object allows you to configure the behavior of Consul's
|
||||
[xDS protocol](https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol)
|
||||
server.
|
||||
|
||||
- `update_max_per_second`: Specifies the number of proxy configuration updates across all connected xDS streams that are allowed per second. This configuration prevents updates to global resources, such as wildcard intentions, from consuming system resources at the expense of other processes, such as Raft and Gossip, which could cause general cluster instability.
|
||||
|
||||
The default value is `250`. It is based on a load test of 5,000 streams connected to a single server with two CPU cores.
|
||||
|
||||
If necessary, you can lower or increase the limit without a rolling restart by using the `consul reload` command or by sending the server a `SIGHUP`.
|
||||
|
|
Loading…
Reference in New Issue