connect: tame thundering herd of CSRs on CA rotation (#5228)

* Support rate limiting and concurrency limiting CSR requests on servers; handle CA rotations gracefully with jitter and backoff-on-rate-limit in client

* Add CSR rate limiting docs

* Fix config naming and add tests for new CA configs
This commit is contained in:
Paul Banks 2019-01-22 17:19:36 +00:00 committed by GitHub
parent fd1c1a656b
commit 1c4dfbcd2e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1184 additions and 82 deletions

View File

@ -1084,15 +1084,12 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
if a.config.ConnectCAProvider != "" { if a.config.ConnectCAProvider != "" {
base.CAConfig.Provider = a.config.ConnectCAProvider base.CAConfig.Provider = a.config.ConnectCAProvider
}
// Merge with the default config if it's the consul provider. // Merge connect CA Config regardless of provider (since there are some
if a.config.ConnectCAProvider == "consul" { // common config options valid to all like leaf TTL).
for k, v := range a.config.ConnectCAConfig { for k, v := range a.config.ConnectCAConfig {
base.CAConfig.Config[k] = v base.CAConfig.Config[k] = v
}
} else {
base.CAConfig.Config = a.config.ConnectCAConfig
}
} }
} }

View File

@ -12,21 +12,38 @@ import (
"github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
) )
// Recommended name for registration. // Recommended name for registration.
const ConnectCALeafName = "connect-ca-leaf" const ConnectCALeafName = "connect-ca-leaf"
// caChangeInitialSpreadDefault is the jitter we apply after noticing the CA // caChangeJitterWindow is the time over which we spread each round of retries
// changed before requesting a new cert. Since we don't know how many services // when attempting to get a new certificate following a root rotation. It's
// are in the cluster we can't be too smart about setting this so it's a // selected to be a trade-off between not making rotation unnecessarily slow on
// tradeoff between not making root rotations take unnecessarily long on small // a tiny cluster while not hammering the servers on a huge cluster
// clusters and not hammering the servers to hard on large ones. Note that // unnecessarily hard. Servers rate limit to protect themselves from the
// server's will soon have CSR rate limiting that will limit the impact on big // expensive crypto work, but in practice have 10k+ RPCs all in the same second
// clusters, but a small spread in the initial requests still seems like a good // will cause a major disruption even on large servers due to downloading the
// idea and limits how many clients will hit the rate limit. // payloads, parsing msgpack etc. Instead we pick a window that for now is fixed
const caChangeInitialSpreadDefault = 20 * time.Second // but later might be either user configurable (not nice since it would become
// another hard-to-tune value) or set dynamically by the server based on it's
// knowledge of how many certs need to be rotated. Currently the server doesn't
// know that so we pick something that is reasonable. We err on the side of
// being slower that we need in trivial cases but gentler for large deployments.
// 30s means that even with a cluster of 10k service instances, the server only
// has to cope with ~333 RPCs a second which shouldn't be too bad if it's rate
// limiting the actual expensive crypto work.
//
// The actual backoff strategy when we are rate limited is to have each cert
// only retry once with each window of this size, at a point in the window
// selected at random. This performs much better than exponential backoff in
// terms of getting things rotated quickly with more predictable load and so
// fewer rate limited requests. See the full simulation this is based on at
// https://github.com/banks/sim-rate-limit-backoff/blob/master/README.md for
// more detail.
const caChangeJitterWindow = 30 * time.Second
// ConnectCALeaf supports fetching and generating Connect leaf // ConnectCALeaf supports fetching and generating Connect leaf
// certificates. // certificates.
@ -75,6 +92,9 @@ type ConnectCALeaf struct {
// Fetch. Pointers themselves are OK, but if we point to another struct that we // Fetch. Pointers themselves are OK, but if we point to another struct that we
// call a method or modify in some way that would directly mutate the cache and // call a method or modify in some way that would directly mutate the cache and
// cause problems. We'd need to deep-clone in that case in Fetch below. // cause problems. We'd need to deep-clone in that case in Fetch below.
// time.Time technically contains a pointer to the Location but we ignore that
// since all times we get from our wall clock should point to the same Location
// anyway.
type fetchState struct { type fetchState struct {
// authorityKeyID is the key ID of the CA root that signed the current cert. // authorityKeyID is the key ID of the CA root that signed the current cert.
// This is just to save parsing the whole cert everytime we have to check if // This is just to save parsing the whole cert everytime we have to check if
@ -84,6 +104,18 @@ type fetchState struct {
// forceExpireAfter is used to coordinate renewing certs after a CA rotation // forceExpireAfter is used to coordinate renewing certs after a CA rotation
// in a staggered way so that we don't overwhelm the servers. // in a staggered way so that we don't overwhelm the servers.
forceExpireAfter time.Time forceExpireAfter time.Time
// activeRootRotationStart is set when the root has changed and we need to get
// a new cert but haven't got one yet. forceExpireAfter will be set to the
// next scheduled time we should try our CSR, but this is needed to calculate
// the retry windows if we are rate limited when we try. See comment on
// caChangeJitterWindow above for more.
activeRootRotationStart time.Time
// consecutiveRateLimitErrs stores how many rate limit errors we've hit. We
// use this to choose a new window for the next retry. See comment on
// caChangeJitterWindow above for more.
consecutiveRateLimitErrs int
} }
// fetchStart is called on each fetch that is about to block and wait for // fetchStart is called on each fetch that is about to block and wait for
@ -277,22 +309,44 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache
existing, ok = opts.LastResult.Value.(*structs.IssuedCert) existing, ok = opts.LastResult.Value.(*structs.IssuedCert)
if !ok { if !ok {
return result, fmt.Errorf( return result, fmt.Errorf(
"Internal cache failure: last value wrong type: %T", req) "Internal cache failure: last value wrong type: %T", opts.LastResult.Value)
} }
state, ok = opts.LastResult.State.(fetchState) if opts.LastResult.State != nil {
if !ok { state, ok = opts.LastResult.State.(fetchState)
return result, fmt.Errorf( if !ok {
"Internal cache failure: last state wrong type: %T", req) return result, fmt.Errorf(
"Internal cache failure: last state wrong type: %T", opts.LastResult.State)
}
} }
} else {
state = fetchState{}
} }
// Handle brand new request first as it's simplest. // Handle brand new request first as it's simplest.
if existing == nil { if existing == nil {
return c.generateNewLeaf(reqReal, &state) return c.generateNewLeaf(reqReal, result)
} }
// Setup result to mirror the current value for if we timeout or hit a rate
// limit. This allows us to update the state (e.g. for backoff or retry
// coordination on root change) even if we don't get a new cert.
result.Value = existing
result.Index = existing.ModifyIndex
result.State = state
// Since state is not a pointer, we can't just set it once in result and then
// continue to update it later since we will be updating only our copy.
// Instead we have a helper function that is used to make sure the state is
// updated in the result when we return.
lastResultWithNewState := func() cache.FetchResult {
return cache.FetchResult{
Value: existing,
Index: existing.ModifyIndex,
State: state,
}
}
// Beyond this point we need to only return lastResultWithNewState() not just
// result since otherwise we might "loose" state updates we expect not to.
// We have a certificate in cache already. Check it's still valid. // We have a certificate in cache already. Check it's still valid.
now := time.Now() now := time.Now()
minExpire, maxExpire := calculateSoftExpiry(now, existing) minExpire, maxExpire := calculateSoftExpiry(now, existing)
@ -306,7 +360,7 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache
if expiresAt == now || expiresAt.Before(now) { if expiresAt == now || expiresAt.Before(now) {
// Already expired, just make a new one right away // Already expired, just make a new one right away
return c.generateNewLeaf(reqReal, &state) return c.generateNewLeaf(reqReal, lastResultWithNewState())
} }
// We are about to block and wait for a change or timeout. // We are about to block and wait for a change or timeout.
@ -318,16 +372,18 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache
// reload latest CA from cache. // reload latest CA from cache.
rootUpdateCh := make(chan struct{}, 1) rootUpdateCh := make(chan struct{}, 1)
// The roots may have changed in between blocking calls. We need to verify
// that the existing cert was signed by the current root. If it was we still
// want to do the whole jitter thing. We could code that again here but it's
// identical to the select case below so we just trigger our own update chan
// and let the logic below handle checking if the CA actually changed in the
// common case where it didn't it is a no-op anyway.
rootUpdateCh <- struct{}{}
// Subscribe our chan to get root update notification. // Subscribe our chan to get root update notification.
c.fetchStart(rootUpdateCh) c.fetchStart(rootUpdateCh)
defer c.fetchDone(rootUpdateCh) defer c.fetchDone(rootUpdateCh)
// Setup result to mirror the current value for if we timeout. This allows us
// to update the state even if we don't generate a new cert.
result.Value = existing
result.Index = existing.ModifyIndex
result.State = state
// Setup the timeout chan outside the loop so we don't keep bumping the timout // Setup the timeout chan outside the loop so we don't keep bumping the timout
// later if we loop around. // later if we loop around.
timeoutCh := time.After(opts.Timeout) timeoutCh := time.After(opts.Timeout)
@ -341,31 +397,35 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache
select { select {
case <-timeoutCh: case <-timeoutCh:
// We timed out the request with same cert. // We timed out the request with same cert.
return result, nil return lastResultWithNewState(), nil
case <-expiresCh: case <-expiresCh:
// Cert expired or was force-expired by a root change. // Cert expired or was force-expired by a root change.
return c.generateNewLeaf(reqReal, &state) return c.generateNewLeaf(reqReal, lastResultWithNewState())
case <-rootUpdateCh: case <-rootUpdateCh:
// A root cache change occurred, reload roots from cache. // A root cache change occurred, reload roots from cache.
roots, err := c.rootsFromCache() roots, err := c.rootsFromCache()
if err != nil { if err != nil {
return result, err return lastResultWithNewState(), err
} }
// Handle _possibly_ changed roots. We still need to verify the new active // Handle _possibly_ changed roots. We still need to verify the new active
// root is not the same as the one our current cert was signed by since we // root is not the same as the one our current cert was signed by since we
// can be notified spuriously if we are the first request since the // can be notified spuriously if we are the first request since the
// rootsWatcher didn't know about the CA we were signed by. // rootsWatcher didn't know about the CA we were signed by. We also rely
// on this on every request to do the initial check that the current roots
// are the same ones the current cert was signed by.
if activeRootHasKey(roots, state.authorityKeyID) { if activeRootHasKey(roots, state.authorityKeyID) {
// Current active CA is the same one that signed our current cert so // Current active CA is the same one that signed our current cert so
// keep waiting for a change. // keep waiting for a change.
continue continue
} }
state.activeRootRotationStart = time.Now()
// CA root changed. We add some jitter here to avoid a thundering herd. // CA root changed. We add some jitter here to avoid a thundering herd.
// See docs on caChangeInitialJitter const. // See docs on caChangeJitterWindow const.
delay := lib.RandomStagger(caChangeInitialSpreadDefault) delay := lib.RandomStagger(caChangeJitterWindow)
if c.TestOverrideCAChangeInitialDelay > 0 { if c.TestOverrideCAChangeInitialDelay > 0 {
delay = c.TestOverrideCAChangeInitialDelay delay = c.TestOverrideCAChangeInitialDelay
} }
@ -374,7 +434,7 @@ func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache
// the cache state so the next request will notice we still need to renew // the cache state so the next request will notice we still need to renew
// and do it at the right time. This is cleared once a new cert is // and do it at the right time. This is cleared once a new cert is
// returned by generateNewLeaf. // returned by generateNewLeaf.
state.forceExpireAfter = time.Now().Add(delay) state.forceExpireAfter = state.activeRootRotationStart.Add(delay)
// If the delay time is within the current timeout, we want to renew the // If the delay time is within the current timeout, we want to renew the
// as soon as it's up. We change the expire time and chan so that when we // as soon as it's up. We change the expire time and chan so that when we
// loop back around, we'll wait at most delay until generating a new cert. // loop back around, we'll wait at most delay until generating a new cert.
@ -416,9 +476,20 @@ func (c *ConnectCALeaf) rootsFromCache() (*structs.IndexedCARoots, error) {
} }
// generateNewLeaf does the actual work of creating a new private key, // generateNewLeaf does the actual work of creating a new private key,
// generating a CSR and getting it signed by the servers. // generating a CSR and getting it signed by the servers. result argument
func (c *ConnectCALeaf) generateNewLeaf(req *ConnectCALeafRequest, state *fetchState) (cache.FetchResult, error) { // represents the last result currently in cache if any along with it's state.
var result cache.FetchResult func (c *ConnectCALeaf) generateNewLeaf(req *ConnectCALeafRequest,
result cache.FetchResult) (cache.FetchResult, error) {
var state fetchState
if result.State != nil {
var ok bool
state, ok = result.State.(fetchState)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: result state wrong type: %T", result.State)
}
}
// Need to lookup RootCAs response to discover trust domain. This should be a // Need to lookup RootCAs response to discover trust domain. This should be a
// cache hit. // cache hit.
@ -458,12 +529,55 @@ func (c *ConnectCALeaf) generateNewLeaf(req *ConnectCALeafRequest, state *fetchS
CSR: csr, CSR: csr,
} }
if err := c.RPC.RPC("ConnectCA.Sign", &args, &reply); err != nil { if err := c.RPC.RPC("ConnectCA.Sign", &args, &reply); err != nil {
if err.Error() == consul.ErrRateLimited.Error() {
if result.Value == nil {
// This was a first fetch - we have no good value in cache. In this case
// we just return the error to the caller rather than rely on surprising
// semi-blocking until the rate limit is appeased or we timeout
// behavior. It's likely the caller isn't expecting this to block since
// it's an initial fetch. This also massively simplifies this edge case.
return result, err
}
if state.activeRootRotationStart.IsZero() {
// We hit a rate limit error by chance - for example a cert expired
// before the root rotation was observed (not triggered by rotation) but
// while server is working through high load from a recent rotation.
// Just pretend there is a rotation and the retry logic here will start
// jittering and retrying in the same way from now.
state.activeRootRotationStart = time.Now()
}
// Increment the errors in the state
state.consecutiveRateLimitErrs++
delay := lib.RandomStagger(caChangeJitterWindow)
if c.TestOverrideCAChangeInitialDelay > 0 {
delay = c.TestOverrideCAChangeInitialDelay
}
// Find the start of the next window we can retry in. See comment on
// caChangeJitterWindow for details of why we use this strategy.
windowStart := state.activeRootRotationStart.Add(
time.Duration(state.consecutiveRateLimitErrs) * delay)
// Pick a random time in that window
state.forceExpireAfter = windowStart.Add(delay)
// Return a result with the existing cert but the new state - the cache
// will see this as no change. Note that we always have an existing result
// here due to the nil value check above.
result.State = state
return result, nil
}
return result, err return result, err
} }
reply.PrivateKeyPEM = pkPEM reply.PrivateKeyPEM = pkPEM
// Reset the forcedExpiry in the state // Reset rotation state
state.forceExpireAfter = time.Time{} state.forceExpireAfter = time.Time{}
state.consecutiveRateLimitErrs = 0
state.activeRootRotationStart = time.Time{}
cert, err := connect.ParseCert(reply.CertPEM) cert, err := connect.ParseCert(reply.CertPEM)
if err != nil { if err != nil {
@ -475,7 +589,7 @@ func (c *ConnectCALeaf) generateNewLeaf(req *ConnectCALeafRequest, state *fetchS
result.Value = &reply result.Value = &reply
// Store value not pointer so we don't accidentally mutate the cache entry // Store value not pointer so we don't accidentally mutate the cache entry
// state in Fetch. // state in Fetch.
result.State = *state result.State = state
result.Index = reply.ModifyIndex result.Index = reply.ModifyIndex
return result, nil return result, nil
} }

View File

@ -10,6 +10,7 @@ import (
"github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -162,17 +163,28 @@ func TestConnectCALeaf_changingRoots(t *testing.T) {
QueryMeta: structs.QueryMeta{Index: 1}, QueryMeta: structs.QueryMeta{Index: 1},
} }
// We need this later but needs to be defined so we sign second CSR with it
// otherwise we break the cert root checking.
caRoot2 := connect.TestCA(t, nil)
// Instrument ConnectCA.Sign to return signed cert // Instrument ConnectCA.Sign to return signed cert
var resp *structs.IssuedCert var resp *structs.IssuedCert
var idx uint64 var idx uint64
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil). rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) { Run(func(args mock.Arguments) {
ca := caRoot
cIdx := atomic.AddUint64(&idx, 1)
if cIdx > 1 {
// Second time round use the new CA
ca = caRoot2
}
reply := args.Get(2).(*structs.IssuedCert) reply := args.Get(2).(*structs.IssuedCert)
leaf, _ := connect.TestLeaf(t, "web", caRoot) leaf, _ := connect.TestLeaf(t, "web", ca)
reply.CertPEM = leaf reply.CertPEM = leaf
reply.ValidAfter = time.Now().Add(-1 * time.Hour) reply.ValidAfter = time.Now().Add(-1 * time.Hour)
reply.ValidBefore = time.Now().Add(11 * time.Hour) reply.ValidBefore = time.Now().Add(11 * time.Hour)
reply.CreateIndex = atomic.AddUint64(&idx, 1) reply.CreateIndex = cIdx
reply.ModifyIndex = reply.CreateIndex reply.ModifyIndex = reply.CreateIndex
resp = reply resp = reply
}) })
@ -205,7 +217,6 @@ func TestConnectCALeaf_changingRoots(t *testing.T) {
// Let's send in new roots, which should trigger the sign req. We need to take // Let's send in new roots, which should trigger the sign req. We need to take
// care to set the new root as active // care to set the new root as active
caRoot2 := connect.TestCA(t, nil)
caRoot2.Active = true caRoot2.Active = true
caRoot.Active = false caRoot.Active = false
rootsCh <- structs.IndexedCARoots{ rootsCh <- structs.IndexedCARoots{
@ -225,6 +236,9 @@ func TestConnectCALeaf_changingRoots(t *testing.T) {
require.Equal(resp, v.Value) require.Equal(resp, v.Value)
// 3 since the second CA "update" used up 2 // 3 since the second CA "update" used up 2
require.Equal(uint64(3), v.Index) require.Equal(uint64(3), v.Index)
// Set the LastResult for subsequent fetches
opts.LastResult = &v
opts.MinIndex = 3
} }
// Third fetch should block // Third fetch should block
@ -305,7 +319,14 @@ func TestConnectCALeaf_changingRootsJitterBetweenCalls(t *testing.T) {
} }
// Let's send in new roots, which should eventually trigger the sign req. We // Let's send in new roots, which should eventually trigger the sign req. We
// need to take care to set the new root as active // need to take care to set the new root as active. Note that this is
// implicitly testing that root updates that happen in between leaf blocking
// queries are still noticed too. At this point no leaf blocking query is
// running so the root watch should be stopped. By pushing this update, the
// next blocking query will _immediately_ see the new root which means it
// needs to correctly notice that it is not the same one that generated the
// current cert and start the rotation. This is good, just not obvious that
// the behavior is actually well tested here when it is.
caRoot2 := connect.TestCA(t, nil) caRoot2 := connect.TestCA(t, nil)
caRoot2.Active = true caRoot2.Active = true
caRoot.Active = false caRoot.Active = false
@ -380,6 +401,291 @@ func TestConnectCALeaf_changingRootsJitterBetweenCalls(t *testing.T) {
} }
} }
// Tests that if the root changes in between blocking calls we still pick it up.
func TestConnectCALeaf_changingRootsBetweenBlockingCalls(t *testing.T) {
t.Parallel()
require := require.New(t)
rpc := TestRPC(t)
defer rpc.AssertExpectations(t)
typ, rootsCh := testCALeafType(t, rpc)
defer close(rootsCh)
caRoot := connect.TestCA(t, nil)
caRoot.Active = true
rootsCh <- structs.IndexedCARoots{
ActiveRootID: caRoot.ID,
TrustDomain: "fake-trust-domain.consul",
Roots: []*structs.CARoot{
caRoot,
},
QueryMeta: structs.QueryMeta{Index: 1},
}
// Instrument ConnectCA.Sign to return signed cert
var resp *structs.IssuedCert
var idx uint64
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
reply := args.Get(2).(*structs.IssuedCert)
leaf, _ := connect.TestLeaf(t, "web", caRoot)
reply.CertPEM = leaf
reply.ValidAfter = time.Now().Add(-1 * time.Hour)
reply.ValidBefore = time.Now().Add(11 * time.Hour)
reply.CreateIndex = atomic.AddUint64(&idx, 1)
reply.ModifyIndex = reply.CreateIndex
resp = reply
})
// We'll reuse the fetch options and request. Short timeout important since we
// wait the full timeout before chaning roots.
opts := cache.FetchOptions{MinIndex: 0, Timeout: 35 * time.Millisecond}
req := &ConnectCALeafRequest{Datacenter: "dc1", Service: "web"}
// First fetch should return immediately
fetchCh := TestFetchCh(t, typ, opts, req)
select {
case <-time.After(100 * time.Millisecond):
t.Fatal("shouldn't block waiting for fetch")
case result := <-fetchCh:
v := mustFetchResult(t, result)
require.Equal(resp, v.Value)
require.Equal(uint64(1), v.Index)
// Set the LastResult for subsequent fetches
opts.LastResult = &v
}
// Next fetch should block for the full timeout
start := time.Now()
fetchCh = TestFetchCh(t, typ, opts, req)
select {
case <-time.After(100 * time.Millisecond):
t.Fatal("shouldn't block for too long waiting for fetch")
case result := <-fetchCh:
v := mustFetchResult(t, result)
require.Equal(resp, v.Value)
// Still the initial cached result
require.Equal(uint64(1), v.Index)
// Sanity check that it waited
require.True(time.Since(start) > opts.Timeout)
// Set the LastResult for subsequent fetches
opts.LastResult = &v
}
// No active requests, simulate root change now
caRoot2 := connect.TestCA(t, nil)
caRoot2.Active = true
caRoot.Active = false
rootsCh <- structs.IndexedCARoots{
ActiveRootID: caRoot2.ID,
TrustDomain: "fake-trust-domain.consul",
Roots: []*structs.CARoot{
caRoot2,
caRoot,
},
QueryMeta: structs.QueryMeta{Index: atomic.AddUint64(&idx, 1)},
}
earliestRootDelivery := time.Now()
// We should get the new cert immediately on next fetch (since test override
// root change jitter to be 1 nanosecond so no delay expected).
fetchCh = TestFetchCh(t, typ, opts, req)
select {
case <-time.After(100 * time.Millisecond):
t.Fatal("shouldn't block too long waiting for fetch")
case result := <-fetchCh:
v := mustFetchResult(t, result)
require.Equal(resp, v.Value)
// Index should be 3 since root change consumed 2
require.Equal(uint64(3), v.Index)
// Sanity check that we didn't wait too long
require.True(time.Since(earliestRootDelivery) < opts.Timeout)
// Set the LastResult for subsequent fetches
opts.LastResult = &v
}
}
func TestConnectCALeaf_CSRRateLimiting(t *testing.T) {
t.Parallel()
require := require.New(t)
rpc := TestRPC(t)
defer rpc.AssertExpectations(t)
typ, rootsCh := testCALeafType(t, rpc)
defer close(rootsCh)
// Each jitter window will be only 100 ms long to make testing quick but
// highly likely not to fail based on scheduling issues.
typ.TestOverrideCAChangeInitialDelay = 100 * time.Millisecond
// Setup root that will be returned by the mocked Root cache fetch
caRoot := connect.TestCA(t, nil)
caRoot.Active = true
rootsCh <- structs.IndexedCARoots{
ActiveRootID: caRoot.ID,
TrustDomain: "fake-trust-domain.consul",
Roots: []*structs.CARoot{
caRoot,
},
QueryMeta: structs.QueryMeta{Index: 1},
}
// Instrument ConnectCA.Sign
var resp *structs.IssuedCert
var idx, rateLimitedRPCs uint64
genCert := func(args mock.Arguments) {
reply := args.Get(2).(*structs.IssuedCert)
leaf, _ := connect.TestLeaf(t, "web", caRoot)
reply.CertPEM = leaf
reply.ValidAfter = time.Now().Add(-1 * time.Hour)
reply.ValidBefore = time.Now().Add(11 * time.Hour)
reply.CreateIndex = atomic.AddUint64(&idx, 1)
reply.ModifyIndex = reply.CreateIndex
resp = reply
}
incRateLimit := func(args mock.Arguments) {
atomic.AddUint64(&rateLimitedRPCs, 1)
}
// First call return rate limit error. This is important as it checks
// behavior when cache is empty and we have to return a nil Value but need to
// save state to do the right thing for retry.
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).
Return(consul.ErrRateLimited).Once().Run(incRateLimit)
// Then succeed on second call
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).
Return(nil).Run(genCert).Once()
// Then be rate limited again on several further calls
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).
Return(consul.ErrRateLimited).Twice().Run(incRateLimit)
// Then fine after that
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).
Return(nil).Run(genCert)
opts := cache.FetchOptions{MinIndex: 0, Timeout: 10 * time.Minute}
req := &ConnectCALeafRequest{Datacenter: "dc1", Service: "web"}
// First fetch should return rate limit error directly - client is expected to
// backoff itself.
fetchCh := TestFetchCh(t, typ, opts, req)
select {
case <-time.After(200 * time.Millisecond):
t.Fatal("shouldn't block longer than one jitter window for success")
case result := <-fetchCh:
switch v := result.(type) {
case error:
require.Error(v)
require.Equal(consul.ErrRateLimited.Error(), v.Error())
case cache.FetchResult:
t.Fatalf("Expected error")
}
}
// Second call should return correct cert immediately.
fetchCh = TestFetchCh(t, typ, opts, req)
select {
case <-time.After(100 * time.Millisecond):
t.Fatal("shouldn't block waiting for fetch")
case result := <-fetchCh:
v := mustFetchResult(t, result)
require.Equal(resp, v.Value)
require.Equal(uint64(1), v.Index)
// Set the LastResult for subsequent fetches
opts.LastResult = &v
// Set MinIndex
opts.MinIndex = 1
}
// Send in new roots, which should trigger the next sign req. We need to take
// care to set the new root as active
caRoot2 := connect.TestCA(t, nil)
caRoot2.Active = true
caRoot.Active = false
rootsCh <- structs.IndexedCARoots{
ActiveRootID: caRoot2.ID,
TrustDomain: "fake-trust-domain.consul",
Roots: []*structs.CARoot{
caRoot2,
caRoot,
},
QueryMeta: structs.QueryMeta{Index: atomic.AddUint64(&idx, 1)},
}
earliestRootDelivery := time.Now()
// Sanity check state
require.Equal(uint64(1), atomic.LoadUint64(&rateLimitedRPCs))
// After root rotation jitter has been waited out, a new CSR will
// be attempted but will fail and return the previous cached result with no
// error since we will try again soon.
fetchCh = TestFetchCh(t, typ, opts, req)
select {
case <-time.After(200 * time.Millisecond):
t.Fatal("shouldn't block too long waiting for fetch")
case result := <-fetchCh:
// We should block for _at least_ one jitter period since we set that to
// 100ms and in test override mode we always pick the max jitter not a
// random amount.
require.True(time.Since(earliestRootDelivery) > 100*time.Millisecond)
require.Equal(uint64(2), atomic.LoadUint64(&rateLimitedRPCs))
v := mustFetchResult(t, result)
require.Equal(resp, v.Value)
// 1 since this should still be the original cached result as we failed to
// get a new cert.
require.Equal(uint64(1), v.Index)
// Set the LastResult for subsequent fetches
opts.LastResult = &v
}
// Root rotation state is now only captured in the opts.LastResult.State so a
// subsequent call should also wait for 100ms and then attempt to generate a
// new cert since we failed last time.
fetchCh = TestFetchCh(t, typ, opts, req)
select {
case <-time.After(200 * time.Millisecond):
t.Fatal("shouldn't block too long waiting for fetch")
case result := <-fetchCh:
// We should block for _at least_ two jitter periods now.
require.True(time.Since(earliestRootDelivery) > 200*time.Millisecond)
require.Equal(uint64(3), atomic.LoadUint64(&rateLimitedRPCs))
v := mustFetchResult(t, result)
require.Equal(resp, v.Value)
// 1 since this should still be the original cached result as we failed to
// get a new cert.
require.Equal(uint64(1), v.Index)
// Set the LastResult for subsequent fetches
opts.LastResult = &v
}
// Now we've had two rate limit failures and seen root rotation state work
// across both the blocking request that observed the rotation and the
// subsequent one. The next request should wait out the rest of the backoff
// and then actually fetch a new cert at last!
fetchCh = TestFetchCh(t, typ, opts, req)
select {
case <-time.After(200 * time.Millisecond):
t.Fatal("shouldn't block too long waiting for fetch")
case result := <-fetchCh:
// We should block for _at least_ three jitter periods now.
require.True(time.Since(earliestRootDelivery) > 300*time.Millisecond)
require.Equal(uint64(3), atomic.LoadUint64(&rateLimitedRPCs))
v := mustFetchResult(t, result)
require.Equal(resp, v.Value)
// 3 since the rootCA change used 2
require.Equal(uint64(3), v.Index)
// Set the LastResult for subsequent fetches
opts.LastResult = &v
}
}
// This test runs multiple concurrent callers watching different leaf certs and // This test runs multiple concurrent callers watching different leaf certs and
// tries to ensure that the background root watch activity behaves correctly. // tries to ensure that the background root watch activity behaves correctly.
func TestConnectCALeaf_watchRootsDedupingMultipleCallers(t *testing.T) { func TestConnectCALeaf_watchRootsDedupingMultipleCallers(t *testing.T) {

View File

@ -501,6 +501,15 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<-
// This is a valid entry with a result // This is a valid entry with a result
newEntry.Valid = true newEntry.Valid = true
} else if result.State != nil && err == nil {
// Also set state if it's non-nil but Value is nil. This is important in the
// case we are returning nil due to a timeout or a transient error like rate
// limiting that we want to mask from the user - there is no result yet but
// we want to manage retrying internally before we return an error to user.
// The retrying state is in State so we need to still update that in the
// entry even if we don't have an actual result yet (e.g. hit a rate limit
// on first request for a leaf certificate).
newEntry.State = result.State
} }
// Error handling // Error handling

View File

@ -383,7 +383,7 @@ func TestCacheGet_emptyFetchResult(t *testing.T) {
// Configure the type // Configure the type
typ.Static(FetchResult{Value: 42, State: 31, Index: 1}, nil).Times(1) typ.Static(FetchResult{Value: 42, State: 31, Index: 1}, nil).Times(1)
// Return different State, it should be ignored // Return different State, it should NOT be ignored
typ.Static(FetchResult{Value: nil, State: 32}, nil).Run(func(args mock.Arguments) { typ.Static(FetchResult{Value: nil, State: 32}, nil).Run(func(args mock.Arguments) {
// We should get back the original state // We should get back the original state
opts := args.Get(0).(FetchOptions) opts := args.Get(0).(FetchOptions)
@ -414,8 +414,8 @@ func TestCacheGet_emptyFetchResult(t *testing.T) {
t.Fatal("timed out") t.Fatal("timed out")
} }
// Next request should get the first returned state too since last Fetch // Next request should get the SECOND returned state even though the fetch
// returned nil result. // returns nil and so the previous result is used.
req = TestRequest(t, RequestInfo{ req = TestRequest(t, RequestInfo{
Key: "hello", MinIndex: 1, Timeout: 100 * time.Millisecond}) Key: "hello", MinIndex: 1, Timeout: 100 * time.Millisecond})
result, meta, err = c.Get("t", req) result, meta, err = c.Get("t", req)
@ -424,7 +424,7 @@ func TestCacheGet_emptyFetchResult(t *testing.T) {
require.False(meta.Hit) require.False(meta.Hit)
select { select {
case state := <-stateCh: case state := <-stateCh:
require.Equal(31, state) require.Equal(32, state)
case <-time.After(20 * time.Millisecond): case <-time.After(20 * time.Millisecond):
t.Fatal("timed out") t.Fatal("timed out")
} }

3
agent/cache/type.go vendored
View File

@ -26,7 +26,8 @@ type Type interface {
// Consul so this allows cache types to be implemented with no extra logic. // Consul so this allows cache types to be implemented with no extra logic.
// Second, FetchResult can return an unset value and index. In this case, the // Second, FetchResult can return an unset value and index. In this case, the
// cache will reuse the last value automatically. If an unset Value is // cache will reuse the last value automatically. If an unset Value is
// returned, the State field will also be ignored currently. // returned, the State field will still be updated which allows maintaining
// metadata even when there is no result.
Fetch(FetchOptions, Request) (FetchResult, error) Fetch(FetchOptions, Request) (FetchResult, error)
// SupportsBlocking should return true if the type supports blocking queries. // SupportsBlocking should return true if the type supports blocking queries.

View File

@ -576,7 +576,9 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
"tls_skip_verify": "TLSSkipVerify", "tls_skip_verify": "TLSSkipVerify",
// Common CA config // Common CA config
"leaf_cert_ttl": "LeafCertTTL", "leaf_cert_ttl": "LeafCertTTL",
"csr_max_per_second": "CSRMaxPerSecond",
"csr_max_concurrent": "CSRMaxConcurrent",
}) })
} }

View File

@ -3009,8 +3009,10 @@ func TestFullConfig(t *testing.T) {
"connect": { "connect": {
"ca_provider": "consul", "ca_provider": "consul",
"ca_config": { "ca_config": {
"RotationPeriod": "90h", "rotation_period": "90h",
"LeafCertTTL": "1h" "leaf_cert_ttl": "1h",
"csr_max_per_second": 100,
"csr_max_concurrent": 2
}, },
"enabled": true, "enabled": true,
"proxy_defaults": { "proxy_defaults": {
@ -3558,6 +3560,10 @@ func TestFullConfig(t *testing.T) {
ca_config { ca_config {
rotation_period = "90h" rotation_period = "90h"
leaf_cert_ttl = "1h" leaf_cert_ttl = "1h"
# hack float since json parses numbers as float and we have to
# assert against the same thing
csr_max_per_second = 100.0
csr_max_concurrent = 2.0
} }
enabled = true enabled = true
proxy_defaults { proxy_defaults {
@ -4211,8 +4217,10 @@ func TestFullConfig(t *testing.T) {
ConnectSidecarMaxPort: 9999, ConnectSidecarMaxPort: 9999,
ConnectCAProvider: "consul", ConnectCAProvider: "consul",
ConnectCAConfig: map[string]interface{}{ ConnectCAConfig: map[string]interface{}{
"RotationPeriod": "90h", "RotationPeriod": "90h",
"LeafCertTTL": "1h", "LeafCertTTL": "1h",
"CSRMaxPerSecond": float64(100),
"CSRMaxConcurrent": float64(2),
}, },
ConnectProxyAllowManagedRoot: false, ConnectProxyAllowManagedRoot: false,
ConnectProxyAllowManagedAPIRegistration: false, ConnectProxyAllowManagedAPIRegistration: false,

View File

@ -1,12 +1,18 @@
package consul package consul
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"reflect" "reflect"
"strings" "strings"
"sync"
"time" "time"
"github.com/hashicorp/consul/lib/semaphore"
"golang.org/x/time/rate"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
@ -14,12 +20,84 @@ import (
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
) )
var ErrConnectNotEnabled = errors.New("Connect must be enabled in order to use this endpoint") var (
// Err strings. net/rpc doesn't have a way to transport typed/rich errors so
// we currently rely on sniffing the error string in a few cases where we need
// to change client behavior. These are the canonical error strings to use.
// Note though that client code can't use `err == consul.Err*` directly since
// the error returned by RPC will be a plain error.errorString created by
// net/rpc client so will not be the same _instance_ that this package
// variable points to. Clients need to compare using `err.Error() ==
// consul.ErrRateLimited.Error()` which is very sad. Short of replacing our
// RPC mechanism it's hard to know how to make that much better though.
ErrConnectNotEnabled = errors.New("Connect must be enabled in order to use this endpoint")
ErrRateLimited = errors.New("Rate limit reached, try again later")
)
const (
// csrLimitWait is the maximum time we'll wait for a slot when CSR concurrency
// limiting or rate limiting is occuring. It's intentionally short so small
// batches of requests can be accommodated when server has capacity (assuming
// signing one cert takes much less than this) but failing requests fast when
// a thundering herd comes along.
csrLimitWait = 500 * time.Millisecond
)
// ConnectCA manages the Connect CA. // ConnectCA manages the Connect CA.
type ConnectCA struct { type ConnectCA struct {
// srv is a pointer back to the server. // srv is a pointer back to the server.
srv *Server srv *Server
// csrRateLimiter limits the rate of signing new certs if configured. Lazily
// initialized from current config to support dynamic changes.
// csrRateLimiterMu must be held while dereferencing the pointer or storing a
// new one, but methods can be called on the limiter object outside of the
// locked section. This is done only in the getCSRRateLimiterWithLimit method.
csrRateLimiter *rate.Limiter
csrRateLimiterMu sync.RWMutex
// csrConcurrencyLimiter is a dynamically resizable semaphore used to limit
// Sign RPC concurrency if configured. The zero value is usable as soon as
// SetSize is called which we do dynamically in the RPC handler to avoid
// having to hook elaborate synchronization mechanisms through the CA config
// endpoint and config reload etc.
csrConcurrencyLimiter semaphore.Dynamic
}
// getCSRRateLimiterWithLimit returns a rate.Limiter with the desired limit set.
// It uses the shared server-wide limiter unless the limit has been changed in
// config or the limiter has not been setup yet in which case it just-in-time
// configures the new limiter. We assume that limit changes are relatively rare
// and that all callers (there is currently only one) use the same config value
// as the limit. There might be some flapping if there are multiple concurrent
// requests in flight at the time the config changes where A sees the new value
// and updates, B sees the old but then gets this lock second and changes back.
// Eventually though and very soon (once all current RPCs are complete) we are
// guaranteed to have the correct limit set by the next RPC that comes in so I
// assume this is fine. If we observe strange behavior because of it, we could
// add hysteresis that prevents changes too soon after a previous change but
// that seems unnecessary for now.
func (s *ConnectCA) getCSRRateLimiterWithLimit(limit rate.Limit) *rate.Limiter {
s.csrRateLimiterMu.RLock()
lim := s.csrRateLimiter
s.csrRateLimiterMu.RUnlock()
// If there is a current limiter with the same limit, return it. This should
// be the common case.
if lim != nil && lim.Limit() == limit {
return lim
}
// Need to change limiter, get write lock
s.csrRateLimiterMu.Lock()
defer s.csrRateLimiterMu.Unlock()
// No limiter yet, or limit changed in CA config, reconfigure a new limiter.
// We use burst of 1 for a hard limit. Note that either bursting or waiting is
// necessary to get expected behavior in fact of random arrival times, but we
// don't need both and we use Wait with a small delay to smooth noise. See
// https://github.com/banks/sim-rate-limit-backoff/blob/master/README.md.
s.csrRateLimiter = rate.NewLimiter(limit, 1)
return s.csrRateLimiter
} }
// ConfigurationGet returns the configuration for the CA. // ConfigurationGet returns the configuration for the CA.
@ -370,6 +448,28 @@ func (s *ConnectCA) Sign(
"we are %s", serviceID.Datacenter, s.srv.config.Datacenter) "we are %s", serviceID.Datacenter, s.srv.config.Datacenter)
} }
commonCfg, err := config.GetCommonConfig()
if err != nil {
return err
}
if commonCfg.CSRMaxPerSecond > 0 {
lim := s.getCSRRateLimiterWithLimit(rate.Limit(commonCfg.CSRMaxPerSecond))
// Wait up to the small threshold we allow for a token.
ctx, cancel := context.WithTimeout(context.Background(), csrLimitWait)
defer cancel()
if lim.Wait(ctx) != nil {
return ErrRateLimited
}
} else if commonCfg.CSRMaxConcurrent > 0 {
s.csrConcurrencyLimiter.SetSize(int64(commonCfg.CSRMaxConcurrent))
ctx, cancel := context.WithTimeout(context.Background(), csrLimitWait)
defer cancel()
if err := s.csrConcurrencyLimiter.Acquire(ctx); err != nil {
return ErrRateLimited
}
defer s.csrConcurrencyLimiter.Release()
}
// All seems to be in order, actually sign it. // All seems to be in order, actually sign it.
pem, err := provider.Sign(csr) pem, err := provider.Sign(csr)
if err != nil { if err != nil {

View File

@ -5,6 +5,7 @@ import (
"encoding/pem" "encoding/pem"
"fmt" "fmt"
"os" "os"
"sync"
"testing" "testing"
"time" "time"
@ -348,6 +349,201 @@ func TestConnectCASign(t *testing.T) {
assert.Equal(spiffeId.URI().String(), reply.ServiceURI) assert.Equal(spiffeId.URI().String(), reply.ServiceURI)
} }
// Bench how long Signing RPC takes. This was used to ballpark reasonable
// default rate limit to protect servers from thundering herds of signing
// requests on root rotation.
func BenchmarkConnectCASign(b *testing.B) {
t := &testing.T{}
require := require.New(b)
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Generate a CSR and request signing
spiffeID := connect.TestSpiffeIDService(b, "web")
csr, _ := connect.TestCSR(b, spiffeID)
args := &structs.CASignRequest{
Datacenter: "dc1",
CSR: csr,
}
var reply structs.IssuedCert
b.ResetTimer()
for n := 0; n < b.N; n++ {
require.NoError(msgpackrpc.CallWithCodec(codec, "ConnectCA.Sign", args, &reply))
}
}
func TestConnectCASign_rateLimit(t *testing.T) {
t.Parallel()
require := require.New(t)
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = true
c.CAConfig.Config = map[string]interface{}{
// It actually doesn't work as expected with some higher values because
// the token bucket is initialized with max(10%, 1) burst which for small
// values is 1 and then the test completes so fast it doesn't actually
// replenish any tokens so you only get the burst allowed through. This is
// OK, running the test slower is likely to be more brittle anyway since
// it will become more timing dependent whether the actual rate the
// requests are made matches the expectation from the sleeps etc.
"CSRMaxPerSecond": 1,
}
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Generate a CSR and request signing a few times in a loop.
spiffeID := connect.TestSpiffeIDService(t, "web")
csr, _ := connect.TestCSR(t, spiffeID)
args := &structs.CASignRequest{
Datacenter: "dc1",
CSR: csr,
}
var reply structs.IssuedCert
errs := make([]error, 10)
for i := 0; i < len(errs); i++ {
errs[i] = msgpackrpc.CallWithCodec(codec, "ConnectCA.Sign", args, &reply)
}
limitedCount := 0
successCount := 0
for _, err := range errs {
if err == nil {
successCount++
} else if err.Error() == ErrRateLimited.Error() {
limitedCount++
} else {
require.NoError(err)
}
}
// I've only ever seen this as 1/9 however if the test runs slowly on an
// over-subscribed CPU (e.g. in CI) it's possible that later requests could
// have had their token replenished and succeed so we allow a little slack -
// the test here isn't really the exact token bucket response more a sanity
// check that some limiting is being applied. Note that we can't just measure
// the time it took to send them all and infer how many should have succeeded
// without some complex modelling of the token bucket algorithm.
require.Truef(successCount >= 1, "at least 1 CSRs should have succeeded, got %d", successCount)
require.Truef(limitedCount >= 7, "at least 7 CSRs should have been rate limited, got %d", limitedCount)
}
func TestConnectCASign_concurrencyLimit(t *testing.T) {
t.Parallel()
require := require.New(t)
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.Bootstrap = true
c.CAConfig.Config = map[string]interface{}{
// Must disable the rate limit since it takes precedence
"CSRMaxPerSecond": 0,
"CSRMaxConcurrent": 1,
}
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Generate a CSR and request signing a few times in a loop.
spiffeID := connect.TestSpiffeIDService(t, "web")
csr, _ := connect.TestCSR(t, spiffeID)
args := &structs.CASignRequest{
Datacenter: "dc1",
CSR: csr,
}
var wg sync.WaitGroup
errs := make(chan error, 10)
times := make(chan time.Duration, cap(errs))
start := time.Now()
for i := 0; i < cap(errs); i++ {
wg.Add(1)
go func() {
defer wg.Done()
codec := rpcClient(t, s1)
defer codec.Close()
var reply structs.IssuedCert
errs <- msgpackrpc.CallWithCodec(codec, "ConnectCA.Sign", args, &reply)
times <- time.Since(start)
}()
}
wg.Wait()
close(errs)
limitedCount := 0
successCount := 0
var minTime, maxTime time.Duration
for err := range errs {
elapsed := <-times
if elapsed < minTime || minTime == 0 {
minTime = elapsed
}
if elapsed > maxTime {
maxTime = elapsed
}
if err == nil {
successCount++
} else if err.Error() == ErrRateLimited.Error() {
limitedCount++
} else {
require.NoError(err)
}
}
// These are very hand wavy - on my mac times look like this:
// 2.776009ms
// 3.705813ms
// 4.527212ms
// 5.267755ms
// 6.119809ms
// 6.958083ms
// 7.869179ms
// 8.675058ms
// 9.512281ms
// 10.238183ms
//
// But it's indistinguishable from noise - even if you disable the concurrency
// limiter you get pretty much the same pattern/spread.
//
// On the other hand it's only timing that stops us from not hitting the 500ms
// timeout. On highly CPU constrained CI box this could be brittle if we
// assert that we never get rate limited.
//
// So this test is not super strong - but it's a sanity check at least that
// things don't break when configured this way, and through manual
// inspection/debug logging etc. we can verify it's actually doing the
// concurrency limit thing. If you add a 100ms sleep into the sign endpoint
// after the rate limit code for example it makes it much more obvious:
//
// With 100ms sleep an no concurrency limit:
// min=109ms, max=118ms
// With concurrency limit of 1:
// min=106ms, max=538ms (with ~half hitting the 500ms timeout)
//
// Without instrumenting the endpoint to make the RPC take an artificially
// long time it's hard to know what else we can do to actively detect that the
// requests were serialized.
t.Logf("min=%s, max=%s", minTime, maxTime)
//t.Fail() // Uncomment to see the time spread logged
require.Truef(successCount >= 1, "at least 1 CSRs should have succeeded, got %d", successCount)
}
func TestConnectCASignValidation(t *testing.T) { func TestConnectCASignValidation(t *testing.T) {
t.Parallel() t.Parallel()

View File

@ -4,7 +4,7 @@ func init() {
registerEndpoint(func(s *Server) interface{} { return &ACL{s} }) registerEndpoint(func(s *Server) interface{} { return &ACL{s} })
registerEndpoint(func(s *Server) interface{} { return &Catalog{s} }) registerEndpoint(func(s *Server) interface{} { return &Catalog{s} })
registerEndpoint(func(s *Server) interface{} { return NewCoordinate(s) }) registerEndpoint(func(s *Server) interface{} { return NewCoordinate(s) })
registerEndpoint(func(s *Server) interface{} { return &ConnectCA{s} }) registerEndpoint(func(s *Server) interface{} { return &ConnectCA{srv: s} })
registerEndpoint(func(s *Server) interface{} { return &Health{s} }) registerEndpoint(func(s *Server) interface{} { return &Health{s} })
registerEndpoint(func(s *Server) interface{} { return &Intention{s} }) registerEndpoint(func(s *Server) interface{} { return &Intention{s} })
registerEndpoint(func(s *Server) interface{} { return &Internal{s} }) registerEndpoint(func(s *Server) interface{} { return &Internal{s} })

View File

@ -174,7 +174,6 @@ func (s *Store) caSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.CAConf
if err != nil { if err != nil {
return fmt.Errorf("failed CA config lookup: %s", err) return fmt.Errorf("failed CA config lookup: %s", err)
} }
// Set the indexes, prevent the cluster ID from changing. // Set the indexes, prevent the cluster ID from changing.
if prev != nil { if prev != nil {
existing := prev.(*structs.CAConfiguration) existing := prev.(*structs.CAConfiguration)

View File

@ -18,6 +18,7 @@ import (
metrics "github.com/armon/go-metrics" metrics "github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
cleanhttp "github.com/hashicorp/go-cleanhttp" cleanhttp "github.com/hashicorp/go-cleanhttp"
"github.com/mitchellh/mapstructure" "github.com/mitchellh/mapstructure"
@ -335,6 +336,11 @@ func (s *HTTPServer) wrap(handler endpoint, methods []string) http.HandlerFunc {
return ok return ok
} }
isTooManyRequests := func(err error) bool {
// Sadness net/rpc can't do nice typed errors so this is all we got
return err.Error() == consul.ErrRateLimited.Error()
}
addAllowHeader := func(methods []string) { addAllowHeader := func(methods []string) {
resp.Header().Add("Allow", strings.Join(methods, ",")) resp.Header().Add("Allow", strings.Join(methods, ","))
} }
@ -358,6 +364,9 @@ func (s *HTTPServer) wrap(handler endpoint, methods []string) http.HandlerFunc {
case isBadRequest(err): case isBadRequest(err):
resp.WriteHeader(http.StatusBadRequest) resp.WriteHeader(http.StatusBadRequest)
fmt.Fprint(resp, err.Error()) fmt.Fprint(resp, err.Error())
case isTooManyRequests(err):
resp.WriteHeader(http.StatusTooManyRequests)
fmt.Fprint(resp, err.Error())
default: default:
resp.WriteHeader(http.StatusInternalServerError) resp.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(resp, err.Error()) fmt.Fprint(resp, err.Error())

View File

@ -222,6 +222,10 @@ func (c *CAConfiguration) GetCommonConfig() (*CommonCAProviderConfig, error) {
} }
var config CommonCAProviderConfig var config CommonCAProviderConfig
// Set Defaults
config.CSRMaxPerSecond = 50 // See doc comment for rationale here.
decodeConf := &mapstructure.DecoderConfig{ decodeConf := &mapstructure.DecoderConfig{
DecodeHook: ParseDurationFunc(), DecodeHook: ParseDurationFunc(),
Result: &config, Result: &config,
@ -244,6 +248,30 @@ type CommonCAProviderConfig struct {
LeafCertTTL time.Duration LeafCertTTL time.Duration
SkipValidate bool SkipValidate bool
// CSRMaxPerSecond is a rate limit on processing Connect Certificate Signing
// Requests on the servers. It applies to all CA providers so can be used to
// limit rate to an external CA too. 0 disables the rate limit. Defaults to 50
// which is low enough to prevent overload of a reasonably sized production
// server while allowing a cluster with 1000 service instances to complete a
// rotation in 20 seconds. For reference a quad-core 2017 MacBook pro can
// process 100 signing RPCs a second while using less than half of one core.
// For large clusters with powerful servers it's advisable to increase this
// rate or to disable this limit and instead rely on CSRMaxConcurrent to only
// consume a subset of the server's cores.
CSRMaxPerSecond float32
// CSRMaxConcurrent is a limit on how many concurrent CSR signing requests
// will be processed in parallel. New incoming signing requests will try for
// `consul.csrSemaphoreWait` (currently 500ms) for a slot before being
// rejected with a "rate limited" backpressure response. This effectively sets
// how many CPU cores can be occupied by Connect CA signing activity and
// should be a (small) subset of your server's available cores to allow other
// tasks to complete when a barrage of CSRs come in (e.g. after a CA root
// rotation). Setting to 0 disables the limit, attempting to sign certs
// immediately in the RPC goroutine. This is 0 by default and CSRMaxPerSecond
// is used. This is ignored if CSRMaxPerSecond is non-zero.
CSRMaxConcurrent int
} }
func (c CommonCAProviderConfig) Validate() error { func (c CommonCAProviderConfig) Validate() error {

View File

@ -18,12 +18,14 @@ func TestCAConfiguration_GetCommonConfig(t *testing.T) {
name: "basic defaults", name: "basic defaults",
cfg: &CAConfiguration{ cfg: &CAConfiguration{
Config: map[string]interface{}{ Config: map[string]interface{}{
"RotationPeriod": "2160h", "RotationPeriod": "2160h",
"LeafCertTTL": "72h", "LeafCertTTL": "72h",
"CSRMaxPerSecond": "50",
}, },
}, },
want: &CommonCAProviderConfig{ want: &CommonCAProviderConfig{
LeafCertTTL: 72 * time.Hour, LeafCertTTL: 72 * time.Hour,
CSRMaxPerSecond: 50,
}, },
}, },
{ {
@ -41,7 +43,8 @@ func TestCAConfiguration_GetCommonConfig(t *testing.T) {
}, },
}, },
want: &CommonCAProviderConfig{ want: &CommonCAProviderConfig{
LeafCertTTL: 72 * time.Hour, LeafCertTTL: 72 * time.Hour,
CSRMaxPerSecond: 50, // The default value
}, },
}, },
} }

View File

@ -23,7 +23,10 @@ type CAConfig struct {
// CommonCAProviderConfig is the common options available to all CA providers. // CommonCAProviderConfig is the common options available to all CA providers.
type CommonCAProviderConfig struct { type CommonCAProviderConfig struct {
LeafCertTTL time.Duration LeafCertTTL time.Duration
SkipValidate bool
CSRMaxPerSecond float32
CSRMaxConcurrent int
} }
// ConsulCAProviderConfig is the config for the built-in Consul CA provider. // ConsulCAProviderConfig is the config for the built-in Consul CA provider.
@ -41,7 +44,6 @@ func ParseConsulCAConfig(raw map[string]interface{}) (*ConsulCAProviderConfig, e
var config ConsulCAProviderConfig var config ConsulCAProviderConfig
decodeConf := &mapstructure.DecoderConfig{ decodeConf := &mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(), DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
ErrorUnused: true,
Result: &config, Result: &config,
WeaklyTypedInput: true, WeaklyTypedInput: true,
} }

106
lib/semaphore/semaphore.go Normal file
View File

@ -0,0 +1,106 @@
// Package semaphore implements a simple semaphore that is based on
// golang.org/x/sync/semaphore but doesn't support weights. It's advantage over
// a simple buffered chan is that the capacity of the semaphore (i.e. the number
// of slots available) can be changed dynamically at runtime without waiting for
// all existing work to stop. This makes it easier to implement e.g. concurrency
// limits on certain operations that can be reconfigured at runtime.
package semaphore
import (
"container/list"
"context"
"sync"
)
// Dynamic implements a semaphore whose capacity can be changed dynamically at
// run time.
type Dynamic struct {
size int64
cur int64
waiters list.List
mu sync.Mutex
}
// NewDynamic returns a dynamic semaphore with the given initial capacity. Note
// that this is for convenience and to match golang.org/x/sync/semaphore however
// it's possible to use a zero-value semaphore provided SetSize is called before
// use.
func NewDynamic(n int64) *Dynamic {
return &Dynamic{
size: n,
}
}
// SetSize dynamically updates the number of available slots. If there are more
// than n slots currently acquired, no further acquires will succeed until
// sufficient have been released to take the total outstanding below n again.
func (s *Dynamic) SetSize(n int64) error {
s.mu.Lock()
defer s.mu.Unlock()
s.size = n
return nil
}
// Acquire attempts to acquire one "slot" in the semaphore, blocking only until
// ctx is Done. On success, returns nil. On failure, returns ctx.Err() and leaves
// the semaphore unchanged.
//
// If ctx is already done, Acquire may still succeed without blocking.
func (s *Dynamic) Acquire(ctx context.Context) error {
s.mu.Lock()
if s.cur < s.size {
s.cur++
s.mu.Unlock()
return nil
}
// Need to wait, add to waiter list
ready := make(chan struct{})
elem := s.waiters.PushBack(ready)
s.mu.Unlock()
select {
case <-ctx.Done():
err := ctx.Err()
s.mu.Lock()
select {
case <-ready:
// Acquired the semaphore after we were canceled. Rather than trying to
// fix up the queue, just pretend we didn't notice the cancellation.
err = nil
default:
s.waiters.Remove(elem)
}
s.mu.Unlock()
return err
case <-ready:
return nil
}
}
// Release releases the semaphore. It will panic if release is called on an
// empty semphore.
func (s *Dynamic) Release() {
s.mu.Lock()
defer s.mu.Unlock()
if s.cur < 1 {
panic("semaphore: bad release")
}
next := s.waiters.Front()
// If there are no waiters, just decrement and we're done
if next == nil {
s.cur--
return
}
// Need to yield our slot to the next waiter.
// Remove them from the list
s.waiters.Remove(next)
// And trigger it's chan before we release the lock
close(next.Value.(chan struct{}))
// Note we _don't_ decrement inflight since the slot was yielded directly.
}

View File

@ -0,0 +1,119 @@
package semaphore
// Based on https://github.com/golang/sync/blob/master/semaphore/semaphore_test.go
import (
"context"
"math/rand"
"runtime"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
)
const maxSleep = 1 * time.Millisecond
func HammerDynamic(sem *Dynamic, loops int) {
for i := 0; i < loops; i++ {
sem.Acquire(context.Background())
time.Sleep(time.Duration(rand.Int63n(int64(maxSleep/time.Nanosecond))) * time.Nanosecond)
sem.Release()
}
}
// TestDynamic hammers the semaphore from all available cores to ensure we don't
// hit a panic or race detector notice something wonky.
func TestDynamic(t *testing.T) {
t.Parallel()
n := runtime.GOMAXPROCS(0)
loops := 10000 / n
sem := NewDynamic(int64(n))
var wg sync.WaitGroup
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
HammerDynamic(sem, loops)
}()
}
wg.Wait()
}
func TestDynamicPanic(t *testing.T) {
t.Parallel()
defer func() {
if recover() == nil {
t.Fatal("release of an unacquired dynamic semaphore did not panic")
}
}()
w := NewDynamic(1)
w.Release()
}
func checkAcquire(t *testing.T, sem *Dynamic, wantAcquire bool) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
err := sem.Acquire(ctx)
if wantAcquire {
require.NoErrorf(t, err, "failed to acquire when we should have")
} else {
require.Error(t, err, "failed to block when should be full")
}
}
func TestDynamicAcquire(t *testing.T) {
t.Parallel()
ctx := context.Background()
sem := NewDynamic(2)
// Consume one slot [free: 1]
sem.Acquire(ctx)
// Should be able to consume another [free: 0]
checkAcquire(t, sem, true)
// Should fail to consume another [free: 0]
checkAcquire(t, sem, false)
// Release 2
sem.Release()
sem.Release()
// Should be able to consume another [free: 1]
checkAcquire(t, sem, true)
// Should be able to consume another [free: 0]
checkAcquire(t, sem, true)
// Should fail to consume another [free: 0]
checkAcquire(t, sem, false)
// Now expand the semaphore and we should be able to acquire again [free: 2]
sem.SetSize(4)
// Should be able to consume another [free: 1]
checkAcquire(t, sem, true)
// Should be able to consume another [free: 0]
checkAcquire(t, sem, true)
// Should fail to consume another [free: 0]
checkAcquire(t, sem, false)
// Shrinking it should work [free: 0]
sem.SetSize(3)
// Should fail to consume another [free: 0]
checkAcquire(t, sem, false)
// Release one [free: 0] (3 slots used are release, size only 3)
sem.Release()
// Should fail to consume another [free: 0]
checkAcquire(t, sem, false)
sem.Release()
// Should be able to consume another [free: 1]
checkAcquire(t, sem, true)
}

View File

@ -857,23 +857,72 @@ default will automatically work with some tooling.
<p>There are also a number of common configuration options supported by all providers:</p> <p>There are also a number of common configuration options supported by all providers:</p>
* <a name="ca_leaf_cert_ttl"></a><a href="#ca_leaf_cert_ttl">`leaf_cert_ttl`</a> The upper bound on the * <a name="ca_leaf_cert_ttl"></a><a href="#ca_leaf_cert_ttl">`leaf_cert_ttl`</a> The upper bound on the
lease duration of a leaf certificate issued for a service. In most cases a new leaf certificate will be lease duration of a leaf certificate issued for a service. In most
requested by a proxy before this limit is reached. This is also the effective limit on how long a server cases a new leaf certificate will be requested by a proxy before this
outage can last (with no leader) before network connections will start being rejected, and as a result the limit is reached. This is also the effective limit on how long a
defaults is `72h` to last through a weekend without intervention. This value cannot be lower than 1 hour server outage can last (with no leader) before network connections
or higher than 1 year. will start being rejected, and as a result the defaults is `72h` to
last through a weekend without intervention. This value cannot be
lower than 1 hour or higher than 1 year.
This value is also used when rotating out old root certificates from the cluster. When a root certificate This value is also used when rotating out old root certificates from
has been inactive (rotated out) for more than twice the *current* `leaf_cert_ttl`, it will be removed from the cluster. When a root certificate has been inactive (rotated out)
the trusted list. for more than twice the *current* `leaf_cert_ttl`, it will be removed
from the trusted list.
* <a name="connect_proxy"></a><a href="#connect_proxy">`proxy`</a> [**Deprecated**](/docs/connect/proxies/managed-deprecated.html) This object allows setting options for the Connect proxies. The following sub-keys are available: * <a name="ca_csr_max_per_second"></a><a
href="#ca_csr_max_per_second">`csr_max_per_second`</a> Sets a rate
limit on the maximum number of Certificate Signing Requests (CSRs) the
servers will accept. This is used to prevent CA rotation from causing
unbounded CPU usage on servers. It defaults to 50 which is
conservative - a 2017 Macbook can process about 100 per second using
only ~40% of one CPU core - but sufficient for deployments up to ~1500
service instances before the time it takes to rotate is impacted. For
larger deployments we recommend increasing this based on the expected
number of server instances and server resources, or use
`csr_max_concurrent` instead if servers have more than one core.
Setting this to zero disables rate limiting. Added in 1.4.1.
* <a name="connect_proxy_allow_managed_registration"></a><a href="#connect_proxy_allow_managed_registration">`allow_managed_api_registration`</a> [**Deprecated**](/docs/connect/proxies/managed-deprecated.html) Allows managed proxies to be configured with services that are registered via the Agent HTTP API. Enabling this would allow anyone with permission to register a service to define a command to execute for the proxy. By default, this is false to protect against arbitrary process execution. * <a name="ca_csr_max_concurrent"></a><a
href="#ca_csr_max_concurrent">`csr_max_concurrent`</a> Sets a limit
on how many Certificate Signing Requests will be processed
concurrently. Defaults to 0 (disabled). This is useful when you have
more than one or two cores available to the server. For example on an
8 core server, setting this to 1 will ensure that even during a CA
rotation no more than one server core on the leader will be consumed
at a time with generating new certificates. Setting this is
recommended _instead_ of `csr_max_per_second` where you know there are
multiple cores available since it is simpler to reason about limiting
CSR resources this way without artificially slowing down rotations.
Added in 1.4.1.
* <a name="connect_proxy_allow_managed_root"></a><a href="#connect_proxy_allow_managed_root">`allow_managed_root`</a> [**Deprecated**](/docs/connect/proxies/managed-deprecated.html) Allows Consul to start managed proxies if Consul is running as root (EUID of the process is zero). We recommend running Consul as a non-root user. By default, this is false to protect inadvertently running external processes as root. * <a name="connect_proxy"></a><a href="#connect_proxy">`proxy`</a>
[**Deprecated**](/docs/connect/proxies/managed-deprecated.html) This
* <a name="connect_proxy_defaults"></a><a href="#connect_proxy_defaults">`proxy_defaults`</a> [**Deprecated**](/docs/connect/proxies/managed-deprecated.html) This object configures the default proxy settings for service definitions with [managed proxies](/docs/connect/proxies/managed-deprecated.html) (now deprecated). It accepts the fields `exec_mode`, `daemon_command`, and `config`. These are used as default values for the respective fields in the service definition. object allows setting options for the Connect proxies. The following
sub-keys are available:
* <a name="connect_proxy_allow_managed_registration"></a><a
href="#connect_proxy_allow_managed_registration">`allow_managed_api_registration`</a>
[**Deprecated**](/docs/connect/proxies/managed-deprecated.html)
Allows managed proxies to be configured with services that are
registered via the Agent HTTP API. Enabling this would allow anyone
with permission to register a service to define a command to execute
for the proxy. By default, this is false to protect against
arbitrary process execution.
* <a name="connect_proxy_allow_managed_root"></a><a
href="#connect_proxy_allow_managed_root">`allow_managed_root`</a>
[**Deprecated**](/docs/connect/proxies/managed-deprecated.html)
Allows Consul to start managed proxies if Consul is running as root
(EUID of the process is zero). We recommend running Consul as a
non-root user. By default, this is false to protect inadvertently
running external processes as root.
* <a name="connect_proxy_defaults"></a><a
href="#connect_proxy_defaults">`proxy_defaults`</a>
[**Deprecated**](/docs/connect/proxies/managed-deprecated.html) This
object configures the default proxy settings for service definitions
with [managed proxies](/docs/connect/proxies/managed-deprecated.html)
(now deprecated). It accepts the fields `exec_mode`, `daemon_command`,
and `config`. These are used as default values for the respective
fields in the service definition.
* <a name="datacenter"></a><a href="#datacenter">`datacenter`</a> Equivalent to the * <a name="datacenter"></a><a href="#datacenter">`datacenter`</a> Equivalent to the
[`-datacenter` command-line flag](#_datacenter). [`-datacenter` command-line flag](#_datacenter).

View File

@ -79,9 +79,21 @@ So a scaling factor of `5` (i.e. `raft_multiplier: 5`) updates the following val
~> **NOTE** Wide networks with more latency will perform better with larger values of `raft_multiplier`. ~> **NOTE** Wide networks with more latency will perform better with larger values of `raft_multiplier`.
The trade off is between leader stability and time to recover from an actual leader failure. A short multiplier minimizes failure detection and election time but may be triggered frequently in high latency situations. This can cause constant leadership churn and associated unavailability. A high multiplier reduces the chances that spurious failures will cause leadership churn but it does this at the expense of taking longer to detect real failures and thus takes longer to restore cluster availability. The trade off is between leader stability and time to recover from an actual
leader failure. A short multiplier minimizes failure detection and election time
but may be triggered frequently in high latency situations. This can cause
constant leadership churn and associated unavailability. A high multiplier
reduces the chances that spurious failures will cause leadership churn but it
does this at the expense of taking longer to detect real failures and thus takes
longer to restore cluster availability.
Leadership instability can also be caused by under-provisioned CPU resources and is more likely in environments where CPU cycles are shared with other workloads. In order for a server to remain the leader, it must send frequent heartbeat messages to all other servers every few hundred milliseconds. If some number of these are missing or late due to the leader not having sufficient CPU to send them on time, the other servers will detect it as failed and hold a new election. Leadership instability can also be caused by under-provisioned CPU resources and
is more likely in environments where CPU cycles are shared with other workloads.
In order for a server to remain the leader, it must send frequent heartbeat
messages to all other servers every few hundred milliseconds. If some number of
these are missing or late due to the leader not having sufficient CPU to send
them on time, the other servers will detect it as failed and hold a new
election.
It's best to benchmark with a realistic workload when choosing a production server for Consul. It's best to benchmark with a realistic workload when choosing a production server for Consul.
Here are some general recommendations: Here are some general recommendations:
@ -159,3 +171,45 @@ To prevent any CPU spikes from a misconfigured client, RPC requests to the serve
~> **NOTE** Rate limiting is configured on the client agent only. ~> **NOTE** Rate limiting is configured on the client agent only.
In addition, two [performance indicators](/docs/agent/telemetry.html) &mdash; `consul.runtime.alloc_bytes` and `consul.runtime.heap_objects` &mdash; can help diagnose if the current sizing is not adequately meeting the load. In addition, two [performance indicators](/docs/agent/telemetry.html) &mdash; `consul.runtime.alloc_bytes` and `consul.runtime.heap_objects` &mdash; can help diagnose if the current sizing is not adequately meeting the load.
## Connect Certificate Signing CPU Limits
If you enable [Connect](/docs/connect/index.html), the leader server will need
to perform public key signing operations for every service instance in the
cluster. Typically these operations are fast on modern hardware, however when
the CA is changed or it's key rotated, the leader will face an influx of
requests for new certificates for every service instance running.
While the client agents distribute these randomly over 30 seconds to avoid an
immediate thundering herd, they don't have enough information to tune that
period based on the number of certificates in use in the cluster so picking
longer smearing results in artificially slow rotations for small clusters.
Smearing requests over 30s is sufficient to bring RPC load to a reasonable level
in all but the very largest clusters, but the extra CPU load from cryptographic
operations could impact the server's normal work. To limit that, Consul since
1.4.1 exposes two ways to limit the impact Certificate signing has on the leader
[`csr_max_per_second`](/docs/agent/options.html#ca_csr_max_per_second) and
[`csr_max_concurrent`](/docs/agent/options.html#ca_csr_max_concurrent).
By default we set a limit of 50 per second which is reasonable on modest
hardware but may be too low and impact rotation times if more than 1500 service
instances are using Connect in the cluster. `csr_max_per_second` is likely best
if you have fewer than four cores available since a whole core being used by
signing is likely to impact the server stability if it's all or a large portion
of the cores available. The downside is that you need to capacity plan: how many
service instances will need Connect certificates? What CSR rate can your server
tolerate without impacting stability? How fast do you want CA rotations to
process?
For larger production deployments, we generally recommend multiple CPU cores for
servers to handle the normal workload. With four or more cores available, it's
simpler to limit signing CPU impact with `csr_max_concurrent` rather than tune
the rate limit. This effectively sets how many CPU cores can be monopolized by
certificate signing work (although it doesn't pin that work to specific cores).
In this case `csr_max_per_second` should be disabled (set to `0`).
For example if you have an 8 core server, setting `csr_max_concurrent` to `1`
would allow you to process CSRs as fast as a single core can (which is likely
sufficient for the very large clusters), without consuming all available
CPU cores and impacting normal server work or stability.