From 0fc4da6861b85d3bec0d63746d0ea05da6ce974e Mon Sep 17 00:00:00 2001 From: Matt Keeler Date: Mon, 24 Jun 2019 14:21:51 -0400 Subject: [PATCH] Implement intention replication and secondary CA initialization --- agent/consul/connect_ca_endpoint.go | 52 +- agent/consul/enterprise_server_oss.go | 4 - agent/consul/leader.go | 4 +- agent/consul/leader_connect.go | 567 +++++++++++++++++ agent/consul/leader_connect_test.go | 842 ++++++++++++++++++++++++++ agent/consul/leader_oss.go | 29 - agent/consul/server.go | 13 + 7 files changed, 1474 insertions(+), 37 deletions(-) create mode 100644 agent/consul/leader_connect.go create mode 100644 agent/consul/leader_connect_test.go delete mode 100644 agent/consul/leader_oss.go diff --git a/agent/consul/connect_ca_endpoint.go b/agent/consul/connect_ca_endpoint.go index b617179c1..749a64ac4 100644 --- a/agent/consul/connect_ca_endpoint.go +++ b/agent/consul/connect_ca_endpoint.go @@ -30,8 +30,9 @@ var ( // variable points to. Clients need to compare using `err.Error() == // consul.ErrRateLimited.Error()` which is very sad. Short of replacing our // RPC mechanism it's hard to know how to make that much better though. - ErrConnectNotEnabled = errors.New("Connect must be enabled in order to use this endpoint") - ErrRateLimited = errors.New("Rate limit reached, try again later") + ErrConnectNotEnabled = errors.New("Connect must be enabled in order to use this endpoint") + ErrRateLimited = errors.New("Rate limit reached, try again later") + ErrNotPrimaryDatacenter = errors.New("not the primary datacenter") ) const ( @@ -556,3 +557,50 @@ func (s *ConnectCA) Sign( return nil } + +// SignIntermediate signs an intermediate certificate for a remote datacenter. +func (s *ConnectCA) SignIntermediate( + args *structs.CASignRequest, + reply *string) error { + // Exit early if Connect hasn't been enabled. + if !s.srv.config.ConnectEnabled { + return ErrConnectNotEnabled + } + + if done, err := s.srv.forward("ConnectCA.SignIntermediate", args, args, reply); done { + return err + } + + // Verify we are allowed to serve this request + if s.srv.config.PrimaryDatacenter != s.srv.config.Datacenter { + return ErrNotPrimaryDatacenter + } + + // This action requires operator write access. + rule, err := s.srv.ResolveToken(args.Token) + if err != nil { + return err + } + if rule != nil && !rule.OperatorWrite() { + return acl.ErrPermissionDenied + } + + provider, _ := s.srv.getCAProvider() + if provider == nil { + return fmt.Errorf("internal error: CA provider is nil") + } + + csr, err := connect.ParseCSR(args.CSR) + if err != nil { + return err + } + + cert, err := provider.SignIntermediate(csr) + if err != nil { + return err + } + + *reply = cert + + return nil +} diff --git a/agent/consul/enterprise_server_oss.go b/agent/consul/enterprise_server_oss.go index 4dc3ed2a5..ca604ecaf 100644 --- a/agent/consul/enterprise_server_oss.go +++ b/agent/consul/enterprise_server_oss.go @@ -30,7 +30,3 @@ func (s *Server) handleEnterpriseRPCConn(rtype pool.RPCType, conn net.Conn, isTL func (s *Server) enterpriseStats() map[string]map[string]string { return nil } - -func (s *Server) intentionReplicationEnabled() bool { - return false -} diff --git a/agent/consul/leader.go b/agent/consul/leader.go index 21d96c360..93a1e0947 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -327,7 +327,7 @@ func (s *Server) establishLeadership() error { s.startConfigReplication() - s.startEnterpriseLeader() + s.startConnectLeader() s.startCARootPruning() @@ -347,7 +347,7 @@ func (s *Server) revokeLeadership() { s.stopConfigReplication() - s.stopEnterpriseLeader() + s.stopConnectLeader() s.stopCARootPruning() diff --git a/agent/consul/leader_connect.go b/agent/consul/leader_connect.go new file mode 100644 index 000000000..e15431646 --- /dev/null +++ b/agent/consul/leader_connect.go @@ -0,0 +1,567 @@ +package consul + +import ( + "context" + "errors" + "fmt" + "strings" + "time" + + "golang.org/x/time/rate" + + "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/agent/connect/ca" + "github.com/hashicorp/consul/agent/consul/autopilot" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/go-version" +) + +const ( + // loopRateLimit is the maximum rate per second at which we can rerun CA and intention + // replication watches. + loopRateLimit rate.Limit = 0.2 + + // retryBucketSize is the maximum number of stored rate limit attempts for looped + // blocking query operations. + retryBucketSize = 5 + + // maxIntentionTxnSize is the maximum size (in bytes) of a transaction used during + // Intention replication. + maxIntentionTxnSize = raftWarnSize / 4 +) + +var ( + // maxRetryBackoff is the maximum number of seconds to wait between failed blocking + // queries when backing off. + maxRetryBackoff = 256 + + // minMultiDCConnectVersion is the minimum version in order to support multi-DC Connect + // features. + minMultiDCConnectVersion = version.Must(version.NewVersion("1.4.0")) + + // maxRootsQueryTime is the maximum time the primary roots watch query can block before + // returning. + maxRootsQueryTime = maxQueryTime + + errEmptyVersion = errors.New("version string is empty") +) + +// initializeCA sets up the CA provider when gaining leadership, either bootstrapping +// the CA if this is the primary DC or making a remote RPC for intermediate signing +// if this is a secondary DC. +func (s *Server) initializeCA() error { + // Bail if connect isn't enabled. + if !s.config.ConnectEnabled { + return nil + } + + // Initialize the provider based on the current config. + conf, err := s.initializeCAConfig() + if err != nil { + return err + } + provider, err := s.createCAProvider(conf) + if err != nil { + return err + } + s.setCAProvider(provider, nil) + + // Check whether the primary DC has been upgraded to support multi-DC Connect. + // If it hasn't, we skip the secondary initialization routine and continue acting + // as a primary DC. This is periodically re-checked in the goroutine watching the + // primary's CA roots so that we can transition to a secondary DC when it has + // been upgraded. + var primaryHasVersion bool + if s.config.PrimaryDatacenter != s.config.Datacenter { + primaryHasVersion, err = s.datacentersMeetMinVersion(minMultiDCConnectVersion) + if err == errEmptyVersion { + s.logger.Printf("[WARN] connect: primary datacenter %q is reachable but not yet initialized", s.config.PrimaryDatacenter) + return nil + } else if err != nil { + s.logger.Printf("[ERR] connect: error initializing CA: could not query primary datacenter: %v", err) + return nil + } + } + + // If this isn't the primary DC, run the secondary DC routine if the primary has already + // been upgraded to at least 1.4.0. + if s.config.PrimaryDatacenter != s.config.Datacenter && primaryHasVersion { + // Get the root CA to see if we need to refresh our intermediate. + args := structs.DCSpecificRequest{ + Datacenter: s.config.PrimaryDatacenter, + } + var roots structs.IndexedCARoots + if err := s.forwardDC("ConnectCA.Roots", s.config.PrimaryDatacenter, &args, &roots); err != nil { + return err + } + + // Configure the CA provider and initialize the intermediate certificate if necessary. + if err := s.initializeSecondaryProvider(provider, roots); err != nil { + return fmt.Errorf("error configuring provider: %v", err) + } + if err := s.initializeSecondaryCA(provider, roots); err != nil { + return err + } + + s.logger.Printf("[INFO] connect: initialized secondary datacenter CA with provider %q", conf.Provider) + return nil + } + + return s.initializeRootCA(provider, conf) +} + +// initializeSecondaryCA runs the routine for generating an intermediate CA CSR and getting +// it signed by the primary DC if the root CA of the primary DC has changed since the last +// intermediate. +func (s *Server) initializeSecondaryCA(provider ca.Provider, roots structs.IndexedCARoots) error { + activeIntermediate, err := provider.ActiveIntermediate() + if err != nil { + return err + } + + var storedRootID string + if activeIntermediate != "" { + storedRoot, err := provider.ActiveRoot() + if err != nil { + return err + } + + storedRootID, err = connect.CalculateCertFingerprint(storedRoot) + if err != nil { + return fmt.Errorf("error parsing root fingerprint: %v, %#v", err, roots) + } + } + + var newActiveRoot *structs.CARoot + for _, root := range roots.Roots { + if root.ID == roots.ActiveRootID && root.Active { + newActiveRoot = root + break + } + } + if newActiveRoot == nil { + return fmt.Errorf("primary datacenter does not have an active root CA for Connect") + } + + // Update the roots list in the state store if there's a new active root. + state := s.fsm.State() + _, activeRoot, err := state.CARootActive(nil) + if err != nil { + return err + } + if activeRoot == nil || activeRoot.ID != newActiveRoot.ID { + idx, oldRoots, err := state.CARoots(nil) + if err != nil { + return err + } + + _, config, err := state.CAConfig() + if err != nil { + return err + } + if config == nil { + return fmt.Errorf("local CA not initialized yet") + } + newConf := *config + newConf.ClusterID = newActiveRoot.ExternalTrustDomain + + // Copy the root list and append the new active root, updating the old root + // with the time it was rotated out. + var newRoots structs.CARoots + for _, r := range oldRoots { + newRoot := *r + if newRoot.Active { + newRoot.Active = false + newRoot.RotatedOutAt = time.Now() + } + if newRoot.ExternalTrustDomain == "" { + newRoot.ExternalTrustDomain = config.ClusterID + } + newRoots = append(newRoots, &newRoot) + } + newRoots = append(newRoots, newActiveRoot) + + args := &structs.CARequest{ + Op: structs.CAOpSetRootsAndConfig, + Index: idx, + Roots: newRoots, + Config: &newConf, + } + resp, err := s.raftApply(structs.ConnectCARequestType, &args) + if err != nil { + return err + } + if respErr, ok := resp.(error); ok { + return respErr + } + if respOk, ok := resp.(bool); ok && !respOk { + return fmt.Errorf("could not atomically update roots and config") + } + + s.logger.Printf("[INFO] connect: updated root certificates from primary datacenter") + } + + // Get a signed intermediate from the primary DC if the provider + // hasn't been initialized yet or if the primary's root has changed. + if activeIntermediate == "" || storedRootID != roots.ActiveRootID { + csr, err := provider.GenerateIntermediateCSR() + if err != nil { + return err + } + + var intermediatePEM string + if err := s.forwardDC("ConnectCA.SignIntermediate", s.config.PrimaryDatacenter, s.generateCASignRequest(csr), &intermediatePEM); err != nil { + return err + } + + if err := provider.SetIntermediate(intermediatePEM, newActiveRoot.RootCert); err != nil { + return err + } + + // Append the new intermediate to our local active root entry. + newActiveRoot.IntermediateCerts = append(newActiveRoot.IntermediateCerts, intermediatePEM) + + s.logger.Printf("[INFO] connect: received new intermediate certificate from primary datacenter") + } + + s.setCAProvider(provider, newActiveRoot) + return nil +} + +func (s *Server) generateCASignRequest(csr string) *structs.CASignRequest { + return &structs.CASignRequest{ + Datacenter: s.config.PrimaryDatacenter, + CSR: csr, + WriteRequest: structs.WriteRequest{Token: s.tokens.ReplicationToken()}, + } +} + +// startConnectLeader starts multi-dc connect leader routines. +func (s *Server) startConnectLeader() { + s.connectLock.Lock() + defer s.connectLock.Unlock() + + if s.connectEnabled { + return + } + + s.connectCh = make(chan struct{}) + + // Start the Connect secondary DC actions if enabled. + if s.config.ConnectEnabled && s.config.Datacenter != s.config.PrimaryDatacenter { + go s.secondaryCARootWatch(s.connectCh) + go s.replicateIntentions(s.connectCh) + } + + s.connectEnabled = true +} + +// stopConnectLeader stops connect specific leader functions. +func (s *Server) stopConnectLeader() { + s.connectLock.Lock() + defer s.connectLock.Unlock() + + if !s.connectEnabled { + return + } + + s.actingSecondaryLock.Lock() + s.actingSecondaryCA = false + s.actingSecondaryLock.Unlock() + + close(s.connectCh) + s.connectEnabled = false +} + +// secondaryCARootWatch maintains a blocking query to the primary datacenter's +// ConnectCA.Roots endpoint to monitor when it needs to request a new signed +// intermediate certificate. +func (s *Server) secondaryCARootWatch(stopCh <-chan struct{}) { + args := structs.DCSpecificRequest{ + Datacenter: s.config.PrimaryDatacenter, + QueryOptions: structs.QueryOptions{ + MaxQueryTime: maxRootsQueryTime, + }, + } + + s.logger.Printf("[DEBUG] connect: starting Connect CA root replication from primary datacenter %q", s.config.PrimaryDatacenter) + + retryLoopBackoff(stopCh, func() error { + var roots structs.IndexedCARoots + if err := s.forwardDC("ConnectCA.Roots", s.config.PrimaryDatacenter, &args, &roots); err != nil { + return err + } + + // Check to see if the primary has been upgraded in case we're waiting to switch to + // secondary mode. + provider, _ := s.getCAProvider() + if !s.configuredSecondaryCA() { + primaryHasVersion, err := s.datacentersMeetMinVersion(minMultiDCConnectVersion) + if err != nil { + return err + } + + if primaryHasVersion { + if err := s.initializeSecondaryProvider(provider, roots); err != nil { + return err + } + } + } + + // Run the secondary CA init routine to see if we need to request a new + // intermediate. + if s.configuredSecondaryCA() { + if err := s.initializeSecondaryCA(provider, roots); err != nil { + return err + } + } + + args.QueryOptions.MinQueryIndex = nextIndexVal(args.QueryOptions.MinQueryIndex, roots.QueryMeta.Index) + return nil + }, func(err error) { + // Don't log the error if it's a result of the primary still starting up. + if err != errEmptyVersion { + s.logger.Printf("[ERR] connect: error watching primary datacenter roots: %v", err) + } + }) +} + +// replicateIntentions executes a blocking query to the primary datacenter to replicate +// the intentions there to the local state. +func (s *Server) replicateIntentions(stopCh <-chan struct{}) { + args := structs.DCSpecificRequest{ + Datacenter: s.config.PrimaryDatacenter, + QueryOptions: structs.QueryOptions{Token: s.tokens.ReplicationToken()}, + } + + s.logger.Printf("[DEBUG] connect: starting Connect intention replication from primary datacenter %q", s.config.PrimaryDatacenter) + + retryLoopBackoff(stopCh, func() error { + var remote structs.IndexedIntentions + if err := s.forwardDC("Intention.List", s.config.PrimaryDatacenter, &args, &remote); err != nil { + return err + } + + _, local, err := s.fsm.State().Intentions(nil) + if err != nil { + return err + } + + // Compute the diff between the remote and local intentions. + deletes, updates := diffIntentions(local, remote.Intentions) + txnOpSets := batchIntentionUpdates(deletes, updates) + + // Apply batched updates to the state store. + for _, ops := range txnOpSets { + txnReq := structs.TxnRequest{Ops: ops} + + resp, err := s.raftApply(structs.TxnRequestType, &txnReq) + if err != nil { + return err + } + if respErr, ok := resp.(error); ok { + return respErr + } + + if txnResp, ok := resp.(structs.TxnResponse); ok { + if len(txnResp.Errors) > 0 { + return txnResp.Error() + } + } else { + return fmt.Errorf("unexpected return type %T", resp) + } + } + + args.QueryOptions.MinQueryIndex = nextIndexVal(args.QueryOptions.MinQueryIndex, remote.QueryMeta.Index) + return nil + }, func(err error) { + s.logger.Printf("[ERR] connect: error replicating intentions: %v", err) + }) +} + +// retryLoopBackoff loops a given function indefinitely, backing off exponentially +// upon errors up to a maximum of maxRetryBackoff seconds. +func retryLoopBackoff(stopCh <-chan struct{}, loopFn func() error, errFn func(error)) { + var failedAttempts uint + limiter := rate.NewLimiter(loopRateLimit, retryBucketSize) + for { + // Rate limit how often we run the loop + limiter.Wait(context.Background()) + select { + case <-stopCh: + return + default: + } + if (1 << failedAttempts) < maxRetryBackoff { + failedAttempts++ + } + retryTime := (1 << failedAttempts) * time.Second + + if err := loopFn(); err != nil { + errFn(err) + time.Sleep(retryTime) + continue + } + + // Reset the failed attempts after a successful run. + failedAttempts = 0 + } +} + +// diffIntentions computes the difference between the local and remote intentions +// and returns lists of deletes and updates. +func diffIntentions(local, remote structs.Intentions) (structs.Intentions, structs.Intentions) { + localIdx := make(map[string]uint64, len(local)) + remoteIdx := make(map[string]struct{}, len(remote)) + + var deletes structs.Intentions + var updates structs.Intentions + + for _, intention := range local { + localIdx[intention.ID] = intention.ModifyIndex + } + for _, intention := range remote { + remoteIdx[intention.ID] = struct{}{} + } + + for _, intention := range local { + if _, ok := remoteIdx[intention.ID]; !ok { + deletes = append(deletes, intention) + } + } + + for _, intention := range remote { + existingIdx, ok := localIdx[intention.ID] + if !ok { + updates = append(updates, intention) + } else if existingIdx < intention.ModifyIndex { + updates = append(updates, intention) + } + } + + return deletes, updates +} + +// batchIntentionUpdates breaks up the given updates into sets of TxnOps based +// on the estimated size of the operations. +func batchIntentionUpdates(deletes, updates structs.Intentions) []structs.TxnOps { + var txnOps structs.TxnOps + for _, delete := range deletes { + deleteOp := &structs.TxnIntentionOp{ + Op: structs.IntentionOpDelete, + Intention: delete, + } + txnOps = append(txnOps, &structs.TxnOp{Intention: deleteOp}) + } + + for _, update := range updates { + updateOp := &structs.TxnIntentionOp{ + Op: structs.IntentionOpUpdate, + Intention: update, + } + txnOps = append(txnOps, &structs.TxnOp{Intention: updateOp}) + } + + // Divide the operations into chunks according to maxIntentionTxnSize. + var batchedOps []structs.TxnOps + for batchStart := 0; batchStart < len(txnOps); { + // inner loop finds the last element to include in this batch. + batchSize := 0 + batchEnd := batchStart + for ; batchEnd < len(txnOps) && batchSize < maxIntentionTxnSize; batchEnd += 1 { + batchSize += txnOps[batchEnd].Intention.Intention.EstimateSize() + } + + batchedOps = append(batchedOps, txnOps[batchStart:batchEnd]) + + // txnOps[batchEnd] wasn't included as the slicing doesn't include the element at the stop index + batchStart = batchEnd + } + + return batchedOps +} + +// nextIndexVal computes the next index value to query for, resetting to zero +// if the index went backward. +func nextIndexVal(prevIdx, idx uint64) uint64 { + if prevIdx > idx { + return 0 + } + return idx +} + +// datacentersMeetMinVersion returns whether this datacenter and the primary +// are ready and have upgraded to at least the given version. +func (s *Server) datacentersMeetMinVersion(minVersion *version.Version) (bool, error) { + localAutopilotHealth := s.autopilot.GetClusterHealth() + localServersMeetVersion, err := autopilotServersMeetMinimumVersion(localAutopilotHealth.Servers, minVersion) + if err != nil { + return false, err + } + if !localServersMeetVersion { + return false, err + } + + args := structs.DCSpecificRequest{ + Datacenter: s.config.PrimaryDatacenter, + } + var reply autopilot.OperatorHealthReply + if err := s.forwardDC("Operator.ServerHealth", s.config.PrimaryDatacenter, &args, &reply); err != nil { + return false, err + } + remoteServersMeetVersion, err := autopilotServersMeetMinimumVersion(reply.Servers, minVersion) + if err != nil { + return false, err + } + + return localServersMeetVersion && remoteServersMeetVersion, nil +} + +// autopilotServersMeetMinimumVersion returns whether the given slice of servers +// meets a minimum version. +func autopilotServersMeetMinimumVersion(servers []autopilot.ServerHealth, minVersion *version.Version) (bool, error) { + for _, server := range servers { + if server.Version == "" { + return false, errEmptyVersion + } + version, err := version.NewVersion(server.Version) + if err != nil { + return false, fmt.Errorf("error parsing remote server version: %v", err) + } + + if version.LessThan(minVersion) { + return false, nil + } + } + + return true, nil +} + +// initializeSecondaryProvider configures the given provider for a secondary, non-root datacenter. +func (s *Server) initializeSecondaryProvider(provider ca.Provider, roots structs.IndexedCARoots) error { + if roots.TrustDomain == "" { + return fmt.Errorf("trust domain from primary datacenter is not initialized") + } + + clusterID := strings.Split(roots.TrustDomain, ".")[0] + _, conf, err := s.fsm.State().CAConfig() + if err != nil { + return err + } + + if err := provider.Configure(clusterID, false, conf.Config); err != nil { + return fmt.Errorf("error configuring provider: %v", err) + } + + s.actingSecondaryLock.Lock() + s.actingSecondaryCA = true + s.actingSecondaryLock.Unlock() + + return nil +} + +func (s *Server) configuredSecondaryCA() bool { + s.actingSecondaryLock.RLock() + defer s.actingSecondaryLock.RUnlock() + return s.actingSecondaryCA +} diff --git a/agent/consul/leader_connect_test.go b/agent/consul/leader_connect_test.go new file mode 100644 index 000000000..2cc70304c --- /dev/null +++ b/agent/consul/leader_connect_test.go @@ -0,0 +1,842 @@ +package consul + +import ( + "crypto/x509" + "os" + "strings" + "testing" + "time" + + "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/token" + "github.com/hashicorp/consul/sdk/testutil/retry" + "github.com/hashicorp/consul/testrpc" + uuid "github.com/hashicorp/go-uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestLeader_SecondaryCA_Initialize(t *testing.T) { + t.Parallel() + + require := require.New(t) + + // Initialize primary as the primary DC + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "primary" + c.PrimaryDatacenter = "primary" + c.Build = "1.4.0" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + testrpc.WaitForLeader(t, s1.RPC, "primary") + + // secondary as a secondary DC + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "secondary" + c.PrimaryDatacenter = "primary" + c.Build = "1.4.0" + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + // Create the WAN link + joinWAN(t, s2, s1) + testrpc.WaitForLeader(t, s2.RPC, "secondary") + + _, caRoot := s1.getCAProvider() + secondaryProvider, _ := s2.getCAProvider() + intermediatePEM, err := secondaryProvider.ActiveIntermediate() + require.NoError(err) + + // Verify the root lists are equal in each DC's state store. + state1 := s1.fsm.State() + _, roots1, err := state1.CARoots(nil) + require.NoError(err) + + state2 := s2.fsm.State() + _, roots2, err := state2.CARoots(nil) + require.NoError(err) + require.Equal(roots1[0].ID, roots2[0].ID) + require.Equal(roots1[0].RootCert, roots2[0].RootCert) + require.Equal(1, len(roots1)) + require.Equal(len(roots1), len(roots2)) + + // Have secondary sign a leaf cert and make sure the chain is correct. + spiffeService := &connect.SpiffeIDService{ + Host: "node1", + Namespace: "default", + Datacenter: "primary", + Service: "foo", + } + raw, _ := connect.TestCSR(t, spiffeService) + + leafCsr, err := connect.ParseCSR(raw) + require.NoError(err) + + leafPEM, err := secondaryProvider.Sign(leafCsr) + require.NoError(err) + + cert, err := connect.ParseCert(leafPEM) + require.NoError(err) + + // Check that the leaf signed by the new cert can be verified using the + // returned cert chain (signed intermediate + remote root). + intermediatePool := x509.NewCertPool() + intermediatePool.AppendCertsFromPEM([]byte(intermediatePEM)) + rootPool := x509.NewCertPool() + rootPool.AppendCertsFromPEM([]byte(caRoot.RootCert)) + + _, err = cert.Verify(x509.VerifyOptions{ + Intermediates: intermediatePool, + Roots: rootPool, + }) + require.NoError(err) +} + +func TestLeader_SecondaryCA_IntermediateRefresh(t *testing.T) { + t.Parallel() + + require := require.New(t) + + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.Build = "1.4.0" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + // dc2 as a secondary DC + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc1" + c.Build = "1.4.0" + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + // Create the WAN link + joinWAN(t, s2, s1) + testrpc.WaitForLeader(t, s2.RPC, "dc2") + + // Get the original intermediate + secondaryProvider, _ := s2.getCAProvider() + oldIntermediatePEM, err := secondaryProvider.ActiveIntermediate() + require.NoError(err) + require.NotEmpty(oldIntermediatePEM) + + // Store the current root + rootReq := &structs.DCSpecificRequest{ + Datacenter: "dc1", + } + var rootList structs.IndexedCARoots + require.NoError(s1.RPC("ConnectCA.Roots", rootReq, &rootList)) + require.Len(rootList.Roots, 1) + + // Update the provider config to use a new private key, which should + // cause a rotation. + _, newKey, err := connect.GeneratePrivateKey() + require.NoError(err) + newConfig := &structs.CAConfiguration{ + Provider: "consul", + Config: map[string]interface{}{ + "PrivateKey": newKey, + "RootCert": "", + "RotationPeriod": 90 * 24 * time.Hour, + }, + } + { + args := &structs.CARequest{ + Datacenter: "dc1", + Config: newConfig, + } + var reply interface{} + + require.NoError(s1.RPC("ConnectCA.ConfigurationSet", args, &reply)) + } + + // Wait for dc2's intermediate to be refreshed. + var intermediatePEM string + retry.Run(t, func(r *retry.R) { + intermediatePEM, err = secondaryProvider.ActiveIntermediate() + r.Check(err) + if intermediatePEM == oldIntermediatePEM { + r.Fatal("not a new intermediate") + } + }) + require.NoError(err) + + // Verify the root lists have been rotated in each DC's state store. + state1 := s1.fsm.State() + _, primaryRoot, err := state1.CARootActive(nil) + require.NoError(err) + + state2 := s2.fsm.State() + _, roots2, err := state2.CARoots(nil) + require.NoError(err) + require.Equal(2, len(roots2)) + + newRoot := roots2[0] + oldRoot := roots2[1] + if roots2[1].Active { + newRoot = roots2[1] + oldRoot = roots2[0] + } + require.False(oldRoot.Active) + require.True(newRoot.Active) + require.Equal(primaryRoot.ID, newRoot.ID) + require.Equal(primaryRoot.RootCert, newRoot.RootCert) + + // Get the new root from dc1 and validate a chain of: + // dc2 leaf -> dc2 intermediate -> dc1 root + _, caRoot := s1.getCAProvider() + + // Have dc2 sign a leaf cert and make sure the chain is correct. + spiffeService := &connect.SpiffeIDService{ + Host: "node1", + Namespace: "default", + Datacenter: "dc1", + Service: "foo", + } + raw, _ := connect.TestCSR(t, spiffeService) + + leafCsr, err := connect.ParseCSR(raw) + require.NoError(err) + + leafPEM, err := secondaryProvider.Sign(leafCsr) + require.NoError(err) + + cert, err := connect.ParseCert(leafPEM) + require.NoError(err) + + // Check that the leaf signed by the new intermediate can be verified using the + // returned cert chain (signed intermediate + remote root). + intermediatePool := x509.NewCertPool() + intermediatePool.AppendCertsFromPEM([]byte(intermediatePEM)) + rootPool := x509.NewCertPool() + rootPool.AppendCertsFromPEM([]byte(caRoot.RootCert)) + + _, err = cert.Verify(x509.VerifyOptions{ + Intermediates: intermediatePool, + Roots: rootPool, + }) + require.NoError(err) +} + +func TestLeader_SecondaryCA_TransitionFromPrimary(t *testing.T) { + t.Parallel() + + require := require.New(t) + + // Initialize dc1 as the primary DC + id1, err := uuid.GenerateUUID() + require.NoError(err) + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.PrimaryDatacenter = "dc1" + c.CAConfig.ClusterID = id1 + c.Build = "1.4.0" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + // dc2 as a primary DC initially + id2, err := uuid.GenerateUUID() + require.NoError(err) + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc2" + c.CAConfig.ClusterID = id2 + c.Build = "1.4.0" + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + // Get the initial (primary) roots state for the secondary + testrpc.WaitForLeader(t, s2.RPC, "dc2") + args := structs.DCSpecificRequest{Datacenter: "dc2"} + var dc2PrimaryRoots structs.IndexedCARoots + require.NoError(s2.RPC("ConnectCA.Roots", &args, &dc2PrimaryRoots)) + require.Len(dc2PrimaryRoots.Roots, 1) + + // Set the ExternalTrustDomain to a blank string to simulate an old version (pre-1.4.0) + // it's fine to change the roots struct directly here because the RPC endpoint already + // makes a copy to return. + dc2PrimaryRoots.Roots[0].ExternalTrustDomain = "" + rootSetArgs := structs.CARequest{ + Op: structs.CAOpSetRoots, + Datacenter: "dc2", + Index: dc2PrimaryRoots.Index, + Roots: dc2PrimaryRoots.Roots, + } + resp, err := s2.raftApply(structs.ConnectCARequestType, rootSetArgs) + require.NoError(err) + if respErr, ok := resp.(error); ok { + t.Fatal(respErr) + } + + // Shutdown s2 and restart it with the dc1 as the primary + s2.Shutdown() + dir3, s3 := testServerWithConfig(t, func(c *Config) { + c.DataDir = s2.config.DataDir + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc1" + c.NodeName = s2.config.NodeName + c.NodeID = s2.config.NodeID + }) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + + // Create the WAN link + joinWAN(t, s3, s1) + testrpc.WaitForLeader(t, s3.RPC, "dc2") + + // Verify the secondary has migrated its TrustDomain and added the new primary's root. + args = structs.DCSpecificRequest{Datacenter: "dc1"} + var dc1Roots structs.IndexedCARoots + require.NoError(s1.RPC("ConnectCA.Roots", &args, &dc1Roots)) + require.Len(dc1Roots.Roots, 1) + + args = structs.DCSpecificRequest{Datacenter: "dc2"} + var dc2SecondaryRoots structs.IndexedCARoots + require.NoError(s3.RPC("ConnectCA.Roots", &args, &dc2SecondaryRoots)) + + // dc2's TrustDomain should have changed to the primary's + require.Equal(dc2SecondaryRoots.TrustDomain, dc1Roots.TrustDomain) + require.NotEqual(dc2SecondaryRoots.TrustDomain, dc2PrimaryRoots.TrustDomain) + + // Both roots should be present and correct + require.Len(dc2SecondaryRoots.Roots, 2) + var oldSecondaryRoot *structs.CARoot + var newSecondaryRoot *structs.CARoot + if dc2SecondaryRoots.Roots[0].ID == dc2PrimaryRoots.Roots[0].ID { + oldSecondaryRoot = dc2SecondaryRoots.Roots[0] + newSecondaryRoot = dc2SecondaryRoots.Roots[1] + } else { + oldSecondaryRoot = dc2SecondaryRoots.Roots[1] + newSecondaryRoot = dc2SecondaryRoots.Roots[0] + } + + // The old root should have its TrustDomain filled in as the old domain. + require.Equal(oldSecondaryRoot.ExternalTrustDomain, strings.TrimSuffix(dc2PrimaryRoots.TrustDomain, ".consul")) + + require.Equal(oldSecondaryRoot.ID, dc2PrimaryRoots.Roots[0].ID) + require.Equal(oldSecondaryRoot.RootCert, dc2PrimaryRoots.Roots[0].RootCert) + require.Equal(newSecondaryRoot.ID, dc1Roots.Roots[0].ID) + require.Equal(newSecondaryRoot.RootCert, dc1Roots.Roots[0].RootCert) +} + +func TestLeader_SecondaryCA_UpgradeBeforePrimary(t *testing.T) { + t.Parallel() + + require := require.New(t) + maxRootsQueryTime = 500 * time.Millisecond + + // Initialize dc1 as the primary DC + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.PrimaryDatacenter = "dc1" + c.Build = "1.3.0" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + // dc2 as a secondary DC + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc1" + c.Build = "1.4.0" + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + // Create the WAN link + joinWAN(t, s2, s1) + testrpc.WaitForLeader(t, s2.RPC, "dc2") + + // Verify the root lists are different in each DC's state store. + var oldSecondaryRootID string + { + state1 := s1.fsm.State() + _, roots1, err := state1.CARoots(nil) + require.NoError(err) + + state2 := s2.fsm.State() + _, roots2, err := state2.CARoots(nil) + require.NoError(err) + require.Equal(1, len(roots1)) + require.Equal(1, len(roots2)) + require.NotEqual(roots1[0].ID, roots2[0].ID) + require.NotEqual(roots1[0].RootCert, roots2[0].RootCert) + oldSecondaryRootID = roots2[0].ID + } + + // Update the version on the fly so s2 kicks off the secondary DC transition. + tags := s1.config.SerfLANConfig.Tags + tags["build"] = "1.4.0" + s1.serfLAN.SetTags(tags) + + // Wait for the secondary transition to happen and then verify the secondary DC + // has both roots present. + secondaryProvider, _ := s2.getCAProvider() + retry.Run(t, func(r *retry.R) { + state := s2.fsm.State() + _, roots, err := state.CARoots(nil) + r.Check(err) + if len(roots) != 2 { + r.Fatalf("should have 2 roots: %v", roots) + } + inter, err := secondaryProvider.ActiveIntermediate() + r.Check(err) + if inter == "" { + r.Fatal("should have valid intermediate") + } + }) + { + state1 := s1.fsm.State() + _, roots1, err := state1.CARoots(nil) + require.NoError(err) + + state2 := s2.fsm.State() + _, roots2, err := state2.CARoots(nil) + require.NoError(err) + require.Equal(1, len(roots1)) + require.Equal(2, len(roots2)) + var oldSecondaryRoot *structs.CARoot + var newSecondaryRoot *structs.CARoot + if roots2[0].ID == oldSecondaryRootID { + oldSecondaryRoot = roots2[0] + newSecondaryRoot = roots2[1] + } else { + oldSecondaryRoot = roots2[1] + newSecondaryRoot = roots2[0] + } + require.Equal(roots1[0].ID, newSecondaryRoot.ID) + require.Equal(roots1[0].RootCert, newSecondaryRoot.RootCert) + require.NotEqual(newSecondaryRoot.ID, oldSecondaryRoot.ID) + require.NotEqual(newSecondaryRoot.RootCert, oldSecondaryRoot.RootCert) + } + + _, caRoot := s1.getCAProvider() + intermediatePEM, err := secondaryProvider.ActiveIntermediate() + require.NoError(err) + + // Have dc2 sign a leaf cert and make sure the chain is correct. + spiffeService := &connect.SpiffeIDService{ + Host: "node1", + Namespace: "default", + Datacenter: "dc1", + Service: "foo", + } + raw, _ := connect.TestCSR(t, spiffeService) + + leafCsr, err := connect.ParseCSR(raw) + require.NoError(err) + + leafPEM, err := secondaryProvider.Sign(leafCsr) + require.NoError(err) + + cert, err := connect.ParseCert(leafPEM) + require.NoError(err) + + // Check that the leaf signed by the new cert can be verified using the + // returned cert chain (signed intermediate + remote root). + intermediatePool := x509.NewCertPool() + intermediatePool.AppendCertsFromPEM([]byte(intermediatePEM)) + rootPool := x509.NewCertPool() + rootPool.AppendCertsFromPEM([]byte(caRoot.RootCert)) + + _, err = cert.Verify(x509.VerifyOptions{ + Intermediates: intermediatePool, + Roots: rootPool, + }) + require.NoError(err) +} + +func TestLeader_ReplicateIntentions(t *testing.T) { + t.Parallel() + + assert := assert.New(t) + require := require.New(t) + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + // dc2 as a secondary DC + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc1" + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + // Create the WAN link + joinWAN(t, s2, s1) + testrpc.WaitForLeader(t, s2.RPC, "dc2") + + // Create an intention in dc1 + ixn := structs.IntentionRequest{ + Datacenter: "dc1", + Op: structs.IntentionOpCreate, + Intention: &structs.Intention{ + SourceNS: structs.IntentionDefaultNamespace, + SourceName: "test", + DestinationNS: structs.IntentionDefaultNamespace, + DestinationName: "test", + Action: structs.IntentionActionAllow, + SourceType: structs.IntentionSourceConsul, + Meta: map[string]string{}, + }, + } + var reply string + require.NoError(s1.RPC("Intention.Apply", &ixn, &reply)) + require.NotEmpty(reply) + + // Wait for it to get replicated to dc2 + var createdAt time.Time + ixn.Intention.ID = reply + retry.Run(t, func(r *retry.R) { + req := &structs.IntentionQueryRequest{ + Datacenter: "dc2", + IntentionID: ixn.Intention.ID, + } + var resp structs.IndexedIntentions + r.Check(s2.RPC("Intention.Get", req, &resp)) + if len(resp.Intentions) != 1 { + r.Fatalf("bad: %v", resp.Intentions) + } + actual := resp.Intentions[0] + createdAt = actual.CreatedAt + }) + + // Sleep a bit so that the UpdatedAt field will definitely be different + time.Sleep(1 * time.Millisecond) + + // Update the intention in dc1 + ixn.Op = structs.IntentionOpUpdate + ixn.Intention.ID = reply + ixn.Intention.SourceName = "*" + require.NoError(s1.RPC("Intention.Apply", &ixn, &reply)) + + // Wait for dc2 to get the update + ixn.Intention.ID = reply + var resp structs.IndexedIntentions + retry.Run(t, func(r *retry.R) { + req := &structs.IntentionQueryRequest{ + Datacenter: "dc2", + IntentionID: ixn.Intention.ID, + } + r.Check(s2.RPC("Intention.Get", req, &resp)) + if len(resp.Intentions) != 1 { + r.Fatalf("bad: %v", resp.Intentions) + } + if resp.Intentions[0].SourceName != "*" { + r.Fatalf("bad: %v", resp.Intentions[0]) + } + }) + + actual := resp.Intentions[0] + assert.Equal(createdAt, actual.CreatedAt) + assert.WithinDuration(time.Now(), actual.UpdatedAt, 5*time.Second) + + actual.CreateIndex, actual.ModifyIndex = 0, 0 + actual.CreatedAt = ixn.Intention.CreatedAt + actual.UpdatedAt = ixn.Intention.UpdatedAt + ixn.Intention.UpdatePrecedence() + assert.Equal(ixn.Intention, actual) + + // Delete + ixn.Op = structs.IntentionOpDelete + require.NoError(s1.RPC("Intention.Apply", &ixn, &reply)) + + // Wait for the delete to be replicated + retry.Run(t, func(r *retry.R) { + req := &structs.IntentionQueryRequest{ + Datacenter: "dc2", + IntentionID: ixn.Intention.ID, + } + var resp structs.IndexedIntentions + err := s2.RPC("Intention.Get", req, &resp) + if err == nil || !strings.Contains(err.Error(), ErrIntentionNotFound.Error()) { + r.Fatalf("expected intention not found") + } + }) +} + +func TestLeader_ReplicateIntentions_forwardToPrimary(t *testing.T) { + t.Parallel() + + assert := assert.New(t) + require := require.New(t) + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + // dc2 as a secondary DC + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc1" + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + // Create the WAN link + joinWAN(t, s2, s1) + testrpc.WaitForLeader(t, s2.RPC, "dc2") + + // Create an intention in dc2 + ixn := structs.IntentionRequest{ + Datacenter: "dc2", + Op: structs.IntentionOpCreate, + Intention: &structs.Intention{ + SourceNS: structs.IntentionDefaultNamespace, + SourceName: "test", + DestinationNS: structs.IntentionDefaultNamespace, + DestinationName: "test", + Action: structs.IntentionActionAllow, + SourceType: structs.IntentionSourceConsul, + Meta: map[string]string{}, + }, + } + var reply string + require.NoError(s1.RPC("Intention.Apply", &ixn, &reply)) + require.NotEmpty(reply) + + // Make sure it exists in both DCs + var createdAt time.Time + ixn.Intention.ID = reply + retry.Run(t, func(r *retry.R) { + for _, server := range []*Server{s1, s2} { + req := &structs.IntentionQueryRequest{ + Datacenter: server.config.Datacenter, + IntentionID: ixn.Intention.ID, + } + var resp structs.IndexedIntentions + r.Check(server.RPC("Intention.Get", req, &resp)) + if len(resp.Intentions) != 1 { + r.Fatalf("bad: %v", resp.Intentions) + } + actual := resp.Intentions[0] + createdAt = actual.CreatedAt + } + }) + + // Sleep a bit so that the UpdatedAt field will definitely be different + time.Sleep(1 * time.Millisecond) + + // Update the intention in dc1 + ixn.Op = structs.IntentionOpUpdate + ixn.Intention.ID = reply + ixn.Intention.SourceName = "*" + require.NoError(s1.RPC("Intention.Apply", &ixn, &reply)) + + // Wait for dc2 to get the update + ixn.Intention.ID = reply + var resp structs.IndexedIntentions + retry.Run(t, func(r *retry.R) { + for _, server := range []*Server{s1, s2} { + req := &structs.IntentionQueryRequest{ + Datacenter: server.config.Datacenter, + IntentionID: ixn.Intention.ID, + } + r.Check(server.RPC("Intention.Get", req, &resp)) + if len(resp.Intentions) != 1 { + r.Fatalf("bad: %v", resp.Intentions) + } + if resp.Intentions[0].SourceName != "*" { + r.Fatalf("bad: %v", resp.Intentions[0]) + } + } + }) + + actual := resp.Intentions[0] + assert.Equal(createdAt, actual.CreatedAt) + assert.WithinDuration(time.Now(), actual.UpdatedAt, 5*time.Second) + + actual.CreateIndex, actual.ModifyIndex = 0, 0 + actual.CreatedAt = ixn.Intention.CreatedAt + actual.UpdatedAt = ixn.Intention.UpdatedAt + ixn.Intention.UpdatePrecedence() + assert.Equal(ixn.Intention, actual) + + // Delete + ixn.Op = structs.IntentionOpDelete + require.NoError(s1.RPC("Intention.Apply", &ixn, &reply)) + + // Wait for the delete to be replicated + retry.Run(t, func(r *retry.R) { + for _, server := range []*Server{s1, s2} { + req := &structs.IntentionQueryRequest{ + Datacenter: server.config.Datacenter, + IntentionID: ixn.Intention.ID, + } + var resp structs.IndexedIntentions + err := server.RPC("Intention.Get", req, &resp) + if err == nil || !strings.Contains(err.Error(), ErrIntentionNotFound.Error()) { + r.Fatalf("expected intention not found") + } + } + }) +} + +func TestLeader_batchIntentionUpdates(t *testing.T) { + t.Parallel() + + assert := assert.New(t) + ixn1 := structs.TestIntention(t) + ixn1.ID = "ixn1" + ixn2 := structs.TestIntention(t) + ixn2.ID = "ixn2" + ixnLarge := structs.TestIntention(t) + ixnLarge.ID = "ixnLarge" + ixnLarge.Description = strings.Repeat("x", maxIntentionTxnSize-1) + + cases := []struct { + deletes structs.Intentions + updates structs.Intentions + expected []structs.TxnOps + }{ + // 1 deletes, 0 updates + { + deletes: structs.Intentions{ixn1}, + expected: []structs.TxnOps{ + structs.TxnOps{ + &structs.TxnOp{ + Intention: &structs.TxnIntentionOp{ + Op: structs.IntentionOpDelete, + Intention: ixn1, + }, + }, + }, + }, + }, + // 0 deletes, 1 updates + { + updates: structs.Intentions{ixn1}, + expected: []structs.TxnOps{ + structs.TxnOps{ + &structs.TxnOp{ + Intention: &structs.TxnIntentionOp{ + Op: structs.IntentionOpUpdate, + Intention: ixn1, + }, + }, + }, + }, + }, + // 1 deletes, 1 updates + { + deletes: structs.Intentions{ixn1}, + updates: structs.Intentions{ixn2}, + expected: []structs.TxnOps{ + structs.TxnOps{ + &structs.TxnOp{ + Intention: &structs.TxnIntentionOp{ + Op: structs.IntentionOpDelete, + Intention: ixn1, + }, + }, + &structs.TxnOp{ + Intention: &structs.TxnIntentionOp{ + Op: structs.IntentionOpUpdate, + Intention: ixn2, + }, + }, + }, + }, + }, + // 1 large intention update + { + updates: structs.Intentions{ixnLarge}, + expected: []structs.TxnOps{ + structs.TxnOps{ + &structs.TxnOp{ + Intention: &structs.TxnIntentionOp{ + Op: structs.IntentionOpUpdate, + Intention: ixnLarge, + }, + }, + }, + }, + }, + // 2 deletes (w/ a large intention), 1 updates + { + deletes: structs.Intentions{ixn1, ixnLarge}, + updates: structs.Intentions{ixn2}, + expected: []structs.TxnOps{ + structs.TxnOps{ + &structs.TxnOp{ + Intention: &structs.TxnIntentionOp{ + Op: structs.IntentionOpDelete, + Intention: ixn1, + }, + }, + &structs.TxnOp{ + Intention: &structs.TxnIntentionOp{ + Op: structs.IntentionOpDelete, + Intention: ixnLarge, + }, + }, + }, + structs.TxnOps{ + &structs.TxnOp{ + Intention: &structs.TxnIntentionOp{ + Op: structs.IntentionOpUpdate, + Intention: ixn2, + }, + }, + }, + }, + }, + // 1 deletes , 2 updates (w/ a large intention) + { + deletes: structs.Intentions{ixn1}, + updates: structs.Intentions{ixnLarge, ixn2}, + expected: []structs.TxnOps{ + structs.TxnOps{ + &structs.TxnOp{ + Intention: &structs.TxnIntentionOp{ + Op: structs.IntentionOpDelete, + Intention: ixn1, + }, + }, + &structs.TxnOp{ + Intention: &structs.TxnIntentionOp{ + Op: structs.IntentionOpUpdate, + Intention: ixnLarge, + }, + }, + }, + structs.TxnOps{ + &structs.TxnOp{ + Intention: &structs.TxnIntentionOp{ + Op: structs.IntentionOpUpdate, + Intention: ixn2, + }, + }, + }, + }, + }, + } + + for _, tc := range cases { + actual := batchIntentionUpdates(tc.deletes, tc.updates) + assert.Equal(tc.expected, actual) + } +} + +func TestLeader_GenerateCASignRequest(t *testing.T) { + csr := "A" + s := Server{config: &Config{PrimaryDatacenter: "east"}, tokens: new(token.Store)} + req := s.generateCASignRequest(csr) + assert.Equal(t, "east", req.RequestDatacenter()) +} diff --git a/agent/consul/leader_oss.go b/agent/consul/leader_oss.go deleted file mode 100644 index 23993bd97..000000000 --- a/agent/consul/leader_oss.go +++ /dev/null @@ -1,29 +0,0 @@ -// +build !consulent - -package consul - -// initializeCA sets up the CA provider when gaining leadership, bootstrapping -// the root in the state store if necessary. -func (s *Server) initializeCA() error { - // Bail if connect isn't enabled. - if !s.config.ConnectEnabled { - return nil - } - - conf, err := s.initializeCAConfig() - if err != nil { - return err - } - - // Initialize the provider based on the current config. - provider, err := s.createCAProvider(conf) - if err != nil { - return err - } - - return s.initializeRootCA(provider, conf) -} - -// Stub methods, only present in Consul Enterprise. -func (s *Server) startEnterpriseLeader() {} -func (s *Server) stopEnterpriseLeader() {} diff --git a/agent/consul/server.go b/agent/consul/server.go index f9c20dc1d..8a34e0567 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -277,6 +277,15 @@ type Server struct { shutdownCh chan struct{} shutdownLock sync.Mutex + // State for enterprise leader logic + connectLock sync.RWMutex + connectEnabled bool + connectCh chan struct{} + + // State for whether this datacenter is acting as a secondary CA. + actingSecondaryCA bool + actingSecondaryLock sync.RWMutex + // embedded struct to hold all the enterprise specific data EnterpriseServer } @@ -1258,6 +1267,10 @@ func (s *Server) isReadyForConsistentReads() bool { return atomic.LoadInt32(&s.readyForConsistentReads) == 1 } +func (s *Server) intentionReplicationEnabled() bool { + return s.config.ConnectEnabled && s.config.Datacenter != s.config.PrimaryDatacenter +} + // peersInfoContent is used to help operators understand what happened to the // peers.json file. This is written to a file called peers.info in the same // location.