Add CA server delegate interface for testing
This commit is contained in:
parent
c8d4a40a87
commit
26a9c985c5
|
@ -22,30 +22,7 @@ func (c *consulCAMockDelegate) State() *state.Store {
|
|||
}
|
||||
|
||||
func (c *consulCAMockDelegate) ApplyCARequest(req *structs.CARequest) (interface{}, error) {
|
||||
idx, _, err := c.state.CAConfig(nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch req.Op {
|
||||
case structs.CAOpSetProviderState:
|
||||
_, err := c.state.CASetProviderState(idx+1, req.ProviderState)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
case structs.CAOpDeleteProviderState:
|
||||
if err := c.state.CADeleteProviderState(idx+1, req.ProviderState.ID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
case structs.CAOpIncrementProviderSerialNumber:
|
||||
return uint64(2), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("Invalid CA operation '%s'", req.Op)
|
||||
}
|
||||
return ApplyCARequestToStore(c.state, req)
|
||||
}
|
||||
|
||||
func newMockDelegate(t *testing.T, conf *structs.CAConfiguration) *consulCAMockDelegate {
|
||||
|
|
|
@ -8,6 +8,8 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/sdk/freeport"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
@ -234,3 +236,30 @@ func (v *TestVaultServer) Stop() error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func ApplyCARequestToStore(store *state.Store, req *structs.CARequest) (interface{}, error) {
|
||||
idx, _, err := store.CAConfig(nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch req.Op {
|
||||
case structs.CAOpSetProviderState:
|
||||
_, err := store.CASetProviderState(idx+1, req.ProviderState)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
case structs.CAOpDeleteProviderState:
|
||||
if err := store.CADeleteProviderState(idx+1, req.ProviderState.ID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
case structs.CAOpIncrementProviderSerialNumber:
|
||||
return uint64(2), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("Invalid CA operation '%s'", req.Op)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,7 +56,7 @@ func ValidateLeaf(caPEM string, leafPEM string, intermediatePEMs []string) error
|
|||
return err
|
||||
}
|
||||
|
||||
func testCA(t testing.T, xc *structs.CARoot, keyType string, keyBits int) *structs.CARoot {
|
||||
func testCA(t testing.T, xc *structs.CARoot, keyType string, keyBits int, ttl time.Duration) *structs.CARoot {
|
||||
var result structs.CARoot
|
||||
result.Active = true
|
||||
result.Name = fmt.Sprintf("Test CA %d", atomic.AddUint64(&testCACounter, 1))
|
||||
|
@ -76,6 +76,14 @@ func testCA(t testing.T, xc *structs.CARoot, keyType string, keyBits int) *struc
|
|||
id := &SpiffeIDSigning{ClusterID: TestClusterID, Domain: "consul"}
|
||||
|
||||
// Create the CA cert
|
||||
now := time.Now()
|
||||
before := now
|
||||
after := now
|
||||
if ttl != 0 {
|
||||
after = after.Add(ttl)
|
||||
} else {
|
||||
after = after.AddDate(10, 0, 0)
|
||||
}
|
||||
template := x509.Certificate{
|
||||
SerialNumber: sn,
|
||||
Subject: pkix.Name{CommonName: result.Name},
|
||||
|
@ -85,8 +93,8 @@ func testCA(t testing.T, xc *structs.CARoot, keyType string, keyBits int) *struc
|
|||
x509.KeyUsageCRLSign |
|
||||
x509.KeyUsageDigitalSignature,
|
||||
IsCA: true,
|
||||
NotAfter: time.Now().AddDate(10, 0, 0),
|
||||
NotBefore: time.Now(),
|
||||
NotAfter: after,
|
||||
NotBefore: before,
|
||||
AuthorityKeyId: testKeyID(t, signer.Public()),
|
||||
SubjectKeyId: testKeyID(t, signer.Public()),
|
||||
}
|
||||
|
@ -159,13 +167,19 @@ func testCA(t testing.T, xc *structs.CARoot, keyType string, keyBits int) *struc
|
|||
// that is cross-signed with the previous cert, and this will be set as
|
||||
// SigningCert.
|
||||
func TestCA(t testing.T, xc *structs.CARoot) *structs.CARoot {
|
||||
return testCA(t, xc, DefaultPrivateKeyType, DefaultPrivateKeyBits)
|
||||
return testCA(t, xc, DefaultPrivateKeyType, DefaultPrivateKeyBits, 0)
|
||||
}
|
||||
|
||||
// TestCAWithTTL is similar to TestCA, except that it
|
||||
// takes a custom duration for the lifetime of the certificate.
|
||||
func TestCAWithTTL(t testing.T, xc *structs.CARoot, ttl time.Duration) *structs.CARoot {
|
||||
return testCA(t, xc, DefaultPrivateKeyType, DefaultPrivateKeyBits, ttl)
|
||||
}
|
||||
|
||||
// TestCAWithKeyType is similar to TestCA, except that it
|
||||
// takes two additional arguments to override the default private key type and size.
|
||||
func TestCAWithKeyType(t testing.T, xc *structs.CARoot, keyType string, keyBits int) *structs.CARoot {
|
||||
return testCA(t, xc, keyType, keyBits)
|
||||
return testCA(t, xc, keyType, keyBits, 0)
|
||||
}
|
||||
|
||||
// testCertID is an interface to be implemented the various spiffe ID / CertURI types
|
||||
|
|
|
@ -375,6 +375,7 @@ func (s *Server) revokeLeadership() {
|
|||
s.stopConnectLeader()
|
||||
|
||||
s.caManager.setCAProvider(nil, nil)
|
||||
s.caManager.setState(CAStateUninitialized, false)
|
||||
|
||||
s.stopACLTokenReaping()
|
||||
|
||||
|
|
|
@ -11,8 +11,8 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/connect/ca"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
uuid "github.com/hashicorp/go-uuid"
|
||||
)
|
||||
|
@ -27,11 +27,26 @@ const (
|
|||
CAStateReconfig = "RECONFIGURING"
|
||||
)
|
||||
|
||||
// caServerDelegate is an interface for server operations for facilitating
|
||||
// easier testing.
|
||||
type caServerDelegate interface {
|
||||
State() *state.Store
|
||||
IsLeader() bool
|
||||
|
||||
createCAProvider(conf *structs.CAConfiguration) (ca.Provider, error)
|
||||
forwardDC(method, dc string, args interface{}, reply interface{}) error
|
||||
generateCASignRequest(csr string) *structs.CASignRequest
|
||||
raftApply(t structs.MessageType, msg interface{}) (interface{}, error)
|
||||
|
||||
checkServersProvider
|
||||
}
|
||||
|
||||
// CAManager is a wrapper around CA operations such as updating roots, an intermediate
|
||||
// or the configuration. All operations should go through the CAManager in order to
|
||||
// avoid data races.
|
||||
type CAManager struct {
|
||||
srv *Server
|
||||
delegate caServerDelegate
|
||||
serverConf *Config
|
||||
logger hclog.Logger
|
||||
|
||||
providerLock sync.RWMutex
|
||||
|
@ -51,14 +66,30 @@ type CAManager struct {
|
|||
actingSecondaryCA bool // True if this datacenter has been initialized as a secondary CA.
|
||||
}
|
||||
|
||||
func NewCAManager(srv *Server) *CAManager {
|
||||
type caDelegateWithState struct {
|
||||
*Server
|
||||
}
|
||||
|
||||
func (c *caDelegateWithState) State() *state.Store {
|
||||
return c.fsm.State()
|
||||
}
|
||||
|
||||
func NewCAManager(delegate caServerDelegate, logger hclog.Logger, config *Config) *CAManager {
|
||||
return &CAManager{
|
||||
srv: srv,
|
||||
logger: srv.loggers.Named(logging.Connect),
|
||||
delegate: delegate,
|
||||
logger: logger,
|
||||
serverConf: config,
|
||||
state: CAStateUninitialized,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *CAManager) reset() {
|
||||
c.state = CAStateUninitialized
|
||||
c.primaryRoots = structs.IndexedCARoots{}
|
||||
c.actingSecondaryCA = false
|
||||
c.setCAProvider(nil, nil)
|
||||
}
|
||||
|
||||
// setState attempts to update the CA state to the given state.
|
||||
// If the current state is not READY, this will fail. The only exception is when
|
||||
// the current state is UNINITIALIZED, and the function is called with CAStateInitializing.
|
||||
|
@ -98,13 +129,13 @@ func (c *CAManager) getPrimaryRoots() structs.IndexedCARoots {
|
|||
// when setting up the CA during establishLeadership. The state should be set to
|
||||
// non-ready before calling this.
|
||||
func (c *CAManager) initializeCAConfig() (*structs.CAConfiguration, error) {
|
||||
state := c.srv.fsm.State()
|
||||
state := c.delegate.State()
|
||||
_, config, err := state.CAConfig(nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if config == nil {
|
||||
config = c.srv.config.CAConfig
|
||||
config = c.serverConf.CAConfig
|
||||
if config.ClusterID == "" {
|
||||
id, err := uuid.GenerateUUID()
|
||||
if err != nil {
|
||||
|
@ -129,7 +160,7 @@ func (c *CAManager) initializeCAConfig() (*structs.CAConfiguration, error) {
|
|||
Op: structs.CAOpSetConfig,
|
||||
Config: config,
|
||||
}
|
||||
if resp, err := c.srv.raftApply(structs.ConnectCARequestType, req); err != nil {
|
||||
if resp, err := c.delegate.raftApply(structs.ConnectCARequestType, req); err != nil {
|
||||
return nil, err
|
||||
} else if respErr, ok := resp.(error); ok {
|
||||
return nil, respErr
|
||||
|
@ -182,7 +213,7 @@ func (c *CAManager) getCAProvider() (ca.Provider, *structs.CARoot) {
|
|||
// In cases where an agent is started with managed proxies, we may ask
|
||||
// for the provider before establishLeadership completes. If we're the
|
||||
// leader, then wait and get the provider again
|
||||
if result == nil && c.srv.IsLeader() && retries < 10 {
|
||||
if result == nil && c.delegate.IsLeader() && retries < 10 {
|
||||
retries++
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
continue
|
||||
|
@ -208,7 +239,7 @@ func (c *CAManager) setCAProvider(newProvider ca.Provider, root *structs.CARoot)
|
|||
// if this is a secondary DC.
|
||||
func (c *CAManager) InitializeCA() error {
|
||||
// Bail if connect isn't enabled.
|
||||
if !c.srv.config.ConnectEnabled {
|
||||
if !c.serverConf.ConnectEnabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -224,7 +255,7 @@ func (c *CAManager) InitializeCA() error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
provider, err := c.srv.createCAProvider(conf)
|
||||
provider, err := c.delegate.createCAProvider(conf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -232,12 +263,12 @@ func (c *CAManager) InitializeCA() error {
|
|||
c.setCAProvider(provider, nil)
|
||||
|
||||
// Run the root CA initialization if this is the primary DC.
|
||||
if c.srv.config.PrimaryDatacenter == c.srv.config.Datacenter {
|
||||
if c.serverConf.PrimaryDatacenter == c.serverConf.Datacenter {
|
||||
return c.initializeRootCA(provider, conf)
|
||||
}
|
||||
|
||||
// If this isn't the primary DC, run the secondary DC routine if the primary has already been upgraded to at least 1.6.0
|
||||
versionOk, foundPrimary := ServersInDCMeetMinimumVersion(c.srv, c.srv.config.PrimaryDatacenter, minMultiDCConnectVersion)
|
||||
versionOk, foundPrimary := ServersInDCMeetMinimumVersion(c.delegate, c.serverConf.PrimaryDatacenter, minMultiDCConnectVersion)
|
||||
if !foundPrimary {
|
||||
c.logger.Warn("primary datacenter is configured but unreachable - deferring initialization of the secondary datacenter CA")
|
||||
// return nil because we will initialize the secondary CA later
|
||||
|
@ -252,10 +283,10 @@ func (c *CAManager) InitializeCA() error {
|
|||
|
||||
// Get the root CA to see if we need to refresh our intermediate.
|
||||
args := structs.DCSpecificRequest{
|
||||
Datacenter: c.srv.config.PrimaryDatacenter,
|
||||
Datacenter: c.serverConf.PrimaryDatacenter,
|
||||
}
|
||||
var roots structs.IndexedCARoots
|
||||
if err := c.srv.forwardDC("ConnectCA.Roots", c.srv.config.PrimaryDatacenter, &args, &roots); err != nil {
|
||||
if err := c.delegate.forwardDC("ConnectCA.Roots", c.serverConf.PrimaryDatacenter, &args, &roots); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.setPrimaryRoots(roots); err != nil {
|
||||
|
@ -279,7 +310,7 @@ func (c *CAManager) InitializeCA() error {
|
|||
func (c *CAManager) initializeRootCA(provider ca.Provider, conf *structs.CAConfiguration) error {
|
||||
pCfg := ca.ProviderConfig{
|
||||
ClusterID: conf.ClusterID,
|
||||
Datacenter: c.srv.config.Datacenter,
|
||||
Datacenter: c.serverConf.Datacenter,
|
||||
IsPrimary: true,
|
||||
RawConfig: conf.Config,
|
||||
State: conf.State,
|
||||
|
@ -324,7 +355,7 @@ func (c *CAManager) initializeRootCA(provider ca.Provider, conf *structs.CAConfi
|
|||
Op: structs.CAOpSetConfig,
|
||||
Config: conf,
|
||||
}
|
||||
if _, err = c.srv.raftApply(structs.ConnectCARequestType, req); err != nil {
|
||||
if _, err = c.delegate.raftApply(structs.ConnectCARequestType, req); err != nil {
|
||||
return fmt.Errorf("error persisting provider state: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -334,7 +365,7 @@ func (c *CAManager) initializeRootCA(provider ca.Provider, conf *structs.CAConfi
|
|||
// tied to the provider.
|
||||
// Every change to the CA after this initial bootstrapping should
|
||||
// be done through the rotation process.
|
||||
state := c.srv.fsm.State()
|
||||
state := c.delegate.State()
|
||||
_, activeRoot, err := state.CARootActive(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -359,7 +390,7 @@ func (c *CAManager) initializeRootCA(provider ca.Provider, conf *structs.CAConfi
|
|||
}
|
||||
|
||||
// Store the root cert in raft
|
||||
resp, err := c.srv.raftApply(structs.ConnectCARequestType, &structs.CARequest{
|
||||
resp, err := c.delegate.raftApply(structs.ConnectCARequestType, &structs.CARequest{
|
||||
Op: structs.CAOpSetRoots,
|
||||
Index: idx,
|
||||
Roots: []*structs.CARoot{rootCA},
|
||||
|
@ -424,7 +455,7 @@ func (c *CAManager) initializeSecondaryCA(provider ca.Provider, config *structs.
|
|||
// This will fetch the secondary's exact current representation of the
|
||||
// active root. Note that this data should only be used if the IDs
|
||||
// match, otherwise it's out of date and should be regenerated.
|
||||
_, activeSecondaryRoot, err = c.srv.fsm.State().CARootActive(nil)
|
||||
_, activeSecondaryRoot, err = c.delegate.State().CARootActive(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -474,7 +505,7 @@ func (c *CAManager) initializeSecondaryCA(provider ca.Provider, config *structs.
|
|||
}
|
||||
|
||||
// Update the roots list in the state store if there's a new active root.
|
||||
state := c.srv.fsm.State()
|
||||
state := c.delegate.State()
|
||||
_, activeRoot, err := state.CARootActive(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -498,7 +529,7 @@ func (c *CAManager) initializeSecondaryCA(provider ca.Provider, config *structs.
|
|||
// If newActiveRoot is non-nil, it will be appended to the current roots list.
|
||||
// If config is non-nil, it will be used to overwrite the existing config.
|
||||
func (c *CAManager) persistNewRootAndConfig(provider ca.Provider, newActiveRoot *structs.CARoot, config *structs.CAConfiguration) error {
|
||||
state := c.srv.fsm.State()
|
||||
state := c.delegate.State()
|
||||
idx, oldRoots, err := state.CARoots(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -554,7 +585,7 @@ func (c *CAManager) persistNewRootAndConfig(provider ca.Provider, newActiveRoot
|
|||
Roots: newRoots,
|
||||
Config: &newConf,
|
||||
}
|
||||
resp, err := c.srv.raftApply(structs.ConnectCARequestType, &args)
|
||||
resp, err := c.delegate.raftApply(structs.ConnectCARequestType, args)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -577,7 +608,7 @@ func (c *CAManager) UpdateConfiguration(args *structs.CARequest) error {
|
|||
defer c.setState(CAStateReady, false)
|
||||
|
||||
// Exit early if it's a no-op change
|
||||
state := c.srv.fsm.State()
|
||||
state := c.delegate.State()
|
||||
confIdx, config, err := state.CAConfig(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -610,15 +641,15 @@ func (c *CAManager) UpdateConfiguration(args *structs.CARequest) error {
|
|||
// and get the current active root CA. This acts as a good validation
|
||||
// of the config and makes sure the provider is functioning correctly
|
||||
// before we commit any changes to Raft.
|
||||
newProvider, err := c.srv.createCAProvider(args.Config)
|
||||
newProvider, err := c.delegate.createCAProvider(args.Config)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not initialize provider: %v", err)
|
||||
}
|
||||
pCfg := ca.ProviderConfig{
|
||||
ClusterID: args.Config.ClusterID,
|
||||
Datacenter: c.srv.config.Datacenter,
|
||||
Datacenter: c.serverConf.Datacenter,
|
||||
// This endpoint can be called in a secondary DC too so set this correctly.
|
||||
IsPrimary: c.srv.config.Datacenter == c.srv.config.PrimaryDatacenter,
|
||||
IsPrimary: c.serverConf.Datacenter == c.serverConf.PrimaryDatacenter,
|
||||
RawConfig: args.Config.Config,
|
||||
State: args.Config.State,
|
||||
}
|
||||
|
@ -637,8 +668,8 @@ func (c *CAManager) UpdateConfiguration(args *structs.CARequest) error {
|
|||
}()
|
||||
|
||||
// If this is a secondary, just check if the intermediate needs to be regenerated.
|
||||
if c.srv.config.Datacenter != c.srv.config.PrimaryDatacenter {
|
||||
if err := c.srv.caManager.initializeSecondaryCA(newProvider, args.Config); err != nil {
|
||||
if c.serverConf.Datacenter != c.serverConf.PrimaryDatacenter {
|
||||
if err := c.initializeSecondaryCA(newProvider, args.Config); err != nil {
|
||||
return fmt.Errorf("Error updating secondary datacenter CA config: %v", err)
|
||||
}
|
||||
cleanupNewProvider = false
|
||||
|
@ -678,7 +709,7 @@ func (c *CAManager) UpdateConfiguration(args *structs.CARequest) error {
|
|||
// If the root didn't change, just update the config and return.
|
||||
if root != nil && root.ID == newActiveRoot.ID {
|
||||
args.Op = structs.CAOpSetConfig
|
||||
resp, err := c.srv.raftApply(structs.ConnectCARequestType, args)
|
||||
resp, err := c.delegate.raftApply(structs.ConnectCARequestType, args)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -771,7 +802,7 @@ func (c *CAManager) UpdateConfiguration(args *structs.CARequest) error {
|
|||
args.Index = idx
|
||||
args.Config.ModifyIndex = confIdx
|
||||
args.Roots = newRoots
|
||||
resp, err := c.srv.raftApply(structs.ConnectCARequestType, args)
|
||||
resp, err := c.delegate.raftApply(structs.ConnectCARequestType, args)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -829,7 +860,7 @@ func (c *CAManager) getIntermediateCASigned(provider ca.Provider, newActiveRoot
|
|||
}
|
||||
|
||||
var intermediatePEM string
|
||||
if err := c.srv.forwardDC("ConnectCA.SignIntermediate", c.srv.config.PrimaryDatacenter, c.srv.generateCASignRequest(csr), &intermediatePEM); err != nil {
|
||||
if err := c.delegate.forwardDC("ConnectCA.SignIntermediate", c.serverConf.PrimaryDatacenter, c.delegate.generateCASignRequest(csr), &intermediatePEM); err != nil {
|
||||
// this is a failure in the primary and shouldn't be capable of erroring out our establishing leadership
|
||||
c.logger.Warn("Primary datacenter refused to sign our intermediate CA certificate", "error", err)
|
||||
return nil
|
||||
|
@ -855,7 +886,7 @@ func (c *CAManager) getIntermediateCASigned(provider ca.Provider, newActiveRoot
|
|||
|
||||
// intermediateCertRenewalWatch periodically attempts to renew the intermediate cert.
|
||||
func (c *CAManager) intermediateCertRenewalWatch(ctx context.Context) error {
|
||||
isPrimary := c.srv.config.Datacenter == c.srv.config.PrimaryDatacenter
|
||||
isPrimary := c.serverConf.Datacenter == c.serverConf.PrimaryDatacenter
|
||||
|
||||
for {
|
||||
select {
|
||||
|
@ -863,7 +894,7 @@ func (c *CAManager) intermediateCertRenewalWatch(ctx context.Context) error {
|
|||
return nil
|
||||
case <-time.After(structs.IntermediateCertRenewInterval):
|
||||
retryLoopBackoffAbortOnSuccess(ctx, func() error {
|
||||
return c.RenewIntermediate(isPrimary)
|
||||
return c.RenewIntermediate(ctx, isPrimary)
|
||||
}, func(err error) {
|
||||
c.logger.Error("error renewing intermediate certs",
|
||||
"routine", intermediateCertRenewWatchRoutineName,
|
||||
|
@ -877,7 +908,7 @@ func (c *CAManager) intermediateCertRenewalWatch(ctx context.Context) error {
|
|||
// RenewIntermediate checks the intermediate cert for
|
||||
// expiration. If more than half the time a cert is valid has passed,
|
||||
// it will try to renew it.
|
||||
func (c *CAManager) RenewIntermediate(isPrimary bool) error {
|
||||
func (c *CAManager) RenewIntermediate(ctx context.Context, isPrimary bool) error {
|
||||
// Grab the 'lock' right away so the provider/config can't be changed out while we check
|
||||
// the intermediate.
|
||||
if err := c.setState(CAStateRenewIntermediate, true); err != nil {
|
||||
|
@ -895,7 +926,7 @@ func (c *CAManager) RenewIntermediate(isPrimary bool) error {
|
|||
return fmt.Errorf("secondary CA is not yet configured.")
|
||||
}
|
||||
|
||||
state := c.srv.fsm.State()
|
||||
state := c.delegate.State()
|
||||
_, root, err := state.CARootActive(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -939,9 +970,20 @@ func (c *CAManager) RenewIntermediate(isPrimary bool) error {
|
|||
if !isPrimary {
|
||||
renewalFunc = c.getIntermediateCASigned
|
||||
}
|
||||
if err := renewalFunc(provider, activeRoot); err != nil {
|
||||
errCh := make(chan error)
|
||||
go func() {
|
||||
errCh <- renewalFunc(provider, activeRoot)
|
||||
}()
|
||||
|
||||
// Wait for the renewal func to return or for the context to be canceled.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case err := <-errCh:
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := c.persistNewRootAndConfig(provider, activeRoot, nil); err != nil {
|
||||
return err
|
||||
|
@ -956,18 +998,18 @@ func (c *CAManager) RenewIntermediate(isPrimary bool) error {
|
|||
// intermediate certificate.
|
||||
func (c *CAManager) secondaryCARootWatch(ctx context.Context) error {
|
||||
args := structs.DCSpecificRequest{
|
||||
Datacenter: c.srv.config.PrimaryDatacenter,
|
||||
Datacenter: c.serverConf.PrimaryDatacenter,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
// the maximum time the primary roots watch query can block before returning
|
||||
MaxQueryTime: c.srv.config.MaxQueryTime,
|
||||
MaxQueryTime: c.serverConf.MaxQueryTime,
|
||||
},
|
||||
}
|
||||
|
||||
c.logger.Debug("starting Connect CA root replication from primary datacenter", "primary", c.srv.config.PrimaryDatacenter)
|
||||
c.logger.Debug("starting Connect CA root replication from primary datacenter", "primary", c.serverConf.PrimaryDatacenter)
|
||||
|
||||
retryLoopBackoff(ctx, func() error {
|
||||
var roots structs.IndexedCARoots
|
||||
if err := c.srv.forwardDC("ConnectCA.Roots", c.srv.config.PrimaryDatacenter, &args, &roots); err != nil {
|
||||
if err := c.delegate.forwardDC("ConnectCA.Roots", c.serverConf.PrimaryDatacenter, &args, &roots); err != nil {
|
||||
return fmt.Errorf("Error retrieving the primary datacenter's roots: %v", err)
|
||||
}
|
||||
|
||||
|
@ -1016,7 +1058,7 @@ func (c *CAManager) UpdateRoots(roots structs.IndexedCARoots) error {
|
|||
return nil
|
||||
}
|
||||
if !c.configuredSecondaryCA() {
|
||||
versionOk, primaryFound := ServersInDCMeetMinimumVersion(c.srv, c.srv.config.PrimaryDatacenter, minMultiDCConnectVersion)
|
||||
versionOk, primaryFound := ServersInDCMeetMinimumVersion(c.delegate, c.serverConf.PrimaryDatacenter, minMultiDCConnectVersion)
|
||||
if !primaryFound {
|
||||
return fmt.Errorf("Primary datacenter is unreachable - deferring secondary CA initialization")
|
||||
}
|
||||
|
@ -1046,14 +1088,14 @@ func (c *CAManager) initializeSecondaryProvider(provider ca.Provider, roots stru
|
|||
}
|
||||
|
||||
clusterID := strings.Split(roots.TrustDomain, ".")[0]
|
||||
_, conf, err := c.srv.fsm.State().CAConfig(nil)
|
||||
_, conf, err := c.delegate.State().CAConfig(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
pCfg := ca.ProviderConfig{
|
||||
ClusterID: clusterID,
|
||||
Datacenter: c.srv.config.Datacenter,
|
||||
Datacenter: c.serverConf.Datacenter,
|
||||
IsPrimary: false,
|
||||
RawConfig: conf.Config,
|
||||
State: conf.State,
|
||||
|
|
|
@ -0,0 +1,255 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
ca "github.com/hashicorp/consul/agent/connect/ca"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/go-version"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type mockCAServerDelegate struct {
|
||||
t *testing.T
|
||||
config *Config
|
||||
store *state.Store
|
||||
primaryRoot *structs.CARoot
|
||||
callbackCh chan string
|
||||
}
|
||||
|
||||
func NewMockCAServerDelegate(t *testing.T, config *Config) *mockCAServerDelegate {
|
||||
delegate := &mockCAServerDelegate{
|
||||
t: t,
|
||||
config: config,
|
||||
store: state.NewStateStore(nil),
|
||||
primaryRoot: connect.TestCAWithTTL(t, nil, 1*time.Second),
|
||||
callbackCh: make(chan string, 0),
|
||||
}
|
||||
delegate.store.CASetConfig(1, testCAConfig())
|
||||
|
||||
return delegate
|
||||
}
|
||||
|
||||
func (m *mockCAServerDelegate) State() *state.Store {
|
||||
return m.store
|
||||
}
|
||||
|
||||
func (m *mockCAServerDelegate) IsLeader() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (m *mockCAServerDelegate) CheckServers(datacenter string, fn func(*metadata.Server) bool) {
|
||||
ver, _ := version.NewVersion("1.6.0")
|
||||
fn(&metadata.Server{
|
||||
Status: serf.StatusAlive,
|
||||
Build: *ver,
|
||||
})
|
||||
}
|
||||
|
||||
func (m *mockCAServerDelegate) ApplyCARequest(req *structs.CARequest) (interface{}, error) {
|
||||
return ca.ApplyCARequestToStore(m.store, req)
|
||||
}
|
||||
|
||||
func (m *mockCAServerDelegate) createCAProvider(conf *structs.CAConfiguration) (ca.Provider, error) {
|
||||
return &mockCAProvider{
|
||||
callbackCh: m.callbackCh,
|
||||
rootPEM: m.primaryRoot.RootCert,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *mockCAServerDelegate) forwardDC(method, dc string, args interface{}, reply interface{}) error {
|
||||
switch method {
|
||||
case "ConnectCA.Roots":
|
||||
roots := reply.(*structs.IndexedCARoots)
|
||||
roots.TrustDomain = connect.TestClusterID
|
||||
roots.Roots = []*structs.CARoot{m.primaryRoot}
|
||||
roots.ActiveRootID = m.primaryRoot.ID
|
||||
case "ConnectCA.SignIntermediate":
|
||||
r := reply.(*string)
|
||||
*r = m.primaryRoot.RootCert
|
||||
}
|
||||
|
||||
m.callbackCh <- fmt.Sprintf("forwardDC/%s", method)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockCAServerDelegate) generateCASignRequest(csr string) *structs.CASignRequest {
|
||||
return &structs.CASignRequest{
|
||||
Datacenter: m.config.PrimaryDatacenter,
|
||||
CSR: csr,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockCAServerDelegate) raftApply(t structs.MessageType, msg interface{}) (interface{}, error) {
|
||||
if t == structs.ConnectCARequestType {
|
||||
req := msg.(*structs.CARequest)
|
||||
act, err := m.store.CARootSetCAS(1, req.Index, req.Roots)
|
||||
require.NoError(m.t, err)
|
||||
require.True(m.t, act)
|
||||
|
||||
act, err = m.store.CACheckAndSetConfig(1, req.Config.ModifyIndex, req.Config)
|
||||
require.NoError(m.t, err)
|
||||
}
|
||||
m.callbackCh <- fmt.Sprintf("raftApply/%s", t)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// mockCAProvider mocks an empty provider implementation with a channel in order to coordinate
|
||||
// waiting for certain methods to be called.
|
||||
type mockCAProvider struct {
|
||||
callbackCh chan string
|
||||
rootPEM string
|
||||
}
|
||||
|
||||
func (m *mockCAProvider) Configure(cfg ca.ProviderConfig) error { return nil }
|
||||
func (m *mockCAProvider) State() (map[string]string, error) { return nil, nil }
|
||||
func (m *mockCAProvider) GenerateRoot() error { return nil }
|
||||
func (m *mockCAProvider) ActiveRoot() (string, error) { return m.rootPEM, nil }
|
||||
func (m *mockCAProvider) GenerateIntermediateCSR() (string, error) {
|
||||
m.callbackCh <- "provider/GenerateIntermediateCSR"
|
||||
return "", nil
|
||||
}
|
||||
func (m *mockCAProvider) SetIntermediate(intermediatePEM, rootPEM string) error {
|
||||
m.callbackCh <- "provider/SetIntermediate"
|
||||
return nil
|
||||
}
|
||||
func (m *mockCAProvider) ActiveIntermediate() (string, error) { return m.rootPEM, nil }
|
||||
func (m *mockCAProvider) GenerateIntermediate() (string, error) { return "", nil }
|
||||
func (m *mockCAProvider) Sign(*x509.CertificateRequest) (string, error) { return "", nil }
|
||||
func (m *mockCAProvider) SignIntermediate(*x509.CertificateRequest) (string, error) { return "", nil }
|
||||
func (m *mockCAProvider) CrossSignCA(*x509.Certificate) (string, error) { return "", nil }
|
||||
func (m *mockCAProvider) SupportsCrossSigning() (bool, error) { return false, nil }
|
||||
func (m *mockCAProvider) Cleanup() error { return nil }
|
||||
|
||||
func waitForCh(t *testing.T, ch chan string, expected string) {
|
||||
select {
|
||||
case op := <-ch:
|
||||
if op != expected {
|
||||
t.Fatalf("got unexpected op %q, wanted %q", op, expected)
|
||||
}
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatalf("never got op %q", expected)
|
||||
}
|
||||
}
|
||||
|
||||
func waitForEmptyCh(t *testing.T, ch chan string) {
|
||||
select {
|
||||
case op := <-ch:
|
||||
t.Fatalf("got unexpected op %q", op)
|
||||
case <-time.After(1 * time.Second):
|
||||
}
|
||||
}
|
||||
|
||||
func testCAConfig() *structs.CAConfiguration {
|
||||
return &structs.CAConfiguration{
|
||||
ClusterID: connect.TestClusterID,
|
||||
Provider: "mock",
|
||||
Config: map[string]interface{}{
|
||||
"LeafCertTTL": "72h",
|
||||
"IntermediateCertTTL": "2160h",
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// initTestManager initializes a CAManager with a mockCAServerDelegate, consuming
|
||||
// the ops that come through the channels and returning when initialization has finished.
|
||||
func initTestManager(t *testing.T, manager *CAManager, delegate *mockCAServerDelegate) {
|
||||
initCh := make(chan struct{})
|
||||
go func() {
|
||||
require.NoError(t, manager.InitializeCA())
|
||||
close(initCh)
|
||||
}()
|
||||
for i := 0; i < 5; i++ {
|
||||
select {
|
||||
case <-delegate.callbackCh:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("failed waiting for initialization events")
|
||||
}
|
||||
}
|
||||
select {
|
||||
case <-initCh:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("failed waiting for initialization")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCAManager_Initialize(t *testing.T) {
|
||||
conf := DefaultConfig()
|
||||
conf.ConnectEnabled = true
|
||||
conf.PrimaryDatacenter = "dc1"
|
||||
conf.Datacenter = "dc2"
|
||||
delegate := NewMockCAServerDelegate(t, conf)
|
||||
manager := NewCAManager(delegate, testutil.Logger(t), conf)
|
||||
|
||||
// Call InitializeCA and then confirm the RPCs and provider calls
|
||||
// happen in the expected order.
|
||||
go func() {
|
||||
require.EqualValues(t, CAStateUninitialized, manager.state)
|
||||
require.NoError(t, manager.InitializeCA())
|
||||
require.EqualValues(t, CAStateReady, manager.state)
|
||||
}()
|
||||
|
||||
waitForCh(t, delegate.callbackCh, "forwardDC/ConnectCA.Roots")
|
||||
require.EqualValues(t, CAStateInitializing, manager.state)
|
||||
waitForCh(t, delegate.callbackCh, "provider/GenerateIntermediateCSR")
|
||||
waitForCh(t, delegate.callbackCh, "forwardDC/ConnectCA.SignIntermediate")
|
||||
waitForCh(t, delegate.callbackCh, "provider/SetIntermediate")
|
||||
waitForCh(t, delegate.callbackCh, "raftApply/ConnectCA")
|
||||
waitForEmptyCh(t, delegate.callbackCh)
|
||||
}
|
||||
|
||||
func TestCAManager_UpdateConfigWhileRenewIntermediate(t *testing.T) {
|
||||
// No parallel execution because we change globals
|
||||
// Set the interval and drift buffer low for renewing the cert.
|
||||
origInterval := structs.IntermediateCertRenewInterval
|
||||
origDriftBuffer := ca.CertificateTimeDriftBuffer
|
||||
defer func() {
|
||||
structs.IntermediateCertRenewInterval = origInterval
|
||||
ca.CertificateTimeDriftBuffer = origDriftBuffer
|
||||
}()
|
||||
structs.IntermediateCertRenewInterval = time.Millisecond
|
||||
ca.CertificateTimeDriftBuffer = 0
|
||||
|
||||
conf := DefaultConfig()
|
||||
conf.ConnectEnabled = true
|
||||
conf.PrimaryDatacenter = "dc1"
|
||||
conf.Datacenter = "dc2"
|
||||
delegate := NewMockCAServerDelegate(t, conf)
|
||||
manager := NewCAManager(delegate, testutil.Logger(t), conf)
|
||||
initTestManager(t, manager, delegate)
|
||||
|
||||
// Wait half the TTL for the cert to need renewing.
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
// Call RenewIntermediate and then confirm the RPCs and provider calls
|
||||
// happen in the expected order.
|
||||
go func() {
|
||||
require.NoError(t, manager.RenewIntermediate(context.TODO(), false))
|
||||
require.EqualValues(t, CAStateReady, manager.state)
|
||||
}()
|
||||
|
||||
waitForCh(t, delegate.callbackCh, "provider/GenerateIntermediateCSR")
|
||||
|
||||
// Call UpdateConfiguration while RenewIntermediate is still in-flight to
|
||||
// make sure we get an error about the state being occupied.
|
||||
go func() {
|
||||
require.EqualValues(t, CAStateRenewIntermediate, manager.state)
|
||||
require.Error(t, errors.New("already in state"), manager.UpdateConfiguration(&structs.CARequest{}))
|
||||
}()
|
||||
|
||||
waitForCh(t, delegate.callbackCh, "forwardDC/ConnectCA.SignIntermediate")
|
||||
waitForCh(t, delegate.callbackCh, "provider/SetIntermediate")
|
||||
waitForCh(t, delegate.callbackCh, "raftApply/ConnectCA")
|
||||
waitForEmptyCh(t, delegate.callbackCh)
|
||||
}
|
|
@ -466,7 +466,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
|
|||
return nil, fmt.Errorf("Failed to start Raft: %v", err)
|
||||
}
|
||||
|
||||
s.caManager = NewCAManager(s)
|
||||
s.caManager = NewCAManager(&caDelegateWithState{s}, s.loggers.Named(logging.Connect), s.config)
|
||||
if s.config.ConnectEnabled && (s.config.AutoEncryptAllowTLS || s.config.AutoConfigAuthzEnabled) {
|
||||
go s.connectCARootsMonitor(&lib.StopChannelContext{StopCh: s.shutdownCh})
|
||||
}
|
||||
|
|
|
@ -151,7 +151,7 @@ type ACLServiceIdentity struct {
|
|||
|
||||
func (s *ACLServiceIdentity) Clone() *ACLServiceIdentity {
|
||||
s2 := *s
|
||||
s2.Datacenters = cloneStringSlice(s.Datacenters)
|
||||
s2.Datacenters = CloneStringSlice(s.Datacenters)
|
||||
return &s2
|
||||
}
|
||||
|
||||
|
@ -666,7 +666,7 @@ func (t *ACLPolicy) UnmarshalJSON(data []byte) error {
|
|||
|
||||
func (p *ACLPolicy) Clone() *ACLPolicy {
|
||||
p2 := *p
|
||||
p2.Datacenters = cloneStringSlice(p.Datacenters)
|
||||
p2.Datacenters = CloneStringSlice(p.Datacenters)
|
||||
return &p2
|
||||
}
|
||||
|
||||
|
@ -1460,7 +1460,7 @@ type ACLPolicyBatchDeleteRequest struct {
|
|||
PolicyIDs []string
|
||||
}
|
||||
|
||||
func cloneStringSlice(s []string) []string {
|
||||
func CloneStringSlice(s []string) []string {
|
||||
if len(s) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -300,7 +300,7 @@ func (p *IntentionHTTPPermission) Clone() *IntentionHTTPPermission {
|
|||
}
|
||||
}
|
||||
|
||||
p2.Methods = cloneStringSlice(p.Methods)
|
||||
p2.Methods = CloneStringSlice(p.Methods)
|
||||
|
||||
return &p2
|
||||
}
|
||||
|
|
|
@ -128,7 +128,7 @@ func (c *CARoot) Clone() *CARoot {
|
|||
}
|
||||
|
||||
newCopy := *c
|
||||
copy(c.IntermediateCerts, newCopy.IntermediateCerts)
|
||||
newCopy.IntermediateCerts = CloneStringSlice(c.IntermediateCerts)
|
||||
return &newCopy
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue