2019-06-24 18:21:51 +00:00
package consul
import (
2019-06-26 16:28:09 +00:00
"bytes"
2019-06-24 18:21:51 +00:00
"context"
"fmt"
2019-11-11 20:57:16 +00:00
"reflect"
2019-06-24 18:21:51 +00:00
"strings"
"time"
"golang.org/x/time/rate"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/connect/ca"
"github.com/hashicorp/consul/agent/structs"
2020-01-28 23:50:41 +00:00
"github.com/hashicorp/consul/logging"
2019-07-26 19:57:57 +00:00
uuid "github.com/hashicorp/go-uuid"
2019-06-24 18:21:51 +00:00
)
const (
// loopRateLimit is the maximum rate per second at which we can rerun CA and intention
// replication watches.
loopRateLimit rate . Limit = 0.2
// retryBucketSize is the maximum number of stored rate limit attempts for looped
// blocking query operations.
retryBucketSize = 5
// maxIntentionTxnSize is the maximum size (in bytes) of a transaction used during
// Intention replication.
maxIntentionTxnSize = raftWarnSize / 4
)
var (
// maxRetryBackoff is the maximum number of seconds to wait between failed blocking
// queries when backing off.
maxRetryBackoff = 256
)
2019-07-26 19:57:57 +00:00
// initializeCAConfig is used to initialize the CA config if necessary
// when setting up the CA during establishLeadership
func ( s * Server ) initializeCAConfig ( ) ( * structs . CAConfiguration , error ) {
state := s . fsm . State ( )
2019-08-19 18:03:03 +00:00
_ , config , err := state . CAConfig ( nil )
2019-07-26 19:57:57 +00:00
if err != nil {
return nil , err
}
2020-02-10 23:05:49 +00:00
if config == nil {
config = s . config . CAConfig
if config . ClusterID == "" {
id , err := uuid . GenerateUUID ( )
if err != nil {
return nil , err
}
config . ClusterID = id
}
} else if _ , ok := config . Config [ "IntermediateCertTTL" ] ; ! ok {
dup := * config
copied := make ( map [ string ] interface { } )
for k , v := range dup . Config {
copied [ k ] = v
2019-07-26 19:57:57 +00:00
}
2020-02-10 23:05:49 +00:00
copied [ "IntermediateCertTTL" ] = connect . DefaultIntermediateCertTTL . String ( )
dup . Config = copied
config = & dup
} else {
return config , nil
2019-07-26 19:57:57 +00:00
}
req := structs . CARequest {
Op : structs . CAOpSetConfig ,
Config : config ,
}
2020-02-10 23:05:49 +00:00
if resp , err := s . raftApply ( structs . ConnectCARequestType , req ) ; err != nil {
2019-07-26 19:57:57 +00:00
return nil , err
2020-02-10 23:05:49 +00:00
} else if respErr , ok := resp . ( error ) ; ok {
return nil , respErr
2019-07-26 19:57:57 +00:00
}
return config , nil
}
// parseCARoot returns a filled-in structs.CARoot from a raw PEM value.
func parseCARoot ( pemValue , provider , clusterID string ) ( * structs . CARoot , error ) {
id , err := connect . CalculateCertFingerprint ( pemValue )
if err != nil {
return nil , fmt . Errorf ( "error parsing root fingerprint: %v" , err )
}
rootCert , err := connect . ParseCert ( pemValue )
if err != nil {
return nil , fmt . Errorf ( "error parsing root cert: %v" , err )
}
2019-11-01 13:20:26 +00:00
keyType , keyBits , err := connect . KeyInfoFromCert ( rootCert )
if err != nil {
return nil , fmt . Errorf ( "error extracting root key info: %v" , err )
}
2019-07-26 19:57:57 +00:00
return & structs . CARoot {
ID : id ,
Name : fmt . Sprintf ( "%s CA Root Cert" , strings . Title ( provider ) ) ,
SerialNumber : rootCert . SerialNumber . Uint64 ( ) ,
2019-09-23 17:52:35 +00:00
SigningKeyID : connect . EncodeSigningKeyID ( rootCert . SubjectKeyId ) ,
2019-07-26 19:57:57 +00:00
ExternalTrustDomain : clusterID ,
NotBefore : rootCert . NotBefore ,
NotAfter : rootCert . NotAfter ,
RootCert : pemValue ,
2019-11-01 13:20:26 +00:00
PrivateKeyType : keyType ,
PrivateKeyBits : keyBits ,
2019-07-26 19:57:57 +00:00
Active : true ,
} , nil
}
// createProvider returns a connect CA provider from the given config.
func ( s * Server ) createCAProvider ( conf * structs . CAConfiguration ) ( ca . Provider , error ) {
2019-11-11 20:30:01 +00:00
var p ca . Provider
2019-07-26 19:57:57 +00:00
switch conf . Provider {
case structs . ConsulCAProvider :
2019-11-11 20:30:01 +00:00
p = & ca . ConsulProvider { Delegate : & consulCADelegate { s } }
2019-07-26 19:57:57 +00:00
case structs . VaultCAProvider :
2019-11-11 20:30:01 +00:00
p = & ca . VaultProvider { }
2019-11-21 17:40:29 +00:00
case structs . AWSCAProvider :
p = & ca . AWSProvider { }
2019-07-26 19:57:57 +00:00
default :
return nil , fmt . Errorf ( "unknown CA provider %q" , conf . Provider )
}
2019-11-11 20:30:01 +00:00
// If the provider implements NeedsLogger, we give it our logger.
if needsLogger , ok := p . ( ca . NeedsLogger ) ; ok {
needsLogger . SetLogger ( s . logger )
}
return p , nil
2019-07-26 19:57:57 +00:00
}
2020-01-17 22:27:13 +00:00
// getCAProvider is being called while holding caProviderReconfigurationLock
// which means it must never take that lock itself or call anything that does.
2019-07-26 19:57:57 +00:00
func ( s * Server ) getCAProvider ( ) ( ca . Provider , * structs . CARoot ) {
retries := 0
var result ca . Provider
var resultRoot * structs . CARoot
for result == nil {
s . caProviderLock . RLock ( )
result = s . caProvider
resultRoot = s . caProviderRoot
s . caProviderLock . RUnlock ( )
// In cases where an agent is started with managed proxies, we may ask
// for the provider before establishLeadership completes. If we're the
// leader, then wait and get the provider again
if result == nil && s . IsLeader ( ) && retries < 10 {
retries ++
time . Sleep ( 50 * time . Millisecond )
continue
}
break
}
return result , resultRoot
}
2020-01-17 22:27:13 +00:00
// setCAProvider is being called while holding caProviderReconfigurationLock
// which means it must never take that lock itself or call anything that does.
2019-07-26 19:57:57 +00:00
func ( s * Server ) setCAProvider ( newProvider ca . Provider , root * structs . CARoot ) {
s . caProviderLock . Lock ( )
defer s . caProviderLock . Unlock ( )
s . caProvider = newProvider
s . caProviderRoot = root
}
2019-06-24 18:21:51 +00:00
// initializeCA sets up the CA provider when gaining leadership, either bootstrapping
// the CA if this is the primary DC or making a remote RPC for intermediate signing
// if this is a secondary DC.
func ( s * Server ) initializeCA ( ) error {
2020-01-28 23:50:41 +00:00
connectLogger := s . loggers . Named ( logging . Connect )
2019-06-24 18:21:51 +00:00
// Bail if connect isn't enabled.
if ! s . config . ConnectEnabled {
return nil
}
// Initialize the provider based on the current config.
conf , err := s . initializeCAConfig ( )
if err != nil {
return err
}
provider , err := s . createCAProvider ( conf )
if err != nil {
return err
}
2020-01-17 22:27:13 +00:00
s . caProviderReconfigurationLock . Lock ( )
defer s . caProviderReconfigurationLock . Unlock ( )
2019-06-24 18:21:51 +00:00
s . setCAProvider ( provider , nil )
2019-07-26 19:57:57 +00:00
// If this isn't the primary DC, run the secondary DC routine if the primary has already been upgraded to at least 1.6.0
2019-06-24 18:21:51 +00:00
if s . config . PrimaryDatacenter != s . config . Datacenter {
2020-03-27 16:31:43 +00:00
versionOk , foundPrimary := ServersInDCMeetMinimumVersion ( s , s . config . PrimaryDatacenter , minMultiDCConnectVersion )
2019-07-26 19:57:57 +00:00
if ! foundPrimary {
2020-01-28 23:50:41 +00:00
connectLogger . Warn ( "primary datacenter is configured but unreachable - deferring initialization of the secondary datacenter CA" )
2019-07-26 19:57:57 +00:00
// return nil because we will initialize the secondary CA later
2019-06-24 18:21:51 +00:00
return nil
2019-07-26 19:57:57 +00:00
} else if ! versionOk {
// return nil because we will initialize the secondary CA later
2020-01-28 23:50:41 +00:00
connectLogger . Warn ( "servers in the primary datacenter are not at least at the minimum version - deferring initialization of the secondary datacenter CA" ,
"min_version" , minMultiDCConnectVersion . String ( ) ,
)
2019-06-24 18:21:51 +00:00
return nil
}
// Get the root CA to see if we need to refresh our intermediate.
args := structs . DCSpecificRequest {
Datacenter : s . config . PrimaryDatacenter ,
}
var roots structs . IndexedCARoots
if err := s . forwardDC ( "ConnectCA.Roots" , s . config . PrimaryDatacenter , & args , & roots ) ; err != nil {
return err
}
// Configure the CA provider and initialize the intermediate certificate if necessary.
if err := s . initializeSecondaryProvider ( provider , roots ) ; err != nil {
return fmt . Errorf ( "error configuring provider: %v" , err )
}
if err := s . initializeSecondaryCA ( provider , roots ) ; err != nil {
return err
}
2020-01-28 23:50:41 +00:00
connectLogger . Info ( "initialized secondary datacenter CA with provider" , "provider" , conf . Provider )
2019-06-24 18:21:51 +00:00
return nil
}
return s . initializeRootCA ( provider , conf )
}
2019-07-26 19:57:57 +00:00
// initializeRootCA runs the initialization logic for a root CA.
2020-01-17 22:27:13 +00:00
// It is being called while holding caProviderReconfigurationLock
// which means it must never take that lock itself or call anything that does.
2019-07-26 19:57:57 +00:00
func ( s * Server ) initializeRootCA ( provider ca . Provider , conf * structs . CAConfiguration ) error {
2020-01-28 23:50:41 +00:00
connectLogger := s . loggers . Named ( logging . Connect )
2019-11-18 14:22:19 +00:00
pCfg := ca . ProviderConfig {
ClusterID : conf . ClusterID ,
Datacenter : s . config . Datacenter ,
IsPrimary : true ,
RawConfig : conf . Config ,
State : conf . State ,
}
if err := provider . Configure ( pCfg ) ; err != nil {
2019-07-26 19:57:57 +00:00
return fmt . Errorf ( "error configuring provider: %v" , err )
}
if err := provider . GenerateRoot ( ) ; err != nil {
return fmt . Errorf ( "error generating CA root certificate: %v" , err )
}
// Get the active root cert from the CA
rootPEM , err := provider . ActiveRoot ( )
if err != nil {
return fmt . Errorf ( "error getting root cert: %v" , err )
}
rootCA , err := parseCARoot ( rootPEM , conf . Provider , conf . ClusterID )
if err != nil {
return err
}
2019-08-02 19:36:03 +00:00
// Also create the intermediate CA, which is the one that actually signs leaf certs
interPEM , err := provider . GenerateIntermediate ( )
if err != nil {
return fmt . Errorf ( "error generating intermediate cert: %v" , err )
}
_ , err = connect . ParseCert ( interPEM )
if err != nil {
return fmt . Errorf ( "error getting intermediate cert: %v" , err )
}
2019-11-11 20:57:16 +00:00
// If the provider has state to persist and it's changed or new then update
// CAConfig.
pState , err := provider . State ( )
if err != nil {
return fmt . Errorf ( "error getting provider state: %v" , err )
}
if ! reflect . DeepEqual ( conf . State , pState ) {
// Update the CAConfig in raft to persist the provider state
conf . State = pState
req := structs . CARequest {
Op : structs . CAOpSetConfig ,
Config : conf ,
}
if _ , err = s . raftApply ( structs . ConnectCARequestType , req ) ; err != nil {
return fmt . Errorf ( "error persisting provider state: %v" , err )
}
}
2019-07-26 19:57:57 +00:00
// Check if the CA root is already initialized and exit if it is,
// adding on any existing intermediate certs since they aren't directly
// tied to the provider.
// Every change to the CA after this initial bootstrapping should
// be done through the rotation process.
state := s . fsm . State ( )
_ , activeRoot , err := state . CARootActive ( nil )
if err != nil {
return err
}
if activeRoot != nil {
// This state shouldn't be possible to get into because we update the root and
// CA config in the same FSM operation.
if activeRoot . ID != rootCA . ID {
return fmt . Errorf ( "stored CA root %q is not the active root (%s)" , rootCA . ID , activeRoot . ID )
}
rootCA . IntermediateCerts = activeRoot . IntermediateCerts
s . setCAProvider ( provider , rootCA )
return nil
}
// Get the highest index
idx , _ , err := state . CARoots ( nil )
if err != nil {
return err
}
// Store the root cert in raft
resp , err := s . raftApply ( structs . ConnectCARequestType , & structs . CARequest {
Op : structs . CAOpSetRoots ,
Index : idx ,
Roots : [ ] * structs . CARoot { rootCA } ,
} )
if err != nil {
2020-01-28 23:50:41 +00:00
connectLogger . Error ( "Raft apply failed" , "error" , err )
2019-07-26 19:57:57 +00:00
return err
}
if respErr , ok := resp . ( error ) ; ok {
return respErr
}
s . setCAProvider ( provider , rootCA )
2020-01-28 23:50:41 +00:00
connectLogger . Info ( "initialized primary datacenter CA with provider" , "provider" , conf . Provider )
2019-07-26 19:57:57 +00:00
return nil
}
2019-06-24 18:21:51 +00:00
// initializeSecondaryCA runs the routine for generating an intermediate CA CSR and getting
// it signed by the primary DC if the root CA of the primary DC has changed since the last
// intermediate.
2020-01-17 22:27:13 +00:00
// It is being called while holding caProviderReconfigurationLock
// which means it must never take that lock itself or call anything that does.
2020-01-09 15:28:16 +00:00
func ( s * Server ) initializeSecondaryCA ( provider ca . Provider , primaryRoots structs . IndexedCARoots ) error {
2019-06-24 18:21:51 +00:00
activeIntermediate , err := provider . ActiveIntermediate ( )
if err != nil {
return err
}
2019-09-26 16:54:14 +00:00
var (
storedRootID string
expectedSigningKeyID string
2020-01-09 15:28:16 +00:00
currentSigningKeyID string
activeSecondaryRoot * structs . CARoot
2019-09-26 16:54:14 +00:00
)
2019-06-24 18:21:51 +00:00
if activeIntermediate != "" {
2020-01-09 15:28:16 +00:00
// In the event that we already have an intermediate, we must have
// already replicated some primary root information locally, so check
// to see if we're up to date by fetching the rootID and the
// signingKeyID used in the secondary.
//
// Note that for the same rootID the primary representation of the root
// will have a different SigningKeyID field than the secondary
// representation of the same root. This is because it's derived from
// the intermediate which is different in all datacenters.
2019-06-24 18:21:51 +00:00
storedRoot , err := provider . ActiveRoot ( )
if err != nil {
return err
}
storedRootID , err = connect . CalculateCertFingerprint ( storedRoot )
if err != nil {
2020-01-17 22:27:13 +00:00
return fmt . Errorf ( "error parsing root fingerprint: %v, %#v" , err , storedRoot )
2019-06-24 18:21:51 +00:00
}
2019-09-26 16:54:14 +00:00
intermediateCert , err := connect . ParseCert ( activeIntermediate )
if err != nil {
return fmt . Errorf ( "error parsing active intermediate cert: %v" , err )
}
expectedSigningKeyID = connect . EncodeSigningKeyID ( intermediateCert . SubjectKeyId )
2020-01-09 15:28:16 +00:00
// This will fetch the secondary's exact current representation of the
// active root. Note that this data should only be used if the IDs
// match, otherwise it's out of date and should be regenerated.
_ , activeSecondaryRoot , err = s . fsm . State ( ) . CARootActive ( nil )
if err != nil {
return err
}
if activeSecondaryRoot != nil {
currentSigningKeyID = activeSecondaryRoot . SigningKeyID
}
2019-06-24 18:21:51 +00:00
}
2020-01-09 15:28:16 +00:00
// Determine which of the provided PRIMARY representations of roots is the
// active one. We'll use this as a template to generate any new root
// representations meant for this secondary.
2019-06-24 18:21:51 +00:00
var newActiveRoot * structs . CARoot
2020-01-09 15:28:16 +00:00
for _ , root := range primaryRoots . Roots {
if root . ID == primaryRoots . ActiveRootID && root . Active {
2019-06-24 18:21:51 +00:00
newActiveRoot = root
break
}
}
if newActiveRoot == nil {
return fmt . Errorf ( "primary datacenter does not have an active root CA for Connect" )
}
2019-08-30 15:38:46 +00:00
// Get a signed intermediate from the primary DC if the provider
// hasn't been initialized yet or if the primary's root has changed.
2019-09-26 16:54:14 +00:00
needsNewIntermediate := false
2020-01-09 15:28:16 +00:00
if activeIntermediate == "" || storedRootID != primaryRoots . ActiveRootID {
2019-09-26 16:54:14 +00:00
needsNewIntermediate = true
}
// Also we take this opportunity to correct an incorrectly persisted SigningKeyID
// in secondary datacenters (see PR-6513).
2020-01-09 15:28:16 +00:00
if expectedSigningKeyID != "" && currentSigningKeyID != expectedSigningKeyID {
2019-09-26 16:54:14 +00:00
needsNewIntermediate = true
}
newIntermediate := false
if needsNewIntermediate {
2020-01-17 22:27:13 +00:00
if err := s . getIntermediateCASigned ( provider , newActiveRoot ) ; err != nil {
2019-08-30 15:38:46 +00:00
return err
}
newIntermediate = true
2020-01-09 15:28:16 +00:00
} else {
// Discard the primary's representation since our local one is
// sufficiently up to date.
newActiveRoot = activeSecondaryRoot
2019-08-30 15:38:46 +00:00
}
2019-06-24 18:21:51 +00:00
// Update the roots list in the state store if there's a new active root.
state := s . fsm . State ( )
_ , activeRoot , err := state . CARootActive ( nil )
if err != nil {
return err
}
2019-08-30 15:38:46 +00:00
if activeRoot == nil || activeRoot . ID != newActiveRoot . ID || newIntermediate {
2020-01-17 22:27:13 +00:00
if err := s . persistNewRoot ( provider , newActiveRoot ) ; err != nil {
2019-06-24 18:21:51 +00:00
return err
}
2020-01-17 22:27:13 +00:00
}
2019-06-24 18:21:51 +00:00
2020-01-17 22:27:13 +00:00
s . setCAProvider ( provider , newActiveRoot )
return nil
}
2019-06-24 18:21:51 +00:00
2020-01-17 22:27:13 +00:00
// persistNewRoot is being called while holding caProviderReconfigurationLock
// which means it must never take that lock itself or call anything that does.
func ( s * Server ) persistNewRoot ( provider ca . Provider , newActiveRoot * structs . CARoot ) error {
2020-01-28 23:50:41 +00:00
connectLogger := s . loggers . Named ( logging . Connect )
2020-01-17 22:27:13 +00:00
state := s . fsm . State ( )
idx , oldRoots , err := state . CARoots ( nil )
if err != nil {
return err
}
2019-11-11 20:57:16 +00:00
2020-01-17 22:27:13 +00:00
_ , config , err := state . CAConfig ( nil )
if err != nil {
return err
}
if config == nil {
return fmt . Errorf ( "local CA not initialized yet" )
}
newConf := * config
newConf . ClusterID = newActiveRoot . ExternalTrustDomain
2019-06-24 18:21:51 +00:00
2020-01-17 22:27:13 +00:00
// Persist any state the provider needs us to
newConf . State , err = provider . State ( )
if err != nil {
return fmt . Errorf ( "error getting provider state: %v" , err )
}
// Copy the root list and append the new active root, updating the old root
// with the time it was rotated out.
var newRoots structs . CARoots
for _ , r := range oldRoots {
newRoot := * r
if newRoot . Active {
newRoot . Active = false
newRoot . RotatedOutAt = time . Now ( )
2019-06-24 18:21:51 +00:00
}
2020-01-17 22:27:13 +00:00
if newRoot . ExternalTrustDomain == "" {
newRoot . ExternalTrustDomain = config . ClusterID
2019-06-24 18:21:51 +00:00
}
2020-01-17 22:27:13 +00:00
newRoots = append ( newRoots , & newRoot )
}
newRoots = append ( newRoots , newActiveRoot )
2019-06-24 18:21:51 +00:00
2020-01-17 22:27:13 +00:00
args := & structs . CARequest {
Op : structs . CAOpSetRootsAndConfig ,
Index : idx ,
Roots : newRoots ,
Config : & newConf ,
}
resp , err := s . raftApply ( structs . ConnectCARequestType , & args )
if err != nil {
return err
}
if respErr , ok := resp . ( error ) ; ok {
return respErr
}
if respOk , ok := resp . ( bool ) ; ok && ! respOk {
return fmt . Errorf ( "could not atomically update roots and config" )
2019-06-24 18:21:51 +00:00
}
2020-01-28 23:50:41 +00:00
connectLogger . Info ( "updated root certificates from primary datacenter" )
2020-01-17 22:27:13 +00:00
return nil
}
// getIntermediateCASigned is being called while holding caProviderReconfigurationLock
// which means it must never take that lock itself or call anything that does.
func ( s * Server ) getIntermediateCASigned ( provider ca . Provider , newActiveRoot * structs . CARoot ) error {
2020-01-28 23:50:41 +00:00
connectLogger := s . loggers . Named ( logging . Connect )
2020-01-17 22:27:13 +00:00
csr , err := provider . GenerateIntermediateCSR ( )
if err != nil {
return err
}
var intermediatePEM string
if err := s . forwardDC ( "ConnectCA.SignIntermediate" , s . config . PrimaryDatacenter , s . generateCASignRequest ( csr ) , & intermediatePEM ) ; err != nil {
// this is a failure in the primary and shouldn't be capable of erroring out our establishing leadership
2020-01-28 23:50:41 +00:00
connectLogger . Warn ( "Primary datacenter refused to sign our intermediate CA certificate" , "error" , err )
2020-01-17 22:27:13 +00:00
return nil
}
if err := provider . SetIntermediate ( intermediatePEM , newActiveRoot . RootCert ) ; err != nil {
return fmt . Errorf ( "Failed to set the intermediate certificate with the CA provider: %v" , err )
}
intermediateCert , err := connect . ParseCert ( intermediatePEM )
if err != nil {
return fmt . Errorf ( "error parsing intermediate cert: %v" , err )
}
// Append the new intermediate to our local active root entry. This is
// where the root representations start to diverge.
newActiveRoot . IntermediateCerts = append ( newActiveRoot . IntermediateCerts , intermediatePEM )
newActiveRoot . SigningKeyID = connect . EncodeSigningKeyID ( intermediateCert . SubjectKeyId )
2020-01-28 23:50:41 +00:00
connectLogger . Info ( "received new intermediate certificate from primary datacenter" )
2019-06-24 18:21:51 +00:00
return nil
}
func ( s * Server ) generateCASignRequest ( csr string ) * structs . CASignRequest {
return & structs . CASignRequest {
Datacenter : s . config . PrimaryDatacenter ,
CSR : csr ,
WriteRequest : structs . WriteRequest { Token : s . tokens . ReplicationToken ( ) } ,
}
}
// startConnectLeader starts multi-dc connect leader routines.
func ( s * Server ) startConnectLeader ( ) {
// Start the Connect secondary DC actions if enabled.
if s . config . ConnectEnabled && s . config . Datacenter != s . config . PrimaryDatacenter {
2019-10-04 17:08:45 +00:00
s . leaderRoutineManager . Start ( secondaryCARootWatchRoutineName , s . secondaryCARootWatch )
s . leaderRoutineManager . Start ( intentionReplicationRoutineName , s . replicateIntentions )
2020-01-17 22:27:13 +00:00
s . leaderRoutineManager . Start ( secondaryCertRenewWatchRoutineName , s . secondaryIntermediateCertRenewalWatch )
2020-06-26 21:59:15 +00:00
s . startConnectLeaderEnterprise ( )
2019-06-24 18:21:51 +00:00
}
2019-10-04 17:08:45 +00:00
s . leaderRoutineManager . Start ( caRootPruningRoutineName , s . runCARootPruning )
2019-06-24 18:21:51 +00:00
}
// stopConnectLeader stops connect specific leader functions.
func ( s * Server ) stopConnectLeader ( ) {
2019-10-04 17:08:45 +00:00
s . leaderRoutineManager . Stop ( secondaryCARootWatchRoutineName )
s . leaderRoutineManager . Stop ( intentionReplicationRoutineName )
s . leaderRoutineManager . Stop ( caRootPruningRoutineName )
2020-06-26 21:59:15 +00:00
s . stopConnectLeaderEnterprise ( )
2019-06-24 18:21:51 +00:00
}
2019-10-04 17:08:45 +00:00
func ( s * Server ) runCARootPruning ( ctx context . Context ) error {
2019-07-26 19:57:57 +00:00
ticker := time . NewTicker ( caRootPruneInterval )
defer ticker . Stop ( )
for {
select {
2019-10-04 17:08:45 +00:00
case <- ctx . Done ( ) :
return nil
2019-07-26 19:57:57 +00:00
case <- ticker . C :
if err := s . pruneCARoots ( ) ; err != nil {
2020-01-28 23:50:41 +00:00
s . loggers . Named ( logging . Connect ) . Error ( "error pruning CA roots" , "error" , err )
2019-07-26 19:57:57 +00:00
}
}
}
}
// pruneCARoots looks for any CARoots that have been rotated out and expired.
func ( s * Server ) pruneCARoots ( ) error {
if ! s . config . ConnectEnabled {
return nil
}
state := s . fsm . State ( )
idx , roots , err := state . CARoots ( nil )
if err != nil {
return err
}
2019-08-19 18:03:03 +00:00
_ , caConf , err := state . CAConfig ( nil )
2019-07-26 19:57:57 +00:00
if err != nil {
return err
}
common , err := caConf . GetCommonConfig ( )
if err != nil {
return err
}
var newRoots structs . CARoots
for _ , r := range roots {
if ! r . Active && ! r . RotatedOutAt . IsZero ( ) && time . Now ( ) . Sub ( r . RotatedOutAt ) > common . LeafCertTTL * 2 {
2020-01-28 23:50:41 +00:00
s . loggers . Named ( logging . Connect ) . Info ( "pruning old unused root CA" , "id" , r . ID )
2019-07-26 19:57:57 +00:00
continue
}
newRoot := * r
newRoots = append ( newRoots , & newRoot )
}
// Return early if there's nothing to remove.
if len ( newRoots ) == len ( roots ) {
return nil
}
// Commit the new root state.
var args structs . CARequest
args . Op = structs . CAOpSetRoots
args . Index = idx
args . Roots = newRoots
resp , err := s . raftApply ( structs . ConnectCARequestType , args )
if err != nil {
return err
}
if respErr , ok := resp . ( error ) ; ok {
return respErr
}
return nil
}
2020-01-17 22:27:13 +00:00
// secondaryIntermediateCertRenewalWatch checks the intermediate cert for
// expiration. As soon as more than half the time a cert is valid has passed,
// it will try to renew it.
func ( s * Server ) secondaryIntermediateCertRenewalWatch ( ctx context . Context ) error {
2020-01-28 23:50:41 +00:00
connectLogger := s . loggers . Named ( logging . Connect )
2020-01-17 22:27:13 +00:00
for {
select {
case <- ctx . Done ( ) :
return nil
2020-02-10 23:05:49 +00:00
case <- time . After ( structs . IntermediateCertRenewInterval ) :
2020-09-04 09:47:16 +00:00
retryLoopBackoffAbortOnSuccess ( ctx , func ( ) error {
2020-01-17 22:27:13 +00:00
s . caProviderReconfigurationLock . Lock ( )
defer s . caProviderReconfigurationLock . Unlock ( )
provider , _ := s . getCAProvider ( )
if provider == nil {
// this happens when leadership is being revoked and this go routine will be stopped
return nil
}
if ! s . configuredSecondaryCA ( ) {
return fmt . Errorf ( "secondary CA is not yet configured." )
}
state := s . fsm . State ( )
_ , activeRoot , err := state . CARootActive ( nil )
if err != nil {
return err
}
activeIntermediate , err := provider . ActiveIntermediate ( )
if err != nil {
return err
}
if activeIntermediate == "" {
return fmt . Errorf ( "secondary datacenter doesn't have an active intermediate." )
}
intermediateCert , err := connect . ParseCert ( activeIntermediate )
if err != nil {
return fmt . Errorf ( "error parsing active intermediate cert: %v" , err )
}
if lessThanHalfTimePassed ( time . Now ( ) , intermediateCert . NotBefore ,
intermediateCert . NotAfter ) {
return nil
}
if err := s . getIntermediateCASigned ( provider , activeRoot ) ; err != nil {
return err
}
if err := s . persistNewRoot ( provider , activeRoot ) ; err != nil {
return err
}
s . setCAProvider ( provider , activeRoot )
return nil
} , func ( err error ) {
2020-01-28 23:50:41 +00:00
connectLogger . Error ( "error renewing intermediate certs" ,
"routine" , secondaryCertRenewWatchRoutineName ,
"error" , err ,
)
2020-01-17 22:27:13 +00:00
} )
}
}
}
2019-06-24 18:21:51 +00:00
// secondaryCARootWatch maintains a blocking query to the primary datacenter's
// ConnectCA.Roots endpoint to monitor when it needs to request a new signed
// intermediate certificate.
2019-10-04 17:08:45 +00:00
func ( s * Server ) secondaryCARootWatch ( ctx context . Context ) error {
2020-01-28 23:50:41 +00:00
connectLogger := s . loggers . Named ( logging . Connect )
2019-06-24 18:21:51 +00:00
args := structs . DCSpecificRequest {
Datacenter : s . config . PrimaryDatacenter ,
QueryOptions : structs . QueryOptions {
2020-01-17 13:20:57 +00:00
// the maximum time the primary roots watch query can block before returning
MaxQueryTime : s . config . MaxQueryTime ,
2019-06-24 18:21:51 +00:00
} ,
}
2020-01-28 23:50:41 +00:00
connectLogger . Debug ( "starting Connect CA root replication from primary datacenter" , "primary" , s . config . PrimaryDatacenter )
2019-06-24 18:21:51 +00:00
2020-06-24 16:36:14 +00:00
retryLoopBackoff ( ctx , func ( ) error {
2019-06-24 18:21:51 +00:00
var roots structs . IndexedCARoots
if err := s . forwardDC ( "ConnectCA.Roots" , s . config . PrimaryDatacenter , & args , & roots ) ; err != nil {
2019-07-26 19:57:57 +00:00
return fmt . Errorf ( "Error retrieving the primary datacenter's roots: %v" , err )
2019-06-24 18:21:51 +00:00
}
// Check to see if the primary has been upgraded in case we're waiting to switch to
// secondary mode.
provider , _ := s . getCAProvider ( )
2019-07-26 19:57:57 +00:00
if provider == nil {
// this happens when leadership is being revoked and this go routine will be stopped
return nil
}
2019-06-24 18:21:51 +00:00
if ! s . configuredSecondaryCA ( ) {
2020-03-27 16:31:43 +00:00
versionOk , primaryFound := ServersInDCMeetMinimumVersion ( s , s . config . PrimaryDatacenter , minMultiDCConnectVersion )
2019-07-26 19:57:57 +00:00
if ! primaryFound {
return fmt . Errorf ( "Primary datacenter is unreachable - deferring secondary CA initialization" )
2019-06-24 18:21:51 +00:00
}
2019-07-26 19:57:57 +00:00
if versionOk {
2019-06-24 18:21:51 +00:00
if err := s . initializeSecondaryProvider ( provider , roots ) ; err != nil {
2019-07-26 19:57:57 +00:00
return fmt . Errorf ( "Failed to initialize secondary CA provider: %v" , err )
2019-06-24 18:21:51 +00:00
}
}
}
// Run the secondary CA init routine to see if we need to request a new
// intermediate.
if s . configuredSecondaryCA ( ) {
if err := s . initializeSecondaryCA ( provider , roots ) ; err != nil {
2019-07-26 19:57:57 +00:00
return fmt . Errorf ( "Failed to initialize the secondary CA: %v" , err )
2019-06-24 18:21:51 +00:00
}
}
args . QueryOptions . MinQueryIndex = nextIndexVal ( args . QueryOptions . MinQueryIndex , roots . QueryMeta . Index )
return nil
} , func ( err error ) {
2020-01-28 23:50:41 +00:00
connectLogger . Error ( "CA root replication failed, will retry" ,
"routine" , secondaryCARootWatchRoutineName ,
"error" , err ,
)
2019-06-24 18:21:51 +00:00
} )
2019-10-04 17:08:45 +00:00
return nil
2019-06-24 18:21:51 +00:00
}
// replicateIntentions executes a blocking query to the primary datacenter to replicate
// the intentions there to the local state.
2019-10-04 17:08:45 +00:00
func ( s * Server ) replicateIntentions ( ctx context . Context ) error {
2020-01-28 23:50:41 +00:00
connectLogger := s . loggers . Named ( logging . Connect )
2019-06-24 18:21:51 +00:00
args := structs . DCSpecificRequest {
2019-08-07 16:34:09 +00:00
Datacenter : s . config . PrimaryDatacenter ,
2019-06-24 18:21:51 +00:00
}
2020-01-28 23:50:41 +00:00
connectLogger . Debug ( "starting Connect intention replication from primary datacenter" , "primary" , s . config . PrimaryDatacenter )
2019-06-24 18:21:51 +00:00
2020-06-24 16:36:14 +00:00
retryLoopBackoff ( ctx , func ( ) error {
2019-08-07 16:34:09 +00:00
// Always use the latest replication token value in case it changed while looping.
args . QueryOptions . Token = s . tokens . ReplicationToken ( )
2019-06-24 18:21:51 +00:00
var remote structs . IndexedIntentions
if err := s . forwardDC ( "Intention.List" , s . config . PrimaryDatacenter , & args , & remote ) ; err != nil {
return err
}
2020-06-26 21:59:15 +00:00
_ , local , err := s . fsm . State ( ) . Intentions ( nil , s . replicationEnterpriseMeta ( ) )
2019-06-24 18:21:51 +00:00
if err != nil {
return err
}
// Compute the diff between the remote and local intentions.
deletes , updates := diffIntentions ( local , remote . Intentions )
txnOpSets := batchIntentionUpdates ( deletes , updates )
// Apply batched updates to the state store.
for _ , ops := range txnOpSets {
txnReq := structs . TxnRequest { Ops : ops }
resp , err := s . raftApply ( structs . TxnRequestType , & txnReq )
if err != nil {
return err
}
if respErr , ok := resp . ( error ) ; ok {
return respErr
}
if txnResp , ok := resp . ( structs . TxnResponse ) ; ok {
if len ( txnResp . Errors ) > 0 {
return txnResp . Error ( )
}
} else {
return fmt . Errorf ( "unexpected return type %T" , resp )
}
}
args . QueryOptions . MinQueryIndex = nextIndexVal ( args . QueryOptions . MinQueryIndex , remote . QueryMeta . Index )
return nil
} , func ( err error ) {
2020-01-28 23:50:41 +00:00
connectLogger . Error ( "error replicating intentions" ,
"routine" , intentionReplicationRoutineName ,
"error" , err ,
)
2019-06-24 18:21:51 +00:00
} )
2019-10-04 17:08:45 +00:00
return nil
2019-06-24 18:21:51 +00:00
}
// retryLoopBackoff loops a given function indefinitely, backing off exponentially
// upon errors up to a maximum of maxRetryBackoff seconds.
2020-06-24 16:36:14 +00:00
func retryLoopBackoff ( ctx context . Context , loopFn func ( ) error , errFn func ( error ) ) {
2020-09-04 09:47:16 +00:00
retryLoopBackoffHandleSuccess ( ctx , loopFn , errFn , false )
}
func retryLoopBackoffAbortOnSuccess ( ctx context . Context , loopFn func ( ) error , errFn func ( error ) ) {
retryLoopBackoffHandleSuccess ( ctx , loopFn , errFn , true )
}
func retryLoopBackoffHandleSuccess ( ctx context . Context , loopFn func ( ) error , errFn func ( error ) , abortOnSuccess bool ) {
2019-06-24 18:21:51 +00:00
var failedAttempts uint
limiter := rate . NewLimiter ( loopRateLimit , retryBucketSize )
for {
// Rate limit how often we run the loop
2020-06-24 16:36:14 +00:00
limiter . Wait ( ctx )
2019-06-24 18:21:51 +00:00
select {
2020-06-24 16:36:14 +00:00
case <- ctx . Done ( ) :
2019-06-24 18:21:51 +00:00
return
default :
}
if ( 1 << failedAttempts ) < maxRetryBackoff {
failedAttempts ++
}
retryTime := ( 1 << failedAttempts ) * time . Second
if err := loopFn ( ) ; err != nil {
errFn ( err )
2020-06-24 16:36:14 +00:00
timer := time . NewTimer ( retryTime )
select {
case <- ctx . Done ( ) :
timer . Stop ( )
return
case <- timer . C :
continue
}
2020-09-04 09:47:16 +00:00
} else if abortOnSuccess {
return
2019-06-24 18:21:51 +00:00
}
// Reset the failed attempts after a successful run.
failedAttempts = 0
}
}
// diffIntentions computes the difference between the local and remote intentions
// and returns lists of deletes and updates.
func diffIntentions ( local , remote structs . Intentions ) ( structs . Intentions , structs . Intentions ) {
2019-06-26 16:28:09 +00:00
localIdx := make ( map [ string ] [ ] byte , len ( local ) )
2019-06-24 18:21:51 +00:00
remoteIdx := make ( map [ string ] struct { } , len ( remote ) )
var deletes structs . Intentions
var updates structs . Intentions
for _ , intention := range local {
2019-06-26 16:28:09 +00:00
localIdx [ intention . ID ] = intention . Hash
2019-06-24 18:21:51 +00:00
}
for _ , intention := range remote {
remoteIdx [ intention . ID ] = struct { } { }
}
for _ , intention := range local {
if _ , ok := remoteIdx [ intention . ID ] ; ! ok {
deletes = append ( deletes , intention )
}
}
for _ , intention := range remote {
2019-06-26 16:28:09 +00:00
existingHash , ok := localIdx [ intention . ID ]
2019-06-24 18:21:51 +00:00
if ! ok {
updates = append ( updates , intention )
2019-06-26 16:28:09 +00:00
} else if bytes . Compare ( existingHash , intention . Hash ) != 0 {
2019-06-24 18:21:51 +00:00
updates = append ( updates , intention )
}
}
return deletes , updates
}
// batchIntentionUpdates breaks up the given updates into sets of TxnOps based
// on the estimated size of the operations.
func batchIntentionUpdates ( deletes , updates structs . Intentions ) [ ] structs . TxnOps {
var txnOps structs . TxnOps
for _ , delete := range deletes {
deleteOp := & structs . TxnIntentionOp {
Op : structs . IntentionOpDelete ,
Intention : delete ,
}
txnOps = append ( txnOps , & structs . TxnOp { Intention : deleteOp } )
}
for _ , update := range updates {
updateOp := & structs . TxnIntentionOp {
Op : structs . IntentionOpUpdate ,
Intention : update ,
}
txnOps = append ( txnOps , & structs . TxnOp { Intention : updateOp } )
}
// Divide the operations into chunks according to maxIntentionTxnSize.
var batchedOps [ ] structs . TxnOps
for batchStart := 0 ; batchStart < len ( txnOps ) ; {
// inner loop finds the last element to include in this batch.
batchSize := 0
batchEnd := batchStart
for ; batchEnd < len ( txnOps ) && batchSize < maxIntentionTxnSize ; batchEnd += 1 {
batchSize += txnOps [ batchEnd ] . Intention . Intention . EstimateSize ( )
}
batchedOps = append ( batchedOps , txnOps [ batchStart : batchEnd ] )
// txnOps[batchEnd] wasn't included as the slicing doesn't include the element at the stop index
batchStart = batchEnd
}
return batchedOps
}
// nextIndexVal computes the next index value to query for, resetting to zero
// if the index went backward.
func nextIndexVal ( prevIdx , idx uint64 ) uint64 {
if prevIdx > idx {
return 0
}
return idx
}
// initializeSecondaryProvider configures the given provider for a secondary, non-root datacenter.
2020-01-17 22:27:13 +00:00
// It is being called while holding caProviderReconfigurationLock which means
// it must never take that lock itself or call anything that does.
2019-06-24 18:21:51 +00:00
func ( s * Server ) initializeSecondaryProvider ( provider ca . Provider , roots structs . IndexedCARoots ) error {
if roots . TrustDomain == "" {
return fmt . Errorf ( "trust domain from primary datacenter is not initialized" )
}
clusterID := strings . Split ( roots . TrustDomain , "." ) [ 0 ]
2019-08-19 18:03:03 +00:00
_ , conf , err := s . fsm . State ( ) . CAConfig ( nil )
2019-06-24 18:21:51 +00:00
if err != nil {
return err
}
2019-11-18 14:22:19 +00:00
pCfg := ca . ProviderConfig {
ClusterID : clusterID ,
Datacenter : s . config . Datacenter ,
IsPrimary : false ,
RawConfig : conf . Config ,
State : conf . State ,
}
if err := provider . Configure ( pCfg ) ; err != nil {
2019-06-24 18:21:51 +00:00
return fmt . Errorf ( "error configuring provider: %v" , err )
}
s . actingSecondaryLock . Lock ( )
s . actingSecondaryCA = true
s . actingSecondaryLock . Unlock ( )
return nil
}
2020-01-17 22:27:13 +00:00
// configuredSecondaryCA is being called while holding caProviderReconfigurationLock
// which means it must never take that lock itself or call anything that does.
2019-06-24 18:21:51 +00:00
func ( s * Server ) configuredSecondaryCA ( ) bool {
s . actingSecondaryLock . RLock ( )
defer s . actingSecondaryLock . RUnlock ( )
return s . actingSecondaryCA
}
2020-01-17 22:27:13 +00:00
// halfTime returns a duration that is half the time between notBefore and
// notAfter.
func halfTime ( notBefore , notAfter time . Time ) time . Duration {
interval := notAfter . Sub ( notBefore )
return interval / 2
}
// lessThanHalfTimePassed decides if half the time between notBefore and
// notAfter has passed relative to now.
// lessThanHalfTimePassed is being called while holding caProviderReconfigurationLock
// which means it must never take that lock itself or call anything that does.
func lessThanHalfTimePassed ( now , notBefore , notAfter time . Time ) bool {
t := notBefore . Add ( halfTime ( notBefore , notAfter ) )
return t . Sub ( now ) > 0
}