Add server certificate manager
This certificate manager will request a leaf certificate for server agents and then keep them up to date.
This commit is contained in:
parent
13dc01c553
commit
0c3853a2d0
|
@ -37,6 +37,7 @@ import (
|
||||||
"github.com/hashicorp/consul/agent/checks"
|
"github.com/hashicorp/consul/agent/checks"
|
||||||
"github.com/hashicorp/consul/agent/config"
|
"github.com/hashicorp/consul/agent/config"
|
||||||
"github.com/hashicorp/consul/agent/consul"
|
"github.com/hashicorp/consul/agent/consul"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/servercert"
|
||||||
"github.com/hashicorp/consul/agent/dns"
|
"github.com/hashicorp/consul/agent/dns"
|
||||||
external "github.com/hashicorp/consul/agent/grpc-external"
|
external "github.com/hashicorp/consul/agent/grpc-external"
|
||||||
"github.com/hashicorp/consul/agent/local"
|
"github.com/hashicorp/consul/agent/local"
|
||||||
|
@ -353,6 +354,9 @@ type Agent struct {
|
||||||
// based on the current consul configuration.
|
// based on the current consul configuration.
|
||||||
tlsConfigurator *tlsutil.Configurator
|
tlsConfigurator *tlsutil.Configurator
|
||||||
|
|
||||||
|
// certManager manages the lifecycle of the internally-managed server certificate.
|
||||||
|
certManager *servercert.CertManager
|
||||||
|
|
||||||
// httpConnLimiter is used to limit connections to the HTTP server by client
|
// httpConnLimiter is used to limit connections to the HTTP server by client
|
||||||
// IP.
|
// IP.
|
||||||
httpConnLimiter connlimit.Limiter
|
httpConnLimiter connlimit.Limiter
|
||||||
|
@ -583,6 +587,24 @@ func (a *Agent) Start(ctx context.Context) error {
|
||||||
return fmt.Errorf("Failed to start Consul server: %v", err)
|
return fmt.Errorf("Failed to start Consul server: %v", err)
|
||||||
}
|
}
|
||||||
a.delegate = server
|
a.delegate = server
|
||||||
|
|
||||||
|
if a.config.PeeringEnabled && a.config.ConnectEnabled {
|
||||||
|
d := servercert.Deps{
|
||||||
|
Logger: a.logger.Named("server.cert-manager"),
|
||||||
|
Config: servercert.Config{
|
||||||
|
Datacenter: a.config.Datacenter,
|
||||||
|
ACLsEnabled: a.config.ACLsEnabled,
|
||||||
|
},
|
||||||
|
Cache: a.cache,
|
||||||
|
GetStore: func() servercert.Store { return server.FSM().State() },
|
||||||
|
TLSConfigurator: a.tlsConfigurator,
|
||||||
|
}
|
||||||
|
a.certManager = servercert.NewCertManager(d)
|
||||||
|
if err := a.certManager.Start(&lib.StopChannelContext{StopCh: a.shutdownCh}); err != nil {
|
||||||
|
return fmt.Errorf("failed to start server cert manager: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
client, err := consul.NewClient(consulCfg, a.baseDeps.Deps)
|
client, err := consul.NewClient(consulCfg, a.baseDeps.Deps)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -5984,6 +5984,71 @@ func TestAgent_startListeners(t *testing.T) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAgent_ServerCertificate(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("too slow for testing.Short")
|
||||||
|
}
|
||||||
|
|
||||||
|
const expectURI = "spiffe://11111111-2222-3333-4444-555555555555.consul/agent/server/dc/dc1"
|
||||||
|
|
||||||
|
// Leader should acquire a sever cert after bootstrapping.
|
||||||
|
a1 := NewTestAgent(t, `
|
||||||
|
node_name = "a1"
|
||||||
|
acl {
|
||||||
|
enabled = true
|
||||||
|
tokens {
|
||||||
|
initial_management = "root"
|
||||||
|
default = "root"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
connect {
|
||||||
|
enabled = true
|
||||||
|
}
|
||||||
|
peering {
|
||||||
|
enabled = true
|
||||||
|
}`)
|
||||||
|
defer a1.Shutdown()
|
||||||
|
testrpc.WaitForTestAgent(t, a1.RPC, "dc1")
|
||||||
|
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
cert := a1.tlsConfigurator.AutoEncryptCert()
|
||||||
|
require.NotNil(r, cert)
|
||||||
|
require.Len(r, cert.URIs, 1)
|
||||||
|
require.Equal(r, expectURI, cert.URIs[0].String())
|
||||||
|
})
|
||||||
|
|
||||||
|
// Join a follower, and it should be able to acquire a server cert as well.
|
||||||
|
a2 := NewTestAgent(t, `
|
||||||
|
node_name = "a2"
|
||||||
|
bootstrap = false
|
||||||
|
acl {
|
||||||
|
enabled = true
|
||||||
|
tokens {
|
||||||
|
initial_management = "root"
|
||||||
|
default = "root"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
connect {
|
||||||
|
enabled = true
|
||||||
|
}
|
||||||
|
peering {
|
||||||
|
enabled = true
|
||||||
|
}`)
|
||||||
|
defer a2.Shutdown()
|
||||||
|
|
||||||
|
_, err := a2.JoinLAN([]string{fmt.Sprintf("127.0.0.1:%d", a1.Config.SerfPortLAN)}, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
testrpc.WaitForTestAgent(t, a2.RPC, "dc1")
|
||||||
|
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
cert := a2.tlsConfigurator.AutoEncryptCert()
|
||||||
|
require.NotNil(r, cert)
|
||||||
|
require.Len(r, cert.URIs, 1)
|
||||||
|
require.Equal(r, expectURI, cert.URIs[0].String())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func getExpectedCaPoolByFile(t *testing.T) *x509.CertPool {
|
func getExpectedCaPoolByFile(t *testing.T) *x509.CertPool {
|
||||||
pool := x509.NewCertPool()
|
pool := x509.NewCertPool()
|
||||||
data, err := ioutil.ReadFile("../test/ca/root.cer")
|
data, err := ioutil.ReadFile("../test/ca/root.cer")
|
||||||
|
|
|
@ -688,19 +688,21 @@ func (c *ConnectCALeaf) generateNewLeaf(req *ConnectCALeafRequest,
|
||||||
// since this is only used for cache-related requests and not forwarded
|
// since this is only used for cache-related requests and not forwarded
|
||||||
// directly to any Consul servers.
|
// directly to any Consul servers.
|
||||||
type ConnectCALeafRequest struct {
|
type ConnectCALeafRequest struct {
|
||||||
Token string
|
Token string
|
||||||
Datacenter string
|
Datacenter string
|
||||||
Service string // Service name, not ID
|
DNSSAN []string
|
||||||
Agent string // Agent name, not ID
|
IPSAN []net.IP
|
||||||
Kind structs.ServiceKind // only mesh-gateway for now
|
MinQueryIndex uint64
|
||||||
Server bool
|
MaxQueryTime time.Duration
|
||||||
DNSSAN []string
|
acl.EnterpriseMeta
|
||||||
IPSAN []net.IP
|
|
||||||
MinQueryIndex uint64
|
|
||||||
MaxQueryTime time.Duration
|
|
||||||
MustRevalidate bool
|
MustRevalidate bool
|
||||||
|
|
||||||
acl.EnterpriseMeta
|
// The following flags indicate the entity we are requesting a cert for.
|
||||||
|
// Only one of these must be specified.
|
||||||
|
Service string // Given a Service name, not ID, the request is for a SpiffeIDService.
|
||||||
|
Agent string // Given an Agent name, not ID, the request is for a SpiffeIDAgent.
|
||||||
|
Kind structs.ServiceKind // Given "mesh-gateway", the request is for a SpiffeIDMeshGateway. No other kinds supported.
|
||||||
|
Server bool // If true, the request is for a SpiffeIDServer.
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ConnectCALeafRequest) Key() string {
|
func (r *ConnectCALeafRequest) Key() string {
|
||||||
|
|
|
@ -110,7 +110,7 @@ type serverACLResolverBackend struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *serverACLResolverBackend) IsServerManagementToken(token string) bool {
|
func (s *serverACLResolverBackend) IsServerManagementToken(token string) bool {
|
||||||
mgmt, err := s.getSystemMetadata(structs.ServerManagementToken)
|
mgmt, err := s.getSystemMetadata(structs.ServerManagementTokenAccessorID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logger.Debug("failed to fetch server management token: %w", err)
|
s.logger.Debug("failed to fetch server management token: %w", err)
|
||||||
return false
|
return false
|
||||||
|
|
|
@ -2209,7 +2209,7 @@ func TestACLResolver_ServerManagementToken(t *testing.T) {
|
||||||
authz, err := r.ResolveToken(testToken)
|
authz, err := r.ResolveToken(testToken)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotNil(t, authz.ACLIdentity)
|
require.NotNil(t, authz.ACLIdentity)
|
||||||
require.Equal(t, structs.ServerManagementToken, authz.ACLIdentity.ID())
|
require.Equal(t, structs.ServerManagementTokenAccessorID, authz.ACLIdentity.ID())
|
||||||
require.NotNil(t, authz.Authorizer)
|
require.NotNil(t, authz.Authorizer)
|
||||||
require.Equal(t, acl.ManageAll(), authz.Authorizer)
|
require.Equal(t, acl.ManageAll(), authz.Authorizer)
|
||||||
}
|
}
|
||||||
|
|
|
@ -538,7 +538,7 @@ func (s *Server) initializeACLs(ctx context.Context) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to generate the secret ID for the server management token: %w", err)
|
return fmt.Errorf("failed to generate the secret ID for the server management token: %w", err)
|
||||||
}
|
}
|
||||||
if err := s.setSystemMetadataKey(structs.ServerManagementToken, secretID); err != nil {
|
if err := s.setSystemMetadataKey(structs.ServerManagementTokenAccessorID, secretID); err != nil {
|
||||||
return fmt.Errorf("failed to persist server management token: %w", err)
|
return fmt.Errorf("failed to persist server management token: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1297,7 +1297,7 @@ func TestLeader_ACL_Initialization(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotNil(t, policy)
|
require.NotNil(t, policy)
|
||||||
|
|
||||||
serverToken, err := s1.getSystemMetadata(structs.ServerManagementToken)
|
serverToken, err := s1.getSystemMetadata(structs.ServerManagementTokenAccessorID)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotEmpty(t, serverToken)
|
require.NotEmpty(t, serverToken)
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,267 @@
|
||||||
|
package servercert
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||||
|
"github.com/hashicorp/consul/agent/connect"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/lib/retry"
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
"github.com/hashicorp/go-memdb"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Correlation ID for leaf cert watches.
|
||||||
|
const leafWatchID = "leaf"
|
||||||
|
|
||||||
|
// Cache is an interface to represent the necessary methods of the agent/cache.Cache.
|
||||||
|
// It is used to request and renew the server leaf certificate.
|
||||||
|
type Cache interface {
|
||||||
|
Notify(ctx context.Context, t string, r cache.Request, correlationID string, ch chan<- cache.UpdateEvent) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// TLSConfigurator is an interface to represent the necessary methods of the tlsutil.Configurator.
|
||||||
|
// It is used to apply the server leaf certificate and server name.
|
||||||
|
type TLSConfigurator interface {
|
||||||
|
UpdateAutoTLSCert(pub, priv string) error
|
||||||
|
UpdateAutoTLSPeeringServerName(name string)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store is an interface to represent the necessary methods of the state.Store.
|
||||||
|
// It is used to fetch the CA Config to getStore the trust domain in the TLSConfigurator.
|
||||||
|
type Store interface {
|
||||||
|
CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error)
|
||||||
|
SystemMetadataGet(ws memdb.WatchSet, key string) (uint64, *structs.SystemMetadataEntry, error)
|
||||||
|
AbandonCh() <-chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type Config struct {
|
||||||
|
// Datacenter is the datacenter name the server is configured with.
|
||||||
|
Datacenter string
|
||||||
|
|
||||||
|
// ACLsEnabled indicates whether the ACL system is enabled on this server.
|
||||||
|
ACLsEnabled bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type Deps struct {
|
||||||
|
Config Config
|
||||||
|
Logger hclog.Logger
|
||||||
|
Cache Cache
|
||||||
|
GetStore func() Store
|
||||||
|
TLSConfigurator TLSConfigurator
|
||||||
|
waiter retry.Waiter
|
||||||
|
}
|
||||||
|
|
||||||
|
// CertManager is responsible for requesting and renewing the leaf cert for server agents.
|
||||||
|
// The server certificate is managed internally and used for peering control-plane traffic
|
||||||
|
// to the TLS-enabled external gRPC port.
|
||||||
|
type CertManager struct {
|
||||||
|
logger hclog.Logger
|
||||||
|
|
||||||
|
// config contains agent configuration necessary for the cert manager to operate.
|
||||||
|
config Config
|
||||||
|
|
||||||
|
// cache provides an API to issue internal RPC requests and receive notifications
|
||||||
|
// when there are changes.
|
||||||
|
cache Cache
|
||||||
|
|
||||||
|
// cacheUpdateCh receives notifications of cache update events for resources watched.
|
||||||
|
cacheUpdateCh chan cache.UpdateEvent
|
||||||
|
|
||||||
|
// getStore returns the server state getStore for read-only access.
|
||||||
|
getStore func() Store
|
||||||
|
|
||||||
|
// tlsConfigurator receives the leaf cert and peering server name updates from the cert manager.
|
||||||
|
tlsConfigurator TLSConfigurator
|
||||||
|
|
||||||
|
// waiter contains the waiter for exponential backoff between retries.
|
||||||
|
waiter retry.Waiter
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewCertManager(deps Deps) *CertManager {
|
||||||
|
return &CertManager{
|
||||||
|
config: deps.Config,
|
||||||
|
logger: deps.Logger,
|
||||||
|
cache: deps.Cache,
|
||||||
|
cacheUpdateCh: make(chan cache.UpdateEvent, 1),
|
||||||
|
getStore: deps.GetStore,
|
||||||
|
tlsConfigurator: deps.TLSConfigurator,
|
||||||
|
waiter: retry.Waiter{
|
||||||
|
MinFailures: 1,
|
||||||
|
MinWait: 1 * time.Second,
|
||||||
|
MaxWait: 5 * time.Minute,
|
||||||
|
Jitter: retry.NewJitter(20),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *CertManager) Start(ctx context.Context) error {
|
||||||
|
if err := m.initializeWatches(ctx); err != nil {
|
||||||
|
return fmt.Errorf("failed to set up certificate watches: %w", err)
|
||||||
|
}
|
||||||
|
go m.handleUpdates(ctx)
|
||||||
|
|
||||||
|
m.logger.Info("initialized server certificate management")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *CertManager) initializeWatches(ctx context.Context) error {
|
||||||
|
if m.config.ACLsEnabled {
|
||||||
|
// If ACLs are enabled we need to watch for server token updates and set/reset
|
||||||
|
// leaf cert updates as token updates arrive.
|
||||||
|
go m.watchServerToken(ctx)
|
||||||
|
} else {
|
||||||
|
// If ACLs are disabled we set up a single cache notification for leaf certs.
|
||||||
|
if err := m.watchLeafCert(ctx); err != nil {
|
||||||
|
return fmt.Errorf("failed to watch leaf: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
go m.watchCAConfig(ctx)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *CertManager) watchServerToken(ctx context.Context) {
|
||||||
|
// We keep the last iteration's cancel function to reset watches.
|
||||||
|
var (
|
||||||
|
notifyCtx context.Context
|
||||||
|
cancel context.CancelFunc = func() {}
|
||||||
|
)
|
||||||
|
retryLoopBackoff(ctx, m.waiter, func() error {
|
||||||
|
ws := memdb.NewWatchSet()
|
||||||
|
ws.Add(m.getStore().AbandonCh())
|
||||||
|
|
||||||
|
_, token, err := m.getStore().SystemMetadataGet(ws, structs.ServerManagementTokenAccessorID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if token == nil {
|
||||||
|
m.logger.Debug("ACLs have not finished initializing")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if token.Value == "" {
|
||||||
|
// This should never happen. If the leader stored a token with this key it will not be empty.
|
||||||
|
return fmt.Errorf("empty token")
|
||||||
|
}
|
||||||
|
m.logger.Debug("server management token watch fired - resetting leaf cert watch")
|
||||||
|
|
||||||
|
// Cancel existing the leaf cert watch and spin up new one any time the server token changes.
|
||||||
|
// The watch needs the current token as set by the leader since certificate signing requests go to the leader.
|
||||||
|
fmt.Println("canceling and resetting")
|
||||||
|
cancel()
|
||||||
|
notifyCtx, cancel = context.WithCancel(ctx)
|
||||||
|
|
||||||
|
req := cachetype.ConnectCALeafRequest{
|
||||||
|
Datacenter: m.config.Datacenter,
|
||||||
|
Token: token.Value,
|
||||||
|
Server: true,
|
||||||
|
}
|
||||||
|
if err := m.cache.Notify(notifyCtx, cachetype.ConnectCALeafName, &req, leafWatchID, m.cacheUpdateCh); err != nil {
|
||||||
|
return fmt.Errorf("failed to setup leaf cert notifications: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ws.WatchCtx(ctx)
|
||||||
|
return nil
|
||||||
|
|
||||||
|
}, func(err error) {
|
||||||
|
m.logger.Error("failed to watch server management token", "error", err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *CertManager) watchLeafCert(ctx context.Context) error {
|
||||||
|
req := cachetype.ConnectCALeafRequest{
|
||||||
|
Datacenter: m.config.Datacenter,
|
||||||
|
Server: true,
|
||||||
|
}
|
||||||
|
if err := m.cache.Notify(ctx, cachetype.ConnectCALeafName, &req, leafWatchID, m.cacheUpdateCh); err != nil {
|
||||||
|
return fmt.Errorf("failed to setup leaf cert notifications: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *CertManager) watchCAConfig(ctx context.Context) {
|
||||||
|
retryLoopBackoff(ctx, m.waiter, func() error {
|
||||||
|
ws := memdb.NewWatchSet()
|
||||||
|
ws.Add(m.getStore().AbandonCh())
|
||||||
|
|
||||||
|
_, conf, err := m.getStore().CAConfig(ws)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to fetch CA configuration from the state getStore: %w", err)
|
||||||
|
}
|
||||||
|
if conf == nil || conf.ClusterID == "" {
|
||||||
|
m.logger.Debug("CA has not finished initializing")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
id := connect.SpiffeIDSigningForCluster(conf.ClusterID)
|
||||||
|
name := connect.PeeringServerSAN(m.config.Datacenter, id.Host())
|
||||||
|
|
||||||
|
m.logger.Debug("CA config watch fired - updating auto TLS server name", "name", name)
|
||||||
|
m.tlsConfigurator.UpdateAutoTLSPeeringServerName(name)
|
||||||
|
|
||||||
|
ws.WatchCtx(ctx)
|
||||||
|
return nil
|
||||||
|
|
||||||
|
}, func(err error) {
|
||||||
|
m.logger.Error("failed to watch CA config", "error", err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func retryLoopBackoff(ctx context.Context, waiter retry.Waiter, loopFn func() error, errorFn func(error)) {
|
||||||
|
for {
|
||||||
|
if err := waiter.Wait(ctx); err != nil {
|
||||||
|
// The error will only be non-nil if the context is canceled.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := loopFn(); err != nil {
|
||||||
|
errorFn(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset the failure count seen by the waiter if there was no error.
|
||||||
|
waiter.Reset()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *CertManager) handleUpdates(ctx context.Context) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
m.logger.Debug("context canceled")
|
||||||
|
return
|
||||||
|
|
||||||
|
case event := <-m.cacheUpdateCh:
|
||||||
|
m.logger.Debug("got cache update event", "correlationID", event.CorrelationID, "error", event.Err)
|
||||||
|
|
||||||
|
if err := m.handleLeafUpdate(event); err != nil {
|
||||||
|
m.logger.Error("failed to handle cache update event", "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *CertManager) handleLeafUpdate(event cache.UpdateEvent) error {
|
||||||
|
if event.Err != nil {
|
||||||
|
return fmt.Errorf("leaf cert watch returned an error: %w", event.Err)
|
||||||
|
}
|
||||||
|
if event.CorrelationID != leafWatchID {
|
||||||
|
return fmt.Errorf("got unexpected update correlation ID %q while expecting %q", event.CorrelationID, leafWatchID)
|
||||||
|
}
|
||||||
|
|
||||||
|
leaf, ok := event.Result.(*structs.IssuedCert)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("got invalid type in leaf cert watch response: %T", event.Result)
|
||||||
|
}
|
||||||
|
|
||||||
|
m.logger.Debug("leaf certificate watch fired - updating auto TLS certificate", "uri", leaf.ServerURI)
|
||||||
|
|
||||||
|
if err := m.tlsConfigurator.UpdateAutoTLSCert(leaf.CertPEM, leaf.PrivateKeyPEM); err != nil {
|
||||||
|
return fmt.Errorf("failed to getStore the server leaf cert: %w", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,296 @@
|
||||||
|
package servercert
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
"github.com/hashicorp/consul/agent/connect"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/lib/retry"
|
||||||
|
"github.com/hashicorp/consul/sdk/testutil"
|
||||||
|
"github.com/hashicorp/go-memdb"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type fakeStore struct {
|
||||||
|
// conf is the current CA configuration stored in the fakeStore.
|
||||||
|
conf chan *structs.CAConfiguration
|
||||||
|
|
||||||
|
// tokenEntry is the current server token entry stored in the fakeStore.
|
||||||
|
tokenEntry chan *structs.SystemMetadataEntry
|
||||||
|
|
||||||
|
// tokenCanceler will unblock the WatchSet for the token entry.
|
||||||
|
tokenCanceler <-chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *fakeStore) CAConfig(_ memdb.WatchSet) (uint64, *structs.CAConfiguration, error) {
|
||||||
|
select {
|
||||||
|
case conf := <-s.conf:
|
||||||
|
return 0, conf, nil
|
||||||
|
default:
|
||||||
|
return 0, nil, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *fakeStore) setCAConfig() {
|
||||||
|
s.conf <- &structs.CAConfiguration{
|
||||||
|
ClusterID: connect.TestClusterID,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *fakeStore) SystemMetadataGet(ws memdb.WatchSet, _ string) (uint64, *structs.SystemMetadataEntry, error) {
|
||||||
|
select {
|
||||||
|
case entry := <-s.tokenEntry:
|
||||||
|
ws.Add(s.tokenCanceler)
|
||||||
|
return 0, entry, nil
|
||||||
|
default:
|
||||||
|
return 0, nil, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *fakeStore) setServerToken(token string, canceler <-chan struct{}) {
|
||||||
|
s.tokenCanceler = canceler
|
||||||
|
s.tokenEntry <- &structs.SystemMetadataEntry{
|
||||||
|
Key: structs.ServerManagementTokenAccessorID,
|
||||||
|
Value: token,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *fakeStore) AbandonCh() <-chan struct{} {
|
||||||
|
return make(<-chan struct{})
|
||||||
|
}
|
||||||
|
|
||||||
|
type testCert struct {
|
||||||
|
pub string
|
||||||
|
priv string
|
||||||
|
}
|
||||||
|
|
||||||
|
type fakeTLSConfigurator struct {
|
||||||
|
cert testCert
|
||||||
|
peeringServerName string
|
||||||
|
|
||||||
|
// syncCh is used to signal that an update was handled.
|
||||||
|
// It synchronizes readers and writers in different goroutines.
|
||||||
|
syncCh chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *fakeTLSConfigurator) UpdateAutoTLSCert(pub, priv string) error {
|
||||||
|
u.cert = testCert{
|
||||||
|
pub: pub,
|
||||||
|
priv: priv,
|
||||||
|
}
|
||||||
|
u.syncCh <- struct{}{}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *fakeTLSConfigurator) UpdateAutoTLSPeeringServerName(name string) {
|
||||||
|
u.peeringServerName = name
|
||||||
|
u.syncCh <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *fakeTLSConfigurator) timeoutIfNotUpdated(t *testing.T) error {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-u.syncCh:
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Fatalf("timed out")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type watchInfo struct {
|
||||||
|
ctx context.Context
|
||||||
|
token string
|
||||||
|
}
|
||||||
|
|
||||||
|
type fakeCache struct {
|
||||||
|
updateCh chan<- cache.UpdateEvent
|
||||||
|
|
||||||
|
// watched is a map of watched correlation IDs to the ACL token of the request.
|
||||||
|
watched map[string]watchInfo
|
||||||
|
|
||||||
|
// syncCh is used to signal that Notify was called.
|
||||||
|
// It synchronizes readers and writers in different goroutines.
|
||||||
|
syncCh chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *fakeCache) triggerLeafUpdate() {
|
||||||
|
c.updateCh <- cache.UpdateEvent{
|
||||||
|
CorrelationID: leafWatchID,
|
||||||
|
Result: &structs.IssuedCert{
|
||||||
|
CertPEM: "cert-pem",
|
||||||
|
PrivateKeyPEM: "key-pem",
|
||||||
|
ServerURI: "test-uri",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *fakeCache) Notify(ctx context.Context, t string, r cache.Request, correlationID string, ch chan<- cache.UpdateEvent) error {
|
||||||
|
c.watched[correlationID] = watchInfo{ctx: ctx, token: r.CacheInfo().Token}
|
||||||
|
c.updateCh = ch
|
||||||
|
c.syncCh <- struct{}{}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *fakeCache) timeoutIfNotUpdated(t *testing.T) error {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-c.syncCh:
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Fatalf("timed out")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func testWaiter() retry.Waiter {
|
||||||
|
return retry.Waiter{
|
||||||
|
MinFailures: 1,
|
||||||
|
MinWait: 20 * time.Millisecond,
|
||||||
|
MaxWait: 20 * time.Millisecond,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCertManager_ACLsDisabled(t *testing.T) {
|
||||||
|
tlsConfigurator := fakeTLSConfigurator{syncCh: make(chan struct{}, 1)}
|
||||||
|
cache := fakeCache{watched: make(map[string]watchInfo), syncCh: make(chan struct{}, 1)}
|
||||||
|
store := fakeStore{
|
||||||
|
conf: make(chan *structs.CAConfiguration, 1),
|
||||||
|
tokenEntry: make(chan *structs.SystemMetadataEntry, 1),
|
||||||
|
}
|
||||||
|
|
||||||
|
mgr := NewCertManager(Deps{
|
||||||
|
Logger: testutil.Logger(t),
|
||||||
|
Config: Config{
|
||||||
|
Datacenter: "my-dc",
|
||||||
|
ACLsEnabled: false,
|
||||||
|
},
|
||||||
|
TLSConfigurator: &tlsConfigurator,
|
||||||
|
Cache: &cache,
|
||||||
|
GetStore: func() Store { return &store },
|
||||||
|
})
|
||||||
|
|
||||||
|
// Override the default waiter to reduce time between retries.
|
||||||
|
mgr.waiter = testWaiter()
|
||||||
|
|
||||||
|
require.NoError(t, mgr.Start(context.Background()))
|
||||||
|
|
||||||
|
testutil.RunStep(t, "initial empty state", func(t *testing.T) {
|
||||||
|
require.Empty(t, tlsConfigurator.cert)
|
||||||
|
require.Empty(t, tlsConfigurator.peeringServerName)
|
||||||
|
|
||||||
|
require.Contains(t, cache.watched, leafWatchID)
|
||||||
|
})
|
||||||
|
|
||||||
|
testutil.RunStep(t, "leaf cert update", func(t *testing.T) {
|
||||||
|
cache.triggerLeafUpdate()
|
||||||
|
|
||||||
|
// Wait for the update to arrive.
|
||||||
|
require.NoError(t, tlsConfigurator.timeoutIfNotUpdated(t))
|
||||||
|
|
||||||
|
expect := testCert{
|
||||||
|
pub: "cert-pem",
|
||||||
|
priv: "key-pem",
|
||||||
|
}
|
||||||
|
require.Equal(t, expect, tlsConfigurator.cert)
|
||||||
|
})
|
||||||
|
|
||||||
|
testutil.RunStep(t, "ca config update", func(t *testing.T) {
|
||||||
|
store.setCAConfig()
|
||||||
|
|
||||||
|
// Wait for the update to arrive.
|
||||||
|
require.NoError(t, tlsConfigurator.timeoutIfNotUpdated(t))
|
||||||
|
|
||||||
|
expect := connect.PeeringServerSAN(mgr.config.Datacenter, connect.TestTrustDomain)
|
||||||
|
require.Equal(t, expect, tlsConfigurator.peeringServerName)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCertManager_ACLsEnabled(t *testing.T) {
|
||||||
|
tlsConfigurator := fakeTLSConfigurator{syncCh: make(chan struct{}, 1)}
|
||||||
|
cache := fakeCache{watched: make(map[string]watchInfo), syncCh: make(chan struct{}, 1)}
|
||||||
|
store := fakeStore{
|
||||||
|
conf: make(chan *structs.CAConfiguration, 1),
|
||||||
|
tokenEntry: make(chan *structs.SystemMetadataEntry, 1),
|
||||||
|
}
|
||||||
|
|
||||||
|
mgr := NewCertManager(Deps{
|
||||||
|
Logger: testutil.Logger(t),
|
||||||
|
Config: Config{
|
||||||
|
Datacenter: "my-dc",
|
||||||
|
ACLsEnabled: true,
|
||||||
|
},
|
||||||
|
TLSConfigurator: &tlsConfigurator,
|
||||||
|
Cache: &cache,
|
||||||
|
GetStore: func() Store { return &store },
|
||||||
|
})
|
||||||
|
|
||||||
|
// Override the default waiter to reduce time between retries.
|
||||||
|
mgr.waiter = testWaiter()
|
||||||
|
|
||||||
|
require.NoError(t, mgr.Start(context.Background()))
|
||||||
|
|
||||||
|
testutil.RunStep(t, "initial empty state", func(t *testing.T) {
|
||||||
|
require.Empty(t, tlsConfigurator.cert)
|
||||||
|
require.Empty(t, tlsConfigurator.peeringServerName)
|
||||||
|
|
||||||
|
require.Empty(t, cache.watched)
|
||||||
|
})
|
||||||
|
|
||||||
|
var leafCtx context.Context
|
||||||
|
tokenCanceler := make(chan struct{})
|
||||||
|
|
||||||
|
testutil.RunStep(t, "server token update", func(t *testing.T) {
|
||||||
|
store.setServerToken("first-secret", tokenCanceler)
|
||||||
|
|
||||||
|
require.NoError(t, cache.timeoutIfNotUpdated(t))
|
||||||
|
|
||||||
|
require.Contains(t, cache.watched, leafWatchID)
|
||||||
|
require.Equal(t, "first-secret", cache.watched[leafWatchID].token)
|
||||||
|
|
||||||
|
leafCtx = cache.watched[leafWatchID].ctx
|
||||||
|
})
|
||||||
|
|
||||||
|
testutil.RunStep(t, "leaf cert update", func(t *testing.T) {
|
||||||
|
cache.triggerLeafUpdate()
|
||||||
|
|
||||||
|
// Wait for the update to arrive.
|
||||||
|
require.NoError(t, tlsConfigurator.timeoutIfNotUpdated(t))
|
||||||
|
|
||||||
|
expect := testCert{
|
||||||
|
pub: "cert-pem",
|
||||||
|
priv: "key-pem",
|
||||||
|
}
|
||||||
|
require.Equal(t, expect, tlsConfigurator.cert)
|
||||||
|
})
|
||||||
|
|
||||||
|
testutil.RunStep(t, "another server token update", func(t *testing.T) {
|
||||||
|
store.setServerToken("second-secret", nil)
|
||||||
|
|
||||||
|
// Fire the existing WatchSet to simulate a state store update.
|
||||||
|
tokenCanceler <- struct{}{}
|
||||||
|
|
||||||
|
// The leaf watch in the cache should have been reset.
|
||||||
|
require.NoError(t, cache.timeoutIfNotUpdated(t))
|
||||||
|
|
||||||
|
// The original leaf watch context should have been canceled.
|
||||||
|
require.Error(t, leafCtx.Err())
|
||||||
|
|
||||||
|
// A new leaf watch is expected with the new token.
|
||||||
|
require.Contains(t, cache.watched, leafWatchID)
|
||||||
|
require.Equal(t, "second-secret", cache.watched[leafWatchID].token)
|
||||||
|
})
|
||||||
|
|
||||||
|
testutil.RunStep(t, "ca config update", func(t *testing.T) {
|
||||||
|
store.setCAConfig()
|
||||||
|
|
||||||
|
// Wait for the update to arrive.
|
||||||
|
require.NoError(t, tlsConfigurator.timeoutIfNotUpdated(t))
|
||||||
|
|
||||||
|
expect := connect.PeeringServerSAN(mgr.config.Datacenter, connect.TestTrustDomain)
|
||||||
|
require.Equal(t, expect, tlsConfigurator.peeringServerName)
|
||||||
|
})
|
||||||
|
}
|
|
@ -1840,7 +1840,7 @@ func (id *AgentRecoveryTokenIdentity) EnterpriseMetadata() *acl.EnterpriseMeta {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
const ServerManagementToken = "server-management-token"
|
const ServerManagementTokenAccessorID = "server-management-token"
|
||||||
|
|
||||||
type ACLServerIdentity struct {
|
type ACLServerIdentity struct {
|
||||||
secretID string
|
secretID string
|
||||||
|
@ -1853,7 +1853,7 @@ func NewACLServerIdentity(secretID string) *ACLServerIdentity {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *ACLServerIdentity) ID() string {
|
func (i *ACLServerIdentity) ID() string {
|
||||||
return ServerManagementToken
|
return ServerManagementTokenAccessorID
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *ACLServerIdentity) SecretToken() string {
|
func (i *ACLServerIdentity) SecretToken() string {
|
||||||
|
|
|
@ -79,6 +79,7 @@ func (w *Waiter) delay() time.Duration {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reset the failure count to 0.
|
// Reset the failure count to 0.
|
||||||
|
// Reset must be called if the operation done after Wait did not fail.
|
||||||
func (w *Waiter) Reset() {
|
func (w *Waiter) Reset() {
|
||||||
w.failures = 0
|
w.failures = 0
|
||||||
}
|
}
|
||||||
|
@ -88,9 +89,13 @@ func (w *Waiter) Failures() int {
|
||||||
return int(w.failures)
|
return int(w.failures)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait increase the number of failures by one, and then blocks until the context
|
// Wait increases the number of failures by one, and then blocks until the context
|
||||||
// is cancelled, or until the wait time is reached.
|
// is cancelled, or until the wait time is reached.
|
||||||
|
//
|
||||||
// The wait time increases exponentially as the number of failures increases.
|
// The wait time increases exponentially as the number of failures increases.
|
||||||
|
// Every call to Wait increments the failures count, so Reset must be called
|
||||||
|
// after Wait when there wasn't a failure.
|
||||||
|
//
|
||||||
// Wait will return ctx.Err() if the context is cancelled.
|
// Wait will return ctx.Err() if the context is cancelled.
|
||||||
func (w *Waiter) Wait(ctx context.Context) error {
|
func (w *Waiter) Wait(ctx context.Context) error {
|
||||||
w.failures++
|
w.failures++
|
||||||
|
|
Loading…
Reference in New Issue