diff --git a/agent/consul/leader_connect_ca.go b/agent/consul/leader_connect_ca.go index 5c1b804ca..0f9ce63a0 100644 --- a/agent/consul/leader_connect_ca.go +++ b/agent/consul/leader_connect_ca.go @@ -2,8 +2,10 @@ package consul import ( "context" + "crypto/x509" "errors" "fmt" + "net/url" "reflect" "strings" "sync" @@ -11,6 +13,9 @@ import ( "github.com/hashicorp/go-hclog" uuid "github.com/hashicorp/go-uuid" + "golang.org/x/time/rate" + + "github.com/hashicorp/consul/lib/semaphore" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect/ca" @@ -175,8 +180,8 @@ func (c *CAManager) getPrimaryRoots() structs.IndexedCARoots { // when setting up the CA during establishLeadership. The state should be set to // non-ready before calling this. func (c *CAManager) initializeCAConfig() (*structs.CAConfiguration, error) { - state := c.delegate.State() - _, config, err := state.CAConfig(nil) + st := c.delegate.State() + _, config, err := st.CAConfig(nil) if err != nil { return nil, err } @@ -1313,3 +1318,195 @@ func (c *CAManager) configuredSecondaryCA() bool { defer c.stateLock.Unlock() return c.actingSecondaryCA } + +type connectSignRateLimiter struct { + // csrRateLimiter limits the rate of signing new certs if configured. Lazily + // initialized from current config to support dynamic changes. + // csrRateLimiterMu must be held while dereferencing the pointer or storing a + // new one, but methods can be called on the limiter object outside of the + // locked section. This is done only in the getCSRRateLimiterWithLimit method. + csrRateLimiter *rate.Limiter + csrRateLimiterMu sync.RWMutex + + // csrConcurrencyLimiter is a dynamically resizable semaphore used to limit + // Sign RPC concurrency if configured. The zero value is usable as soon as + // SetSize is called which we do dynamically in the RPC handler to avoid + // having to hook elaborate synchronization mechanisms through the CA config + // endpoint and config reload etc. + csrConcurrencyLimiter semaphore.Dynamic +} + +// getCSRRateLimiterWithLimit returns a rate.Limiter with the desired limit set. +// It uses the shared server-wide limiter unless the limit has been changed in +// config or the limiter has not been setup yet in which case it just-in-time +// configures the new limiter. We assume that limit changes are relatively rare +// and that all callers (there is currently only one) use the same config value +// as the limit. There might be some flapping if there are multiple concurrent +// requests in flight at the time the config changes where A sees the new value +// and updates, B sees the old but then gets this lock second and changes back. +// Eventually though and very soon (once all current RPCs are complete) we are +// guaranteed to have the correct limit set by the next RPC that comes in so I +// assume this is fine. If we observe strange behavior because of it, we could +// add hysteresis that prevents changes too soon after a previous change but +// that seems unnecessary for now. +func (l *connectSignRateLimiter) getCSRRateLimiterWithLimit(limit rate.Limit) *rate.Limiter { + l.csrRateLimiterMu.RLock() + lim := l.csrRateLimiter + l.csrRateLimiterMu.RUnlock() + + // If there is a current limiter with the same limit, return it. This should + // be the common case. + if lim != nil && lim.Limit() == limit { + return lim + } + + // Need to change limiter, get write lock + l.csrRateLimiterMu.Lock() + defer l.csrRateLimiterMu.Unlock() + // No limiter yet, or limit changed in CA config, reconfigure a new limiter. + // We use burst of 1 for a hard limit. Note that either bursting or waiting is + // necessary to get expected behavior in fact of random arrival times, but we + // don't need both and we use Wait with a small delay to smooth noise. See + // https://github.com/banks/sim-rate-limit-backoff/blob/master/README.md. + l.csrRateLimiter = rate.NewLimiter(limit, 1) + return l.csrRateLimiter +} + +func (c *CAManager) SignCertificate(csr *x509.CertificateRequest, spiffeID connect.CertURI) (*structs.IssuedCert, error) { + provider, caRoot := c.getCAProvider() + if provider == nil { + return nil, fmt.Errorf("CA is uninitialized and unable to sign certificates yet: provider is nil") + } else if caRoot == nil { + return nil, fmt.Errorf("CA is uninitialized and unable to sign certificates yet: no root certificate") + } + + // Verify that the CSR entity is in the cluster's trust domain + state := c.delegate.State() + _, config, err := state.CAConfig(nil) + if err != nil { + return nil, err + } + signingID := connect.SpiffeIDSigningForCluster(config) + serviceID, isService := spiffeID.(*connect.SpiffeIDService) + agentID, isAgent := spiffeID.(*connect.SpiffeIDAgent) + if !isService && !isAgent { + return nil, fmt.Errorf("SPIFFE ID in CSR must be a service or agent ID") + } + + var entMeta structs.EnterpriseMeta + if isService { + if !signingID.CanSign(spiffeID) { + return nil, fmt.Errorf("SPIFFE ID in CSR from a different trust domain: %s, "+ + "we are %s", serviceID.Host, signingID.Host()) + } + entMeta.Merge(serviceID.GetEnterpriseMeta()) + } else { + // isAgent - if we support more ID types then this would need to be an else if + // here we are just automatically fixing the trust domain. For auto-encrypt and + // auto-config they make certificate requests before learning about the roots + // so they will have a dummy trust domain in the CSR. + trustDomain := signingID.Host() + if agentID.Host != trustDomain { + originalURI := agentID.URI() + + agentID.Host = trustDomain + + // recreate the URIs list + uris := make([]*url.URL, len(csr.URIs)) + for i, uri := range csr.URIs { + if originalURI.String() == uri.String() { + uris[i] = agentID.URI() + } else { + uris[i] = uri + } + } + + csr.URIs = uris + } + entMeta.Merge(structs.DefaultEnterpriseMeta()) + } + + commonCfg, err := config.GetCommonConfig() + if err != nil { + return nil, err + } + if commonCfg.CSRMaxPerSecond > 0 { + lim := c.caLeafLimiter.getCSRRateLimiterWithLimit(rate.Limit(commonCfg.CSRMaxPerSecond)) + // Wait up to the small threshold we allow for a token. + ctx, cancel := context.WithTimeout(context.Background(), csrLimitWait) + defer cancel() + if lim.Wait(ctx) != nil { + return nil, ErrRateLimited + } + } else if commonCfg.CSRMaxConcurrent > 0 { + c.caLeafLimiter.csrConcurrencyLimiter.SetSize(int64(commonCfg.CSRMaxConcurrent)) + ctx, cancel := context.WithTimeout(context.Background(), csrLimitWait) + defer cancel() + if err := c.caLeafLimiter.csrConcurrencyLimiter.Acquire(ctx); err != nil { + return nil, ErrRateLimited + } + defer c.caLeafLimiter.csrConcurrencyLimiter.Release() + } + + connect.HackSANExtensionForCSR(csr) + + // All seems to be in order, actually sign it. + + pem, err := provider.Sign(csr) + if err == ca.ErrRateLimited { + return nil, ErrRateLimited + } + if err != nil { + return nil, err + } + + // Append any intermediates needed by this root. + for _, p := range caRoot.IntermediateCerts { + pem = pem + ca.EnsureTrailingNewline(p) + } + + // Append our local CA's intermediate if there is one. + inter, err := provider.ActiveIntermediate() + if err != nil { + return nil, err + } + root, err := provider.ActiveRoot() + if err != nil { + return nil, err + } + if inter != root { + pem = pem + ca.EnsureTrailingNewline(inter) + } + + modIdx, err := c.delegate.ApplyCALeafRequest() + if err != nil { + return nil, err + } + + cert, err := connect.ParseCert(pem) + if err != nil { + return nil, err + } + + // Set the response + reply := structs.IssuedCert{ + SerialNumber: connect.EncodeSerialNumber(cert.SerialNumber), + CertPEM: pem, + ValidAfter: cert.NotBefore, + ValidBefore: cert.NotAfter, + EnterpriseMeta: entMeta, + RaftIndex: structs.RaftIndex{ + ModifyIndex: modIdx, + CreateIndex: modIdx, + }, + } + if isService { + reply.Service = serviceID.Service + reply.ServiceURI = cert.URIs[0].String() + } else if isAgent { + reply.Agent = agentID.Agent + reply.AgentURI = cert.URIs[0].String() + } + + return &reply, nil +} diff --git a/agent/consul/server_connect.go b/agent/consul/server_connect.go index 949f6797e..d7ff709b5 100644 --- a/agent/consul/server_connect.go +++ b/agent/consul/server_connect.go @@ -1,76 +1,16 @@ package consul import ( - "context" - "crypto/x509" "fmt" - "net/url" - "sync" - memdb "github.com/hashicorp/go-memdb" - "golang.org/x/time/rate" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect/ca" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/consul/lib/semaphore" ) -// TODO: move to leader_connect_ca.go -type connectSignRateLimiter struct { - // csrRateLimiter limits the rate of signing new certs if configured. Lazily - // initialized from current config to support dynamic changes. - // csrRateLimiterMu must be held while dereferencing the pointer or storing a - // new one, but methods can be called on the limiter object outside of the - // locked section. This is done only in the getCSRRateLimiterWithLimit method. - csrRateLimiter *rate.Limiter - csrRateLimiterMu sync.RWMutex - - // csrConcurrencyLimiter is a dynamically resizable semaphore used to limit - // Sign RPC concurrency if configured. The zero value is usable as soon as - // SetSize is called which we do dynamically in the RPC handler to avoid - // having to hook elaborate synchronization mechanisms through the CA config - // endpoint and config reload etc. - csrConcurrencyLimiter semaphore.Dynamic -} - -// getCSRRateLimiterWithLimit returns a rate.Limiter with the desired limit set. -// It uses the shared server-wide limiter unless the limit has been changed in -// config or the limiter has not been setup yet in which case it just-in-time -// configures the new limiter. We assume that limit changes are relatively rare -// and that all callers (there is currently only one) use the same config value -// as the limit. There might be some flapping if there are multiple concurrent -// requests in flight at the time the config changes where A sees the new value -// and updates, B sees the old but then gets this lock second and changes back. -// Eventually though and very soon (once all current RPCs are complete) we are -// guaranteed to have the correct limit set by the next RPC that comes in so I -// assume this is fine. If we observe strange behavior because of it, we could -// add hysteresis that prevents changes too soon after a previous change but -// that seems unnecessary for now. -func (l *connectSignRateLimiter) getCSRRateLimiterWithLimit(limit rate.Limit) *rate.Limiter { - l.csrRateLimiterMu.RLock() - lim := l.csrRateLimiter - l.csrRateLimiterMu.RUnlock() - - // If there is a current limiter with the same limit, return it. This should - // be the common case. - if lim != nil && lim.Limit() == limit { - return lim - } - - // Need to change limiter, get write lock - l.csrRateLimiterMu.Lock() - defer l.csrRateLimiterMu.Unlock() - // No limiter yet, or limit changed in CA config, reconfigure a new limiter. - // We use burst of 1 for a hard limit. Note that either bursting or waiting is - // necessary to get expected behavior in fact of random arrival times, but we - // don't need both and we use Wait with a small delay to smooth noise. See - // https://github.com/banks/sim-rate-limit-backoff/blob/master/README.md. - l.csrRateLimiter = rate.NewLimiter(limit, 1) - return l.csrRateLimiter -} - // GetCARoots will retrieve CARoots // TODO: move to autoConfigBackend func (s *Server) GetCARoots() (*structs.IndexedCARoots, error) { @@ -139,143 +79,3 @@ func (s *Server) getCARoots(ws memdb.WatchSet, state *state.Store) (*structs.Ind return indexedRoots, nil } - -// TODO: Move this to leader_connect_ca.go -func (c *CAManager) SignCertificate(csr *x509.CertificateRequest, spiffeID connect.CertURI) (*structs.IssuedCert, error) { - provider, caRoot := c.getCAProvider() - if provider == nil { - return nil, fmt.Errorf("CA is uninitialized and unable to sign certificates yet: provider is nil") - } else if caRoot == nil { - return nil, fmt.Errorf("CA is uninitialized and unable to sign certificates yet: no root certificate") - } - - // Verify that the CSR entity is in the cluster's trust domain - state := c.delegate.State() - _, config, err := state.CAConfig(nil) - if err != nil { - return nil, err - } - signingID := connect.SpiffeIDSigningForCluster(config) - serviceID, isService := spiffeID.(*connect.SpiffeIDService) - agentID, isAgent := spiffeID.(*connect.SpiffeIDAgent) - if !isService && !isAgent { - return nil, fmt.Errorf("SPIFFE ID in CSR must be a service or agent ID") - } - - var entMeta structs.EnterpriseMeta - if isService { - if !signingID.CanSign(spiffeID) { - return nil, fmt.Errorf("SPIFFE ID in CSR from a different trust domain: %s, "+ - "we are %s", serviceID.Host, signingID.Host()) - } - entMeta.Merge(serviceID.GetEnterpriseMeta()) - } else { - // isAgent - if we support more ID types then this would need to be an else if - // here we are just automatically fixing the trust domain. For auto-encrypt and - // auto-config they make certificate requests before learning about the roots - // so they will have a dummy trust domain in the CSR. - trustDomain := signingID.Host() - if agentID.Host != trustDomain { - originalURI := agentID.URI() - - agentID.Host = trustDomain - - // recreate the URIs list - uris := make([]*url.URL, len(csr.URIs)) - for i, uri := range csr.URIs { - if originalURI.String() == uri.String() { - uris[i] = agentID.URI() - } else { - uris[i] = uri - } - } - - csr.URIs = uris - } - entMeta.Merge(structs.DefaultEnterpriseMeta()) - } - - commonCfg, err := config.GetCommonConfig() - if err != nil { - return nil, err - } - if commonCfg.CSRMaxPerSecond > 0 { - lim := c.caLeafLimiter.getCSRRateLimiterWithLimit(rate.Limit(commonCfg.CSRMaxPerSecond)) - // Wait up to the small threshold we allow for a token. - ctx, cancel := context.WithTimeout(context.Background(), csrLimitWait) - defer cancel() - if lim.Wait(ctx) != nil { - return nil, ErrRateLimited - } - } else if commonCfg.CSRMaxConcurrent > 0 { - c.caLeafLimiter.csrConcurrencyLimiter.SetSize(int64(commonCfg.CSRMaxConcurrent)) - ctx, cancel := context.WithTimeout(context.Background(), csrLimitWait) - defer cancel() - if err := c.caLeafLimiter.csrConcurrencyLimiter.Acquire(ctx); err != nil { - return nil, ErrRateLimited - } - defer c.caLeafLimiter.csrConcurrencyLimiter.Release() - } - - connect.HackSANExtensionForCSR(csr) - - // All seems to be in order, actually sign it. - - pem, err := provider.Sign(csr) - if err == ca.ErrRateLimited { - return nil, ErrRateLimited - } - if err != nil { - return nil, err - } - - // Append any intermediates needed by this root. - for _, p := range caRoot.IntermediateCerts { - pem = pem + ca.EnsureTrailingNewline(p) - } - - // Append our local CA's intermediate if there is one. - inter, err := provider.ActiveIntermediate() - if err != nil { - return nil, err - } - root, err := provider.ActiveRoot() - if err != nil { - return nil, err - } - if inter != root { - pem = pem + ca.EnsureTrailingNewline(inter) - } - - modIdx, err := c.delegate.ApplyCALeafRequest() - if err != nil { - return nil, err - } - - cert, err := connect.ParseCert(pem) - if err != nil { - return nil, err - } - - // Set the response - reply := structs.IssuedCert{ - SerialNumber: connect.EncodeSerialNumber(cert.SerialNumber), - CertPEM: pem, - ValidAfter: cert.NotBefore, - ValidBefore: cert.NotAfter, - EnterpriseMeta: entMeta, - RaftIndex: structs.RaftIndex{ - ModifyIndex: modIdx, - CreateIndex: modIdx, - }, - } - if isService { - reply.Service = serviceID.Service - reply.ServiceURI = cert.URIs[0].String() - } else if isAgent { - reply.Agent = agentID.Agent - reply.AgentURI = cert.URIs[0].String() - } - - return &reply, nil -}