Implement intention replication and secondary CA initialization
This commit is contained in:
parent
24749bc7e5
commit
0fc4da6861
|
@ -30,8 +30,9 @@ var (
|
|||
// variable points to. Clients need to compare using `err.Error() ==
|
||||
// consul.ErrRateLimited.Error()` which is very sad. Short of replacing our
|
||||
// RPC mechanism it's hard to know how to make that much better though.
|
||||
ErrConnectNotEnabled = errors.New("Connect must be enabled in order to use this endpoint")
|
||||
ErrRateLimited = errors.New("Rate limit reached, try again later")
|
||||
ErrConnectNotEnabled = errors.New("Connect must be enabled in order to use this endpoint")
|
||||
ErrRateLimited = errors.New("Rate limit reached, try again later")
|
||||
ErrNotPrimaryDatacenter = errors.New("not the primary datacenter")
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -556,3 +557,50 @@ func (s *ConnectCA) Sign(
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SignIntermediate signs an intermediate certificate for a remote datacenter.
|
||||
func (s *ConnectCA) SignIntermediate(
|
||||
args *structs.CASignRequest,
|
||||
reply *string) error {
|
||||
// Exit early if Connect hasn't been enabled.
|
||||
if !s.srv.config.ConnectEnabled {
|
||||
return ErrConnectNotEnabled
|
||||
}
|
||||
|
||||
if done, err := s.srv.forward("ConnectCA.SignIntermediate", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
// Verify we are allowed to serve this request
|
||||
if s.srv.config.PrimaryDatacenter != s.srv.config.Datacenter {
|
||||
return ErrNotPrimaryDatacenter
|
||||
}
|
||||
|
||||
// This action requires operator write access.
|
||||
rule, err := s.srv.ResolveToken(args.Token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if rule != nil && !rule.OperatorWrite() {
|
||||
return acl.ErrPermissionDenied
|
||||
}
|
||||
|
||||
provider, _ := s.srv.getCAProvider()
|
||||
if provider == nil {
|
||||
return fmt.Errorf("internal error: CA provider is nil")
|
||||
}
|
||||
|
||||
csr, err := connect.ParseCSR(args.CSR)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cert, err := provider.SignIntermediate(csr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
*reply = cert
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -30,7 +30,3 @@ func (s *Server) handleEnterpriseRPCConn(rtype pool.RPCType, conn net.Conn, isTL
|
|||
func (s *Server) enterpriseStats() map[string]map[string]string {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) intentionReplicationEnabled() bool {
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -327,7 +327,7 @@ func (s *Server) establishLeadership() error {
|
|||
|
||||
s.startConfigReplication()
|
||||
|
||||
s.startEnterpriseLeader()
|
||||
s.startConnectLeader()
|
||||
|
||||
s.startCARootPruning()
|
||||
|
||||
|
@ -347,7 +347,7 @@ func (s *Server) revokeLeadership() {
|
|||
|
||||
s.stopConfigReplication()
|
||||
|
||||
s.stopEnterpriseLeader()
|
||||
s.stopConnectLeader()
|
||||
|
||||
s.stopCARootPruning()
|
||||
|
||||
|
|
567
agent/consul/leader_connect.go
Normal file
567
agent/consul/leader_connect.go
Normal file
|
@ -0,0 +1,567 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/connect/ca"
|
||||
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/go-version"
|
||||
)
|
||||
|
||||
const (
|
||||
// loopRateLimit is the maximum rate per second at which we can rerun CA and intention
|
||||
// replication watches.
|
||||
loopRateLimit rate.Limit = 0.2
|
||||
|
||||
// retryBucketSize is the maximum number of stored rate limit attempts for looped
|
||||
// blocking query operations.
|
||||
retryBucketSize = 5
|
||||
|
||||
// maxIntentionTxnSize is the maximum size (in bytes) of a transaction used during
|
||||
// Intention replication.
|
||||
maxIntentionTxnSize = raftWarnSize / 4
|
||||
)
|
||||
|
||||
var (
|
||||
// maxRetryBackoff is the maximum number of seconds to wait between failed blocking
|
||||
// queries when backing off.
|
||||
maxRetryBackoff = 256
|
||||
|
||||
// minMultiDCConnectVersion is the minimum version in order to support multi-DC Connect
|
||||
// features.
|
||||
minMultiDCConnectVersion = version.Must(version.NewVersion("1.4.0"))
|
||||
|
||||
// maxRootsQueryTime is the maximum time the primary roots watch query can block before
|
||||
// returning.
|
||||
maxRootsQueryTime = maxQueryTime
|
||||
|
||||
errEmptyVersion = errors.New("version string is empty")
|
||||
)
|
||||
|
||||
// initializeCA sets up the CA provider when gaining leadership, either bootstrapping
|
||||
// the CA if this is the primary DC or making a remote RPC for intermediate signing
|
||||
// if this is a secondary DC.
|
||||
func (s *Server) initializeCA() error {
|
||||
// Bail if connect isn't enabled.
|
||||
if !s.config.ConnectEnabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Initialize the provider based on the current config.
|
||||
conf, err := s.initializeCAConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
provider, err := s.createCAProvider(conf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.setCAProvider(provider, nil)
|
||||
|
||||
// Check whether the primary DC has been upgraded to support multi-DC Connect.
|
||||
// If it hasn't, we skip the secondary initialization routine and continue acting
|
||||
// as a primary DC. This is periodically re-checked in the goroutine watching the
|
||||
// primary's CA roots so that we can transition to a secondary DC when it has
|
||||
// been upgraded.
|
||||
var primaryHasVersion bool
|
||||
if s.config.PrimaryDatacenter != s.config.Datacenter {
|
||||
primaryHasVersion, err = s.datacentersMeetMinVersion(minMultiDCConnectVersion)
|
||||
if err == errEmptyVersion {
|
||||
s.logger.Printf("[WARN] connect: primary datacenter %q is reachable but not yet initialized", s.config.PrimaryDatacenter)
|
||||
return nil
|
||||
} else if err != nil {
|
||||
s.logger.Printf("[ERR] connect: error initializing CA: could not query primary datacenter: %v", err)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// If this isn't the primary DC, run the secondary DC routine if the primary has already
|
||||
// been upgraded to at least 1.4.0.
|
||||
if s.config.PrimaryDatacenter != s.config.Datacenter && primaryHasVersion {
|
||||
// Get the root CA to see if we need to refresh our intermediate.
|
||||
args := structs.DCSpecificRequest{
|
||||
Datacenter: s.config.PrimaryDatacenter,
|
||||
}
|
||||
var roots structs.IndexedCARoots
|
||||
if err := s.forwardDC("ConnectCA.Roots", s.config.PrimaryDatacenter, &args, &roots); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Configure the CA provider and initialize the intermediate certificate if necessary.
|
||||
if err := s.initializeSecondaryProvider(provider, roots); err != nil {
|
||||
return fmt.Errorf("error configuring provider: %v", err)
|
||||
}
|
||||
if err := s.initializeSecondaryCA(provider, roots); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.logger.Printf("[INFO] connect: initialized secondary datacenter CA with provider %q", conf.Provider)
|
||||
return nil
|
||||
}
|
||||
|
||||
return s.initializeRootCA(provider, conf)
|
||||
}
|
||||
|
||||
// initializeSecondaryCA runs the routine for generating an intermediate CA CSR and getting
|
||||
// it signed by the primary DC if the root CA of the primary DC has changed since the last
|
||||
// intermediate.
|
||||
func (s *Server) initializeSecondaryCA(provider ca.Provider, roots structs.IndexedCARoots) error {
|
||||
activeIntermediate, err := provider.ActiveIntermediate()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var storedRootID string
|
||||
if activeIntermediate != "" {
|
||||
storedRoot, err := provider.ActiveRoot()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
storedRootID, err = connect.CalculateCertFingerprint(storedRoot)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error parsing root fingerprint: %v, %#v", err, roots)
|
||||
}
|
||||
}
|
||||
|
||||
var newActiveRoot *structs.CARoot
|
||||
for _, root := range roots.Roots {
|
||||
if root.ID == roots.ActiveRootID && root.Active {
|
||||
newActiveRoot = root
|
||||
break
|
||||
}
|
||||
}
|
||||
if newActiveRoot == nil {
|
||||
return fmt.Errorf("primary datacenter does not have an active root CA for Connect")
|
||||
}
|
||||
|
||||
// Update the roots list in the state store if there's a new active root.
|
||||
state := s.fsm.State()
|
||||
_, activeRoot, err := state.CARootActive(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if activeRoot == nil || activeRoot.ID != newActiveRoot.ID {
|
||||
idx, oldRoots, err := state.CARoots(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, config, err := state.CAConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if config == nil {
|
||||
return fmt.Errorf("local CA not initialized yet")
|
||||
}
|
||||
newConf := *config
|
||||
newConf.ClusterID = newActiveRoot.ExternalTrustDomain
|
||||
|
||||
// Copy the root list and append the new active root, updating the old root
|
||||
// with the time it was rotated out.
|
||||
var newRoots structs.CARoots
|
||||
for _, r := range oldRoots {
|
||||
newRoot := *r
|
||||
if newRoot.Active {
|
||||
newRoot.Active = false
|
||||
newRoot.RotatedOutAt = time.Now()
|
||||
}
|
||||
if newRoot.ExternalTrustDomain == "" {
|
||||
newRoot.ExternalTrustDomain = config.ClusterID
|
||||
}
|
||||
newRoots = append(newRoots, &newRoot)
|
||||
}
|
||||
newRoots = append(newRoots, newActiveRoot)
|
||||
|
||||
args := &structs.CARequest{
|
||||
Op: structs.CAOpSetRootsAndConfig,
|
||||
Index: idx,
|
||||
Roots: newRoots,
|
||||
Config: &newConf,
|
||||
}
|
||||
resp, err := s.raftApply(structs.ConnectCARequestType, &args)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
if respOk, ok := resp.(bool); ok && !respOk {
|
||||
return fmt.Errorf("could not atomically update roots and config")
|
||||
}
|
||||
|
||||
s.logger.Printf("[INFO] connect: updated root certificates from primary datacenter")
|
||||
}
|
||||
|
||||
// Get a signed intermediate from the primary DC if the provider
|
||||
// hasn't been initialized yet or if the primary's root has changed.
|
||||
if activeIntermediate == "" || storedRootID != roots.ActiveRootID {
|
||||
csr, err := provider.GenerateIntermediateCSR()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var intermediatePEM string
|
||||
if err := s.forwardDC("ConnectCA.SignIntermediate", s.config.PrimaryDatacenter, s.generateCASignRequest(csr), &intermediatePEM); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := provider.SetIntermediate(intermediatePEM, newActiveRoot.RootCert); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Append the new intermediate to our local active root entry.
|
||||
newActiveRoot.IntermediateCerts = append(newActiveRoot.IntermediateCerts, intermediatePEM)
|
||||
|
||||
s.logger.Printf("[INFO] connect: received new intermediate certificate from primary datacenter")
|
||||
}
|
||||
|
||||
s.setCAProvider(provider, newActiveRoot)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) generateCASignRequest(csr string) *structs.CASignRequest {
|
||||
return &structs.CASignRequest{
|
||||
Datacenter: s.config.PrimaryDatacenter,
|
||||
CSR: csr,
|
||||
WriteRequest: structs.WriteRequest{Token: s.tokens.ReplicationToken()},
|
||||
}
|
||||
}
|
||||
|
||||
// startConnectLeader starts multi-dc connect leader routines.
|
||||
func (s *Server) startConnectLeader() {
|
||||
s.connectLock.Lock()
|
||||
defer s.connectLock.Unlock()
|
||||
|
||||
if s.connectEnabled {
|
||||
return
|
||||
}
|
||||
|
||||
s.connectCh = make(chan struct{})
|
||||
|
||||
// Start the Connect secondary DC actions if enabled.
|
||||
if s.config.ConnectEnabled && s.config.Datacenter != s.config.PrimaryDatacenter {
|
||||
go s.secondaryCARootWatch(s.connectCh)
|
||||
go s.replicateIntentions(s.connectCh)
|
||||
}
|
||||
|
||||
s.connectEnabled = true
|
||||
}
|
||||
|
||||
// stopConnectLeader stops connect specific leader functions.
|
||||
func (s *Server) stopConnectLeader() {
|
||||
s.connectLock.Lock()
|
||||
defer s.connectLock.Unlock()
|
||||
|
||||
if !s.connectEnabled {
|
||||
return
|
||||
}
|
||||
|
||||
s.actingSecondaryLock.Lock()
|
||||
s.actingSecondaryCA = false
|
||||
s.actingSecondaryLock.Unlock()
|
||||
|
||||
close(s.connectCh)
|
||||
s.connectEnabled = false
|
||||
}
|
||||
|
||||
// secondaryCARootWatch maintains a blocking query to the primary datacenter's
|
||||
// ConnectCA.Roots endpoint to monitor when it needs to request a new signed
|
||||
// intermediate certificate.
|
||||
func (s *Server) secondaryCARootWatch(stopCh <-chan struct{}) {
|
||||
args := structs.DCSpecificRequest{
|
||||
Datacenter: s.config.PrimaryDatacenter,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
MaxQueryTime: maxRootsQueryTime,
|
||||
},
|
||||
}
|
||||
|
||||
s.logger.Printf("[DEBUG] connect: starting Connect CA root replication from primary datacenter %q", s.config.PrimaryDatacenter)
|
||||
|
||||
retryLoopBackoff(stopCh, func() error {
|
||||
var roots structs.IndexedCARoots
|
||||
if err := s.forwardDC("ConnectCA.Roots", s.config.PrimaryDatacenter, &args, &roots); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check to see if the primary has been upgraded in case we're waiting to switch to
|
||||
// secondary mode.
|
||||
provider, _ := s.getCAProvider()
|
||||
if !s.configuredSecondaryCA() {
|
||||
primaryHasVersion, err := s.datacentersMeetMinVersion(minMultiDCConnectVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if primaryHasVersion {
|
||||
if err := s.initializeSecondaryProvider(provider, roots); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Run the secondary CA init routine to see if we need to request a new
|
||||
// intermediate.
|
||||
if s.configuredSecondaryCA() {
|
||||
if err := s.initializeSecondaryCA(provider, roots); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
args.QueryOptions.MinQueryIndex = nextIndexVal(args.QueryOptions.MinQueryIndex, roots.QueryMeta.Index)
|
||||
return nil
|
||||
}, func(err error) {
|
||||
// Don't log the error if it's a result of the primary still starting up.
|
||||
if err != errEmptyVersion {
|
||||
s.logger.Printf("[ERR] connect: error watching primary datacenter roots: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// replicateIntentions executes a blocking query to the primary datacenter to replicate
|
||||
// the intentions there to the local state.
|
||||
func (s *Server) replicateIntentions(stopCh <-chan struct{}) {
|
||||
args := structs.DCSpecificRequest{
|
||||
Datacenter: s.config.PrimaryDatacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.tokens.ReplicationToken()},
|
||||
}
|
||||
|
||||
s.logger.Printf("[DEBUG] connect: starting Connect intention replication from primary datacenter %q", s.config.PrimaryDatacenter)
|
||||
|
||||
retryLoopBackoff(stopCh, func() error {
|
||||
var remote structs.IndexedIntentions
|
||||
if err := s.forwardDC("Intention.List", s.config.PrimaryDatacenter, &args, &remote); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, local, err := s.fsm.State().Intentions(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Compute the diff between the remote and local intentions.
|
||||
deletes, updates := diffIntentions(local, remote.Intentions)
|
||||
txnOpSets := batchIntentionUpdates(deletes, updates)
|
||||
|
||||
// Apply batched updates to the state store.
|
||||
for _, ops := range txnOpSets {
|
||||
txnReq := structs.TxnRequest{Ops: ops}
|
||||
|
||||
resp, err := s.raftApply(structs.TxnRequestType, &txnReq)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if respErr, ok := resp.(error); ok {
|
||||
return respErr
|
||||
}
|
||||
|
||||
if txnResp, ok := resp.(structs.TxnResponse); ok {
|
||||
if len(txnResp.Errors) > 0 {
|
||||
return txnResp.Error()
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("unexpected return type %T", resp)
|
||||
}
|
||||
}
|
||||
|
||||
args.QueryOptions.MinQueryIndex = nextIndexVal(args.QueryOptions.MinQueryIndex, remote.QueryMeta.Index)
|
||||
return nil
|
||||
}, func(err error) {
|
||||
s.logger.Printf("[ERR] connect: error replicating intentions: %v", err)
|
||||
})
|
||||
}
|
||||
|
||||
// retryLoopBackoff loops a given function indefinitely, backing off exponentially
|
||||
// upon errors up to a maximum of maxRetryBackoff seconds.
|
||||
func retryLoopBackoff(stopCh <-chan struct{}, loopFn func() error, errFn func(error)) {
|
||||
var failedAttempts uint
|
||||
limiter := rate.NewLimiter(loopRateLimit, retryBucketSize)
|
||||
for {
|
||||
// Rate limit how often we run the loop
|
||||
limiter.Wait(context.Background())
|
||||
select {
|
||||
case <-stopCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
if (1 << failedAttempts) < maxRetryBackoff {
|
||||
failedAttempts++
|
||||
}
|
||||
retryTime := (1 << failedAttempts) * time.Second
|
||||
|
||||
if err := loopFn(); err != nil {
|
||||
errFn(err)
|
||||
time.Sleep(retryTime)
|
||||
continue
|
||||
}
|
||||
|
||||
// Reset the failed attempts after a successful run.
|
||||
failedAttempts = 0
|
||||
}
|
||||
}
|
||||
|
||||
// diffIntentions computes the difference between the local and remote intentions
|
||||
// and returns lists of deletes and updates.
|
||||
func diffIntentions(local, remote structs.Intentions) (structs.Intentions, structs.Intentions) {
|
||||
localIdx := make(map[string]uint64, len(local))
|
||||
remoteIdx := make(map[string]struct{}, len(remote))
|
||||
|
||||
var deletes structs.Intentions
|
||||
var updates structs.Intentions
|
||||
|
||||
for _, intention := range local {
|
||||
localIdx[intention.ID] = intention.ModifyIndex
|
||||
}
|
||||
for _, intention := range remote {
|
||||
remoteIdx[intention.ID] = struct{}{}
|
||||
}
|
||||
|
||||
for _, intention := range local {
|
||||
if _, ok := remoteIdx[intention.ID]; !ok {
|
||||
deletes = append(deletes, intention)
|
||||
}
|
||||
}
|
||||
|
||||
for _, intention := range remote {
|
||||
existingIdx, ok := localIdx[intention.ID]
|
||||
if !ok {
|
||||
updates = append(updates, intention)
|
||||
} else if existingIdx < intention.ModifyIndex {
|
||||
updates = append(updates, intention)
|
||||
}
|
||||
}
|
||||
|
||||
return deletes, updates
|
||||
}
|
||||
|
||||
// batchIntentionUpdates breaks up the given updates into sets of TxnOps based
|
||||
// on the estimated size of the operations.
|
||||
func batchIntentionUpdates(deletes, updates structs.Intentions) []structs.TxnOps {
|
||||
var txnOps structs.TxnOps
|
||||
for _, delete := range deletes {
|
||||
deleteOp := &structs.TxnIntentionOp{
|
||||
Op: structs.IntentionOpDelete,
|
||||
Intention: delete,
|
||||
}
|
||||
txnOps = append(txnOps, &structs.TxnOp{Intention: deleteOp})
|
||||
}
|
||||
|
||||
for _, update := range updates {
|
||||
updateOp := &structs.TxnIntentionOp{
|
||||
Op: structs.IntentionOpUpdate,
|
||||
Intention: update,
|
||||
}
|
||||
txnOps = append(txnOps, &structs.TxnOp{Intention: updateOp})
|
||||
}
|
||||
|
||||
// Divide the operations into chunks according to maxIntentionTxnSize.
|
||||
var batchedOps []structs.TxnOps
|
||||
for batchStart := 0; batchStart < len(txnOps); {
|
||||
// inner loop finds the last element to include in this batch.
|
||||
batchSize := 0
|
||||
batchEnd := batchStart
|
||||
for ; batchEnd < len(txnOps) && batchSize < maxIntentionTxnSize; batchEnd += 1 {
|
||||
batchSize += txnOps[batchEnd].Intention.Intention.EstimateSize()
|
||||
}
|
||||
|
||||
batchedOps = append(batchedOps, txnOps[batchStart:batchEnd])
|
||||
|
||||
// txnOps[batchEnd] wasn't included as the slicing doesn't include the element at the stop index
|
||||
batchStart = batchEnd
|
||||
}
|
||||
|
||||
return batchedOps
|
||||
}
|
||||
|
||||
// nextIndexVal computes the next index value to query for, resetting to zero
|
||||
// if the index went backward.
|
||||
func nextIndexVal(prevIdx, idx uint64) uint64 {
|
||||
if prevIdx > idx {
|
||||
return 0
|
||||
}
|
||||
return idx
|
||||
}
|
||||
|
||||
// datacentersMeetMinVersion returns whether this datacenter and the primary
|
||||
// are ready and have upgraded to at least the given version.
|
||||
func (s *Server) datacentersMeetMinVersion(minVersion *version.Version) (bool, error) {
|
||||
localAutopilotHealth := s.autopilot.GetClusterHealth()
|
||||
localServersMeetVersion, err := autopilotServersMeetMinimumVersion(localAutopilotHealth.Servers, minVersion)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !localServersMeetVersion {
|
||||
return false, err
|
||||
}
|
||||
|
||||
args := structs.DCSpecificRequest{
|
||||
Datacenter: s.config.PrimaryDatacenter,
|
||||
}
|
||||
var reply autopilot.OperatorHealthReply
|
||||
if err := s.forwardDC("Operator.ServerHealth", s.config.PrimaryDatacenter, &args, &reply); err != nil {
|
||||
return false, err
|
||||
}
|
||||
remoteServersMeetVersion, err := autopilotServersMeetMinimumVersion(reply.Servers, minVersion)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return localServersMeetVersion && remoteServersMeetVersion, nil
|
||||
}
|
||||
|
||||
// autopilotServersMeetMinimumVersion returns whether the given slice of servers
|
||||
// meets a minimum version.
|
||||
func autopilotServersMeetMinimumVersion(servers []autopilot.ServerHealth, minVersion *version.Version) (bool, error) {
|
||||
for _, server := range servers {
|
||||
if server.Version == "" {
|
||||
return false, errEmptyVersion
|
||||
}
|
||||
version, err := version.NewVersion(server.Version)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error parsing remote server version: %v", err)
|
||||
}
|
||||
|
||||
if version.LessThan(minVersion) {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// initializeSecondaryProvider configures the given provider for a secondary, non-root datacenter.
|
||||
func (s *Server) initializeSecondaryProvider(provider ca.Provider, roots structs.IndexedCARoots) error {
|
||||
if roots.TrustDomain == "" {
|
||||
return fmt.Errorf("trust domain from primary datacenter is not initialized")
|
||||
}
|
||||
|
||||
clusterID := strings.Split(roots.TrustDomain, ".")[0]
|
||||
_, conf, err := s.fsm.State().CAConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := provider.Configure(clusterID, false, conf.Config); err != nil {
|
||||
return fmt.Errorf("error configuring provider: %v", err)
|
||||
}
|
||||
|
||||
s.actingSecondaryLock.Lock()
|
||||
s.actingSecondaryCA = true
|
||||
s.actingSecondaryLock.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) configuredSecondaryCA() bool {
|
||||
s.actingSecondaryLock.RLock()
|
||||
defer s.actingSecondaryLock.RUnlock()
|
||||
return s.actingSecondaryCA
|
||||
}
|
842
agent/consul/leader_connect_test.go
Normal file
842
agent/consul/leader_connect_test.go
Normal file
|
@ -0,0 +1,842 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"crypto/x509"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
uuid "github.com/hashicorp/go-uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestLeader_SecondaryCA_Initialize(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
// Initialize primary as the primary DC
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "primary"
|
||||
c.PrimaryDatacenter = "primary"
|
||||
c.Build = "1.4.0"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "primary")
|
||||
|
||||
// secondary as a secondary DC
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "secondary"
|
||||
c.PrimaryDatacenter = "primary"
|
||||
c.Build = "1.4.0"
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
// Create the WAN link
|
||||
joinWAN(t, s2, s1)
|
||||
testrpc.WaitForLeader(t, s2.RPC, "secondary")
|
||||
|
||||
_, caRoot := s1.getCAProvider()
|
||||
secondaryProvider, _ := s2.getCAProvider()
|
||||
intermediatePEM, err := secondaryProvider.ActiveIntermediate()
|
||||
require.NoError(err)
|
||||
|
||||
// Verify the root lists are equal in each DC's state store.
|
||||
state1 := s1.fsm.State()
|
||||
_, roots1, err := state1.CARoots(nil)
|
||||
require.NoError(err)
|
||||
|
||||
state2 := s2.fsm.State()
|
||||
_, roots2, err := state2.CARoots(nil)
|
||||
require.NoError(err)
|
||||
require.Equal(roots1[0].ID, roots2[0].ID)
|
||||
require.Equal(roots1[0].RootCert, roots2[0].RootCert)
|
||||
require.Equal(1, len(roots1))
|
||||
require.Equal(len(roots1), len(roots2))
|
||||
|
||||
// Have secondary sign a leaf cert and make sure the chain is correct.
|
||||
spiffeService := &connect.SpiffeIDService{
|
||||
Host: "node1",
|
||||
Namespace: "default",
|
||||
Datacenter: "primary",
|
||||
Service: "foo",
|
||||
}
|
||||
raw, _ := connect.TestCSR(t, spiffeService)
|
||||
|
||||
leafCsr, err := connect.ParseCSR(raw)
|
||||
require.NoError(err)
|
||||
|
||||
leafPEM, err := secondaryProvider.Sign(leafCsr)
|
||||
require.NoError(err)
|
||||
|
||||
cert, err := connect.ParseCert(leafPEM)
|
||||
require.NoError(err)
|
||||
|
||||
// Check that the leaf signed by the new cert can be verified using the
|
||||
// returned cert chain (signed intermediate + remote root).
|
||||
intermediatePool := x509.NewCertPool()
|
||||
intermediatePool.AppendCertsFromPEM([]byte(intermediatePEM))
|
||||
rootPool := x509.NewCertPool()
|
||||
rootPool.AppendCertsFromPEM([]byte(caRoot.RootCert))
|
||||
|
||||
_, err = cert.Verify(x509.VerifyOptions{
|
||||
Intermediates: intermediatePool,
|
||||
Roots: rootPool,
|
||||
})
|
||||
require.NoError(err)
|
||||
}
|
||||
|
||||
func TestLeader_SecondaryCA_IntermediateRefresh(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Build = "1.4.0"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// dc2 as a secondary DC
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.PrimaryDatacenter = "dc1"
|
||||
c.Build = "1.4.0"
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
// Create the WAN link
|
||||
joinWAN(t, s2, s1)
|
||||
testrpc.WaitForLeader(t, s2.RPC, "dc2")
|
||||
|
||||
// Get the original intermediate
|
||||
secondaryProvider, _ := s2.getCAProvider()
|
||||
oldIntermediatePEM, err := secondaryProvider.ActiveIntermediate()
|
||||
require.NoError(err)
|
||||
require.NotEmpty(oldIntermediatePEM)
|
||||
|
||||
// Store the current root
|
||||
rootReq := &structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
var rootList structs.IndexedCARoots
|
||||
require.NoError(s1.RPC("ConnectCA.Roots", rootReq, &rootList))
|
||||
require.Len(rootList.Roots, 1)
|
||||
|
||||
// Update the provider config to use a new private key, which should
|
||||
// cause a rotation.
|
||||
_, newKey, err := connect.GeneratePrivateKey()
|
||||
require.NoError(err)
|
||||
newConfig := &structs.CAConfiguration{
|
||||
Provider: "consul",
|
||||
Config: map[string]interface{}{
|
||||
"PrivateKey": newKey,
|
||||
"RootCert": "",
|
||||
"RotationPeriod": 90 * 24 * time.Hour,
|
||||
},
|
||||
}
|
||||
{
|
||||
args := &structs.CARequest{
|
||||
Datacenter: "dc1",
|
||||
Config: newConfig,
|
||||
}
|
||||
var reply interface{}
|
||||
|
||||
require.NoError(s1.RPC("ConnectCA.ConfigurationSet", args, &reply))
|
||||
}
|
||||
|
||||
// Wait for dc2's intermediate to be refreshed.
|
||||
var intermediatePEM string
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
intermediatePEM, err = secondaryProvider.ActiveIntermediate()
|
||||
r.Check(err)
|
||||
if intermediatePEM == oldIntermediatePEM {
|
||||
r.Fatal("not a new intermediate")
|
||||
}
|
||||
})
|
||||
require.NoError(err)
|
||||
|
||||
// Verify the root lists have been rotated in each DC's state store.
|
||||
state1 := s1.fsm.State()
|
||||
_, primaryRoot, err := state1.CARootActive(nil)
|
||||
require.NoError(err)
|
||||
|
||||
state2 := s2.fsm.State()
|
||||
_, roots2, err := state2.CARoots(nil)
|
||||
require.NoError(err)
|
||||
require.Equal(2, len(roots2))
|
||||
|
||||
newRoot := roots2[0]
|
||||
oldRoot := roots2[1]
|
||||
if roots2[1].Active {
|
||||
newRoot = roots2[1]
|
||||
oldRoot = roots2[0]
|
||||
}
|
||||
require.False(oldRoot.Active)
|
||||
require.True(newRoot.Active)
|
||||
require.Equal(primaryRoot.ID, newRoot.ID)
|
||||
require.Equal(primaryRoot.RootCert, newRoot.RootCert)
|
||||
|
||||
// Get the new root from dc1 and validate a chain of:
|
||||
// dc2 leaf -> dc2 intermediate -> dc1 root
|
||||
_, caRoot := s1.getCAProvider()
|
||||
|
||||
// Have dc2 sign a leaf cert and make sure the chain is correct.
|
||||
spiffeService := &connect.SpiffeIDService{
|
||||
Host: "node1",
|
||||
Namespace: "default",
|
||||
Datacenter: "dc1",
|
||||
Service: "foo",
|
||||
}
|
||||
raw, _ := connect.TestCSR(t, spiffeService)
|
||||
|
||||
leafCsr, err := connect.ParseCSR(raw)
|
||||
require.NoError(err)
|
||||
|
||||
leafPEM, err := secondaryProvider.Sign(leafCsr)
|
||||
require.NoError(err)
|
||||
|
||||
cert, err := connect.ParseCert(leafPEM)
|
||||
require.NoError(err)
|
||||
|
||||
// Check that the leaf signed by the new intermediate can be verified using the
|
||||
// returned cert chain (signed intermediate + remote root).
|
||||
intermediatePool := x509.NewCertPool()
|
||||
intermediatePool.AppendCertsFromPEM([]byte(intermediatePEM))
|
||||
rootPool := x509.NewCertPool()
|
||||
rootPool.AppendCertsFromPEM([]byte(caRoot.RootCert))
|
||||
|
||||
_, err = cert.Verify(x509.VerifyOptions{
|
||||
Intermediates: intermediatePool,
|
||||
Roots: rootPool,
|
||||
})
|
||||
require.NoError(err)
|
||||
}
|
||||
|
||||
func TestLeader_SecondaryCA_TransitionFromPrimary(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
// Initialize dc1 as the primary DC
|
||||
id1, err := uuid.GenerateUUID()
|
||||
require.NoError(err)
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.PrimaryDatacenter = "dc1"
|
||||
c.CAConfig.ClusterID = id1
|
||||
c.Build = "1.4.0"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// dc2 as a primary DC initially
|
||||
id2, err := uuid.GenerateUUID()
|
||||
require.NoError(err)
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.PrimaryDatacenter = "dc2"
|
||||
c.CAConfig.ClusterID = id2
|
||||
c.Build = "1.4.0"
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
// Get the initial (primary) roots state for the secondary
|
||||
testrpc.WaitForLeader(t, s2.RPC, "dc2")
|
||||
args := structs.DCSpecificRequest{Datacenter: "dc2"}
|
||||
var dc2PrimaryRoots structs.IndexedCARoots
|
||||
require.NoError(s2.RPC("ConnectCA.Roots", &args, &dc2PrimaryRoots))
|
||||
require.Len(dc2PrimaryRoots.Roots, 1)
|
||||
|
||||
// Set the ExternalTrustDomain to a blank string to simulate an old version (pre-1.4.0)
|
||||
// it's fine to change the roots struct directly here because the RPC endpoint already
|
||||
// makes a copy to return.
|
||||
dc2PrimaryRoots.Roots[0].ExternalTrustDomain = ""
|
||||
rootSetArgs := structs.CARequest{
|
||||
Op: structs.CAOpSetRoots,
|
||||
Datacenter: "dc2",
|
||||
Index: dc2PrimaryRoots.Index,
|
||||
Roots: dc2PrimaryRoots.Roots,
|
||||
}
|
||||
resp, err := s2.raftApply(structs.ConnectCARequestType, rootSetArgs)
|
||||
require.NoError(err)
|
||||
if respErr, ok := resp.(error); ok {
|
||||
t.Fatal(respErr)
|
||||
}
|
||||
|
||||
// Shutdown s2 and restart it with the dc1 as the primary
|
||||
s2.Shutdown()
|
||||
dir3, s3 := testServerWithConfig(t, func(c *Config) {
|
||||
c.DataDir = s2.config.DataDir
|
||||
c.Datacenter = "dc2"
|
||||
c.PrimaryDatacenter = "dc1"
|
||||
c.NodeName = s2.config.NodeName
|
||||
c.NodeID = s2.config.NodeID
|
||||
})
|
||||
defer os.RemoveAll(dir3)
|
||||
defer s3.Shutdown()
|
||||
|
||||
// Create the WAN link
|
||||
joinWAN(t, s3, s1)
|
||||
testrpc.WaitForLeader(t, s3.RPC, "dc2")
|
||||
|
||||
// Verify the secondary has migrated its TrustDomain and added the new primary's root.
|
||||
args = structs.DCSpecificRequest{Datacenter: "dc1"}
|
||||
var dc1Roots structs.IndexedCARoots
|
||||
require.NoError(s1.RPC("ConnectCA.Roots", &args, &dc1Roots))
|
||||
require.Len(dc1Roots.Roots, 1)
|
||||
|
||||
args = structs.DCSpecificRequest{Datacenter: "dc2"}
|
||||
var dc2SecondaryRoots structs.IndexedCARoots
|
||||
require.NoError(s3.RPC("ConnectCA.Roots", &args, &dc2SecondaryRoots))
|
||||
|
||||
// dc2's TrustDomain should have changed to the primary's
|
||||
require.Equal(dc2SecondaryRoots.TrustDomain, dc1Roots.TrustDomain)
|
||||
require.NotEqual(dc2SecondaryRoots.TrustDomain, dc2PrimaryRoots.TrustDomain)
|
||||
|
||||
// Both roots should be present and correct
|
||||
require.Len(dc2SecondaryRoots.Roots, 2)
|
||||
var oldSecondaryRoot *structs.CARoot
|
||||
var newSecondaryRoot *structs.CARoot
|
||||
if dc2SecondaryRoots.Roots[0].ID == dc2PrimaryRoots.Roots[0].ID {
|
||||
oldSecondaryRoot = dc2SecondaryRoots.Roots[0]
|
||||
newSecondaryRoot = dc2SecondaryRoots.Roots[1]
|
||||
} else {
|
||||
oldSecondaryRoot = dc2SecondaryRoots.Roots[1]
|
||||
newSecondaryRoot = dc2SecondaryRoots.Roots[0]
|
||||
}
|
||||
|
||||
// The old root should have its TrustDomain filled in as the old domain.
|
||||
require.Equal(oldSecondaryRoot.ExternalTrustDomain, strings.TrimSuffix(dc2PrimaryRoots.TrustDomain, ".consul"))
|
||||
|
||||
require.Equal(oldSecondaryRoot.ID, dc2PrimaryRoots.Roots[0].ID)
|
||||
require.Equal(oldSecondaryRoot.RootCert, dc2PrimaryRoots.Roots[0].RootCert)
|
||||
require.Equal(newSecondaryRoot.ID, dc1Roots.Roots[0].ID)
|
||||
require.Equal(newSecondaryRoot.RootCert, dc1Roots.Roots[0].RootCert)
|
||||
}
|
||||
|
||||
func TestLeader_SecondaryCA_UpgradeBeforePrimary(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
maxRootsQueryTime = 500 * time.Millisecond
|
||||
|
||||
// Initialize dc1 as the primary DC
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.PrimaryDatacenter = "dc1"
|
||||
c.Build = "1.3.0"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// dc2 as a secondary DC
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.PrimaryDatacenter = "dc1"
|
||||
c.Build = "1.4.0"
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
// Create the WAN link
|
||||
joinWAN(t, s2, s1)
|
||||
testrpc.WaitForLeader(t, s2.RPC, "dc2")
|
||||
|
||||
// Verify the root lists are different in each DC's state store.
|
||||
var oldSecondaryRootID string
|
||||
{
|
||||
state1 := s1.fsm.State()
|
||||
_, roots1, err := state1.CARoots(nil)
|
||||
require.NoError(err)
|
||||
|
||||
state2 := s2.fsm.State()
|
||||
_, roots2, err := state2.CARoots(nil)
|
||||
require.NoError(err)
|
||||
require.Equal(1, len(roots1))
|
||||
require.Equal(1, len(roots2))
|
||||
require.NotEqual(roots1[0].ID, roots2[0].ID)
|
||||
require.NotEqual(roots1[0].RootCert, roots2[0].RootCert)
|
||||
oldSecondaryRootID = roots2[0].ID
|
||||
}
|
||||
|
||||
// Update the version on the fly so s2 kicks off the secondary DC transition.
|
||||
tags := s1.config.SerfLANConfig.Tags
|
||||
tags["build"] = "1.4.0"
|
||||
s1.serfLAN.SetTags(tags)
|
||||
|
||||
// Wait for the secondary transition to happen and then verify the secondary DC
|
||||
// has both roots present.
|
||||
secondaryProvider, _ := s2.getCAProvider()
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
state := s2.fsm.State()
|
||||
_, roots, err := state.CARoots(nil)
|
||||
r.Check(err)
|
||||
if len(roots) != 2 {
|
||||
r.Fatalf("should have 2 roots: %v", roots)
|
||||
}
|
||||
inter, err := secondaryProvider.ActiveIntermediate()
|
||||
r.Check(err)
|
||||
if inter == "" {
|
||||
r.Fatal("should have valid intermediate")
|
||||
}
|
||||
})
|
||||
{
|
||||
state1 := s1.fsm.State()
|
||||
_, roots1, err := state1.CARoots(nil)
|
||||
require.NoError(err)
|
||||
|
||||
state2 := s2.fsm.State()
|
||||
_, roots2, err := state2.CARoots(nil)
|
||||
require.NoError(err)
|
||||
require.Equal(1, len(roots1))
|
||||
require.Equal(2, len(roots2))
|
||||
var oldSecondaryRoot *structs.CARoot
|
||||
var newSecondaryRoot *structs.CARoot
|
||||
if roots2[0].ID == oldSecondaryRootID {
|
||||
oldSecondaryRoot = roots2[0]
|
||||
newSecondaryRoot = roots2[1]
|
||||
} else {
|
||||
oldSecondaryRoot = roots2[1]
|
||||
newSecondaryRoot = roots2[0]
|
||||
}
|
||||
require.Equal(roots1[0].ID, newSecondaryRoot.ID)
|
||||
require.Equal(roots1[0].RootCert, newSecondaryRoot.RootCert)
|
||||
require.NotEqual(newSecondaryRoot.ID, oldSecondaryRoot.ID)
|
||||
require.NotEqual(newSecondaryRoot.RootCert, oldSecondaryRoot.RootCert)
|
||||
}
|
||||
|
||||
_, caRoot := s1.getCAProvider()
|
||||
intermediatePEM, err := secondaryProvider.ActiveIntermediate()
|
||||
require.NoError(err)
|
||||
|
||||
// Have dc2 sign a leaf cert and make sure the chain is correct.
|
||||
spiffeService := &connect.SpiffeIDService{
|
||||
Host: "node1",
|
||||
Namespace: "default",
|
||||
Datacenter: "dc1",
|
||||
Service: "foo",
|
||||
}
|
||||
raw, _ := connect.TestCSR(t, spiffeService)
|
||||
|
||||
leafCsr, err := connect.ParseCSR(raw)
|
||||
require.NoError(err)
|
||||
|
||||
leafPEM, err := secondaryProvider.Sign(leafCsr)
|
||||
require.NoError(err)
|
||||
|
||||
cert, err := connect.ParseCert(leafPEM)
|
||||
require.NoError(err)
|
||||
|
||||
// Check that the leaf signed by the new cert can be verified using the
|
||||
// returned cert chain (signed intermediate + remote root).
|
||||
intermediatePool := x509.NewCertPool()
|
||||
intermediatePool.AppendCertsFromPEM([]byte(intermediatePEM))
|
||||
rootPool := x509.NewCertPool()
|
||||
rootPool.AppendCertsFromPEM([]byte(caRoot.RootCert))
|
||||
|
||||
_, err = cert.Verify(x509.VerifyOptions{
|
||||
Intermediates: intermediatePool,
|
||||
Roots: rootPool,
|
||||
})
|
||||
require.NoError(err)
|
||||
}
|
||||
|
||||
func TestLeader_ReplicateIntentions(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
assert := assert.New(t)
|
||||
require := require.New(t)
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// dc2 as a secondary DC
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.PrimaryDatacenter = "dc1"
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
// Create the WAN link
|
||||
joinWAN(t, s2, s1)
|
||||
testrpc.WaitForLeader(t, s2.RPC, "dc2")
|
||||
|
||||
// Create an intention in dc1
|
||||
ixn := structs.IntentionRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.IntentionOpCreate,
|
||||
Intention: &structs.Intention{
|
||||
SourceNS: structs.IntentionDefaultNamespace,
|
||||
SourceName: "test",
|
||||
DestinationNS: structs.IntentionDefaultNamespace,
|
||||
DestinationName: "test",
|
||||
Action: structs.IntentionActionAllow,
|
||||
SourceType: structs.IntentionSourceConsul,
|
||||
Meta: map[string]string{},
|
||||
},
|
||||
}
|
||||
var reply string
|
||||
require.NoError(s1.RPC("Intention.Apply", &ixn, &reply))
|
||||
require.NotEmpty(reply)
|
||||
|
||||
// Wait for it to get replicated to dc2
|
||||
var createdAt time.Time
|
||||
ixn.Intention.ID = reply
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
req := &structs.IntentionQueryRequest{
|
||||
Datacenter: "dc2",
|
||||
IntentionID: ixn.Intention.ID,
|
||||
}
|
||||
var resp structs.IndexedIntentions
|
||||
r.Check(s2.RPC("Intention.Get", req, &resp))
|
||||
if len(resp.Intentions) != 1 {
|
||||
r.Fatalf("bad: %v", resp.Intentions)
|
||||
}
|
||||
actual := resp.Intentions[0]
|
||||
createdAt = actual.CreatedAt
|
||||
})
|
||||
|
||||
// Sleep a bit so that the UpdatedAt field will definitely be different
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
|
||||
// Update the intention in dc1
|
||||
ixn.Op = structs.IntentionOpUpdate
|
||||
ixn.Intention.ID = reply
|
||||
ixn.Intention.SourceName = "*"
|
||||
require.NoError(s1.RPC("Intention.Apply", &ixn, &reply))
|
||||
|
||||
// Wait for dc2 to get the update
|
||||
ixn.Intention.ID = reply
|
||||
var resp structs.IndexedIntentions
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
req := &structs.IntentionQueryRequest{
|
||||
Datacenter: "dc2",
|
||||
IntentionID: ixn.Intention.ID,
|
||||
}
|
||||
r.Check(s2.RPC("Intention.Get", req, &resp))
|
||||
if len(resp.Intentions) != 1 {
|
||||
r.Fatalf("bad: %v", resp.Intentions)
|
||||
}
|
||||
if resp.Intentions[0].SourceName != "*" {
|
||||
r.Fatalf("bad: %v", resp.Intentions[0])
|
||||
}
|
||||
})
|
||||
|
||||
actual := resp.Intentions[0]
|
||||
assert.Equal(createdAt, actual.CreatedAt)
|
||||
assert.WithinDuration(time.Now(), actual.UpdatedAt, 5*time.Second)
|
||||
|
||||
actual.CreateIndex, actual.ModifyIndex = 0, 0
|
||||
actual.CreatedAt = ixn.Intention.CreatedAt
|
||||
actual.UpdatedAt = ixn.Intention.UpdatedAt
|
||||
ixn.Intention.UpdatePrecedence()
|
||||
assert.Equal(ixn.Intention, actual)
|
||||
|
||||
// Delete
|
||||
ixn.Op = structs.IntentionOpDelete
|
||||
require.NoError(s1.RPC("Intention.Apply", &ixn, &reply))
|
||||
|
||||
// Wait for the delete to be replicated
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
req := &structs.IntentionQueryRequest{
|
||||
Datacenter: "dc2",
|
||||
IntentionID: ixn.Intention.ID,
|
||||
}
|
||||
var resp structs.IndexedIntentions
|
||||
err := s2.RPC("Intention.Get", req, &resp)
|
||||
if err == nil || !strings.Contains(err.Error(), ErrIntentionNotFound.Error()) {
|
||||
r.Fatalf("expected intention not found")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestLeader_ReplicateIntentions_forwardToPrimary(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
assert := assert.New(t)
|
||||
require := require.New(t)
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// dc2 as a secondary DC
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.PrimaryDatacenter = "dc1"
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
// Create the WAN link
|
||||
joinWAN(t, s2, s1)
|
||||
testrpc.WaitForLeader(t, s2.RPC, "dc2")
|
||||
|
||||
// Create an intention in dc2
|
||||
ixn := structs.IntentionRequest{
|
||||
Datacenter: "dc2",
|
||||
Op: structs.IntentionOpCreate,
|
||||
Intention: &structs.Intention{
|
||||
SourceNS: structs.IntentionDefaultNamespace,
|
||||
SourceName: "test",
|
||||
DestinationNS: structs.IntentionDefaultNamespace,
|
||||
DestinationName: "test",
|
||||
Action: structs.IntentionActionAllow,
|
||||
SourceType: structs.IntentionSourceConsul,
|
||||
Meta: map[string]string{},
|
||||
},
|
||||
}
|
||||
var reply string
|
||||
require.NoError(s1.RPC("Intention.Apply", &ixn, &reply))
|
||||
require.NotEmpty(reply)
|
||||
|
||||
// Make sure it exists in both DCs
|
||||
var createdAt time.Time
|
||||
ixn.Intention.ID = reply
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
for _, server := range []*Server{s1, s2} {
|
||||
req := &structs.IntentionQueryRequest{
|
||||
Datacenter: server.config.Datacenter,
|
||||
IntentionID: ixn.Intention.ID,
|
||||
}
|
||||
var resp structs.IndexedIntentions
|
||||
r.Check(server.RPC("Intention.Get", req, &resp))
|
||||
if len(resp.Intentions) != 1 {
|
||||
r.Fatalf("bad: %v", resp.Intentions)
|
||||
}
|
||||
actual := resp.Intentions[0]
|
||||
createdAt = actual.CreatedAt
|
||||
}
|
||||
})
|
||||
|
||||
// Sleep a bit so that the UpdatedAt field will definitely be different
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
|
||||
// Update the intention in dc1
|
||||
ixn.Op = structs.IntentionOpUpdate
|
||||
ixn.Intention.ID = reply
|
||||
ixn.Intention.SourceName = "*"
|
||||
require.NoError(s1.RPC("Intention.Apply", &ixn, &reply))
|
||||
|
||||
// Wait for dc2 to get the update
|
||||
ixn.Intention.ID = reply
|
||||
var resp structs.IndexedIntentions
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
for _, server := range []*Server{s1, s2} {
|
||||
req := &structs.IntentionQueryRequest{
|
||||
Datacenter: server.config.Datacenter,
|
||||
IntentionID: ixn.Intention.ID,
|
||||
}
|
||||
r.Check(server.RPC("Intention.Get", req, &resp))
|
||||
if len(resp.Intentions) != 1 {
|
||||
r.Fatalf("bad: %v", resp.Intentions)
|
||||
}
|
||||
if resp.Intentions[0].SourceName != "*" {
|
||||
r.Fatalf("bad: %v", resp.Intentions[0])
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
actual := resp.Intentions[0]
|
||||
assert.Equal(createdAt, actual.CreatedAt)
|
||||
assert.WithinDuration(time.Now(), actual.UpdatedAt, 5*time.Second)
|
||||
|
||||
actual.CreateIndex, actual.ModifyIndex = 0, 0
|
||||
actual.CreatedAt = ixn.Intention.CreatedAt
|
||||
actual.UpdatedAt = ixn.Intention.UpdatedAt
|
||||
ixn.Intention.UpdatePrecedence()
|
||||
assert.Equal(ixn.Intention, actual)
|
||||
|
||||
// Delete
|
||||
ixn.Op = structs.IntentionOpDelete
|
||||
require.NoError(s1.RPC("Intention.Apply", &ixn, &reply))
|
||||
|
||||
// Wait for the delete to be replicated
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
for _, server := range []*Server{s1, s2} {
|
||||
req := &structs.IntentionQueryRequest{
|
||||
Datacenter: server.config.Datacenter,
|
||||
IntentionID: ixn.Intention.ID,
|
||||
}
|
||||
var resp structs.IndexedIntentions
|
||||
err := server.RPC("Intention.Get", req, &resp)
|
||||
if err == nil || !strings.Contains(err.Error(), ErrIntentionNotFound.Error()) {
|
||||
r.Fatalf("expected intention not found")
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestLeader_batchIntentionUpdates(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
assert := assert.New(t)
|
||||
ixn1 := structs.TestIntention(t)
|
||||
ixn1.ID = "ixn1"
|
||||
ixn2 := structs.TestIntention(t)
|
||||
ixn2.ID = "ixn2"
|
||||
ixnLarge := structs.TestIntention(t)
|
||||
ixnLarge.ID = "ixnLarge"
|
||||
ixnLarge.Description = strings.Repeat("x", maxIntentionTxnSize-1)
|
||||
|
||||
cases := []struct {
|
||||
deletes structs.Intentions
|
||||
updates structs.Intentions
|
||||
expected []structs.TxnOps
|
||||
}{
|
||||
// 1 deletes, 0 updates
|
||||
{
|
||||
deletes: structs.Intentions{ixn1},
|
||||
expected: []structs.TxnOps{
|
||||
structs.TxnOps{
|
||||
&structs.TxnOp{
|
||||
Intention: &structs.TxnIntentionOp{
|
||||
Op: structs.IntentionOpDelete,
|
||||
Intention: ixn1,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
// 0 deletes, 1 updates
|
||||
{
|
||||
updates: structs.Intentions{ixn1},
|
||||
expected: []structs.TxnOps{
|
||||
structs.TxnOps{
|
||||
&structs.TxnOp{
|
||||
Intention: &structs.TxnIntentionOp{
|
||||
Op: structs.IntentionOpUpdate,
|
||||
Intention: ixn1,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
// 1 deletes, 1 updates
|
||||
{
|
||||
deletes: structs.Intentions{ixn1},
|
||||
updates: structs.Intentions{ixn2},
|
||||
expected: []structs.TxnOps{
|
||||
structs.TxnOps{
|
||||
&structs.TxnOp{
|
||||
Intention: &structs.TxnIntentionOp{
|
||||
Op: structs.IntentionOpDelete,
|
||||
Intention: ixn1,
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
Intention: &structs.TxnIntentionOp{
|
||||
Op: structs.IntentionOpUpdate,
|
||||
Intention: ixn2,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
// 1 large intention update
|
||||
{
|
||||
updates: structs.Intentions{ixnLarge},
|
||||
expected: []structs.TxnOps{
|
||||
structs.TxnOps{
|
||||
&structs.TxnOp{
|
||||
Intention: &structs.TxnIntentionOp{
|
||||
Op: structs.IntentionOpUpdate,
|
||||
Intention: ixnLarge,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
// 2 deletes (w/ a large intention), 1 updates
|
||||
{
|
||||
deletes: structs.Intentions{ixn1, ixnLarge},
|
||||
updates: structs.Intentions{ixn2},
|
||||
expected: []structs.TxnOps{
|
||||
structs.TxnOps{
|
||||
&structs.TxnOp{
|
||||
Intention: &structs.TxnIntentionOp{
|
||||
Op: structs.IntentionOpDelete,
|
||||
Intention: ixn1,
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
Intention: &structs.TxnIntentionOp{
|
||||
Op: structs.IntentionOpDelete,
|
||||
Intention: ixnLarge,
|
||||
},
|
||||
},
|
||||
},
|
||||
structs.TxnOps{
|
||||
&structs.TxnOp{
|
||||
Intention: &structs.TxnIntentionOp{
|
||||
Op: structs.IntentionOpUpdate,
|
||||
Intention: ixn2,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
// 1 deletes , 2 updates (w/ a large intention)
|
||||
{
|
||||
deletes: structs.Intentions{ixn1},
|
||||
updates: structs.Intentions{ixnLarge, ixn2},
|
||||
expected: []structs.TxnOps{
|
||||
structs.TxnOps{
|
||||
&structs.TxnOp{
|
||||
Intention: &structs.TxnIntentionOp{
|
||||
Op: structs.IntentionOpDelete,
|
||||
Intention: ixn1,
|
||||
},
|
||||
},
|
||||
&structs.TxnOp{
|
||||
Intention: &structs.TxnIntentionOp{
|
||||
Op: structs.IntentionOpUpdate,
|
||||
Intention: ixnLarge,
|
||||
},
|
||||
},
|
||||
},
|
||||
structs.TxnOps{
|
||||
&structs.TxnOp{
|
||||
Intention: &structs.TxnIntentionOp{
|
||||
Op: structs.IntentionOpUpdate,
|
||||
Intention: ixn2,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
actual := batchIntentionUpdates(tc.deletes, tc.updates)
|
||||
assert.Equal(tc.expected, actual)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeader_GenerateCASignRequest(t *testing.T) {
|
||||
csr := "A"
|
||||
s := Server{config: &Config{PrimaryDatacenter: "east"}, tokens: new(token.Store)}
|
||||
req := s.generateCASignRequest(csr)
|
||||
assert.Equal(t, "east", req.RequestDatacenter())
|
||||
}
|
|
@ -1,29 +0,0 @@
|
|||
// +build !consulent
|
||||
|
||||
package consul
|
||||
|
||||
// initializeCA sets up the CA provider when gaining leadership, bootstrapping
|
||||
// the root in the state store if necessary.
|
||||
func (s *Server) initializeCA() error {
|
||||
// Bail if connect isn't enabled.
|
||||
if !s.config.ConnectEnabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
conf, err := s.initializeCAConfig()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Initialize the provider based on the current config.
|
||||
provider, err := s.createCAProvider(conf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.initializeRootCA(provider, conf)
|
||||
}
|
||||
|
||||
// Stub methods, only present in Consul Enterprise.
|
||||
func (s *Server) startEnterpriseLeader() {}
|
||||
func (s *Server) stopEnterpriseLeader() {}
|
|
@ -277,6 +277,15 @@ type Server struct {
|
|||
shutdownCh chan struct{}
|
||||
shutdownLock sync.Mutex
|
||||
|
||||
// State for enterprise leader logic
|
||||
connectLock sync.RWMutex
|
||||
connectEnabled bool
|
||||
connectCh chan struct{}
|
||||
|
||||
// State for whether this datacenter is acting as a secondary CA.
|
||||
actingSecondaryCA bool
|
||||
actingSecondaryLock sync.RWMutex
|
||||
|
||||
// embedded struct to hold all the enterprise specific data
|
||||
EnterpriseServer
|
||||
}
|
||||
|
@ -1258,6 +1267,10 @@ func (s *Server) isReadyForConsistentReads() bool {
|
|||
return atomic.LoadInt32(&s.readyForConsistentReads) == 1
|
||||
}
|
||||
|
||||
func (s *Server) intentionReplicationEnabled() bool {
|
||||
return s.config.ConnectEnabled && s.config.Datacenter != s.config.PrimaryDatacenter
|
||||
}
|
||||
|
||||
// peersInfoContent is used to help operators understand what happened to the
|
||||
// peers.json file. This is written to a file called peers.info in the same
|
||||
// location.
|
||||
|
|
Loading…
Reference in a new issue