From 5600069d696d0a37cadcec13a12bf5cc767fa59f Mon Sep 17 00:00:00 2001 From: Matt Keeler Date: Mon, 29 Jun 2020 15:52:47 -0400 Subject: [PATCH] Store the Connect CA rate limiter on the server This fixes a bug where auto_encrypt was operating without utilizing a common rate limiter. --- agent/consul/connect_ca_endpoint.go | 61 ++--------------------------- agent/consul/server.go | 3 ++ agent/consul/server_connect.go | 61 +++++++++++++++++++++++++++++ 3 files changed, 68 insertions(+), 57 deletions(-) create mode 100644 agent/consul/server_connect.go diff --git a/agent/consul/connect_ca_endpoint.go b/agent/consul/connect_ca_endpoint.go index 81a511ea7..bbd2c4591 100644 --- a/agent/consul/connect_ca_endpoint.go +++ b/agent/consul/connect_ca_endpoint.go @@ -6,10 +6,8 @@ import ( "fmt" "reflect" "strings" - "sync" "time" - "github.com/hashicorp/consul/lib/semaphore" "github.com/hashicorp/go-hclog" "golang.org/x/time/rate" @@ -53,57 +51,6 @@ type ConnectCA struct { srv *Server logger hclog.Logger - - // csrRateLimiter limits the rate of signing new certs if configured. Lazily - // initialized from current config to support dynamic changes. - // csrRateLimiterMu must be held while dereferencing the pointer or storing a - // new one, but methods can be called on the limiter object outside of the - // locked section. This is done only in the getCSRRateLimiterWithLimit method. - csrRateLimiter *rate.Limiter - csrRateLimiterMu sync.RWMutex - - // csrConcurrencyLimiter is a dynamically resizable semaphore used to limit - // Sign RPC concurrency if configured. The zero value is usable as soon as - // SetSize is called which we do dynamically in the RPC handler to avoid - // having to hook elaborate synchronization mechanisms through the CA config - // endpoint and config reload etc. - csrConcurrencyLimiter semaphore.Dynamic -} - -// getCSRRateLimiterWithLimit returns a rate.Limiter with the desired limit set. -// It uses the shared server-wide limiter unless the limit has been changed in -// config or the limiter has not been setup yet in which case it just-in-time -// configures the new limiter. We assume that limit changes are relatively rare -// and that all callers (there is currently only one) use the same config value -// as the limit. There might be some flapping if there are multiple concurrent -// requests in flight at the time the config changes where A sees the new value -// and updates, B sees the old but then gets this lock second and changes back. -// Eventually though and very soon (once all current RPCs are complete) we are -// guaranteed to have the correct limit set by the next RPC that comes in so I -// assume this is fine. If we observe strange behavior because of it, we could -// add hysteresis that prevents changes too soon after a previous change but -// that seems unnecessary for now. -func (s *ConnectCA) getCSRRateLimiterWithLimit(limit rate.Limit) *rate.Limiter { - s.csrRateLimiterMu.RLock() - lim := s.csrRateLimiter - s.csrRateLimiterMu.RUnlock() - - // If there is a current limiter with the same limit, return it. This should - // be the common case. - if lim != nil && lim.Limit() == limit { - return lim - } - - // Need to change limiter, get write lock - s.csrRateLimiterMu.Lock() - defer s.csrRateLimiterMu.Unlock() - // No limiter yet, or limit changed in CA config, reconfigure a new limiter. - // We use burst of 1 for a hard limit. Note that either bursting or waiting is - // necessary to get expected behavior in fact of random arrival times, but we - // don't need both and we use Wait with a small delay to smooth noise. See - // https://github.com/banks/sim-rate-limit-backoff/blob/master/README.md. - s.csrRateLimiter = rate.NewLimiter(limit, 1) - return s.csrRateLimiter } // ConfigurationGet returns the configuration for the CA. @@ -514,7 +461,7 @@ func (s *ConnectCA) Sign( return err } if commonCfg.CSRMaxPerSecond > 0 { - lim := s.getCSRRateLimiterWithLimit(rate.Limit(commonCfg.CSRMaxPerSecond)) + lim := s.srv.caLeafLimiter.getCSRRateLimiterWithLimit(rate.Limit(commonCfg.CSRMaxPerSecond)) // Wait up to the small threshold we allow for a token. ctx, cancel := context.WithTimeout(context.Background(), csrLimitWait) defer cancel() @@ -522,13 +469,13 @@ func (s *ConnectCA) Sign( return ErrRateLimited } } else if commonCfg.CSRMaxConcurrent > 0 { - s.csrConcurrencyLimiter.SetSize(int64(commonCfg.CSRMaxConcurrent)) + s.srv.caLeafLimiter.csrConcurrencyLimiter.SetSize(int64(commonCfg.CSRMaxConcurrent)) ctx, cancel := context.WithTimeout(context.Background(), csrLimitWait) defer cancel() - if err := s.csrConcurrencyLimiter.Acquire(ctx); err != nil { + if err := s.srv.caLeafLimiter.csrConcurrencyLimiter.Acquire(ctx); err != nil { return ErrRateLimited } - defer s.csrConcurrencyLimiter.Release() + defer s.srv.caLeafLimiter.csrConcurrencyLimiter.Release() } // All seems to be in order, actually sign it. diff --git a/agent/consul/server.go b/agent/consul/server.go index 3fd83de20..a79783a72 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -153,6 +153,9 @@ type Server struct { caProviderRoot *structs.CARoot caProviderLock sync.RWMutex + // rate limiter to use when signing leaf certificates + caLeafLimiter connectSignRateLimiter + // Consul configuration config *Config diff --git a/agent/consul/server_connect.go b/agent/consul/server_connect.go new file mode 100644 index 000000000..2dc2bed2d --- /dev/null +++ b/agent/consul/server_connect.go @@ -0,0 +1,61 @@ +package consul + +import ( + "sync" + + "github.com/hashicorp/consul/lib/semaphore" + "golang.org/x/time/rate" +) + +type connectSignRateLimiter struct { + // csrRateLimiter limits the rate of signing new certs if configured. Lazily + // initialized from current config to support dynamic changes. + // csrRateLimiterMu must be held while dereferencing the pointer or storing a + // new one, but methods can be called on the limiter object outside of the + // locked section. This is done only in the getCSRRateLimiterWithLimit method. + csrRateLimiter *rate.Limiter + csrRateLimiterMu sync.RWMutex + + // csrConcurrencyLimiter is a dynamically resizable semaphore used to limit + // Sign RPC concurrency if configured. The zero value is usable as soon as + // SetSize is called which we do dynamically in the RPC handler to avoid + // having to hook elaborate synchronization mechanisms through the CA config + // endpoint and config reload etc. + csrConcurrencyLimiter semaphore.Dynamic +} + +// getCSRRateLimiterWithLimit returns a rate.Limiter with the desired limit set. +// It uses the shared server-wide limiter unless the limit has been changed in +// config or the limiter has not been setup yet in which case it just-in-time +// configures the new limiter. We assume that limit changes are relatively rare +// and that all callers (there is currently only one) use the same config value +// as the limit. There might be some flapping if there are multiple concurrent +// requests in flight at the time the config changes where A sees the new value +// and updates, B sees the old but then gets this lock second and changes back. +// Eventually though and very soon (once all current RPCs are complete) we are +// guaranteed to have the correct limit set by the next RPC that comes in so I +// assume this is fine. If we observe strange behavior because of it, we could +// add hysteresis that prevents changes too soon after a previous change but +// that seems unnecessary for now. +func (l *connectSignRateLimiter) getCSRRateLimiterWithLimit(limit rate.Limit) *rate.Limiter { + l.csrRateLimiterMu.RLock() + lim := l.csrRateLimiter + l.csrRateLimiterMu.RUnlock() + + // If there is a current limiter with the same limit, return it. This should + // be the common case. + if lim != nil && lim.Limit() == limit { + return lim + } + + // Need to change limiter, get write lock + l.csrRateLimiterMu.Lock() + defer l.csrRateLimiterMu.Unlock() + // No limiter yet, or limit changed in CA config, reconfigure a new limiter. + // We use burst of 1 for a hard limit. Note that either bursting or waiting is + // necessary to get expected behavior in fact of random arrival times, but we + // don't need both and we use Wait with a small delay to smooth noise. See + // https://github.com/banks/sim-rate-limit-backoff/blob/master/README.md. + l.csrRateLimiter = rate.NewLimiter(limit, 1) + return l.csrRateLimiter +}