Added ratelimit to handle throtling cache (#8226)
This implements a solution for #7863 It does: Add a new config cache.entry_fetch_rate to limit the number of calls/s for a given cache entry, default value = rate.Inf Add cache.entry_fetch_max_burst size of rate limit (default value = 2) The new configuration now supports the following syntax for instance to allow 1 query every 3s: command line HCL: -hcl 'cache = { entry_fetch_rate = 0.333}' in JSON { "cache": { "entry_fetch_rate": 0.333 } }
This commit is contained in:
parent
ec612d7744
commit
947d8eb039
|
@ -664,7 +664,7 @@ func (a *Agent) Start(ctx context.Context) error {
|
|||
a.sync = ae.NewStateSyncer(a.State, c.AEInterval, a.shutdownCh, a.logger)
|
||||
|
||||
// create the cache
|
||||
a.cache = cache.New(nil)
|
||||
a.cache = cache.New(c.Cache)
|
||||
|
||||
// create the config for the rpc server/client
|
||||
consulCfg, err := a.consulConfig()
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -857,6 +858,98 @@ func testAgent_AddServiceNoRemoteExec(t *testing.T, extraHCL string) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCacheRateLimit(test *testing.T) {
|
||||
test.Parallel()
|
||||
tests := []struct {
|
||||
// count := number of updates performed (1 every 10ms)
|
||||
count int
|
||||
// rateLimit rate limiting of cache
|
||||
rateLimit float64
|
||||
// Minimum number of updates to see from a cache perspective
|
||||
// We add a value with tolerance to work even on a loaded CI
|
||||
minUpdates int
|
||||
}{
|
||||
// 250 => we have a test running for at least 2.5s
|
||||
{250, 0.5, 1},
|
||||
{250, 1, 1},
|
||||
{300, 2, 2},
|
||||
}
|
||||
for _, currentTest := range tests {
|
||||
test.Run(fmt.Sprintf("rate_limit_at_%v", currentTest.rateLimit), func(t *testing.T) {
|
||||
tt := currentTest
|
||||
t.Parallel()
|
||||
a := NewTestAgent(t, fmt.Sprintf("cache = { entry_fetch_rate = %v, entry_fetch_max_burst = 1 }", tt.rateLimit))
|
||||
defer a.Shutdown()
|
||||
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
||||
|
||||
var wg sync.WaitGroup
|
||||
stillProcessing := true
|
||||
|
||||
injectService := func(i int) {
|
||||
srv := &structs.NodeService{
|
||||
Service: "redis",
|
||||
ID: "redis",
|
||||
Port: 1024 + i,
|
||||
Address: fmt.Sprintf("10.0.1.%d", i%255),
|
||||
}
|
||||
|
||||
err := a.AddService(srv, []*structs.CheckType{}, false, "", ConfigSourceRemote)
|
||||
require.Nil(t, err)
|
||||
}
|
||||
|
||||
runUpdates := func() {
|
||||
wg.Add(tt.count)
|
||||
for i := 0; i < tt.count; i++ {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
injectService(i)
|
||||
wg.Done()
|
||||
}
|
||||
stillProcessing = false
|
||||
}
|
||||
|
||||
getIndex := func(t *testing.T, oldIndex int) int {
|
||||
req, err := http.NewRequest("GET", fmt.Sprintf("/v1/health/service/redis?cached&wait=5s&index=%d", oldIndex), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
a.srv.Handler.ServeHTTP(resp, req)
|
||||
// Key doesn't actually exist so we should get 404
|
||||
if got, want := resp.Code, http.StatusOK; got != want {
|
||||
t.Fatalf("bad response code got %d want %d", got, want)
|
||||
}
|
||||
index, err := strconv.Atoi(resp.Header().Get("X-Consul-Index"))
|
||||
require.NoError(t, err)
|
||||
return index
|
||||
}
|
||||
|
||||
{
|
||||
start := time.Now()
|
||||
injectService(0)
|
||||
// Get the first index
|
||||
index := getIndex(t, 0)
|
||||
require.Greater(t, index, 2)
|
||||
go runUpdates()
|
||||
numberOfUpdates := 0
|
||||
for stillProcessing {
|
||||
oldIndex := index
|
||||
index = getIndex(t, oldIndex)
|
||||
require.GreaterOrEqual(t, index, oldIndex, "index must be increasing only")
|
||||
numberOfUpdates++
|
||||
}
|
||||
elapsed := time.Since(start)
|
||||
qps := float64(time.Second) * float64(numberOfUpdates) / float64(elapsed)
|
||||
summary := fmt.Sprintf("received %v updates in %v aka %f qps, target max was: %f qps", numberOfUpdates, elapsed, qps, tt.rateLimit)
|
||||
|
||||
// We must never go beyond the limit, we give 10% margin to avoid having values like 1.05 instead of 1 due to precision of clock
|
||||
require.LessOrEqual(t, qps, 1.1*tt.rateLimit, fmt.Sprintf("it should never get more requests than ratelimit, had: %s", summary))
|
||||
// We must have at least being notified a few times
|
||||
require.GreaterOrEqual(t, numberOfUpdates, tt.minUpdates, fmt.Sprintf("It should have received a minimum of %d updates, had: %s", tt.minUpdates, summary))
|
||||
}
|
||||
wg.Wait()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestAddServiceIPv4TaggedDefault(t *testing.T) {
|
||||
t.Helper()
|
||||
|
||||
|
|
39
agent/cache/cache.go
vendored
39
agent/cache/cache.go
vendored
|
@ -25,6 +25,7 @@ import (
|
|||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
//go:generate mockery -all -inpkg
|
||||
|
@ -81,6 +82,10 @@ type Cache struct {
|
|||
stopped uint32
|
||||
// stopCh is closed when Close is called
|
||||
stopCh chan struct{}
|
||||
// options includes a per Cache Rate limiter specification to avoid performing too many queries
|
||||
options Options
|
||||
rateLimitContext context.Context
|
||||
rateLimitCancel context.CancelFunc
|
||||
}
|
||||
|
||||
// typeEntry is a single type that is registered with a Cache.
|
||||
|
@ -122,23 +127,29 @@ type ResultMeta struct {
|
|||
|
||||
// Options are options for the Cache.
|
||||
type Options struct {
|
||||
// Nothing currently, reserved.
|
||||
// EntryFetchMaxBurst max burst size of RateLimit for a single cache entry
|
||||
EntryFetchMaxBurst int
|
||||
// EntryFetchRate represents the max calls/sec for a single cache entry
|
||||
EntryFetchRate rate.Limit
|
||||
}
|
||||
|
||||
// New creates a new cache with the given RPC client and reasonable defaults.
|
||||
// Further settings can be tweaked on the returned value.
|
||||
func New(*Options) *Cache {
|
||||
func New(options Options) *Cache {
|
||||
// Initialize the heap. The buffer of 1 is really important because
|
||||
// its possible for the expiry loop to trigger the heap to update
|
||||
// itself and it'd block forever otherwise.
|
||||
h := &expiryHeap{NotifyCh: make(chan struct{}, 1)}
|
||||
heap.Init(h)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
c := &Cache{
|
||||
types: make(map[string]typeEntry),
|
||||
entries: make(map[string]cacheEntry),
|
||||
entriesExpiryHeap: h,
|
||||
stopCh: make(chan struct{}),
|
||||
options: options,
|
||||
rateLimitContext: ctx,
|
||||
rateLimitCancel: cancel,
|
||||
}
|
||||
|
||||
// Start the expiry watcher
|
||||
|
@ -454,7 +465,14 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
|
|||
// If we don't have an entry, then create it. The entry must be marked
|
||||
// as invalid so that it isn't returned as a valid value for a zero index.
|
||||
if !ok {
|
||||
entry = cacheEntry{Valid: false, Waiter: make(chan struct{})}
|
||||
entry = cacheEntry{
|
||||
Valid: false,
|
||||
Waiter: make(chan struct{}),
|
||||
FetchRateLimiter: rate.NewLimiter(
|
||||
c.options.EntryFetchRate,
|
||||
c.options.EntryFetchMaxBurst,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
// Set that we're fetching to true, which makes it so that future
|
||||
|
@ -504,7 +522,13 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
|
|||
Index: entry.Index,
|
||||
}
|
||||
}
|
||||
|
||||
if err := entry.FetchRateLimiter.Wait(c.rateLimitContext); err != nil {
|
||||
if connectedTimer != nil {
|
||||
connectedTimer.Stop()
|
||||
}
|
||||
entry.Error = fmt.Errorf("rateLimitContext canceled: %s", err.Error())
|
||||
return
|
||||
}
|
||||
// Start building the new entry by blocking on the fetch.
|
||||
result, err := r.Fetch(fOpts)
|
||||
if connectedTimer != nil {
|
||||
|
@ -728,6 +752,7 @@ func (c *Cache) Close() error {
|
|||
if wasStopped == 0 {
|
||||
// First time only, close stop chan
|
||||
close(c.stopCh)
|
||||
c.rateLimitCancel()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -747,6 +772,10 @@ func (c *Cache) Prepopulate(t string, res FetchResult, dc, token, k string) erro
|
|||
FetchedAt: time.Now(),
|
||||
Waiter: make(chan struct{}),
|
||||
Expiry: &cacheEntryExpiry{Key: key},
|
||||
FetchRateLimiter: rate.NewLimiter(
|
||||
c.options.EntryFetchRate,
|
||||
c.options.EntryFetchMaxBurst,
|
||||
),
|
||||
}
|
||||
c.entriesLock.Lock()
|
||||
c.entries[key] = newEntry
|
||||
|
|
4
agent/cache/entry.go
vendored
4
agent/cache/entry.go
vendored
|
@ -3,6 +3,8 @@ package cache
|
|||
import (
|
||||
"container/heap"
|
||||
"time"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
// cacheEntry stores a single cache entry.
|
||||
|
@ -41,6 +43,8 @@ type cacheEntry struct {
|
|||
// background request has be blocking for at least 5 seconds, which ever
|
||||
// happens first.
|
||||
RefreshLostContact time.Time
|
||||
// FetchRateLimiter limits the rate at which fetch is called for this entry.
|
||||
FetchRateLimiter *rate.Limiter
|
||||
}
|
||||
|
||||
// cacheEntryExpiry contains the expiration information for a cache
|
||||
|
|
3
agent/cache/testing.go
vendored
3
agent/cache/testing.go
vendored
|
@ -8,12 +8,13 @@ import (
|
|||
"github.com/mitchellh/go-testing-interface"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
// TestCache returns a Cache instance configuring for testing.
|
||||
func TestCache(t testing.T) *Cache {
|
||||
// Simple but lets us do some fine-tuning later if we want to.
|
||||
return New(nil)
|
||||
return New(Options{EntryFetchRate: rate.Inf, EntryFetchMaxBurst: 2})
|
||||
}
|
||||
|
||||
// TestCacheGetCh returns a channel that returns the result of the Get call.
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/checks"
|
||||
"github.com/hashicorp/consul/agent/connect/ca"
|
||||
"github.com/hashicorp/consul/agent/consul"
|
||||
|
@ -32,6 +33,15 @@ import (
|
|||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
// The following constants are default values for some settings
|
||||
// Ensure to update documentation if you modify those values
|
||||
const (
|
||||
// DefaultEntryFetchMaxBurst is the default value for cache.entry_fetch_max_burst
|
||||
DefaultEntryFetchMaxBurst = 2
|
||||
// DefaultEntryFetchRate is the default value for cache.entry_fetch_rate
|
||||
DefaultEntryFetchRate = float64(rate.Inf)
|
||||
)
|
||||
|
||||
// Builder constructs a valid runtime configuration from multiple
|
||||
// configuration sources.
|
||||
//
|
||||
|
@ -887,6 +897,12 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
|
|||
BindAddr: bindAddr,
|
||||
Bootstrap: b.boolVal(c.Bootstrap),
|
||||
BootstrapExpect: b.intVal(c.BootstrapExpect),
|
||||
Cache: cache.Options{
|
||||
EntryFetchRate: rate.Limit(
|
||||
b.float64ValWithDefault(c.Cache.EntryFetchRate, DefaultEntryFetchRate)),
|
||||
EntryFetchMaxBurst: b.intValWithDefault(
|
||||
c.Cache.EntryFetchMaxBurst, DefaultEntryFetchMaxBurst),
|
||||
},
|
||||
CAFile: b.stringVal(c.CAFile),
|
||||
CAPath: b.stringVal(c.CAPath),
|
||||
CertFile: b.stringVal(c.CertFile),
|
||||
|
@ -1014,6 +1030,13 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
|
|||
Watches: c.Watches,
|
||||
}
|
||||
|
||||
if rt.Cache.EntryFetchMaxBurst <= 0 {
|
||||
return RuntimeConfig{}, fmt.Errorf("cache.entry_fetch_max_burst must be strictly positive, was: %v", rt.Cache.EntryFetchMaxBurst)
|
||||
}
|
||||
if rt.Cache.EntryFetchRate <= 0 {
|
||||
return RuntimeConfig{}, fmt.Errorf("cache.entry_fetch_rate must be strictly positive, was: %v", rt.Cache.EntryFetchRate)
|
||||
}
|
||||
|
||||
if entCfg, err := b.BuildEnterpriseRuntimeConfig(&c); err != nil {
|
||||
return RuntimeConfig{}, err
|
||||
} else {
|
||||
|
@ -1645,14 +1668,18 @@ func (b *Builder) stringVal(v *string) string {
|
|||
return b.stringValWithDefault(v, "")
|
||||
}
|
||||
|
||||
func (b *Builder) float64Val(v *float64) float64 {
|
||||
func (b *Builder) float64ValWithDefault(v *float64, defaultVal float64) float64 {
|
||||
if v == nil {
|
||||
return 0
|
||||
return defaultVal
|
||||
}
|
||||
|
||||
return *v
|
||||
}
|
||||
|
||||
func (b *Builder) float64Val(v *float64) float64 {
|
||||
return b.float64ValWithDefault(v, 0)
|
||||
}
|
||||
|
||||
func (b *Builder) cidrsVal(name string, v []string) (nets []*net.IPNet) {
|
||||
if v == nil {
|
||||
return
|
||||
|
|
|
@ -58,6 +58,14 @@ func Parse(data string, format string) (c Config, md mapstructure.Metadata, err
|
|||
return c, md, nil
|
||||
}
|
||||
|
||||
// Cache is the tunning configuration for cache, values are optional
|
||||
type Cache struct {
|
||||
// EntryFetchMaxBurst max burst size of RateLimit for a single cache entry
|
||||
EntryFetchMaxBurst *int `json:"entry_fetch_max_burst,omitempty" hcl:"entry_fetch_max_burst" mapstructure:"entry_fetch_max_burst"`
|
||||
// EntryFetchRate represents the max calls/sec for a single cache entry
|
||||
EntryFetchRate *float64 `json:"entry_fetch_rate,omitempty" hcl:"entry_fetch_rate" mapstructure:"entry_fetch_rate"`
|
||||
}
|
||||
|
||||
// Config defines the format of a configuration file in either JSON or
|
||||
// HCL format.
|
||||
//
|
||||
|
@ -101,6 +109,7 @@ type Config struct {
|
|||
BindAddr *string `json:"bind_addr,omitempty" hcl:"bind_addr" mapstructure:"bind_addr"`
|
||||
Bootstrap *bool `json:"bootstrap,omitempty" hcl:"bootstrap" mapstructure:"bootstrap"`
|
||||
BootstrapExpect *int `json:"bootstrap_expect,omitempty" hcl:"bootstrap_expect" mapstructure:"bootstrap_expect"`
|
||||
Cache Cache `json:"cache,omitempty" hcl:"cache" mapstructure:"cache"`
|
||||
CAFile *string `json:"ca_file,omitempty" hcl:"ca_file" mapstructure:"ca_file"`
|
||||
CAPath *string `json:"ca_path,omitempty" hcl:"ca_path" mapstructure:"ca_path"`
|
||||
CertFile *string `json:"cert_file,omitempty" hcl:"cert_file" mapstructure:"cert_file"`
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
|
@ -444,6 +445,9 @@ type RuntimeConfig struct {
|
|||
// flag: -bootstrap-expect=int
|
||||
BootstrapExpect int
|
||||
|
||||
// Cache represent cache configuration of agent
|
||||
Cache cache.Options
|
||||
|
||||
// CAFile is a path to a certificate authority file. This is used with
|
||||
// VerifyIncoming or VerifyOutgoing to verify the TLS connection.
|
||||
//
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/checks"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
|
@ -4411,6 +4412,10 @@ func TestFullConfig(t *testing.T) {
|
|||
"bind_addr": "16.99.34.17",
|
||||
"bootstrap": true,
|
||||
"bootstrap_expect": 53,
|
||||
"cache": {
|
||||
"entry_fetch_max_burst": 42,
|
||||
"entry_fetch_rate": 0.334
|
||||
},
|
||||
"ca_file": "erA7T0PM",
|
||||
"ca_path": "mQEN1Mfp",
|
||||
"cert_file": "7s4QAzDk",
|
||||
|
@ -5071,6 +5076,10 @@ func TestFullConfig(t *testing.T) {
|
|||
bind_addr = "16.99.34.17"
|
||||
bootstrap = true
|
||||
bootstrap_expect = 53
|
||||
cache = {
|
||||
entry_fetch_max_burst = 42
|
||||
entry_fetch_rate = 0.334
|
||||
},
|
||||
ca_file = "erA7T0PM"
|
||||
ca_path = "mQEN1Mfp"
|
||||
cert_file = "7s4QAzDk"
|
||||
|
@ -5797,6 +5806,10 @@ func TestFullConfig(t *testing.T) {
|
|||
BindAddr: ipAddr("16.99.34.17"),
|
||||
Bootstrap: true,
|
||||
BootstrapExpect: 53,
|
||||
Cache: cache.Options{
|
||||
EntryFetchMaxBurst: 42,
|
||||
EntryFetchRate: 0.334,
|
||||
},
|
||||
CAFile: "erA7T0PM",
|
||||
CAPath: "mQEN1Mfp",
|
||||
CertFile: "7s4QAzDk",
|
||||
|
@ -6679,6 +6692,10 @@ func TestSanitize(t *testing.T) {
|
|||
&net.TCPAddr{IP: net.ParseIP("1.2.3.4"), Port: 5678},
|
||||
&net.UnixAddr{Name: "/var/run/foo"},
|
||||
},
|
||||
Cache: cache.Options{
|
||||
EntryFetchMaxBurst: 42,
|
||||
EntryFetchRate: 0.334,
|
||||
},
|
||||
ConsulCoordinateUpdatePeriod: 15 * time.Second,
|
||||
RetryJoinLAN: []string{
|
||||
"foo=bar key=baz secret=boom bang=bar",
|
||||
|
@ -6749,6 +6766,10 @@ func TestSanitize(t *testing.T) {
|
|||
"BindAddr": "127.0.0.1",
|
||||
"Bootstrap": false,
|
||||
"BootstrapExpect": 0,
|
||||
"Cache": {
|
||||
"EntryFetchMaxBurst": 42,
|
||||
"EntryFetchRate": 0.334
|
||||
},
|
||||
"CAFile": "",
|
||||
"CAPath": "",
|
||||
"CertFile": "",
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
|
||||
"github.com/mitchellh/copystructure"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||
|
@ -461,7 +462,7 @@ func TestManager_deliverLatest(t *testing.T) {
|
|||
// None of these need to do anything to test this method just be valid
|
||||
logger := testutil.Logger(t)
|
||||
cfg := ManagerConfig{
|
||||
Cache: cache.New(nil),
|
||||
Cache: cache.New(cache.Options{EntryFetchRate: rate.Inf, EntryFetchMaxBurst: 2}),
|
||||
State: local.NewState(local.Config{}, logger, &token.Store{}),
|
||||
Source: &structs.QuerySource{
|
||||
Node: "node1",
|
||||
|
|
|
@ -959,6 +959,24 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'."
|
|||
|
||||
- `bind_addr` Equivalent to the [`-bind` command-line flag](#_bind).
|
||||
|
||||
- `cache` Cache configuration of agent. The configurable values are the following:
|
||||
|
||||
- `entry_fetch_max_burst`: The size of the token bucket used to recharge the rate-limit per
|
||||
cache entry. The default value is 2 and means that when cache has not been updated
|
||||
for a long time, 2 successive queries can be made as long as the rate-limit is not
|
||||
reached.
|
||||
|
||||
- `entry_fetch_rate`: configures the rate-limit at which the cache may refresh a single
|
||||
entry. On a cluster with many changes/s, watching changes in the cache might put high
|
||||
pressure on the servers. This ensures the number of requests for a single cache entry
|
||||
will never go beyond this limit, even when a given service changes every 1/100s.
|
||||
Since this is a per cache entry limit, having a highly unstable service will only rate
|
||||
limit the watched on this service, but not the other services/entries.
|
||||
The value is strictly positive, expressed in queries per second as a float,
|
||||
1 means 1 query per second, 0.1 mean 1 request every 10s maximum.
|
||||
The default value is "No limit" and should be tuned on large
|
||||
clusters to avoid performing too many RPCs on entries changing a lot.
|
||||
|
||||
- `ca_file` This provides a file path to a PEM-encoded certificate
|
||||
authority. The certificate authority is used to check the authenticity of client
|
||||
and server connections with the appropriate [`verify_incoming`](#verify_incoming)
|
||||
|
|
Loading…
Reference in a new issue