package leafcert import ( "context" "errors" "fmt" "sync" "time" "github.com/armon/go-metrics" "github.com/hashicorp/go-hclog" "golang.org/x/sync/singleflight" "golang.org/x/time/rate" "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/lib/ttlcache" ) const ( DefaultLastGetTTL = 72 * time.Hour // reasonable default is days // DefaultLeafCertRefreshRate is the default rate at which certs can be refreshed. // This defaults to not being limited DefaultLeafCertRefreshRate = rate.Inf // DefaultLeafCertRefreshMaxBurst is the number of cache entry fetches that can // occur in a burst. DefaultLeafCertRefreshMaxBurst = 2 DefaultLeafCertRefreshBackoffMin = 3 // 3 attempts before backing off DefaultLeafCertRefreshMaxWait = 1 * time.Minute // maximum backoff wait time DefaultQueryTimeout = 10 * time.Minute ) type Config struct { // LastGetTTL is the time that the certs returned by this type remain in // the cache after the last get operation. If a cert isn't accessed within // this duration, the certs is purged and background refreshing will cease. LastGetTTL time.Duration // LeafCertRefreshMaxBurst max burst size of RateLimit for a single cache entry LeafCertRefreshMaxBurst int // LeafCertRefreshRate represents the max calls/sec for a single cache entry LeafCertRefreshRate rate.Limit // LeafCertRefreshBackoffMin is the number of attempts to wait before // backing off. // // Mostly configurable just for testing. LeafCertRefreshBackoffMin uint // LeafCertRefreshMaxWait is the maximum backoff wait time. // // Mostly configurable just for testing. LeafCertRefreshMaxWait time.Duration // TestOverrideCAChangeInitialDelay allows overriding the random jitter // after a root change with a fixed delay. So far ths is only done in // tests. If it's zero the caChangeInitialSpreadDefault maximum jitter will // be used but if set, it overrides and provides a fixed delay. To // essentially disable the delay in tests they can set it to 1 nanosecond. // We may separately allow configuring the jitter limit by users later but // this is different and for tests only since we need to set a // deterministic time delay in order to test the behavior here fully and // determinstically. TestOverrideCAChangeInitialDelay time.Duration } func (c Config) withDefaults() Config { if c.LastGetTTL <= 0 { c.LastGetTTL = DefaultLastGetTTL } if c.LeafCertRefreshRate == 0.0 { c.LeafCertRefreshRate = DefaultLeafCertRefreshRate } if c.LeafCertRefreshMaxBurst == 0 { c.LeafCertRefreshMaxBurst = DefaultLeafCertRefreshMaxBurst } if c.LeafCertRefreshBackoffMin == 0 { c.LeafCertRefreshBackoffMin = DefaultLeafCertRefreshBackoffMin } if c.LeafCertRefreshMaxWait == 0 { c.LeafCertRefreshMaxWait = DefaultLeafCertRefreshMaxWait } return c } type Deps struct { Config Config Logger hclog.Logger // RootsReader is an interface to access connect CA roots. RootsReader RootsReader // CertSigner is an interface to remotely sign certificates. CertSigner CertSigner } type RootsReader interface { Get() (*structs.IndexedCARoots, error) Notify(ctx context.Context, correlationID string, ch chan<- cache.UpdateEvent) error } type CertSigner interface { SignCert(ctx context.Context, args *structs.CASignRequest) (*structs.IssuedCert, error) } func NewManager(deps Deps) *Manager { deps.Config = deps.Config.withDefaults() if deps.Logger == nil { deps.Logger = hclog.NewNullLogger() } if deps.RootsReader == nil { panic("RootsReader is required") } if deps.CertSigner == nil { panic("CertSigner is required") } m := &Manager{ config: deps.Config, logger: deps.Logger, certSigner: deps.CertSigner, rootsReader: deps.RootsReader, // certs: make(map[string]*certData), certsExpiryHeap: ttlcache.NewExpiryHeap(), } m.ctx, m.ctxCancel = context.WithCancel(context.Background()) m.rootWatcher = &rootWatcher{ ctx: m.ctx, rootsReader: m.rootsReader, } // Start the expiry watcher go m.runExpiryLoop() return m } type Manager struct { logger hclog.Logger // config contains agent configuration necessary for the cert manager to operate. config Config // rootsReader is an interface to access connect CA roots. rootsReader RootsReader // certSigner is an interface to remotely sign certificates. certSigner CertSigner // rootWatcher helps let multiple requests for leaf certs to coordinate // sharing a single long-lived watch for the root certs. This allows the // leaf cert requests to notice when the roots rotate and trigger their // reissuance. rootWatcher *rootWatcher // This is the "top-level" internal context. This is used to cancel // background operations. ctx context.Context ctxCancel context.CancelFunc // lock guards access to certs and certsExpiryHeap lock sync.RWMutex certs map[string]*certData certsExpiryHeap *ttlcache.ExpiryHeap // certGroup is a singleflight group keyed identically to the certs map. // When the leaf cert itself needs replacement requests will coalesce // together through this chokepoint. certGroup singleflight.Group } func (m *Manager) getCertData(key string) *certData { m.lock.RLock() cd, ok := m.certs[key] m.lock.RUnlock() if ok { return cd } m.lock.Lock() defer m.lock.Unlock() cd, ok = m.certs[key] if !ok { cd = &certData{ expiry: m.certsExpiryHeap.Add(key, m.config.LastGetTTL), refreshRateLimiter: rate.NewLimiter( m.config.LeafCertRefreshRate, m.config.LeafCertRefreshMaxBurst, ), } m.certs[key] = cd metrics.SetGauge([]string{"leaf-certs", "entries_count"}, float32(len(m.certs))) } return cd } // Stop stops any background work and frees all resources for the manager. // Current fetch requests are allowed to continue to completion and callers may // still access the current leaf cert values so coordination isn't needed with // callers, however no background activity will continue. It's intended to // close the manager at agent shutdown so no further requests should be made, // however concurrent or in-flight ones won't break. func (m *Manager) Stop() { if m.ctxCancel != nil { m.ctxCancel() m.ctxCancel = nil } } // Get returns the leaf cert for the request. If data satisfying the // minimum index is present, it is returned immediately. Otherwise, // this will block until the cert is refreshed or the request timeout is // reached. // // Multiple Get calls for the same logical request will block on a single // network request. // // The timeout specified by the request will be the timeout on the cache // Get, and does not correspond to the timeout of any background data // fetching. If the timeout is reached before data satisfying the minimum // index is retrieved, the last known value (maybe nil) is returned. No // error is returned on timeout. This matches the behavior of Consul blocking // queries. func (m *Manager) Get(ctx context.Context, req *ConnectCALeafRequest) (*structs.IssuedCert, cache.ResultMeta, error) { // Lightweight copy this object so that manipulating req doesn't race. dup := *req req = &dup // We don't want non-blocking queries to return expired leaf certs // or leaf certs not valid under the current CA. So always revalidate // the leaf cert on non-blocking queries (ie when MinQueryIndex == 0) // // NOTE: This conditional was formerly only in the API endpoint. if req.MinQueryIndex == 0 { req.MustRevalidate = true } return m.internalGet(ctx, req) } func (m *Manager) internalGet(ctx context.Context, req *ConnectCALeafRequest) (*structs.IssuedCert, cache.ResultMeta, error) { key := req.Key() if key == "" { return nil, cache.ResultMeta{}, fmt.Errorf("a key is required") } if req.MaxQueryTime <= 0 { req.MaxQueryTime = DefaultQueryTimeout } timeoutTimer := time.NewTimer(req.MaxQueryTime) defer timeoutTimer.Stop() // First time through first := true for { // Get the current value cd := m.getCertData(key) cd.lock.Lock() var ( existing = cd.value existingIndex = cd.index refreshing = cd.refreshing fetchedAt = cd.fetchedAt lastFetchErr = cd.lastFetchErr expiry = cd.expiry ) cd.lock.Unlock() shouldReplaceCert := certNeedsUpdate(req, existingIndex, existing, refreshing) if expiry != nil { // The entry already exists in the TTL heap, touch it to keep it alive since // this Get is still interested in the value. Note that we used to only do // this in the `entryValid` block below but that means that a cache entry // will expire after it's TTL regardless of how many callers are waiting for // updates in this method in a couple of cases: // // 1. If the agent is disconnected from servers for the TTL then the client // will be in backoff getting errors on each call to Get and since an // errored cache entry has Valid = false it won't be touching the TTL. // // 2. If the value is just not changing then the client's current index // will be equal to the entry index and entryValid will be false. This // is a common case! // // But regardless of the state of the entry, assuming it's already in the // TTL heap, we should touch it every time around here since this caller at // least still cares about the value! m.lock.Lock() m.certsExpiryHeap.Update(expiry.Index(), m.config.LastGetTTL) m.lock.Unlock() } if !shouldReplaceCert { meta := cache.ResultMeta{ Index: existingIndex, } if first { meta.Hit = true } // For non-background refresh types, the age is just how long since we // fetched it last. if !fetchedAt.IsZero() { meta.Age = time.Since(fetchedAt) } // We purposely do not return an error here since the cache only works with // fetching values that either have a value or have an error, but not both. // The Error may be non-nil in the entry in the case that an error has // occurred _since_ the last good value, but we still want to return the // good value to clients that are not requesting a specific version. The // effect of this is that blocking clients will all see an error immediately // without waiting a whole timeout to see it, but clients that just look up // cache with an older index than the last valid result will still see the // result and not the error here. I.e. the error is not "cached" without a // new fetch attempt occurring, but the last good value can still be fetched // from cache. return existing, meta, nil } // If this isn't our first time through and our last value has an error, then // we return the error. This has the behavior that we don't sit in a retry // loop getting the same error for the entire duration of the timeout. // Instead, we make one effort to fetch a new value, and if there was an // error, we return. Note that the invariant is that if both entry.Value AND // entry.Error are non-nil, the error _must_ be more recent than the Value. In // other words valid fetches should reset the error. See // https://github.com/hashicorp/consul/issues/4480. if !first && lastFetchErr != nil { return existing, cache.ResultMeta{Index: existingIndex}, lastFetchErr } notifyCh := m.triggerCertRefreshInGroup(req, cd) // No longer our first time through first = false select { case <-ctx.Done(): return nil, cache.ResultMeta{}, ctx.Err() case <-notifyCh: // Our fetch returned, retry the get from the cache. req.MustRevalidate = false case <-timeoutTimer.C: // Timeout on the cache read, just return whatever we have. return existing, cache.ResultMeta{Index: existingIndex}, nil } } } func certNeedsUpdate(req *ConnectCALeafRequest, index uint64, value *structs.IssuedCert, refreshing bool) bool { if value == nil { return true } if req.MinQueryIndex > 0 && req.MinQueryIndex >= index { // MinIndex was given and matches or is higher than current value so we // ignore the cache and fallthrough to blocking on a new value. return true } // Check if re-validate is requested. If so the first time round the // loop is not a hit but subsequent ones should be treated normally. if req.MustRevalidate { // It is important to note that this block ONLY applies when we are not // in indefinite refresh mode (where the underlying goroutine will // continue to re-query for data). // // In this mode goroutines have a 1:1 relationship to RPCs that get // executed, and importantly they DO NOT SLEEP after executing. // // This means that a running goroutine for this cache entry extremely // strongly implies that the RPC has not yet completed, which is why // this check works for the revalidation-avoidance optimization here. if refreshing { // There is an active goroutine performing a blocking query for // this data, which has not returned. // // We can logically deduce that the contents of the cache are // actually current, and we can simply return this while leaving // the blocking query alone. return false } else { return true } } return false } func (m *Manager) triggerCertRefreshInGroup(req *ConnectCALeafRequest, cd *certData) <-chan singleflight.Result { // Lightweight copy this object so that manipulating req doesn't race. dup := *req req = &dup if req.MaxQueryTime == 0 { req.MaxQueryTime = DefaultQueryTimeout } // At this point, we know we either don't have a cert at all or the // cert we have is too old. We need to mint a new one. // // We use a singleflight group to coordinate only one request driving // the async update to the key at once. // // NOTE: this anonymous function only has one goroutine in it per key at all times return m.certGroup.DoChan(req.Key(), func() (any, error) { cd.lock.Lock() var ( shouldReplaceCert = certNeedsUpdate(req, cd.index, cd.value, cd.refreshing) rateLimiter = cd.refreshRateLimiter lastIndex = cd.index ) cd.lock.Unlock() if !shouldReplaceCert { // This handles the case where a fetch succeeded after checking for // its existence in Get. This ensures that we don't miss updates // since we don't hold the lock between the read and then the // refresh trigger. return nil, nil } if err := rateLimiter.Wait(m.ctx); err != nil { // NOTE: this can only happen when the entire cache is being // shutdown and isn't something that can happen normally. return nil, nil } cd.MarkRefreshing(true) defer cd.MarkRefreshing(false) req.MinQueryIndex = lastIndex // Start building the new entry by blocking on the fetch. m.refreshLeafAndUpdate(req, cd) return nil, nil }) } // testGet is a way for the test code to do a get but from the middle of the // logic stack, skipping some of the caching logic. func (m *Manager) testGet(req *ConnectCALeafRequest) (uint64, *structs.IssuedCert, error) { cd := m.getCertData(req.Key()) m.refreshLeafAndUpdate(req, cd) cd.lock.Lock() var ( index = cd.index cert = cd.value err = cd.lastFetchErr ) cd.lock.Unlock() if err != nil { return 0, nil, err } return index, cert, nil } // refreshLeafAndUpdate will try to refresh the leaf and persist the updated // data back to the in-memory store. // // NOTE: this function only has one goroutine in it per key at all times func (m *Manager) refreshLeafAndUpdate(req *ConnectCALeafRequest, cd *certData) { existing, state := cd.GetValueAndState() newCert, updatedState, err := m.attemptLeafRefresh(req, existing, state) cd.Update(newCert, updatedState, err) } // Prepopulate puts a cert in manually. This is useful when the correct initial // value is known and the cache shouldn't refetch the same thing on startup. It // is used to set AgentLeafCert when AutoEncrypt.TLS is turned on. The manager // itself cannot fetch that the first time because it requires a special // RPCType. Subsequent runs are fine though. func (m *Manager) Prepopulate( ctx context.Context, key string, index uint64, value *structs.IssuedCert, authorityKeyID string, ) error { if value == nil { return errors.New("value is required") } cd := m.getCertData(key) cd.lock.Lock() defer cd.lock.Unlock() cd.index = index cd.value = value cd.state = fetchState{ authorityKeyID: authorityKeyID, forceExpireAfter: time.Time{}, consecutiveRateLimitErrs: 0, activeRootRotationStart: time.Time{}, } return nil } // runExpiryLoop is a blocking function that watches the expiration // heap and invalidates cert entries that have expired. func (m *Manager) runExpiryLoop() { for { m.lock.RLock() timer := m.certsExpiryHeap.Next() m.lock.RUnlock() select { case <-m.ctx.Done(): timer.Stop() return case <-m.certsExpiryHeap.NotifyCh: timer.Stop() continue case <-timer.Wait(): m.lock.Lock() entry := timer.Entry // Entry expired! Remove it. delete(m.certs, entry.Key()) m.certsExpiryHeap.Remove(entry.Index()) // Set some metrics metrics.IncrCounter([]string{"leaf-certs", "evict_expired"}, 1) metrics.SetGauge([]string{"leaf-certs", "entries_count"}, float32(len(m.certs))) m.lock.Unlock() } } }