diff --git a/agent/connect/ca/provider_consul_test.go b/agent/connect/ca/provider_consul_test.go index 759769049..8733293d8 100644 --- a/agent/connect/ca/provider_consul_test.go +++ b/agent/connect/ca/provider_consul_test.go @@ -22,30 +22,7 @@ func (c *consulCAMockDelegate) State() *state.Store { } func (c *consulCAMockDelegate) ApplyCARequest(req *structs.CARequest) (interface{}, error) { - idx, _, err := c.state.CAConfig(nil) - if err != nil { - return nil, err - } - - switch req.Op { - case structs.CAOpSetProviderState: - _, err := c.state.CASetProviderState(idx+1, req.ProviderState) - if err != nil { - return nil, err - } - - return true, nil - case structs.CAOpDeleteProviderState: - if err := c.state.CADeleteProviderState(idx+1, req.ProviderState.ID); err != nil { - return nil, err - } - - return true, nil - case structs.CAOpIncrementProviderSerialNumber: - return uint64(2), nil - default: - return nil, fmt.Errorf("Invalid CA operation '%s'", req.Op) - } + return ApplyCARequestToStore(c.state, req) } func newMockDelegate(t *testing.T, conf *structs.CAConfiguration) *consulCAMockDelegate { diff --git a/agent/connect/ca/testing.go b/agent/connect/ca/testing.go index 25533f8dd..0bd837f7f 100644 --- a/agent/connect/ca/testing.go +++ b/agent/connect/ca/testing.go @@ -8,6 +8,8 @@ import ( "sync" "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/sdk/freeport" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/go-hclog" @@ -234,3 +236,30 @@ func (v *TestVaultServer) Stop() error { return nil } + +func ApplyCARequestToStore(store *state.Store, req *structs.CARequest) (interface{}, error) { + idx, _, err := store.CAConfig(nil) + if err != nil { + return nil, err + } + + switch req.Op { + case structs.CAOpSetProviderState: + _, err := store.CASetProviderState(idx+1, req.ProviderState) + if err != nil { + return nil, err + } + + return true, nil + case structs.CAOpDeleteProviderState: + if err := store.CADeleteProviderState(idx+1, req.ProviderState.ID); err != nil { + return nil, err + } + + return true, nil + case structs.CAOpIncrementProviderSerialNumber: + return uint64(2), nil + default: + return nil, fmt.Errorf("Invalid CA operation '%s'", req.Op) + } +} diff --git a/agent/connect/testing_ca.go b/agent/connect/testing_ca.go index e623c1872..4851b9db5 100644 --- a/agent/connect/testing_ca.go +++ b/agent/connect/testing_ca.go @@ -56,7 +56,7 @@ func ValidateLeaf(caPEM string, leafPEM string, intermediatePEMs []string) error return err } -func testCA(t testing.T, xc *structs.CARoot, keyType string, keyBits int) *structs.CARoot { +func testCA(t testing.T, xc *structs.CARoot, keyType string, keyBits int, ttl time.Duration) *structs.CARoot { var result structs.CARoot result.Active = true result.Name = fmt.Sprintf("Test CA %d", atomic.AddUint64(&testCACounter, 1)) @@ -76,6 +76,14 @@ func testCA(t testing.T, xc *structs.CARoot, keyType string, keyBits int) *struc id := &SpiffeIDSigning{ClusterID: TestClusterID, Domain: "consul"} // Create the CA cert + now := time.Now() + before := now + after := now + if ttl != 0 { + after = after.Add(ttl) + } else { + after = after.AddDate(10, 0, 0) + } template := x509.Certificate{ SerialNumber: sn, Subject: pkix.Name{CommonName: result.Name}, @@ -85,8 +93,8 @@ func testCA(t testing.T, xc *structs.CARoot, keyType string, keyBits int) *struc x509.KeyUsageCRLSign | x509.KeyUsageDigitalSignature, IsCA: true, - NotAfter: time.Now().AddDate(10, 0, 0), - NotBefore: time.Now(), + NotAfter: after, + NotBefore: before, AuthorityKeyId: testKeyID(t, signer.Public()), SubjectKeyId: testKeyID(t, signer.Public()), } @@ -159,13 +167,19 @@ func testCA(t testing.T, xc *structs.CARoot, keyType string, keyBits int) *struc // that is cross-signed with the previous cert, and this will be set as // SigningCert. func TestCA(t testing.T, xc *structs.CARoot) *structs.CARoot { - return testCA(t, xc, DefaultPrivateKeyType, DefaultPrivateKeyBits) + return testCA(t, xc, DefaultPrivateKeyType, DefaultPrivateKeyBits, 0) +} + +// TestCAWithTTL is similar to TestCA, except that it +// takes a custom duration for the lifetime of the certificate. +func TestCAWithTTL(t testing.T, xc *structs.CARoot, ttl time.Duration) *structs.CARoot { + return testCA(t, xc, DefaultPrivateKeyType, DefaultPrivateKeyBits, ttl) } // TestCAWithKeyType is similar to TestCA, except that it // takes two additional arguments to override the default private key type and size. func TestCAWithKeyType(t testing.T, xc *structs.CARoot, keyType string, keyBits int) *structs.CARoot { - return testCA(t, xc, keyType, keyBits) + return testCA(t, xc, keyType, keyBits, 0) } // testCertID is an interface to be implemented the various spiffe ID / CertURI types diff --git a/agent/consul/leader.go b/agent/consul/leader.go index b4b200c80..797ffeae8 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -375,6 +375,7 @@ func (s *Server) revokeLeadership() { s.stopConnectLeader() s.caManager.setCAProvider(nil, nil) + s.caManager.setState(CAStateUninitialized, false) s.stopACLTokenReaping() diff --git a/agent/consul/leader_connect_ca.go b/agent/consul/leader_connect_ca.go index d6b94978d..5e42b5f9d 100644 --- a/agent/consul/leader_connect_ca.go +++ b/agent/consul/leader_connect_ca.go @@ -11,8 +11,8 @@ import ( "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect/ca" + "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/consul/logging" "github.com/hashicorp/go-hclog" uuid "github.com/hashicorp/go-uuid" ) @@ -27,12 +27,27 @@ const ( CAStateReconfig = "RECONFIGURING" ) +// caServerDelegate is an interface for server operations for facilitating +// easier testing. +type caServerDelegate interface { + State() *state.Store + IsLeader() bool + + createCAProvider(conf *structs.CAConfiguration) (ca.Provider, error) + forwardDC(method, dc string, args interface{}, reply interface{}) error + generateCASignRequest(csr string) *structs.CASignRequest + raftApply(t structs.MessageType, msg interface{}) (interface{}, error) + + checkServersProvider +} + // CAManager is a wrapper around CA operations such as updating roots, an intermediate // or the configuration. All operations should go through the CAManager in order to // avoid data races. type CAManager struct { - srv *Server - logger hclog.Logger + delegate caServerDelegate + serverConf *Config + logger hclog.Logger providerLock sync.RWMutex // provider is the current CA provider in use for Connect. This is @@ -51,14 +66,30 @@ type CAManager struct { actingSecondaryCA bool // True if this datacenter has been initialized as a secondary CA. } -func NewCAManager(srv *Server) *CAManager { +type caDelegateWithState struct { + *Server +} + +func (c *caDelegateWithState) State() *state.Store { + return c.fsm.State() +} + +func NewCAManager(delegate caServerDelegate, logger hclog.Logger, config *Config) *CAManager { return &CAManager{ - srv: srv, - logger: srv.loggers.Named(logging.Connect), - state: CAStateUninitialized, + delegate: delegate, + logger: logger, + serverConf: config, + state: CAStateUninitialized, } } +func (c *CAManager) reset() { + c.state = CAStateUninitialized + c.primaryRoots = structs.IndexedCARoots{} + c.actingSecondaryCA = false + c.setCAProvider(nil, nil) +} + // setState attempts to update the CA state to the given state. // If the current state is not READY, this will fail. The only exception is when // the current state is UNINITIALIZED, and the function is called with CAStateInitializing. @@ -98,13 +129,13 @@ func (c *CAManager) getPrimaryRoots() structs.IndexedCARoots { // when setting up the CA during establishLeadership. The state should be set to // non-ready before calling this. func (c *CAManager) initializeCAConfig() (*structs.CAConfiguration, error) { - state := c.srv.fsm.State() + state := c.delegate.State() _, config, err := state.CAConfig(nil) if err != nil { return nil, err } if config == nil { - config = c.srv.config.CAConfig + config = c.serverConf.CAConfig if config.ClusterID == "" { id, err := uuid.GenerateUUID() if err != nil { @@ -129,7 +160,7 @@ func (c *CAManager) initializeCAConfig() (*structs.CAConfiguration, error) { Op: structs.CAOpSetConfig, Config: config, } - if resp, err := c.srv.raftApply(structs.ConnectCARequestType, req); err != nil { + if resp, err := c.delegate.raftApply(structs.ConnectCARequestType, req); err != nil { return nil, err } else if respErr, ok := resp.(error); ok { return nil, respErr @@ -182,7 +213,7 @@ func (c *CAManager) getCAProvider() (ca.Provider, *structs.CARoot) { // In cases where an agent is started with managed proxies, we may ask // for the provider before establishLeadership completes. If we're the // leader, then wait and get the provider again - if result == nil && c.srv.IsLeader() && retries < 10 { + if result == nil && c.delegate.IsLeader() && retries < 10 { retries++ time.Sleep(50 * time.Millisecond) continue @@ -208,7 +239,7 @@ func (c *CAManager) setCAProvider(newProvider ca.Provider, root *structs.CARoot) // if this is a secondary DC. func (c *CAManager) InitializeCA() error { // Bail if connect isn't enabled. - if !c.srv.config.ConnectEnabled { + if !c.serverConf.ConnectEnabled { return nil } @@ -224,7 +255,7 @@ func (c *CAManager) InitializeCA() error { if err != nil { return err } - provider, err := c.srv.createCAProvider(conf) + provider, err := c.delegate.createCAProvider(conf) if err != nil { return err } @@ -232,12 +263,12 @@ func (c *CAManager) InitializeCA() error { c.setCAProvider(provider, nil) // Run the root CA initialization if this is the primary DC. - if c.srv.config.PrimaryDatacenter == c.srv.config.Datacenter { + if c.serverConf.PrimaryDatacenter == c.serverConf.Datacenter { return c.initializeRootCA(provider, conf) } // If this isn't the primary DC, run the secondary DC routine if the primary has already been upgraded to at least 1.6.0 - versionOk, foundPrimary := ServersInDCMeetMinimumVersion(c.srv, c.srv.config.PrimaryDatacenter, minMultiDCConnectVersion) + versionOk, foundPrimary := ServersInDCMeetMinimumVersion(c.delegate, c.serverConf.PrimaryDatacenter, minMultiDCConnectVersion) if !foundPrimary { c.logger.Warn("primary datacenter is configured but unreachable - deferring initialization of the secondary datacenter CA") // return nil because we will initialize the secondary CA later @@ -252,10 +283,10 @@ func (c *CAManager) InitializeCA() error { // Get the root CA to see if we need to refresh our intermediate. args := structs.DCSpecificRequest{ - Datacenter: c.srv.config.PrimaryDatacenter, + Datacenter: c.serverConf.PrimaryDatacenter, } var roots structs.IndexedCARoots - if err := c.srv.forwardDC("ConnectCA.Roots", c.srv.config.PrimaryDatacenter, &args, &roots); err != nil { + if err := c.delegate.forwardDC("ConnectCA.Roots", c.serverConf.PrimaryDatacenter, &args, &roots); err != nil { return err } if err := c.setPrimaryRoots(roots); err != nil { @@ -279,7 +310,7 @@ func (c *CAManager) InitializeCA() error { func (c *CAManager) initializeRootCA(provider ca.Provider, conf *structs.CAConfiguration) error { pCfg := ca.ProviderConfig{ ClusterID: conf.ClusterID, - Datacenter: c.srv.config.Datacenter, + Datacenter: c.serverConf.Datacenter, IsPrimary: true, RawConfig: conf.Config, State: conf.State, @@ -324,7 +355,7 @@ func (c *CAManager) initializeRootCA(provider ca.Provider, conf *structs.CAConfi Op: structs.CAOpSetConfig, Config: conf, } - if _, err = c.srv.raftApply(structs.ConnectCARequestType, req); err != nil { + if _, err = c.delegate.raftApply(structs.ConnectCARequestType, req); err != nil { return fmt.Errorf("error persisting provider state: %v", err) } } @@ -334,7 +365,7 @@ func (c *CAManager) initializeRootCA(provider ca.Provider, conf *structs.CAConfi // tied to the provider. // Every change to the CA after this initial bootstrapping should // be done through the rotation process. - state := c.srv.fsm.State() + state := c.delegate.State() _, activeRoot, err := state.CARootActive(nil) if err != nil { return err @@ -359,7 +390,7 @@ func (c *CAManager) initializeRootCA(provider ca.Provider, conf *structs.CAConfi } // Store the root cert in raft - resp, err := c.srv.raftApply(structs.ConnectCARequestType, &structs.CARequest{ + resp, err := c.delegate.raftApply(structs.ConnectCARequestType, &structs.CARequest{ Op: structs.CAOpSetRoots, Index: idx, Roots: []*structs.CARoot{rootCA}, @@ -424,7 +455,7 @@ func (c *CAManager) initializeSecondaryCA(provider ca.Provider, config *structs. // This will fetch the secondary's exact current representation of the // active root. Note that this data should only be used if the IDs // match, otherwise it's out of date and should be regenerated. - _, activeSecondaryRoot, err = c.srv.fsm.State().CARootActive(nil) + _, activeSecondaryRoot, err = c.delegate.State().CARootActive(nil) if err != nil { return err } @@ -474,7 +505,7 @@ func (c *CAManager) initializeSecondaryCA(provider ca.Provider, config *structs. } // Update the roots list in the state store if there's a new active root. - state := c.srv.fsm.State() + state := c.delegate.State() _, activeRoot, err := state.CARootActive(nil) if err != nil { return err @@ -498,7 +529,7 @@ func (c *CAManager) initializeSecondaryCA(provider ca.Provider, config *structs. // If newActiveRoot is non-nil, it will be appended to the current roots list. // If config is non-nil, it will be used to overwrite the existing config. func (c *CAManager) persistNewRootAndConfig(provider ca.Provider, newActiveRoot *structs.CARoot, config *structs.CAConfiguration) error { - state := c.srv.fsm.State() + state := c.delegate.State() idx, oldRoots, err := state.CARoots(nil) if err != nil { return err @@ -554,7 +585,7 @@ func (c *CAManager) persistNewRootAndConfig(provider ca.Provider, newActiveRoot Roots: newRoots, Config: &newConf, } - resp, err := c.srv.raftApply(structs.ConnectCARequestType, &args) + resp, err := c.delegate.raftApply(structs.ConnectCARequestType, args) if err != nil { return err } @@ -577,7 +608,7 @@ func (c *CAManager) UpdateConfiguration(args *structs.CARequest) error { defer c.setState(CAStateReady, false) // Exit early if it's a no-op change - state := c.srv.fsm.State() + state := c.delegate.State() confIdx, config, err := state.CAConfig(nil) if err != nil { return err @@ -610,15 +641,15 @@ func (c *CAManager) UpdateConfiguration(args *structs.CARequest) error { // and get the current active root CA. This acts as a good validation // of the config and makes sure the provider is functioning correctly // before we commit any changes to Raft. - newProvider, err := c.srv.createCAProvider(args.Config) + newProvider, err := c.delegate.createCAProvider(args.Config) if err != nil { return fmt.Errorf("could not initialize provider: %v", err) } pCfg := ca.ProviderConfig{ ClusterID: args.Config.ClusterID, - Datacenter: c.srv.config.Datacenter, + Datacenter: c.serverConf.Datacenter, // This endpoint can be called in a secondary DC too so set this correctly. - IsPrimary: c.srv.config.Datacenter == c.srv.config.PrimaryDatacenter, + IsPrimary: c.serverConf.Datacenter == c.serverConf.PrimaryDatacenter, RawConfig: args.Config.Config, State: args.Config.State, } @@ -637,8 +668,8 @@ func (c *CAManager) UpdateConfiguration(args *structs.CARequest) error { }() // If this is a secondary, just check if the intermediate needs to be regenerated. - if c.srv.config.Datacenter != c.srv.config.PrimaryDatacenter { - if err := c.srv.caManager.initializeSecondaryCA(newProvider, args.Config); err != nil { + if c.serverConf.Datacenter != c.serverConf.PrimaryDatacenter { + if err := c.initializeSecondaryCA(newProvider, args.Config); err != nil { return fmt.Errorf("Error updating secondary datacenter CA config: %v", err) } cleanupNewProvider = false @@ -678,7 +709,7 @@ func (c *CAManager) UpdateConfiguration(args *structs.CARequest) error { // If the root didn't change, just update the config and return. if root != nil && root.ID == newActiveRoot.ID { args.Op = structs.CAOpSetConfig - resp, err := c.srv.raftApply(structs.ConnectCARequestType, args) + resp, err := c.delegate.raftApply(structs.ConnectCARequestType, args) if err != nil { return err } @@ -771,7 +802,7 @@ func (c *CAManager) UpdateConfiguration(args *structs.CARequest) error { args.Index = idx args.Config.ModifyIndex = confIdx args.Roots = newRoots - resp, err := c.srv.raftApply(structs.ConnectCARequestType, args) + resp, err := c.delegate.raftApply(structs.ConnectCARequestType, args) if err != nil { return err } @@ -829,7 +860,7 @@ func (c *CAManager) getIntermediateCASigned(provider ca.Provider, newActiveRoot } var intermediatePEM string - if err := c.srv.forwardDC("ConnectCA.SignIntermediate", c.srv.config.PrimaryDatacenter, c.srv.generateCASignRequest(csr), &intermediatePEM); err != nil { + if err := c.delegate.forwardDC("ConnectCA.SignIntermediate", c.serverConf.PrimaryDatacenter, c.delegate.generateCASignRequest(csr), &intermediatePEM); err != nil { // this is a failure in the primary and shouldn't be capable of erroring out our establishing leadership c.logger.Warn("Primary datacenter refused to sign our intermediate CA certificate", "error", err) return nil @@ -855,7 +886,7 @@ func (c *CAManager) getIntermediateCASigned(provider ca.Provider, newActiveRoot // intermediateCertRenewalWatch periodically attempts to renew the intermediate cert. func (c *CAManager) intermediateCertRenewalWatch(ctx context.Context) error { - isPrimary := c.srv.config.Datacenter == c.srv.config.PrimaryDatacenter + isPrimary := c.serverConf.Datacenter == c.serverConf.PrimaryDatacenter for { select { @@ -863,7 +894,7 @@ func (c *CAManager) intermediateCertRenewalWatch(ctx context.Context) error { return nil case <-time.After(structs.IntermediateCertRenewInterval): retryLoopBackoffAbortOnSuccess(ctx, func() error { - return c.RenewIntermediate(isPrimary) + return c.RenewIntermediate(ctx, isPrimary) }, func(err error) { c.logger.Error("error renewing intermediate certs", "routine", intermediateCertRenewWatchRoutineName, @@ -877,7 +908,7 @@ func (c *CAManager) intermediateCertRenewalWatch(ctx context.Context) error { // RenewIntermediate checks the intermediate cert for // expiration. If more than half the time a cert is valid has passed, // it will try to renew it. -func (c *CAManager) RenewIntermediate(isPrimary bool) error { +func (c *CAManager) RenewIntermediate(ctx context.Context, isPrimary bool) error { // Grab the 'lock' right away so the provider/config can't be changed out while we check // the intermediate. if err := c.setState(CAStateRenewIntermediate, true); err != nil { @@ -895,7 +926,7 @@ func (c *CAManager) RenewIntermediate(isPrimary bool) error { return fmt.Errorf("secondary CA is not yet configured.") } - state := c.srv.fsm.State() + state := c.delegate.State() _, root, err := state.CARootActive(nil) if err != nil { return err @@ -939,8 +970,19 @@ func (c *CAManager) RenewIntermediate(isPrimary bool) error { if !isPrimary { renewalFunc = c.getIntermediateCASigned } - if err := renewalFunc(provider, activeRoot); err != nil { - return err + errCh := make(chan error) + go func() { + errCh <- renewalFunc(provider, activeRoot) + }() + + // Wait for the renewal func to return or for the context to be canceled. + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-errCh: + if err != nil { + return err + } } if err := c.persistNewRootAndConfig(provider, activeRoot, nil); err != nil { @@ -956,18 +998,18 @@ func (c *CAManager) RenewIntermediate(isPrimary bool) error { // intermediate certificate. func (c *CAManager) secondaryCARootWatch(ctx context.Context) error { args := structs.DCSpecificRequest{ - Datacenter: c.srv.config.PrimaryDatacenter, + Datacenter: c.serverConf.PrimaryDatacenter, QueryOptions: structs.QueryOptions{ // the maximum time the primary roots watch query can block before returning - MaxQueryTime: c.srv.config.MaxQueryTime, + MaxQueryTime: c.serverConf.MaxQueryTime, }, } - c.logger.Debug("starting Connect CA root replication from primary datacenter", "primary", c.srv.config.PrimaryDatacenter) + c.logger.Debug("starting Connect CA root replication from primary datacenter", "primary", c.serverConf.PrimaryDatacenter) retryLoopBackoff(ctx, func() error { var roots structs.IndexedCARoots - if err := c.srv.forwardDC("ConnectCA.Roots", c.srv.config.PrimaryDatacenter, &args, &roots); err != nil { + if err := c.delegate.forwardDC("ConnectCA.Roots", c.serverConf.PrimaryDatacenter, &args, &roots); err != nil { return fmt.Errorf("Error retrieving the primary datacenter's roots: %v", err) } @@ -1016,7 +1058,7 @@ func (c *CAManager) UpdateRoots(roots structs.IndexedCARoots) error { return nil } if !c.configuredSecondaryCA() { - versionOk, primaryFound := ServersInDCMeetMinimumVersion(c.srv, c.srv.config.PrimaryDatacenter, minMultiDCConnectVersion) + versionOk, primaryFound := ServersInDCMeetMinimumVersion(c.delegate, c.serverConf.PrimaryDatacenter, minMultiDCConnectVersion) if !primaryFound { return fmt.Errorf("Primary datacenter is unreachable - deferring secondary CA initialization") } @@ -1046,14 +1088,14 @@ func (c *CAManager) initializeSecondaryProvider(provider ca.Provider, roots stru } clusterID := strings.Split(roots.TrustDomain, ".")[0] - _, conf, err := c.srv.fsm.State().CAConfig(nil) + _, conf, err := c.delegate.State().CAConfig(nil) if err != nil { return err } pCfg := ca.ProviderConfig{ ClusterID: clusterID, - Datacenter: c.srv.config.Datacenter, + Datacenter: c.serverConf.Datacenter, IsPrimary: false, RawConfig: conf.Config, State: conf.State, diff --git a/agent/consul/leader_connect_ca_test.go b/agent/consul/leader_connect_ca_test.go new file mode 100644 index 000000000..a582f90ba --- /dev/null +++ b/agent/consul/leader_connect_ca_test.go @@ -0,0 +1,255 @@ +package consul + +import ( + "context" + "crypto/x509" + "errors" + "fmt" + "testing" + "time" + + "github.com/hashicorp/consul/agent/connect" + ca "github.com/hashicorp/consul/agent/connect/ca" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/metadata" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/sdk/testutil" + "github.com/hashicorp/go-version" + "github.com/hashicorp/serf/serf" + "github.com/stretchr/testify/require" +) + +type mockCAServerDelegate struct { + t *testing.T + config *Config + store *state.Store + primaryRoot *structs.CARoot + callbackCh chan string +} + +func NewMockCAServerDelegate(t *testing.T, config *Config) *mockCAServerDelegate { + delegate := &mockCAServerDelegate{ + t: t, + config: config, + store: state.NewStateStore(nil), + primaryRoot: connect.TestCAWithTTL(t, nil, 1*time.Second), + callbackCh: make(chan string, 0), + } + delegate.store.CASetConfig(1, testCAConfig()) + + return delegate +} + +func (m *mockCAServerDelegate) State() *state.Store { + return m.store +} + +func (m *mockCAServerDelegate) IsLeader() bool { + return true +} + +func (m *mockCAServerDelegate) CheckServers(datacenter string, fn func(*metadata.Server) bool) { + ver, _ := version.NewVersion("1.6.0") + fn(&metadata.Server{ + Status: serf.StatusAlive, + Build: *ver, + }) +} + +func (m *mockCAServerDelegate) ApplyCARequest(req *structs.CARequest) (interface{}, error) { + return ca.ApplyCARequestToStore(m.store, req) +} + +func (m *mockCAServerDelegate) createCAProvider(conf *structs.CAConfiguration) (ca.Provider, error) { + return &mockCAProvider{ + callbackCh: m.callbackCh, + rootPEM: m.primaryRoot.RootCert, + }, nil +} + +func (m *mockCAServerDelegate) forwardDC(method, dc string, args interface{}, reply interface{}) error { + switch method { + case "ConnectCA.Roots": + roots := reply.(*structs.IndexedCARoots) + roots.TrustDomain = connect.TestClusterID + roots.Roots = []*structs.CARoot{m.primaryRoot} + roots.ActiveRootID = m.primaryRoot.ID + case "ConnectCA.SignIntermediate": + r := reply.(*string) + *r = m.primaryRoot.RootCert + } + + m.callbackCh <- fmt.Sprintf("forwardDC/%s", method) + + return nil +} + +func (m *mockCAServerDelegate) generateCASignRequest(csr string) *structs.CASignRequest { + return &structs.CASignRequest{ + Datacenter: m.config.PrimaryDatacenter, + CSR: csr, + } +} + +func (m *mockCAServerDelegate) raftApply(t structs.MessageType, msg interface{}) (interface{}, error) { + if t == structs.ConnectCARequestType { + req := msg.(*structs.CARequest) + act, err := m.store.CARootSetCAS(1, req.Index, req.Roots) + require.NoError(m.t, err) + require.True(m.t, act) + + act, err = m.store.CACheckAndSetConfig(1, req.Config.ModifyIndex, req.Config) + require.NoError(m.t, err) + } + m.callbackCh <- fmt.Sprintf("raftApply/%s", t) + return nil, nil +} + +// mockCAProvider mocks an empty provider implementation with a channel in order to coordinate +// waiting for certain methods to be called. +type mockCAProvider struct { + callbackCh chan string + rootPEM string +} + +func (m *mockCAProvider) Configure(cfg ca.ProviderConfig) error { return nil } +func (m *mockCAProvider) State() (map[string]string, error) { return nil, nil } +func (m *mockCAProvider) GenerateRoot() error { return nil } +func (m *mockCAProvider) ActiveRoot() (string, error) { return m.rootPEM, nil } +func (m *mockCAProvider) GenerateIntermediateCSR() (string, error) { + m.callbackCh <- "provider/GenerateIntermediateCSR" + return "", nil +} +func (m *mockCAProvider) SetIntermediate(intermediatePEM, rootPEM string) error { + m.callbackCh <- "provider/SetIntermediate" + return nil +} +func (m *mockCAProvider) ActiveIntermediate() (string, error) { return m.rootPEM, nil } +func (m *mockCAProvider) GenerateIntermediate() (string, error) { return "", nil } +func (m *mockCAProvider) Sign(*x509.CertificateRequest) (string, error) { return "", nil } +func (m *mockCAProvider) SignIntermediate(*x509.CertificateRequest) (string, error) { return "", nil } +func (m *mockCAProvider) CrossSignCA(*x509.Certificate) (string, error) { return "", nil } +func (m *mockCAProvider) SupportsCrossSigning() (bool, error) { return false, nil } +func (m *mockCAProvider) Cleanup() error { return nil } + +func waitForCh(t *testing.T, ch chan string, expected string) { + select { + case op := <-ch: + if op != expected { + t.Fatalf("got unexpected op %q, wanted %q", op, expected) + } + case <-time.After(3 * time.Second): + t.Fatalf("never got op %q", expected) + } +} + +func waitForEmptyCh(t *testing.T, ch chan string) { + select { + case op := <-ch: + t.Fatalf("got unexpected op %q", op) + case <-time.After(1 * time.Second): + } +} + +func testCAConfig() *structs.CAConfiguration { + return &structs.CAConfiguration{ + ClusterID: connect.TestClusterID, + Provider: "mock", + Config: map[string]interface{}{ + "LeafCertTTL": "72h", + "IntermediateCertTTL": "2160h", + }, + } +} + +// initTestManager initializes a CAManager with a mockCAServerDelegate, consuming +// the ops that come through the channels and returning when initialization has finished. +func initTestManager(t *testing.T, manager *CAManager, delegate *mockCAServerDelegate) { + initCh := make(chan struct{}) + go func() { + require.NoError(t, manager.InitializeCA()) + close(initCh) + }() + for i := 0; i < 5; i++ { + select { + case <-delegate.callbackCh: + case <-time.After(3 * time.Second): + t.Fatal("failed waiting for initialization events") + } + } + select { + case <-initCh: + case <-time.After(3 * time.Second): + t.Fatal("failed waiting for initialization") + } +} + +func TestCAManager_Initialize(t *testing.T) { + conf := DefaultConfig() + conf.ConnectEnabled = true + conf.PrimaryDatacenter = "dc1" + conf.Datacenter = "dc2" + delegate := NewMockCAServerDelegate(t, conf) + manager := NewCAManager(delegate, testutil.Logger(t), conf) + + // Call InitializeCA and then confirm the RPCs and provider calls + // happen in the expected order. + go func() { + require.EqualValues(t, CAStateUninitialized, manager.state) + require.NoError(t, manager.InitializeCA()) + require.EqualValues(t, CAStateReady, manager.state) + }() + + waitForCh(t, delegate.callbackCh, "forwardDC/ConnectCA.Roots") + require.EqualValues(t, CAStateInitializing, manager.state) + waitForCh(t, delegate.callbackCh, "provider/GenerateIntermediateCSR") + waitForCh(t, delegate.callbackCh, "forwardDC/ConnectCA.SignIntermediate") + waitForCh(t, delegate.callbackCh, "provider/SetIntermediate") + waitForCh(t, delegate.callbackCh, "raftApply/ConnectCA") + waitForEmptyCh(t, delegate.callbackCh) +} + +func TestCAManager_UpdateConfigWhileRenewIntermediate(t *testing.T) { + // No parallel execution because we change globals + // Set the interval and drift buffer low for renewing the cert. + origInterval := structs.IntermediateCertRenewInterval + origDriftBuffer := ca.CertificateTimeDriftBuffer + defer func() { + structs.IntermediateCertRenewInterval = origInterval + ca.CertificateTimeDriftBuffer = origDriftBuffer + }() + structs.IntermediateCertRenewInterval = time.Millisecond + ca.CertificateTimeDriftBuffer = 0 + + conf := DefaultConfig() + conf.ConnectEnabled = true + conf.PrimaryDatacenter = "dc1" + conf.Datacenter = "dc2" + delegate := NewMockCAServerDelegate(t, conf) + manager := NewCAManager(delegate, testutil.Logger(t), conf) + initTestManager(t, manager, delegate) + + // Wait half the TTL for the cert to need renewing. + time.Sleep(500 * time.Millisecond) + + // Call RenewIntermediate and then confirm the RPCs and provider calls + // happen in the expected order. + go func() { + require.NoError(t, manager.RenewIntermediate(context.TODO(), false)) + require.EqualValues(t, CAStateReady, manager.state) + }() + + waitForCh(t, delegate.callbackCh, "provider/GenerateIntermediateCSR") + + // Call UpdateConfiguration while RenewIntermediate is still in-flight to + // make sure we get an error about the state being occupied. + go func() { + require.EqualValues(t, CAStateRenewIntermediate, manager.state) + require.Error(t, errors.New("already in state"), manager.UpdateConfiguration(&structs.CARequest{})) + }() + + waitForCh(t, delegate.callbackCh, "forwardDC/ConnectCA.SignIntermediate") + waitForCh(t, delegate.callbackCh, "provider/SetIntermediate") + waitForCh(t, delegate.callbackCh, "raftApply/ConnectCA") + waitForEmptyCh(t, delegate.callbackCh) +} diff --git a/agent/consul/server.go b/agent/consul/server.go index 0f34f25ba..d5884a869 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -466,7 +466,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) { return nil, fmt.Errorf("Failed to start Raft: %v", err) } - s.caManager = NewCAManager(s) + s.caManager = NewCAManager(&caDelegateWithState{s}, s.loggers.Named(logging.Connect), s.config) if s.config.ConnectEnabled && (s.config.AutoEncryptAllowTLS || s.config.AutoConfigAuthzEnabled) { go s.connectCARootsMonitor(&lib.StopChannelContext{StopCh: s.shutdownCh}) } diff --git a/agent/structs/acl.go b/agent/structs/acl.go index de01c6e2f..fef8d0875 100644 --- a/agent/structs/acl.go +++ b/agent/structs/acl.go @@ -151,7 +151,7 @@ type ACLServiceIdentity struct { func (s *ACLServiceIdentity) Clone() *ACLServiceIdentity { s2 := *s - s2.Datacenters = cloneStringSlice(s.Datacenters) + s2.Datacenters = CloneStringSlice(s.Datacenters) return &s2 } @@ -666,7 +666,7 @@ func (t *ACLPolicy) UnmarshalJSON(data []byte) error { func (p *ACLPolicy) Clone() *ACLPolicy { p2 := *p - p2.Datacenters = cloneStringSlice(p.Datacenters) + p2.Datacenters = CloneStringSlice(p.Datacenters) return &p2 } @@ -1460,7 +1460,7 @@ type ACLPolicyBatchDeleteRequest struct { PolicyIDs []string } -func cloneStringSlice(s []string) []string { +func CloneStringSlice(s []string) []string { if len(s) == 0 { return nil } diff --git a/agent/structs/config_entry_intentions.go b/agent/structs/config_entry_intentions.go index 7a737d67d..21e55224c 100644 --- a/agent/structs/config_entry_intentions.go +++ b/agent/structs/config_entry_intentions.go @@ -300,7 +300,7 @@ func (p *IntentionHTTPPermission) Clone() *IntentionHTTPPermission { } } - p2.Methods = cloneStringSlice(p.Methods) + p2.Methods = CloneStringSlice(p.Methods) return &p2 } diff --git a/agent/structs/connect_ca.go b/agent/structs/connect_ca.go index dc96334f8..54f156141 100644 --- a/agent/structs/connect_ca.go +++ b/agent/structs/connect_ca.go @@ -128,7 +128,7 @@ func (c *CARoot) Clone() *CARoot { } newCopy := *c - copy(c.IntermediateCerts, newCopy.IntermediateCerts) + newCopy.IntermediateCerts = CloneStringSlice(c.IntermediateCerts) return &newCopy }