Store the Connect CA rate limiter on the server
This fixes a bug where auto_encrypt was operating without utilizing a common rate limiter.
This commit is contained in:
parent
e9e88e4527
commit
5600069d69
|
@ -6,10 +6,8 @@ import (
|
|||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/lib/semaphore"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
|
@ -53,57 +51,6 @@ type ConnectCA struct {
|
|||
srv *Server
|
||||
|
||||
logger hclog.Logger
|
||||
|
||||
// csrRateLimiter limits the rate of signing new certs if configured. Lazily
|
||||
// initialized from current config to support dynamic changes.
|
||||
// csrRateLimiterMu must be held while dereferencing the pointer or storing a
|
||||
// new one, but methods can be called on the limiter object outside of the
|
||||
// locked section. This is done only in the getCSRRateLimiterWithLimit method.
|
||||
csrRateLimiter *rate.Limiter
|
||||
csrRateLimiterMu sync.RWMutex
|
||||
|
||||
// csrConcurrencyLimiter is a dynamically resizable semaphore used to limit
|
||||
// Sign RPC concurrency if configured. The zero value is usable as soon as
|
||||
// SetSize is called which we do dynamically in the RPC handler to avoid
|
||||
// having to hook elaborate synchronization mechanisms through the CA config
|
||||
// endpoint and config reload etc.
|
||||
csrConcurrencyLimiter semaphore.Dynamic
|
||||
}
|
||||
|
||||
// getCSRRateLimiterWithLimit returns a rate.Limiter with the desired limit set.
|
||||
// It uses the shared server-wide limiter unless the limit has been changed in
|
||||
// config or the limiter has not been setup yet in which case it just-in-time
|
||||
// configures the new limiter. We assume that limit changes are relatively rare
|
||||
// and that all callers (there is currently only one) use the same config value
|
||||
// as the limit. There might be some flapping if there are multiple concurrent
|
||||
// requests in flight at the time the config changes where A sees the new value
|
||||
// and updates, B sees the old but then gets this lock second and changes back.
|
||||
// Eventually though and very soon (once all current RPCs are complete) we are
|
||||
// guaranteed to have the correct limit set by the next RPC that comes in so I
|
||||
// assume this is fine. If we observe strange behavior because of it, we could
|
||||
// add hysteresis that prevents changes too soon after a previous change but
|
||||
// that seems unnecessary for now.
|
||||
func (s *ConnectCA) getCSRRateLimiterWithLimit(limit rate.Limit) *rate.Limiter {
|
||||
s.csrRateLimiterMu.RLock()
|
||||
lim := s.csrRateLimiter
|
||||
s.csrRateLimiterMu.RUnlock()
|
||||
|
||||
// If there is a current limiter with the same limit, return it. This should
|
||||
// be the common case.
|
||||
if lim != nil && lim.Limit() == limit {
|
||||
return lim
|
||||
}
|
||||
|
||||
// Need to change limiter, get write lock
|
||||
s.csrRateLimiterMu.Lock()
|
||||
defer s.csrRateLimiterMu.Unlock()
|
||||
// No limiter yet, or limit changed in CA config, reconfigure a new limiter.
|
||||
// We use burst of 1 for a hard limit. Note that either bursting or waiting is
|
||||
// necessary to get expected behavior in fact of random arrival times, but we
|
||||
// don't need both and we use Wait with a small delay to smooth noise. See
|
||||
// https://github.com/banks/sim-rate-limit-backoff/blob/master/README.md.
|
||||
s.csrRateLimiter = rate.NewLimiter(limit, 1)
|
||||
return s.csrRateLimiter
|
||||
}
|
||||
|
||||
// ConfigurationGet returns the configuration for the CA.
|
||||
|
@ -514,7 +461,7 @@ func (s *ConnectCA) Sign(
|
|||
return err
|
||||
}
|
||||
if commonCfg.CSRMaxPerSecond > 0 {
|
||||
lim := s.getCSRRateLimiterWithLimit(rate.Limit(commonCfg.CSRMaxPerSecond))
|
||||
lim := s.srv.caLeafLimiter.getCSRRateLimiterWithLimit(rate.Limit(commonCfg.CSRMaxPerSecond))
|
||||
// Wait up to the small threshold we allow for a token.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), csrLimitWait)
|
||||
defer cancel()
|
||||
|
@ -522,13 +469,13 @@ func (s *ConnectCA) Sign(
|
|||
return ErrRateLimited
|
||||
}
|
||||
} else if commonCfg.CSRMaxConcurrent > 0 {
|
||||
s.csrConcurrencyLimiter.SetSize(int64(commonCfg.CSRMaxConcurrent))
|
||||
s.srv.caLeafLimiter.csrConcurrencyLimiter.SetSize(int64(commonCfg.CSRMaxConcurrent))
|
||||
ctx, cancel := context.WithTimeout(context.Background(), csrLimitWait)
|
||||
defer cancel()
|
||||
if err := s.csrConcurrencyLimiter.Acquire(ctx); err != nil {
|
||||
if err := s.srv.caLeafLimiter.csrConcurrencyLimiter.Acquire(ctx); err != nil {
|
||||
return ErrRateLimited
|
||||
}
|
||||
defer s.csrConcurrencyLimiter.Release()
|
||||
defer s.srv.caLeafLimiter.csrConcurrencyLimiter.Release()
|
||||
}
|
||||
|
||||
// All seems to be in order, actually sign it.
|
||||
|
|
|
@ -153,6 +153,9 @@ type Server struct {
|
|||
caProviderRoot *structs.CARoot
|
||||
caProviderLock sync.RWMutex
|
||||
|
||||
// rate limiter to use when signing leaf certificates
|
||||
caLeafLimiter connectSignRateLimiter
|
||||
|
||||
// Consul configuration
|
||||
config *Config
|
||||
|
||||
|
|
|
@ -0,0 +1,61 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/consul/lib/semaphore"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
type connectSignRateLimiter struct {
|
||||
// csrRateLimiter limits the rate of signing new certs if configured. Lazily
|
||||
// initialized from current config to support dynamic changes.
|
||||
// csrRateLimiterMu must be held while dereferencing the pointer or storing a
|
||||
// new one, but methods can be called on the limiter object outside of the
|
||||
// locked section. This is done only in the getCSRRateLimiterWithLimit method.
|
||||
csrRateLimiter *rate.Limiter
|
||||
csrRateLimiterMu sync.RWMutex
|
||||
|
||||
// csrConcurrencyLimiter is a dynamically resizable semaphore used to limit
|
||||
// Sign RPC concurrency if configured. The zero value is usable as soon as
|
||||
// SetSize is called which we do dynamically in the RPC handler to avoid
|
||||
// having to hook elaborate synchronization mechanisms through the CA config
|
||||
// endpoint and config reload etc.
|
||||
csrConcurrencyLimiter semaphore.Dynamic
|
||||
}
|
||||
|
||||
// getCSRRateLimiterWithLimit returns a rate.Limiter with the desired limit set.
|
||||
// It uses the shared server-wide limiter unless the limit has been changed in
|
||||
// config or the limiter has not been setup yet in which case it just-in-time
|
||||
// configures the new limiter. We assume that limit changes are relatively rare
|
||||
// and that all callers (there is currently only one) use the same config value
|
||||
// as the limit. There might be some flapping if there are multiple concurrent
|
||||
// requests in flight at the time the config changes where A sees the new value
|
||||
// and updates, B sees the old but then gets this lock second and changes back.
|
||||
// Eventually though and very soon (once all current RPCs are complete) we are
|
||||
// guaranteed to have the correct limit set by the next RPC that comes in so I
|
||||
// assume this is fine. If we observe strange behavior because of it, we could
|
||||
// add hysteresis that prevents changes too soon after a previous change but
|
||||
// that seems unnecessary for now.
|
||||
func (l *connectSignRateLimiter) getCSRRateLimiterWithLimit(limit rate.Limit) *rate.Limiter {
|
||||
l.csrRateLimiterMu.RLock()
|
||||
lim := l.csrRateLimiter
|
||||
l.csrRateLimiterMu.RUnlock()
|
||||
|
||||
// If there is a current limiter with the same limit, return it. This should
|
||||
// be the common case.
|
||||
if lim != nil && lim.Limit() == limit {
|
||||
return lim
|
||||
}
|
||||
|
||||
// Need to change limiter, get write lock
|
||||
l.csrRateLimiterMu.Lock()
|
||||
defer l.csrRateLimiterMu.Unlock()
|
||||
// No limiter yet, or limit changed in CA config, reconfigure a new limiter.
|
||||
// We use burst of 1 for a hard limit. Note that either bursting or waiting is
|
||||
// necessary to get expected behavior in fact of random arrival times, but we
|
||||
// don't need both and we use Wait with a small delay to smooth noise. See
|
||||
// https://github.com/banks/sim-rate-limit-backoff/blob/master/README.md.
|
||||
l.csrRateLimiter = rate.NewLimiter(limit, 1)
|
||||
return l.csrRateLimiter
|
||||
}
|
Loading…
Reference in New Issue