package consul import ( "bytes" "context" "fmt" "reflect" "strings" "time" "golang.org/x/time/rate" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect/ca" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/logging" uuid "github.com/hashicorp/go-uuid" ) const ( // loopRateLimit is the maximum rate per second at which we can rerun CA and intention // replication watches. loopRateLimit rate.Limit = 0.2 // retryBucketSize is the maximum number of stored rate limit attempts for looped // blocking query operations. retryBucketSize = 5 // maxIntentionTxnSize is the maximum size (in bytes) of a transaction used during // Intention replication. maxIntentionTxnSize = raftWarnSize / 4 ) var ( // maxRetryBackoff is the maximum number of seconds to wait between failed blocking // queries when backing off. maxRetryBackoff = 256 ) // initializeCAConfig is used to initialize the CA config if necessary // when setting up the CA during establishLeadership func (s *Server) initializeCAConfig() (*structs.CAConfiguration, error) { state := s.fsm.State() _, config, err := state.CAConfig(nil) if err != nil { return nil, err } if config == nil { config = s.config.CAConfig if config.ClusterID == "" { id, err := uuid.GenerateUUID() if err != nil { return nil, err } config.ClusterID = id } } else if _, ok := config.Config["IntermediateCertTTL"]; !ok { dup := *config copied := make(map[string]interface{}) for k, v := range dup.Config { copied[k] = v } copied["IntermediateCertTTL"] = connect.DefaultIntermediateCertTTL.String() dup.Config = copied config = &dup } else { return config, nil } req := structs.CARequest{ Op: structs.CAOpSetConfig, Config: config, } if resp, err := s.raftApply(structs.ConnectCARequestType, req); err != nil { return nil, err } else if respErr, ok := resp.(error); ok { return nil, respErr } return config, nil } // parseCARoot returns a filled-in structs.CARoot from a raw PEM value. func parseCARoot(pemValue, provider, clusterID string) (*structs.CARoot, error) { id, err := connect.CalculateCertFingerprint(pemValue) if err != nil { return nil, fmt.Errorf("error parsing root fingerprint: %v", err) } rootCert, err := connect.ParseCert(pemValue) if err != nil { return nil, fmt.Errorf("error parsing root cert: %v", err) } keyType, keyBits, err := connect.KeyInfoFromCert(rootCert) if err != nil { return nil, fmt.Errorf("error extracting root key info: %v", err) } return &structs.CARoot{ ID: id, Name: fmt.Sprintf("%s CA Root Cert", strings.Title(provider)), SerialNumber: rootCert.SerialNumber.Uint64(), SigningKeyID: connect.EncodeSigningKeyID(rootCert.SubjectKeyId), ExternalTrustDomain: clusterID, NotBefore: rootCert.NotBefore, NotAfter: rootCert.NotAfter, RootCert: pemValue, PrivateKeyType: keyType, PrivateKeyBits: keyBits, Active: true, }, nil } // createProvider returns a connect CA provider from the given config. func (s *Server) createCAProvider(conf *structs.CAConfiguration) (ca.Provider, error) { var p ca.Provider switch conf.Provider { case structs.ConsulCAProvider: p = &ca.ConsulProvider{Delegate: &consulCADelegate{s}} case structs.VaultCAProvider: p = ca.NewVaultProvider() case structs.AWSCAProvider: p = &ca.AWSProvider{} default: return nil, fmt.Errorf("unknown CA provider %q", conf.Provider) } // If the provider implements NeedsLogger, we give it our logger. if needsLogger, ok := p.(ca.NeedsLogger); ok { needsLogger.SetLogger(s.logger) } return p, nil } // getCAProvider is being called while holding caProviderReconfigurationLock // which means it must never take that lock itself or call anything that does. func (s *Server) getCAProvider() (ca.Provider, *structs.CARoot) { retries := 0 var result ca.Provider var resultRoot *structs.CARoot for result == nil { s.caProviderLock.RLock() result = s.caProvider resultRoot = s.caProviderRoot s.caProviderLock.RUnlock() // In cases where an agent is started with managed proxies, we may ask // for the provider before establishLeadership completes. If we're the // leader, then wait and get the provider again if result == nil && s.IsLeader() && retries < 10 { retries++ time.Sleep(50 * time.Millisecond) continue } break } return result, resultRoot } // setCAProvider is being called while holding caProviderReconfigurationLock // which means it must never take that lock itself or call anything that does. func (s *Server) setCAProvider(newProvider ca.Provider, root *structs.CARoot) { s.caProviderLock.Lock() defer s.caProviderLock.Unlock() s.caProvider = newProvider s.caProviderRoot = root } // initializeCA sets up the CA provider when gaining leadership, either bootstrapping // the CA if this is the primary DC or making a remote RPC for intermediate signing // if this is a secondary DC. func (s *Server) initializeCA() error { connectLogger := s.loggers.Named(logging.Connect) // Bail if connect isn't enabled. if !s.config.ConnectEnabled { return nil } // Initialize the provider based on the current config. conf, err := s.initializeCAConfig() if err != nil { return err } provider, err := s.createCAProvider(conf) if err != nil { return err } s.caProviderReconfigurationLock.Lock() defer s.caProviderReconfigurationLock.Unlock() s.setCAProvider(provider, nil) // If this isn't the primary DC, run the secondary DC routine if the primary has already been upgraded to at least 1.6.0 if s.config.PrimaryDatacenter != s.config.Datacenter { versionOk, foundPrimary := ServersInDCMeetMinimumVersion(s, s.config.PrimaryDatacenter, minMultiDCConnectVersion) if !foundPrimary { connectLogger.Warn("primary datacenter is configured but unreachable - deferring initialization of the secondary datacenter CA") // return nil because we will initialize the secondary CA later return nil } else if !versionOk { // return nil because we will initialize the secondary CA later connectLogger.Warn("servers in the primary datacenter are not at least at the minimum version - deferring initialization of the secondary datacenter CA", "min_version", minMultiDCConnectVersion.String(), ) return nil } // Get the root CA to see if we need to refresh our intermediate. args := structs.DCSpecificRequest{ Datacenter: s.config.PrimaryDatacenter, } var roots structs.IndexedCARoots if err := s.forwardDC("ConnectCA.Roots", s.config.PrimaryDatacenter, &args, &roots); err != nil { return err } // Configure the CA provider and initialize the intermediate certificate if necessary. if err := s.initializeSecondaryProvider(provider, roots); err != nil { return fmt.Errorf("error configuring provider: %v", err) } if err := s.initializeSecondaryCA(provider, roots); err != nil { return err } connectLogger.Info("initialized secondary datacenter CA with provider", "provider", conf.Provider) return nil } return s.initializeRootCA(provider, conf) } // initializeRootCA runs the initialization logic for a root CA. // It is being called while holding caProviderReconfigurationLock // which means it must never take that lock itself or call anything that does. func (s *Server) initializeRootCA(provider ca.Provider, conf *structs.CAConfiguration) error { connectLogger := s.loggers.Named(logging.Connect) pCfg := ca.ProviderConfig{ ClusterID: conf.ClusterID, Datacenter: s.config.Datacenter, IsPrimary: true, RawConfig: conf.Config, State: conf.State, } if err := provider.Configure(pCfg); err != nil { return fmt.Errorf("error configuring provider: %v", err) } if err := provider.GenerateRoot(); err != nil { return fmt.Errorf("error generating CA root certificate: %v", err) } // Get the active root cert from the CA rootPEM, err := provider.ActiveRoot() if err != nil { return fmt.Errorf("error getting root cert: %v", err) } rootCA, err := parseCARoot(rootPEM, conf.Provider, conf.ClusterID) if err != nil { return err } // Also create the intermediate CA, which is the one that actually signs leaf certs interPEM, err := provider.GenerateIntermediate() if err != nil { return fmt.Errorf("error generating intermediate cert: %v", err) } _, err = connect.ParseCert(interPEM) if err != nil { return fmt.Errorf("error getting intermediate cert: %v", err) } // If the provider has state to persist and it's changed or new then update // CAConfig. pState, err := provider.State() if err != nil { return fmt.Errorf("error getting provider state: %v", err) } if !reflect.DeepEqual(conf.State, pState) { // Update the CAConfig in raft to persist the provider state conf.State = pState req := structs.CARequest{ Op: structs.CAOpSetConfig, Config: conf, } if _, err = s.raftApply(structs.ConnectCARequestType, req); err != nil { return fmt.Errorf("error persisting provider state: %v", err) } } // Check if the CA root is already initialized and exit if it is, // adding on any existing intermediate certs since they aren't directly // tied to the provider. // Every change to the CA after this initial bootstrapping should // be done through the rotation process. state := s.fsm.State() _, activeRoot, err := state.CARootActive(nil) if err != nil { return err } if activeRoot != nil { // This state shouldn't be possible to get into because we update the root and // CA config in the same FSM operation. if activeRoot.ID != rootCA.ID { return fmt.Errorf("stored CA root %q is not the active root (%s)", rootCA.ID, activeRoot.ID) } rootCA.IntermediateCerts = activeRoot.IntermediateCerts s.setCAProvider(provider, rootCA) return nil } // Get the highest index idx, _, err := state.CARoots(nil) if err != nil { return err } // Store the root cert in raft resp, err := s.raftApply(structs.ConnectCARequestType, &structs.CARequest{ Op: structs.CAOpSetRoots, Index: idx, Roots: []*structs.CARoot{rootCA}, }) if err != nil { connectLogger.Error("Raft apply failed", "error", err) return err } if respErr, ok := resp.(error); ok { return respErr } s.setCAProvider(provider, rootCA) connectLogger.Info("initialized primary datacenter CA with provider", "provider", conf.Provider) return nil } // initializeSecondaryCA runs the routine for generating an intermediate CA CSR and getting // it signed by the primary DC if the root CA of the primary DC has changed since the last // intermediate. // It is being called while holding caProviderReconfigurationLock // which means it must never take that lock itself or call anything that does. func (s *Server) initializeSecondaryCA(provider ca.Provider, primaryRoots structs.IndexedCARoots) error { activeIntermediate, err := provider.ActiveIntermediate() if err != nil { return err } var ( storedRootID string expectedSigningKeyID string currentSigningKeyID string activeSecondaryRoot *structs.CARoot ) if activeIntermediate != "" { // In the event that we already have an intermediate, we must have // already replicated some primary root information locally, so check // to see if we're up to date by fetching the rootID and the // signingKeyID used in the secondary. // // Note that for the same rootID the primary representation of the root // will have a different SigningKeyID field than the secondary // representation of the same root. This is because it's derived from // the intermediate which is different in all datacenters. storedRoot, err := provider.ActiveRoot() if err != nil { return err } storedRootID, err = connect.CalculateCertFingerprint(storedRoot) if err != nil { return fmt.Errorf("error parsing root fingerprint: %v, %#v", err, storedRoot) } intermediateCert, err := connect.ParseCert(activeIntermediate) if err != nil { return fmt.Errorf("error parsing active intermediate cert: %v", err) } expectedSigningKeyID = connect.EncodeSigningKeyID(intermediateCert.SubjectKeyId) // This will fetch the secondary's exact current representation of the // active root. Note that this data should only be used if the IDs // match, otherwise it's out of date and should be regenerated. _, activeSecondaryRoot, err = s.fsm.State().CARootActive(nil) if err != nil { return err } if activeSecondaryRoot != nil { currentSigningKeyID = activeSecondaryRoot.SigningKeyID } } // Determine which of the provided PRIMARY representations of roots is the // active one. We'll use this as a template to generate any new root // representations meant for this secondary. var newActiveRoot *structs.CARoot for _, root := range primaryRoots.Roots { if root.ID == primaryRoots.ActiveRootID && root.Active { newActiveRoot = root break } } if newActiveRoot == nil { return fmt.Errorf("primary datacenter does not have an active root CA for Connect") } // Get a signed intermediate from the primary DC if the provider // hasn't been initialized yet or if the primary's root has changed. needsNewIntermediate := false if activeIntermediate == "" || storedRootID != primaryRoots.ActiveRootID { needsNewIntermediate = true } // Also we take this opportunity to correct an incorrectly persisted SigningKeyID // in secondary datacenters (see PR-6513). if expectedSigningKeyID != "" && currentSigningKeyID != expectedSigningKeyID { needsNewIntermediate = true } newIntermediate := false if needsNewIntermediate { if err := s.getIntermediateCASigned(provider, newActiveRoot); err != nil { return err } newIntermediate = true } else { // Discard the primary's representation since our local one is // sufficiently up to date. newActiveRoot = activeSecondaryRoot } // Update the roots list in the state store if there's a new active root. state := s.fsm.State() _, activeRoot, err := state.CARootActive(nil) if err != nil { return err } if activeRoot == nil || activeRoot.ID != newActiveRoot.ID || newIntermediate { if err := s.persistNewRoot(provider, newActiveRoot); err != nil { return err } } s.setCAProvider(provider, newActiveRoot) return nil } // persistNewRoot is being called while holding caProviderReconfigurationLock // which means it must never take that lock itself or call anything that does. func (s *Server) persistNewRoot(provider ca.Provider, newActiveRoot *structs.CARoot) error { connectLogger := s.loggers.Named(logging.Connect) state := s.fsm.State() idx, oldRoots, err := state.CARoots(nil) if err != nil { return err } _, config, err := state.CAConfig(nil) if err != nil { return err } if config == nil { return fmt.Errorf("local CA not initialized yet") } newConf := *config newConf.ClusterID = newActiveRoot.ExternalTrustDomain // Persist any state the provider needs us to newConf.State, err = provider.State() if err != nil { return fmt.Errorf("error getting provider state: %v", err) } // Copy the root list and append the new active root, updating the old root // with the time it was rotated out. var newRoots structs.CARoots for _, r := range oldRoots { newRoot := *r if newRoot.Active { newRoot.Active = false newRoot.RotatedOutAt = time.Now() } if newRoot.ExternalTrustDomain == "" { newRoot.ExternalTrustDomain = config.ClusterID } newRoots = append(newRoots, &newRoot) } newRoots = append(newRoots, newActiveRoot) args := &structs.CARequest{ Op: structs.CAOpSetRootsAndConfig, Index: idx, Roots: newRoots, Config: &newConf, } resp, err := s.raftApply(structs.ConnectCARequestType, &args) if err != nil { return err } if respErr, ok := resp.(error); ok { return respErr } if respOk, ok := resp.(bool); ok && !respOk { return fmt.Errorf("could not atomically update roots and config") } connectLogger.Info("updated root certificates from primary datacenter") return nil } // getIntermediateCAPrimary regenerates the intermediate cert in the primary datacenter. // This is only run for CAs that require an intermediary in the primary DC, such as Vault. // This function is being called while holding caProviderReconfigurationLock // which means it must never take that lock itself or call anything that does. func (s *Server) getIntermediateCAPrimary(provider ca.Provider, newActiveRoot *structs.CARoot) error { connectLogger := s.loggers.Named(logging.Connect) // Generate and sign an intermediate cert using the root CA. intermediatePEM, err := provider.GenerateIntermediate() if err != nil { return fmt.Errorf("error generating new intermediate cert: %v", err) } intermediateCert, err := connect.ParseCert(intermediatePEM) if err != nil { return fmt.Errorf("error parsing intermediate cert: %v", err) } // Append the new intermediate to our local active root entry. This is // where the root representations start to diverge. newActiveRoot.IntermediateCerts = append(newActiveRoot.IntermediateCerts, intermediatePEM) newActiveRoot.SigningKeyID = connect.EncodeSigningKeyID(intermediateCert.SubjectKeyId) connectLogger.Info("generated new intermediate certificate for primary datacenter") return nil } // getIntermediateCASigned is being called while holding caProviderReconfigurationLock // which means it must never take that lock itself or call anything that does. func (s *Server) getIntermediateCASigned(provider ca.Provider, newActiveRoot *structs.CARoot) error { connectLogger := s.loggers.Named(logging.Connect) csr, err := provider.GenerateIntermediateCSR() if err != nil { return err } var intermediatePEM string if err := s.forwardDC("ConnectCA.SignIntermediate", s.config.PrimaryDatacenter, s.generateCASignRequest(csr), &intermediatePEM); err != nil { // this is a failure in the primary and shouldn't be capable of erroring out our establishing leadership connectLogger.Warn("Primary datacenter refused to sign our intermediate CA certificate", "error", err) return nil } if err := provider.SetIntermediate(intermediatePEM, newActiveRoot.RootCert); err != nil { return fmt.Errorf("Failed to set the intermediate certificate with the CA provider: %v", err) } intermediateCert, err := connect.ParseCert(intermediatePEM) if err != nil { return fmt.Errorf("error parsing intermediate cert: %v", err) } // Append the new intermediate to our local active root entry. This is // where the root representations start to diverge. newActiveRoot.IntermediateCerts = append(newActiveRoot.IntermediateCerts, intermediatePEM) newActiveRoot.SigningKeyID = connect.EncodeSigningKeyID(intermediateCert.SubjectKeyId) connectLogger.Info("received new intermediate certificate from primary datacenter") return nil } func (s *Server) generateCASignRequest(csr string) *structs.CASignRequest { return &structs.CASignRequest{ Datacenter: s.config.PrimaryDatacenter, CSR: csr, WriteRequest: structs.WriteRequest{Token: s.tokens.ReplicationToken()}, } } // startConnectLeader starts multi-dc connect leader routines. func (s *Server) startConnectLeader() { // Start the Connect secondary DC actions if enabled. if s.config.ConnectEnabled && s.config.Datacenter != s.config.PrimaryDatacenter { s.leaderRoutineManager.Start(secondaryCARootWatchRoutineName, s.secondaryCARootWatch) s.leaderRoutineManager.Start(intentionReplicationRoutineName, s.replicateIntentions) s.startConnectLeaderEnterprise() } s.leaderRoutineManager.Start(intermediateCertRenewWatchRoutineName, s.intermediateCertRenewalWatch) s.leaderRoutineManager.Start(caRootPruningRoutineName, s.runCARootPruning) } // stopConnectLeader stops connect specific leader functions. func (s *Server) stopConnectLeader() { s.leaderRoutineManager.Stop(secondaryCARootWatchRoutineName) s.leaderRoutineManager.Stop(intentionReplicationRoutineName) s.leaderRoutineManager.Stop(caRootPruningRoutineName) s.stopConnectLeaderEnterprise() // If the provider implements NeedsStop, we call Stop to perform any shutdown actions. s.caProviderReconfigurationLock.Lock() defer s.caProviderReconfigurationLock.Unlock() provider, _ := s.getCAProvider() if provider != nil { if needsStop, ok := provider.(ca.NeedsStop); ok { needsStop.Stop() } } } func (s *Server) runCARootPruning(ctx context.Context) error { ticker := time.NewTicker(caRootPruneInterval) defer ticker.Stop() for { select { case <-ctx.Done(): return nil case <-ticker.C: if err := s.pruneCARoots(); err != nil { s.loggers.Named(logging.Connect).Error("error pruning CA roots", "error", err) } } } } // pruneCARoots looks for any CARoots that have been rotated out and expired. func (s *Server) pruneCARoots() error { if !s.config.ConnectEnabled { return nil } state := s.fsm.State() idx, roots, err := state.CARoots(nil) if err != nil { return err } _, caConf, err := state.CAConfig(nil) if err != nil { return err } common, err := caConf.GetCommonConfig() if err != nil { return err } var newRoots structs.CARoots for _, r := range roots { if !r.Active && !r.RotatedOutAt.IsZero() && time.Now().Sub(r.RotatedOutAt) > common.LeafCertTTL*2 { s.loggers.Named(logging.Connect).Info("pruning old unused root CA", "id", r.ID) continue } newRoot := *r newRoots = append(newRoots, &newRoot) } // Return early if there's nothing to remove. if len(newRoots) == len(roots) { return nil } // Commit the new root state. var args structs.CARequest args.Op = structs.CAOpSetRoots args.Index = idx args.Roots = newRoots resp, err := s.raftApply(structs.ConnectCARequestType, args) if err != nil { return err } if respErr, ok := resp.(error); ok { return respErr } return nil } // intermediateCertRenewalWatch checks the intermediate cert for // expiration. As soon as more than half the time a cert is valid has passed, // it will try to renew it. func (s *Server) intermediateCertRenewalWatch(ctx context.Context) error { connectLogger := s.loggers.Named(logging.Connect) isPrimary := s.config.Datacenter == s.config.PrimaryDatacenter for { select { case <-ctx.Done(): return nil case <-time.After(structs.IntermediateCertRenewInterval): retryLoopBackoffAbortOnSuccess(ctx, func() error { s.caProviderReconfigurationLock.Lock() defer s.caProviderReconfigurationLock.Unlock() provider, _ := s.getCAProvider() if provider == nil { // this happens when leadership is being revoked and this go routine will be stopped return nil } // If this isn't the primary, make sure the CA has been initialized. if !isPrimary && !s.configuredSecondaryCA() { return fmt.Errorf("secondary CA is not yet configured.") } state := s.fsm.State() _, activeRoot, err := state.CARootActive(nil) if err != nil { return err } // If this is the primary, check if this is a provider that uses an intermediate cert. If // it isn't, we don't need to check for a renewal. if isPrimary { _, config, err := state.CAConfig(nil) if err != nil { return err } if _, ok := ca.PrimaryIntermediateProviders[config.Provider]; !ok { return nil } } activeIntermediate, err := provider.ActiveIntermediate() if err != nil { return err } if activeIntermediate == "" { return fmt.Errorf("datacenter doesn't have an active intermediate.") } intermediateCert, err := connect.ParseCert(activeIntermediate) if err != nil { return fmt.Errorf("error parsing active intermediate cert: %v", err) } if lessThanHalfTimePassed(time.Now(), intermediateCert.NotBefore.Add(ca.CertificateTimeDriftBuffer), intermediateCert.NotAfter) { return nil } renewalFunc := s.getIntermediateCAPrimary if !isPrimary { renewalFunc = s.getIntermediateCASigned } if err := renewalFunc(provider, activeRoot); err != nil { return err } if err := s.persistNewRoot(provider, activeRoot); err != nil { return err } s.setCAProvider(provider, activeRoot) return nil }, func(err error) { connectLogger.Error("error renewing intermediate certs", "routine", intermediateCertRenewWatchRoutineName, "error", err, ) }) } } } // secondaryCARootWatch maintains a blocking query to the primary datacenter's // ConnectCA.Roots endpoint to monitor when it needs to request a new signed // intermediate certificate. func (s *Server) secondaryCARootWatch(ctx context.Context) error { connectLogger := s.loggers.Named(logging.Connect) args := structs.DCSpecificRequest{ Datacenter: s.config.PrimaryDatacenter, QueryOptions: structs.QueryOptions{ // the maximum time the primary roots watch query can block before returning MaxQueryTime: s.config.MaxQueryTime, }, } connectLogger.Debug("starting Connect CA root replication from primary datacenter", "primary", s.config.PrimaryDatacenter) retryLoopBackoff(ctx, func() error { var roots structs.IndexedCARoots if err := s.forwardDC("ConnectCA.Roots", s.config.PrimaryDatacenter, &args, &roots); err != nil { return fmt.Errorf("Error retrieving the primary datacenter's roots: %v", err) } // Check to see if the primary has been upgraded in case we're waiting to switch to // secondary mode. provider, _ := s.getCAProvider() if provider == nil { // this happens when leadership is being revoked and this go routine will be stopped return nil } if !s.configuredSecondaryCA() { versionOk, primaryFound := ServersInDCMeetMinimumVersion(s, s.config.PrimaryDatacenter, minMultiDCConnectVersion) if !primaryFound { return fmt.Errorf("Primary datacenter is unreachable - deferring secondary CA initialization") } if versionOk { if err := s.initializeSecondaryProvider(provider, roots); err != nil { return fmt.Errorf("Failed to initialize secondary CA provider: %v", err) } } } // Run the secondary CA init routine to see if we need to request a new // intermediate. if s.configuredSecondaryCA() { if err := s.initializeSecondaryCA(provider, roots); err != nil { return fmt.Errorf("Failed to initialize the secondary CA: %v", err) } } args.QueryOptions.MinQueryIndex = nextIndexVal(args.QueryOptions.MinQueryIndex, roots.QueryMeta.Index) return nil }, func(err error) { connectLogger.Error("CA root replication failed, will retry", "routine", secondaryCARootWatchRoutineName, "error", err, ) }) return nil } // replicateIntentions executes a blocking query to the primary datacenter to replicate // the intentions there to the local state. func (s *Server) replicateIntentions(ctx context.Context) error { connectLogger := s.loggers.Named(logging.Connect) args := structs.DCSpecificRequest{ Datacenter: s.config.PrimaryDatacenter, } connectLogger.Debug("starting Connect intention replication from primary datacenter", "primary", s.config.PrimaryDatacenter) retryLoopBackoff(ctx, func() error { // Always use the latest replication token value in case it changed while looping. args.QueryOptions.Token = s.tokens.ReplicationToken() var remote structs.IndexedIntentions if err := s.forwardDC("Intention.List", s.config.PrimaryDatacenter, &args, &remote); err != nil { return err } _, local, err := s.fsm.State().Intentions(nil, s.replicationEnterpriseMeta()) if err != nil { return err } // Compute the diff between the remote and local intentions. deletes, updates := diffIntentions(local, remote.Intentions) txnOpSets := batchIntentionUpdates(deletes, updates) // Apply batched updates to the state store. for _, ops := range txnOpSets { txnReq := structs.TxnRequest{Ops: ops} resp, err := s.raftApply(structs.TxnRequestType, &txnReq) if err != nil { return err } if respErr, ok := resp.(error); ok { return respErr } if txnResp, ok := resp.(structs.TxnResponse); ok { if len(txnResp.Errors) > 0 { return txnResp.Error() } } else { return fmt.Errorf("unexpected return type %T", resp) } } args.QueryOptions.MinQueryIndex = nextIndexVal(args.QueryOptions.MinQueryIndex, remote.QueryMeta.Index) return nil }, func(err error) { connectLogger.Error("error replicating intentions", "routine", intentionReplicationRoutineName, "error", err, ) }) return nil } // retryLoopBackoff loops a given function indefinitely, backing off exponentially // upon errors up to a maximum of maxRetryBackoff seconds. func retryLoopBackoff(ctx context.Context, loopFn func() error, errFn func(error)) { retryLoopBackoffHandleSuccess(ctx, loopFn, errFn, false) } func retryLoopBackoffAbortOnSuccess(ctx context.Context, loopFn func() error, errFn func(error)) { retryLoopBackoffHandleSuccess(ctx, loopFn, errFn, true) } func retryLoopBackoffHandleSuccess(ctx context.Context, loopFn func() error, errFn func(error), abortOnSuccess bool) { var failedAttempts uint limiter := rate.NewLimiter(loopRateLimit, retryBucketSize) for { // Rate limit how often we run the loop limiter.Wait(ctx) select { case <-ctx.Done(): return default: } if (1 << failedAttempts) < maxRetryBackoff { failedAttempts++ } retryTime := (1 << failedAttempts) * time.Second if err := loopFn(); err != nil { errFn(err) timer := time.NewTimer(retryTime) select { case <-ctx.Done(): timer.Stop() return case <-timer.C: continue } } else if abortOnSuccess { return } // Reset the failed attempts after a successful run. failedAttempts = 0 } } // diffIntentions computes the difference between the local and remote intentions // and returns lists of deletes and updates. func diffIntentions(local, remote structs.Intentions) (structs.Intentions, structs.Intentions) { localIdx := make(map[string][]byte, len(local)) remoteIdx := make(map[string]struct{}, len(remote)) var deletes structs.Intentions var updates structs.Intentions for _, intention := range local { localIdx[intention.ID] = intention.Hash } for _, intention := range remote { remoteIdx[intention.ID] = struct{}{} } for _, intention := range local { if _, ok := remoteIdx[intention.ID]; !ok { deletes = append(deletes, intention) } } for _, intention := range remote { existingHash, ok := localIdx[intention.ID] if !ok { updates = append(updates, intention) } else if bytes.Compare(existingHash, intention.Hash) != 0 { updates = append(updates, intention) } } return deletes, updates } // batchIntentionUpdates breaks up the given updates into sets of TxnOps based // on the estimated size of the operations. func batchIntentionUpdates(deletes, updates structs.Intentions) []structs.TxnOps { var txnOps structs.TxnOps for _, delete := range deletes { deleteOp := &structs.TxnIntentionOp{ Op: structs.IntentionOpDelete, Intention: delete, } txnOps = append(txnOps, &structs.TxnOp{Intention: deleteOp}) } for _, update := range updates { updateOp := &structs.TxnIntentionOp{ Op: structs.IntentionOpUpdate, Intention: update, } txnOps = append(txnOps, &structs.TxnOp{Intention: updateOp}) } // Divide the operations into chunks according to maxIntentionTxnSize. var batchedOps []structs.TxnOps for batchStart := 0; batchStart < len(txnOps); { // inner loop finds the last element to include in this batch. batchSize := 0 batchEnd := batchStart for ; batchEnd < len(txnOps) && batchSize < maxIntentionTxnSize; batchEnd += 1 { batchSize += txnOps[batchEnd].Intention.Intention.EstimateSize() } batchedOps = append(batchedOps, txnOps[batchStart:batchEnd]) // txnOps[batchEnd] wasn't included as the slicing doesn't include the element at the stop index batchStart = batchEnd } return batchedOps } // nextIndexVal computes the next index value to query for, resetting to zero // if the index went backward. func nextIndexVal(prevIdx, idx uint64) uint64 { if prevIdx > idx { return 0 } return idx } // initializeSecondaryProvider configures the given provider for a secondary, non-root datacenter. // It is being called while holding caProviderReconfigurationLock which means // it must never take that lock itself or call anything that does. func (s *Server) initializeSecondaryProvider(provider ca.Provider, roots structs.IndexedCARoots) error { if roots.TrustDomain == "" { return fmt.Errorf("trust domain from primary datacenter is not initialized") } clusterID := strings.Split(roots.TrustDomain, ".")[0] _, conf, err := s.fsm.State().CAConfig(nil) if err != nil { return err } pCfg := ca.ProviderConfig{ ClusterID: clusterID, Datacenter: s.config.Datacenter, IsPrimary: false, RawConfig: conf.Config, State: conf.State, } if err := provider.Configure(pCfg); err != nil { return fmt.Errorf("error configuring provider: %v", err) } s.actingSecondaryLock.Lock() s.actingSecondaryCA = true s.actingSecondaryLock.Unlock() return nil } // configuredSecondaryCA is being called while holding caProviderReconfigurationLock // which means it must never take that lock itself or call anything that does. func (s *Server) configuredSecondaryCA() bool { s.actingSecondaryLock.RLock() defer s.actingSecondaryLock.RUnlock() return s.actingSecondaryCA } // halfTime returns a duration that is half the time between notBefore and // notAfter. func halfTime(notBefore, notAfter time.Time) time.Duration { interval := notAfter.Sub(notBefore) return interval / 2 } // lessThanHalfTimePassed decides if half the time between notBefore and // notAfter has passed relative to now. // lessThanHalfTimePassed is being called while holding caProviderReconfigurationLock // which means it must never take that lock itself or call anything that does. func lessThanHalfTimePassed(now, notBefore, notAfter time.Time) bool { t := notBefore.Add(halfTime(notBefore, notAfter)) return t.Sub(now) > 0 }