Merge pull request #10445 from hashicorp/dnephin/ca-provider-explore
ca: isolate more of the CA logic in CAManager
This commit is contained in:
commit
a22bdb2ac9
|
@ -3,8 +3,6 @@ package ca
|
|||
import (
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
)
|
||||
|
||||
//go:generate mockery -name Provider -inpkg
|
||||
|
@ -171,13 +169,6 @@ type Provider interface {
|
|||
Cleanup(providerTypeChange bool, otherConfig map[string]interface{}) error
|
||||
}
|
||||
|
||||
// NeedsLogger is an optional interface that allows a CA provider to use the
|
||||
// Consul logger to output diagnostic messages.
|
||||
type NeedsLogger interface {
|
||||
// SetLogger will pass a configured Logger to the provider.
|
||||
SetLogger(logger hclog.Logger)
|
||||
}
|
||||
|
||||
// NeedsStop is an optional interface that allows a CA to define a function
|
||||
// to be called when the CA instance is no longer in use. This is different
|
||||
// from Cleanup(), as only the local provider instance is being shut down
|
||||
|
|
|
@ -18,7 +18,6 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -78,12 +77,9 @@ type AWSProvider struct {
|
|||
logger hclog.Logger
|
||||
}
|
||||
|
||||
// SetLogger implements NeedsLogger
|
||||
func (a *AWSProvider) SetLogger(logger hclog.Logger) {
|
||||
a.logger = logger.
|
||||
ResetNamed(logging.Connect).
|
||||
Named(logging.CA).
|
||||
Named(logging.AWS)
|
||||
// NewAWSProvider returns a new AWSProvider
|
||||
func NewAWSProvider(logger hclog.Logger) *AWSProvider {
|
||||
return &AWSProvider{logger: logger}
|
||||
}
|
||||
|
||||
// Configure implements Provider
|
||||
|
|
|
@ -7,9 +7,10 @@ import (
|
|||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/acmpca"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// skipIfAWSNotConfigured skips the test unless ENABLE_AWS_PCA_TESTS=true.
|
||||
|
@ -375,9 +376,7 @@ func TestAWSProvider_Cleanup(t *testing.T) {
|
|||
}
|
||||
|
||||
func testAWSProvider(t *testing.T, cfg ProviderConfig) *AWSProvider {
|
||||
p := &AWSProvider{}
|
||||
logger := testutil.Logger(t)
|
||||
p.SetLogger(logger)
|
||||
p := NewAWSProvider(testutil.Logger(t))
|
||||
require.NoError(t, p.Configure(cfg))
|
||||
return p
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@ import (
|
|||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -51,6 +50,11 @@ type ConsulProvider struct {
|
|||
sync.RWMutex
|
||||
}
|
||||
|
||||
// NewConsulProvider returns a new ConsulProvider that is ready to be used.
|
||||
func NewConsulProvider(delegate ConsulProviderStateDelegate, logger hclog.Logger) *ConsulProvider {
|
||||
return &ConsulProvider{Delegate: delegate, logger: logger}
|
||||
}
|
||||
|
||||
type ConsulProviderStateDelegate interface {
|
||||
State() *state.Store
|
||||
ApplyCARequest(*structs.CARequest) (interface{}, error)
|
||||
|
@ -114,23 +118,15 @@ func (c *ConsulProvider) Configure(cfg ProviderConfig) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Write the provider state to the state store.
|
||||
newState := structs.CAConsulProviderState{
|
||||
ID: c.id,
|
||||
}
|
||||
|
||||
args := &structs.CARequest{
|
||||
Op: structs.CAOpSetProviderState,
|
||||
ProviderState: &newState,
|
||||
ProviderState: &structs.CAConsulProviderState{ID: c.id},
|
||||
}
|
||||
if _, err := c.Delegate.ApplyCARequest(args); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.logger.Debug("consul CA provider configured",
|
||||
"id", c.id,
|
||||
"is_primary", c.isPrimary,
|
||||
)
|
||||
c.logger.Debug("consul CA provider configured", "id", c.id, "is_primary", c.isPrimary)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -667,14 +663,6 @@ func (c *ConsulProvider) generateCA(privateKey string, sn uint64) (string, error
|
|||
return buf.String(), nil
|
||||
}
|
||||
|
||||
// SetLogger implements the NeedsLogger interface so the provider can log important messages.
|
||||
func (c *ConsulProvider) SetLogger(logger hclog.Logger) {
|
||||
c.logger = logger.
|
||||
ResetNamed(logging.Connect).
|
||||
Named(logging.CA).
|
||||
Named(logging.Consul)
|
||||
}
|
||||
|
||||
func (c *ConsulProvider) parseTestState(rawConfig map[string]interface{}, state map[string]string) {
|
||||
c.testState = nil
|
||||
if rawTestState, ok := rawConfig["test_state"]; ok {
|
||||
|
|
|
@ -17,7 +17,6 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
)
|
||||
|
||||
const VaultCALeafCertRole = "leaf-cert"
|
||||
|
@ -38,8 +37,11 @@ type VaultProvider struct {
|
|||
logger hclog.Logger
|
||||
}
|
||||
|
||||
func NewVaultProvider() *VaultProvider {
|
||||
return &VaultProvider{shutdown: func() {}}
|
||||
func NewVaultProvider(logger hclog.Logger) *VaultProvider {
|
||||
return &VaultProvider{
|
||||
shutdown: func() {},
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func vaultTLSConfig(config *structs.VaultCAProviderConfig) *vaultapi.TLSConfig {
|
||||
|
@ -143,14 +145,6 @@ func (v *VaultProvider) renewToken(ctx context.Context, watcher *vaultapi.Lifeti
|
|||
}
|
||||
}
|
||||
|
||||
// SetLogger implements the NeedsLogger interface so the provider can log important messages.
|
||||
func (v *VaultProvider) SetLogger(logger hclog.Logger) {
|
||||
v.logger = logger.
|
||||
ResetNamed(logging.Connect).
|
||||
Named(logging.CA).
|
||||
Named(logging.Vault)
|
||||
}
|
||||
|
||||
// State implements Provider. Vault provider needs no state other than the
|
||||
// user-provided config currently.
|
||||
func (v *VaultProvider) State() (map[string]string, error) {
|
||||
|
|
|
@ -8,12 +8,13 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
vaultapi "github.com/hashicorp/vault/api"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
)
|
||||
|
||||
func TestVaultCAProvider_VaultTLSConfig(t *testing.T) {
|
||||
|
@ -485,7 +486,7 @@ func createVaultProvider(t *testing.T, isPrimary bool, addr, token string, rawCo
|
|||
conf[k] = v
|
||||
}
|
||||
|
||||
provider := NewVaultProvider()
|
||||
provider := NewVaultProvider(hclog.New(nil))
|
||||
|
||||
cfg := ProviderConfig{
|
||||
ClusterID: connect.TestClusterID,
|
||||
|
@ -494,11 +495,6 @@ func createVaultProvider(t *testing.T, isPrimary bool, addr, token string, rawCo
|
|||
RawConfig: conf,
|
||||
}
|
||||
|
||||
logger := hclog.New(&hclog.LoggerOptions{
|
||||
Output: ioutil.Discard,
|
||||
})
|
||||
provider.SetLogger(logger)
|
||||
|
||||
if !isPrimary {
|
||||
cfg.IsPrimary = false
|
||||
cfg.Datacenter = "dc2"
|
||||
|
|
|
@ -7,14 +7,15 @@ import (
|
|||
"os/exec"
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
vaultapi "github.com/hashicorp/vault/api"
|
||||
"github.com/mitchellh/go-testing-interface"
|
||||
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/sdk/freeport"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
vaultapi "github.com/hashicorp/vault/api"
|
||||
"github.com/mitchellh/go-testing-interface"
|
||||
)
|
||||
|
||||
// KeyTestCases is a list of the important CA key types that we should test
|
||||
|
@ -75,13 +76,9 @@ func CASigningKeyTypeCases() []CASigningKeyTypes {
|
|||
|
||||
// TestConsulProvider creates a new ConsulProvider, taking care to stub out it's
|
||||
// Logger so that logging calls don't panic. If logging output is important
|
||||
// SetLogger can be called again with another logger to capture logs.
|
||||
func TestConsulProvider(t testing.T, d ConsulProviderStateDelegate) *ConsulProvider {
|
||||
provider := &ConsulProvider{Delegate: d}
|
||||
logger := hclog.New(&hclog.LoggerOptions{
|
||||
Output: ioutil.Discard,
|
||||
})
|
||||
provider.SetLogger(logger)
|
||||
logger := hclog.New(&hclog.LoggerOptions{Output: ioutil.Discard})
|
||||
provider := &ConsulProvider{Delegate: d, logger: logger}
|
||||
return provider
|
||||
}
|
||||
|
||||
|
|
|
@ -1,20 +0,0 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
// consulCADelegate providers callbacks for the Consul CA provider
|
||||
// to use the state store for its operations.
|
||||
type consulCADelegate struct {
|
||||
srv *Server
|
||||
}
|
||||
|
||||
func (c *consulCADelegate) State() *state.Store {
|
||||
return c.srv.fsm.State()
|
||||
}
|
||||
|
||||
func (c *consulCADelegate) ApplyCARequest(req *structs.CARequest) (interface{}, error) {
|
||||
return c.srv.raftApply(structs.ConnectCARequestType, req)
|
||||
}
|
|
@ -387,9 +387,6 @@ func (s *Server) revokeLeadership() {
|
|||
|
||||
s.stopConnectLeader()
|
||||
|
||||
s.caManager.setCAProvider(nil, nil)
|
||||
s.caManager.setState(caStateUninitialized, false)
|
||||
|
||||
s.stopACLTokenReaping()
|
||||
|
||||
s.stopACLUpgrade()
|
||||
|
|
|
@ -2,12 +2,10 @@ package consul
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/hashicorp/consul/agent/connect/ca"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
)
|
||||
|
@ -49,36 +47,6 @@ func (s *Server) stopConnectLeader() {
|
|||
s.leaderRoutineManager.Stop(caRootPruningRoutineName)
|
||||
s.leaderRoutineManager.Stop(caRootMetricRoutineName)
|
||||
s.leaderRoutineManager.Stop(caSigningMetricRoutineName)
|
||||
|
||||
// If the provider implements NeedsStop, we call Stop to perform any shutdown actions.
|
||||
provider, _ := s.caManager.getCAProvider()
|
||||
if provider != nil {
|
||||
if needsStop, ok := provider.(ca.NeedsStop); ok {
|
||||
needsStop.Stop()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// createProvider returns a connect CA provider from the given config.
|
||||
func (s *Server) createCAProvider(conf *structs.CAConfiguration) (ca.Provider, error) {
|
||||
var p ca.Provider
|
||||
switch conf.Provider {
|
||||
case structs.ConsulCAProvider:
|
||||
p = &ca.ConsulProvider{Delegate: &consulCADelegate{s}}
|
||||
case structs.VaultCAProvider:
|
||||
p = ca.NewVaultProvider()
|
||||
case structs.AWSCAProvider:
|
||||
p = &ca.AWSProvider{}
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown CA provider %q", conf.Provider)
|
||||
}
|
||||
|
||||
// If the provider implements NeedsLogger, we give it our logger.
|
||||
if needsLogger, ok := p.(ca.NeedsLogger); ok {
|
||||
needsLogger.SetLogger(s.logger)
|
||||
}
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func (s *Server) runCARootPruning(ctx context.Context) error {
|
||||
|
@ -213,11 +181,3 @@ func lessThanHalfTimePassed(now, notBefore, notAfter time.Time) bool {
|
|||
t := notBefore.Add(halfTime(notBefore, notAfter))
|
||||
return t.Sub(now) > 0
|
||||
}
|
||||
|
||||
func (s *Server) generateCASignRequest(csr string) *structs.CASignRequest {
|
||||
return &structs.CASignRequest{
|
||||
Datacenter: s.config.PrimaryDatacenter,
|
||||
CSR: csr,
|
||||
WriteRequest: structs.WriteRequest{Token: s.tokens.ReplicationToken()},
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,22 +23,20 @@ type caState string
|
|||
|
||||
const (
|
||||
caStateUninitialized caState = "UNINITIALIZED"
|
||||
caStateInitializing = "INITIALIZING"
|
||||
caStateInitialized = "INITIALIZED"
|
||||
caStateRenewIntermediate = "RENEWING"
|
||||
caStateReconfig = "RECONFIGURING"
|
||||
caStateInitializing caState = "INITIALIZING"
|
||||
caStateInitialized caState = "INITIALIZED"
|
||||
caStateRenewIntermediate caState = "RENEWING"
|
||||
caStateReconfig caState = "RECONFIGURING"
|
||||
)
|
||||
|
||||
// caServerDelegate is an interface for server operations for facilitating
|
||||
// easier testing.
|
||||
type caServerDelegate interface {
|
||||
State() *state.Store
|
||||
ca.ConsulProviderStateDelegate
|
||||
IsLeader() bool
|
||||
|
||||
createCAProvider(conf *structs.CAConfiguration) (ca.Provider, error)
|
||||
forwardDC(method, dc string, args interface{}, reply interface{}) error
|
||||
generateCASignRequest(csr string) *structs.CASignRequest
|
||||
raftApply(t structs.MessageType, msg interface{}) (interface{}, error)
|
||||
|
||||
checkServersProvider
|
||||
}
|
||||
|
@ -68,6 +66,8 @@ type CAManager struct {
|
|||
actingSecondaryCA bool // True if this datacenter has been initialized as a secondary CA.
|
||||
|
||||
leaderRoutineManager *routine.Manager
|
||||
// providerShim is used to test CAManager with a fake provider.
|
||||
providerShim ca.Provider
|
||||
}
|
||||
|
||||
type caDelegateWithState struct {
|
||||
|
@ -78,6 +78,18 @@ func (c *caDelegateWithState) State() *state.Store {
|
|||
return c.fsm.State()
|
||||
}
|
||||
|
||||
func (c *caDelegateWithState) ApplyCARequest(req *structs.CARequest) (interface{}, error) {
|
||||
return c.Server.raftApplyMsgpack(structs.ConnectCARequestType, req)
|
||||
}
|
||||
|
||||
func (c *caDelegateWithState) generateCASignRequest(csr string) *structs.CASignRequest {
|
||||
return &structs.CASignRequest{
|
||||
Datacenter: c.Server.config.PrimaryDatacenter,
|
||||
CSR: csr,
|
||||
WriteRequest: structs.WriteRequest{Token: c.Server.tokens.ReplicationToken()},
|
||||
}
|
||||
}
|
||||
|
||||
func NewCAManager(delegate caServerDelegate, leaderRoutineManager *routine.Manager, logger hclog.Logger, config *Config) *CAManager {
|
||||
return &CAManager{
|
||||
delegate: delegate,
|
||||
|
@ -88,13 +100,6 @@ func NewCAManager(delegate caServerDelegate, leaderRoutineManager *routine.Manag
|
|||
}
|
||||
}
|
||||
|
||||
func (c *CAManager) reset() {
|
||||
c.state = caStateUninitialized
|
||||
c.primaryRoots = structs.IndexedCARoots{}
|
||||
c.actingSecondaryCA = false
|
||||
c.setCAProvider(nil, nil)
|
||||
}
|
||||
|
||||
// setState attempts to update the CA state to the given state.
|
||||
// Valid state transitions are:
|
||||
//
|
||||
|
@ -175,7 +180,7 @@ func (c *CAManager) initializeCAConfig() (*structs.CAConfiguration, error) {
|
|||
Op: structs.CAOpSetConfig,
|
||||
Config: config,
|
||||
}
|
||||
if resp, err := c.delegate.raftApply(structs.ConnectCARequestType, req); err != nil {
|
||||
if resp, err := c.delegate.ApplyCARequest(&req); err != nil {
|
||||
return nil, err
|
||||
} else if respErr, ok := resp.(error); ok {
|
||||
return nil, respErr
|
||||
|
@ -217,12 +222,10 @@ func parseCARoot(pemValue, provider, clusterID string) (*structs.CARoot, error)
|
|||
// as well as the active root.
|
||||
func (c *CAManager) getCAProvider() (ca.Provider, *structs.CARoot) {
|
||||
retries := 0
|
||||
var result ca.Provider
|
||||
var resultRoot *structs.CARoot
|
||||
for result == nil {
|
||||
for {
|
||||
c.providerLock.RLock()
|
||||
result = c.provider
|
||||
resultRoot = c.providerRoot
|
||||
result := c.provider
|
||||
resultRoot := c.providerRoot
|
||||
c.providerLock.RUnlock()
|
||||
|
||||
// In cases where an agent is started with managed proxies, we may ask
|
||||
|
@ -234,10 +237,8 @@ func (c *CAManager) getCAProvider() (ca.Provider, *structs.CARoot) {
|
|||
continue
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
return result, resultRoot
|
||||
}
|
||||
}
|
||||
|
||||
// setCAProvider is being called while holding the stateLock
|
||||
|
@ -271,6 +272,17 @@ func (c *CAManager) Stop() {
|
|||
c.leaderRoutineManager.Stop(secondaryCARootWatchRoutineName)
|
||||
c.leaderRoutineManager.Stop(intermediateCertRenewWatchRoutineName)
|
||||
c.leaderRoutineManager.Stop(backgroundCAInitializationRoutineName)
|
||||
|
||||
if provider, _ := c.getCAProvider(); provider != nil {
|
||||
if needsStop, ok := provider.(ca.NeedsStop); ok {
|
||||
needsStop.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
c.setState(caStateUninitialized, false)
|
||||
c.primaryRoots = structs.IndexedCARoots{}
|
||||
c.actingSecondaryCA = false
|
||||
c.setCAProvider(nil, nil)
|
||||
}
|
||||
|
||||
func (c *CAManager) startPostInitializeRoutines(ctx context.Context) {
|
||||
|
@ -336,7 +348,7 @@ func (c *CAManager) InitializeCA() (reterr error) {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
provider, err := c.delegate.createCAProvider(conf)
|
||||
provider, err := c.newProvider(conf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -386,6 +398,24 @@ func (c *CAManager) InitializeCA() (reterr error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
// createProvider returns a connect CA provider from the given config.
|
||||
func (c *CAManager) newProvider(conf *structs.CAConfiguration) (ca.Provider, error) {
|
||||
logger := c.logger.Named(conf.Provider)
|
||||
switch conf.Provider {
|
||||
case structs.ConsulCAProvider:
|
||||
return ca.NewConsulProvider(c.delegate, logger), nil
|
||||
case structs.VaultCAProvider:
|
||||
return ca.NewVaultProvider(logger), nil
|
||||
case structs.AWSCAProvider:
|
||||
return ca.NewAWSProvider(logger), nil
|
||||
default:
|
||||
if c.providerShim != nil {
|
||||
return c.providerShim, nil
|
||||
}
|
||||
return nil, fmt.Errorf("unknown CA provider %q", conf.Provider)
|
||||
}
|
||||
}
|
||||
|
||||
// initializeRootCA runs the initialization logic for a root CA. It should only
|
||||
// be called while the state lock is held by setting the state to non-ready.
|
||||
func (c *CAManager) initializeRootCA(provider ca.Provider, conf *structs.CAConfiguration) error {
|
||||
|
@ -436,7 +466,7 @@ func (c *CAManager) initializeRootCA(provider ca.Provider, conf *structs.CAConfi
|
|||
Op: structs.CAOpSetConfig,
|
||||
Config: conf,
|
||||
}
|
||||
if _, err = c.delegate.raftApply(structs.ConnectCARequestType, req); err != nil {
|
||||
if _, err = c.delegate.ApplyCARequest(&req); err != nil {
|
||||
return fmt.Errorf("error persisting provider state: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -485,7 +515,7 @@ func (c *CAManager) initializeRootCA(provider ca.Provider, conf *structs.CAConfi
|
|||
}
|
||||
|
||||
// Store the root cert in raft
|
||||
resp, err := c.delegate.raftApply(structs.ConnectCARequestType, &structs.CARequest{
|
||||
resp, err := c.delegate.ApplyCARequest(&structs.CARequest{
|
||||
Op: structs.CAOpSetRoots,
|
||||
Index: idx,
|
||||
Roots: []*structs.CARoot{rootCA},
|
||||
|
@ -693,7 +723,7 @@ func (c *CAManager) persistNewRootAndConfig(provider ca.Provider, newActiveRoot
|
|||
Roots: newRoots,
|
||||
Config: &newConf,
|
||||
}
|
||||
resp, err := c.delegate.raftApply(structs.ConnectCARequestType, args)
|
||||
resp, err := c.delegate.ApplyCARequest(args)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -766,7 +796,7 @@ func (c *CAManager) UpdateConfiguration(args *structs.CARequest) (reterr error)
|
|||
// and get the current active root CA. This acts as a good validation
|
||||
// of the config and makes sure the provider is functioning correctly
|
||||
// before we commit any changes to Raft.
|
||||
newProvider, err := c.delegate.createCAProvider(args.Config)
|
||||
newProvider, err := c.newProvider(args.Config)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not initialize provider: %v", err)
|
||||
}
|
||||
|
@ -834,7 +864,7 @@ func (c *CAManager) UpdateConfiguration(args *structs.CARequest) (reterr error)
|
|||
// If the root didn't change, just update the config and return.
|
||||
if root != nil && root.ID == newActiveRoot.ID {
|
||||
args.Op = structs.CAOpSetConfig
|
||||
resp, err := c.delegate.raftApply(structs.ConnectCARequestType, args)
|
||||
resp, err := c.delegate.ApplyCARequest(args)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -937,7 +967,7 @@ func (c *CAManager) UpdateConfiguration(args *structs.CARequest) (reterr error)
|
|||
args.Index = idx
|
||||
args.Config.ModifyIndex = confIdx
|
||||
args.Roots = newRoots
|
||||
resp, err := c.delegate.raftApply(structs.ConnectCARequestType, args)
|
||||
resp, err := c.delegate.ApplyCARequest(args)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -8,15 +8,18 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-version"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
ca "github.com/hashicorp/consul/agent/connect/ca"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/go-version"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TODO(kyhavlov): replace with t.Deadline()
|
||||
|
@ -59,15 +62,57 @@ func (m *mockCAServerDelegate) CheckServers(datacenter string, fn func(*metadata
|
|||
})
|
||||
}
|
||||
|
||||
// ApplyCARequest mirrors FSM.applyConnectCAOperation because that functionality
|
||||
// is not exported.
|
||||
func (m *mockCAServerDelegate) ApplyCARequest(req *structs.CARequest) (interface{}, error) {
|
||||
return ca.ApplyCARequestToStore(m.store, req)
|
||||
}
|
||||
idx, _, err := m.store.CAConfig(nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (m *mockCAServerDelegate) createCAProvider(conf *structs.CAConfiguration) (ca.Provider, error) {
|
||||
return &mockCAProvider{
|
||||
callbackCh: m.callbackCh,
|
||||
rootPEM: m.primaryRoot.RootCert,
|
||||
}, nil
|
||||
m.callbackCh <- fmt.Sprintf("raftApply/ConnectCA")
|
||||
|
||||
switch req.Op {
|
||||
case structs.CAOpSetConfig:
|
||||
if req.Config.ModifyIndex != 0 {
|
||||
act, err := m.store.CACheckAndSetConfig(idx+1, req.Config.ModifyIndex, req.Config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return act, nil
|
||||
}
|
||||
|
||||
return nil, m.store.CASetConfig(idx+1, req.Config)
|
||||
case structs.CAOpSetRootsAndConfig:
|
||||
act, err := m.store.CARootSetCAS(idx, req.Index, req.Roots)
|
||||
if err != nil || !act {
|
||||
return act, err
|
||||
}
|
||||
|
||||
act, err = m.store.CACheckAndSetConfig(idx+1, req.Config.ModifyIndex, req.Config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return act, nil
|
||||
case structs.CAOpSetProviderState:
|
||||
_, err := m.store.CASetProviderState(idx+1, req.ProviderState)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
case structs.CAOpDeleteProviderState:
|
||||
if err := m.store.CADeleteProviderState(idx+1, req.ProviderState.ID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
case structs.CAOpIncrementProviderSerialNumber:
|
||||
return uint64(2), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("Invalid CA operation '%s'", req.Op)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockCAServerDelegate) forwardDC(method, dc string, args interface{}, reply interface{}) error {
|
||||
|
@ -96,23 +141,6 @@ func (m *mockCAServerDelegate) generateCASignRequest(csr string) *structs.CASign
|
|||
}
|
||||
}
|
||||
|
||||
func (m *mockCAServerDelegate) raftApply(t structs.MessageType, msg interface{}) (interface{}, error) {
|
||||
if t == structs.ConnectCARequestType {
|
||||
req := msg.(*structs.CARequest)
|
||||
act, err := m.store.CARootSetCAS(1, req.Index, req.Roots)
|
||||
require.NoError(m.t, err)
|
||||
require.True(m.t, act)
|
||||
|
||||
act, err = m.store.CACheckAndSetConfig(1, req.Config.ModifyIndex, req.Config)
|
||||
require.NoError(m.t, err)
|
||||
require.True(m.t, act)
|
||||
} else {
|
||||
return nil, fmt.Errorf("got invalid MessageType %v", t)
|
||||
}
|
||||
m.callbackCh <- fmt.Sprintf("raftApply/%s", t)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// mockCAProvider mocks an empty provider implementation with a channel in order to coordinate
|
||||
// waiting for certain methods to be called.
|
||||
type mockCAProvider struct {
|
||||
|
@ -145,6 +173,7 @@ func (m *mockCAProvider) SupportsCrossSigning() (bool, error)
|
|||
func (m *mockCAProvider) Cleanup(_ bool, _ map[string]interface{}) error { return nil }
|
||||
|
||||
func waitForCh(t *testing.T, ch chan string, expected string) {
|
||||
t.Helper()
|
||||
select {
|
||||
case op := <-ch:
|
||||
if op != expected {
|
||||
|
@ -177,6 +206,7 @@ func testCAConfig() *structs.CAConfiguration {
|
|||
// initTestManager initializes a CAManager with a mockCAServerDelegate, consuming
|
||||
// the ops that come through the channels and returning when initialization has finished.
|
||||
func initTestManager(t *testing.T, manager *CAManager, delegate *mockCAServerDelegate) {
|
||||
t.Helper()
|
||||
initCh := make(chan struct{})
|
||||
go func() {
|
||||
require.NoError(t, manager.InitializeCA())
|
||||
|
@ -207,13 +237,19 @@ func TestCAManager_Initialize(t *testing.T) {
|
|||
conf.Datacenter = "dc2"
|
||||
delegate := NewMockCAServerDelegate(t, conf)
|
||||
manager := NewCAManager(delegate, nil, testutil.Logger(t), conf)
|
||||
manager.providerShim = &mockCAProvider{
|
||||
callbackCh: delegate.callbackCh,
|
||||
rootPEM: delegate.primaryRoot.RootCert,
|
||||
}
|
||||
|
||||
// Call InitializeCA and then confirm the RPCs and provider calls
|
||||
// happen in the expected order.
|
||||
require.EqualValues(t, caStateUninitialized, manager.state)
|
||||
require.Equal(t, caStateUninitialized, manager.state)
|
||||
errCh := make(chan error)
|
||||
go func() {
|
||||
errCh <- manager.InitializeCA()
|
||||
err := manager.InitializeCA()
|
||||
assert.NoError(t, err)
|
||||
errCh <- err
|
||||
}()
|
||||
|
||||
waitForCh(t, delegate.callbackCh, "forwardDC/ConnectCA.Roots")
|
||||
|
@ -232,7 +268,7 @@ func TestCAManager_Initialize(t *testing.T) {
|
|||
t.Fatal("never got result from errCh")
|
||||
}
|
||||
|
||||
require.EqualValues(t, caStateInitialized, manager.state)
|
||||
require.Equal(t, caStateInitialized, manager.state)
|
||||
}
|
||||
|
||||
func TestCAManager_UpdateConfigWhileRenewIntermediate(t *testing.T) {
|
||||
|
@ -257,6 +293,10 @@ func TestCAManager_UpdateConfigWhileRenewIntermediate(t *testing.T) {
|
|||
conf.Datacenter = "dc2"
|
||||
delegate := NewMockCAServerDelegate(t, conf)
|
||||
manager := NewCAManager(delegate, nil, testutil.Logger(t), conf)
|
||||
manager.providerShim = &mockCAProvider{
|
||||
callbackCh: delegate.callbackCh,
|
||||
rootPEM: delegate.primaryRoot.RootCert,
|
||||
}
|
||||
initTestManager(t, manager, delegate)
|
||||
|
||||
// Wait half the TTL for the cert to need renewing.
|
||||
|
@ -293,3 +333,10 @@ func TestCAManager_UpdateConfigWhileRenewIntermediate(t *testing.T) {
|
|||
|
||||
require.EqualValues(t, caStateInitialized, manager.state)
|
||||
}
|
||||
|
||||
func TestCADelegateWithState_GenerateCASignRequest(t *testing.T) {
|
||||
s := Server{config: &Config{PrimaryDatacenter: "east"}, tokens: new(token.Store)}
|
||||
d := &caDelegateWithState{Server: &s}
|
||||
req := d.generateCASignRequest("A")
|
||||
require.Equal(t, "east", req.RequestDatacenter())
|
||||
}
|
||||
|
|
|
@ -14,7 +14,6 @@ import (
|
|||
|
||||
uuid "github.com/hashicorp/go-uuid"
|
||||
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
|
@ -1009,13 +1008,6 @@ func getTestRoots(s *Server, datacenter string) (*structs.IndexedCARoots, *struc
|
|||
return &rootList, active, nil
|
||||
}
|
||||
|
||||
func TestLeader_GenerateCASignRequest(t *testing.T) {
|
||||
csr := "A"
|
||||
s := Server{config: &Config{PrimaryDatacenter: "east"}, tokens: new(token.Store)}
|
||||
req := s.generateCASignRequest(csr)
|
||||
assert.Equal(t, "east", req.RequestDatacenter())
|
||||
}
|
||||
|
||||
func TestLeader_CARootPruning(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
|
|
|
@ -468,7 +468,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
|
|||
return nil, fmt.Errorf("Failed to start Raft: %v", err)
|
||||
}
|
||||
|
||||
s.caManager = NewCAManager(&caDelegateWithState{s}, s.leaderRoutineManager, s.loggers.Named(logging.Connect), s.config)
|
||||
s.caManager = NewCAManager(&caDelegateWithState{s}, s.leaderRoutineManager, s.logger.ResetNamed("connect.ca"), s.config)
|
||||
if s.config.ConnectEnabled && (s.config.AutoEncryptAllowTLS || s.config.AutoConfigAuthzEnabled) {
|
||||
go s.connectCARootsMonitor(&lib.StopChannelContext{StopCh: s.shutdownCh})
|
||||
}
|
||||
|
|
|
@ -138,6 +138,7 @@ func (s *Server) getCARoots(ws memdb.WatchSet, state *state.Store) (*structs.Ind
|
|||
return indexedRoots, nil
|
||||
}
|
||||
|
||||
// TODO: Move this off Server. This is only called by RPC endpoints.
|
||||
func (s *Server) SignCertificate(csr *x509.CertificateRequest, spiffeID connect.CertURI) (*structs.IssuedCert, error) {
|
||||
provider, caRoot := s.caManager.getCAProvider()
|
||||
if provider == nil {
|
||||
|
|
|
@ -16,7 +16,6 @@ import (
|
|||
"github.com/hashicorp/memberlist"
|
||||
"github.com/hashicorp/raft"
|
||||
|
||||
"github.com/hashicorp/consul/agent/connect/ca"
|
||||
"github.com/hashicorp/consul/ipaddr"
|
||||
|
||||
"github.com/hashicorp/go-uuid"
|
||||
|
@ -1645,10 +1644,6 @@ func TestServer_CALogging(t *testing.T) {
|
|||
defer s1.Shutdown()
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
if _, ok := s1.caManager.provider.(ca.NeedsLogger); !ok {
|
||||
t.Fatalf("provider does not implement NeedsLogger")
|
||||
}
|
||||
|
||||
// Wait til CA root is setup
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
var out structs.IndexedCARoots
|
||||
|
|
Loading…
Reference in New Issue