diff --git a/agent/consul/enterprise_server_oss.go b/agent/consul/enterprise_server_oss.go index ca604ecaf..6461e06ac 100644 --- a/agent/consul/enterprise_server_oss.go +++ b/agent/consul/enterprise_server_oss.go @@ -6,9 +6,16 @@ import ( "net" "github.com/hashicorp/consul/agent/pool" + "github.com/hashicorp/go-version" "github.com/hashicorp/serf/serf" ) +var ( + // minMultiDCConnectVersion is the minimum version in order to support multi-DC Connect + // features. + minMultiDCConnectVersion = version.Must(version.NewVersion("1.6.0")) +) + type EnterpriseServer struct{} func (s *Server) initEnterprise() error { diff --git a/agent/consul/leader.go b/agent/consul/leader.go index 4e66b39d0..a791cb948 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -5,15 +5,12 @@ import ( "fmt" "net" "strconv" - "strings" "sync" "sync/atomic" "time" metrics "github.com/armon/go-metrics" "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/connect" - ca "github.com/hashicorp/consul/agent/connect/ca" "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/structs" @@ -329,8 +326,6 @@ func (s *Server) establishLeadership() error { s.startConnectLeader() - s.startCARootPruning() - s.setConsistentReadReady() return nil } @@ -349,8 +344,6 @@ func (s *Server) revokeLeadership() { s.stopConnectLeader() - s.stopCARootPruning() - s.setCAProvider(nil, nil) s.stopACLTokenReaping() @@ -968,272 +961,6 @@ func (s *Server) bootstrapConfigEntries(entries []structs.ConfigEntry) error { return nil } -// initializeCAConfig is used to initialize the CA config if necessary -// when setting up the CA during establishLeadership -func (s *Server) initializeCAConfig() (*structs.CAConfiguration, error) { - state := s.fsm.State() - _, config, err := state.CAConfig() - if err != nil { - return nil, err - } - if config != nil { - return config, nil - } - - config = s.config.CAConfig - if config.ClusterID == "" { - id, err := uuid.GenerateUUID() - if err != nil { - return nil, err - } - config.ClusterID = id - } - - req := structs.CARequest{ - Op: structs.CAOpSetConfig, - Config: config, - } - if _, err = s.raftApply(structs.ConnectCARequestType, req); err != nil { - return nil, err - } - - return config, nil -} - -// initializeRootCA runs the initialization logic for a root CA. -func (s *Server) initializeRootCA(provider ca.Provider, conf *structs.CAConfiguration) error { - if err := provider.Configure(conf.ClusterID, true, conf.Config); err != nil { - return fmt.Errorf("error configuring provider: %v", err) - } - if err := provider.GenerateRoot(); err != nil { - return fmt.Errorf("error generating CA root certificate: %v", err) - } - - // Get the active root cert from the CA - rootPEM, err := provider.ActiveRoot() - if err != nil { - return fmt.Errorf("error getting root cert: %v", err) - } - - rootCA, err := parseCARoot(rootPEM, conf.Provider, conf.ClusterID) - if err != nil { - return err - } - - // Check if the CA root is already initialized and exit if it is, - // adding on any existing intermediate certs since they aren't directly - // tied to the provider. - // Every change to the CA after this initial bootstrapping should - // be done through the rotation process. - state := s.fsm.State() - _, activeRoot, err := state.CARootActive(nil) - if err != nil { - return err - } - if activeRoot != nil { - // This state shouldn't be possible to get into because we update the root and - // CA config in the same FSM operation. - if activeRoot.ID != rootCA.ID { - return fmt.Errorf("stored CA root %q is not the active root (%s)", rootCA.ID, activeRoot.ID) - } - - rootCA.IntermediateCerts = activeRoot.IntermediateCerts - s.setCAProvider(provider, rootCA) - - return nil - } - - // Get the highest index - idx, _, err := state.CARoots(nil) - if err != nil { - return err - } - - // Store the root cert in raft - resp, err := s.raftApply(structs.ConnectCARequestType, &structs.CARequest{ - Op: structs.CAOpSetRoots, - Index: idx, - Roots: []*structs.CARoot{rootCA}, - }) - if err != nil { - s.logger.Printf("[ERR] connect: Apply failed %v", err) - return err - } - if respErr, ok := resp.(error); ok { - return respErr - } - - s.setCAProvider(provider, rootCA) - - s.logger.Printf("[INFO] connect: initialized primary datacenter CA with provider %q", conf.Provider) - - return nil -} - -// parseCARoot returns a filled-in structs.CARoot from a raw PEM value. -func parseCARoot(pemValue, provider, clusterID string) (*structs.CARoot, error) { - id, err := connect.CalculateCertFingerprint(pemValue) - if err != nil { - return nil, fmt.Errorf("error parsing root fingerprint: %v", err) - } - rootCert, err := connect.ParseCert(pemValue) - if err != nil { - return nil, fmt.Errorf("error parsing root cert: %v", err) - } - return &structs.CARoot{ - ID: id, - Name: fmt.Sprintf("%s CA Root Cert", strings.Title(provider)), - SerialNumber: rootCert.SerialNumber.Uint64(), - SigningKeyID: connect.HexString(rootCert.SubjectKeyId), - ExternalTrustDomain: clusterID, - NotBefore: rootCert.NotBefore, - NotAfter: rootCert.NotAfter, - RootCert: pemValue, - Active: true, - }, nil -} - -// createProvider returns a connect CA provider from the given config. -func (s *Server) createCAProvider(conf *structs.CAConfiguration) (ca.Provider, error) { - switch conf.Provider { - case structs.ConsulCAProvider: - return &ca.ConsulProvider{Delegate: &consulCADelegate{s}}, nil - case structs.VaultCAProvider: - return &ca.VaultProvider{}, nil - default: - return nil, fmt.Errorf("unknown CA provider %q", conf.Provider) - } -} - -func (s *Server) getCAProvider() (ca.Provider, *structs.CARoot) { - retries := 0 - var result ca.Provider - var resultRoot *structs.CARoot - for result == nil { - s.caProviderLock.RLock() - result = s.caProvider - resultRoot = s.caProviderRoot - s.caProviderLock.RUnlock() - - // In cases where an agent is started with managed proxies, we may ask - // for the provider before establishLeadership completes. If we're the - // leader, then wait and get the provider again - if result == nil && s.IsLeader() && retries < 10 { - retries++ - time.Sleep(50 * time.Millisecond) - continue - } - - break - } - - return result, resultRoot -} - -func (s *Server) setCAProvider(newProvider ca.Provider, root *structs.CARoot) { - s.caProviderLock.Lock() - defer s.caProviderLock.Unlock() - s.caProvider = newProvider - s.caProviderRoot = root -} - -// startCARootPruning starts a goroutine that looks for stale CARoots -// and removes them from the state store. -func (s *Server) startCARootPruning() { - s.caPruningLock.Lock() - defer s.caPruningLock.Unlock() - - if s.caPruningEnabled { - return - } - - s.caPruningCh = make(chan struct{}) - - go func() { - ticker := time.NewTicker(caRootPruneInterval) - defer ticker.Stop() - - for { - select { - case <-s.caPruningCh: - return - case <-ticker.C: - if err := s.pruneCARoots(); err != nil { - s.logger.Printf("[ERR] connect: error pruning CA roots: %v", err) - } - } - } - }() - - s.caPruningEnabled = true -} - -// pruneCARoots looks for any CARoots that have been rotated out and expired. -func (s *Server) pruneCARoots() error { - if !s.config.ConnectEnabled { - return nil - } - - state := s.fsm.State() - idx, roots, err := state.CARoots(nil) - if err != nil { - return err - } - - _, caConf, err := state.CAConfig() - if err != nil { - return err - } - - common, err := caConf.GetCommonConfig() - if err != nil { - return err - } - - var newRoots structs.CARoots - for _, r := range roots { - if !r.Active && !r.RotatedOutAt.IsZero() && time.Since(r.RotatedOutAt) > common.LeafCertTTL*2 { - s.logger.Printf("[INFO] connect: pruning old unused root CA (ID: %s)", r.ID) - continue - } - newRoot := *r - newRoots = append(newRoots, &newRoot) - } - - // Return early if there's nothing to remove. - if len(newRoots) == len(roots) { - return nil - } - - // Commit the new root state. - var args structs.CARequest - args.Op = structs.CAOpSetRoots - args.Index = idx - args.Roots = newRoots - resp, err := s.raftApply(structs.ConnectCARequestType, args) - if err != nil { - return err - } - if respErr, ok := resp.(error); ok { - return respErr - } - - return nil -} - -// stopCARootPruning stops the CARoot pruning process. -func (s *Server) stopCARootPruning() { - s.caPruningLock.Lock() - defer s.caPruningLock.Unlock() - - if !s.caPruningEnabled { - return - } - - close(s.caPruningCh) - s.caPruningEnabled = false -} - // reconcileReaped is used to reconcile nodes that have failed and been reaped // from Serf but remain in the catalog. This is done by looking for unknown nodes with serfHealth checks registered. // We generate a "reap" event to cause the node to be cleaned up. diff --git a/agent/consul/leader_connect.go b/agent/consul/leader_connect.go index c990fa0f9..61978ab88 100644 --- a/agent/consul/leader_connect.go +++ b/agent/consul/leader_connect.go @@ -3,7 +3,6 @@ package consul import ( "bytes" "context" - "errors" "fmt" "strings" "time" @@ -12,9 +11,8 @@ import ( "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect/ca" - "github.com/hashicorp/consul/agent/consul/autopilot" "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/go-version" + uuid "github.com/hashicorp/go-uuid" ) const ( @@ -36,17 +34,110 @@ var ( // queries when backing off. maxRetryBackoff = 256 - // minMultiDCConnectVersion is the minimum version in order to support multi-DC Connect - // features. - minMultiDCConnectVersion = version.Must(version.NewVersion("1.4.0")) - // maxRootsQueryTime is the maximum time the primary roots watch query can block before // returning. maxRootsQueryTime = maxQueryTime - - errEmptyVersion = errors.New("version string is empty") ) +// initializeCAConfig is used to initialize the CA config if necessary +// when setting up the CA during establishLeadership +func (s *Server) initializeCAConfig() (*structs.CAConfiguration, error) { + state := s.fsm.State() + _, config, err := state.CAConfig() + if err != nil { + return nil, err + } + if config != nil { + return config, nil + } + + config = s.config.CAConfig + if config.ClusterID == "" { + id, err := uuid.GenerateUUID() + if err != nil { + return nil, err + } + config.ClusterID = id + } + + req := structs.CARequest{ + Op: structs.CAOpSetConfig, + Config: config, + } + if _, err = s.raftApply(structs.ConnectCARequestType, req); err != nil { + return nil, err + } + + return config, nil +} + +// parseCARoot returns a filled-in structs.CARoot from a raw PEM value. +func parseCARoot(pemValue, provider, clusterID string) (*structs.CARoot, error) { + id, err := connect.CalculateCertFingerprint(pemValue) + if err != nil { + return nil, fmt.Errorf("error parsing root fingerprint: %v", err) + } + rootCert, err := connect.ParseCert(pemValue) + if err != nil { + return nil, fmt.Errorf("error parsing root cert: %v", err) + } + return &structs.CARoot{ + ID: id, + Name: fmt.Sprintf("%s CA Root Cert", strings.Title(provider)), + SerialNumber: rootCert.SerialNumber.Uint64(), + SigningKeyID: connect.HexString(rootCert.SubjectKeyId), + ExternalTrustDomain: clusterID, + NotBefore: rootCert.NotBefore, + NotAfter: rootCert.NotAfter, + RootCert: pemValue, + Active: true, + }, nil +} + +// createProvider returns a connect CA provider from the given config. +func (s *Server) createCAProvider(conf *structs.CAConfiguration) (ca.Provider, error) { + switch conf.Provider { + case structs.ConsulCAProvider: + return &ca.ConsulProvider{Delegate: &consulCADelegate{s}}, nil + case structs.VaultCAProvider: + return &ca.VaultProvider{}, nil + default: + return nil, fmt.Errorf("unknown CA provider %q", conf.Provider) + } +} + +func (s *Server) getCAProvider() (ca.Provider, *structs.CARoot) { + retries := 0 + var result ca.Provider + var resultRoot *structs.CARoot + for result == nil { + s.caProviderLock.RLock() + result = s.caProvider + resultRoot = s.caProviderRoot + s.caProviderLock.RUnlock() + + // In cases where an agent is started with managed proxies, we may ask + // for the provider before establishLeadership completes. If we're the + // leader, then wait and get the provider again + if result == nil && s.IsLeader() && retries < 10 { + retries++ + time.Sleep(50 * time.Millisecond) + continue + } + + break + } + + return result, resultRoot +} + +func (s *Server) setCAProvider(newProvider ca.Provider, root *structs.CARoot) { + s.caProviderLock.Lock() + defer s.caProviderLock.Unlock() + s.caProvider = newProvider + s.caProviderRoot = root +} + // initializeCA sets up the CA provider when gaining leadership, either bootstrapping // the CA if this is the primary DC or making a remote RPC for intermediate signing // if this is a secondary DC. @@ -67,26 +158,19 @@ func (s *Server) initializeCA() error { } s.setCAProvider(provider, nil) - // Check whether the primary DC has been upgraded to support multi-DC Connect. - // If it hasn't, we skip the secondary initialization routine and continue acting - // as a primary DC. This is periodically re-checked in the goroutine watching the - // primary's CA roots so that we can transition to a secondary DC when it has - // been upgraded. - var primaryHasVersion bool + // If this isn't the primary DC, run the secondary DC routine if the primary has already been upgraded to at least 1.6.0 if s.config.PrimaryDatacenter != s.config.Datacenter { - primaryHasVersion, err = s.datacentersMeetMinVersion(minMultiDCConnectVersion) - if err == errEmptyVersion { - s.logger.Printf("[WARN] connect: primary datacenter %q is reachable but not yet initialized", s.config.PrimaryDatacenter) + versionOk, foundPrimary := ServersInDCMeetMinimumVersion(s.WANMembers(), s.config.PrimaryDatacenter, minMultiDCConnectVersion) + if !foundPrimary { + s.logger.Printf("[WARN] connect: primary datacenter is configured but unreachable - deferring initialization of the secondary datacenter CA") + // return nil because we will initialize the secondary CA later return nil - } else if err != nil { - s.logger.Printf("[ERR] connect: error initializing CA: could not query primary datacenter: %v", err) + } else if !versionOk { + // return nil because we will initialize the secondary CA later + s.logger.Printf("[WARN] connect: servers in the primary datacenter are not at least at version %s - deferring initialization of the secondary datacenter CA", minMultiDCConnectVersion) return nil } - } - // If this isn't the primary DC, run the secondary DC routine if the primary has already - // been upgraded to at least 1.4.0. - if s.config.PrimaryDatacenter != s.config.Datacenter && primaryHasVersion { // Get the root CA to see if we need to refresh our intermediate. args := structs.DCSpecificRequest{ Datacenter: s.config.PrimaryDatacenter, @@ -111,6 +195,76 @@ func (s *Server) initializeCA() error { return s.initializeRootCA(provider, conf) } +// initializeRootCA runs the initialization logic for a root CA. +func (s *Server) initializeRootCA(provider ca.Provider, conf *structs.CAConfiguration) error { + if err := provider.Configure(conf.ClusterID, true, conf.Config); err != nil { + return fmt.Errorf("error configuring provider: %v", err) + } + if err := provider.GenerateRoot(); err != nil { + return fmt.Errorf("error generating CA root certificate: %v", err) + } + + // Get the active root cert from the CA + rootPEM, err := provider.ActiveRoot() + if err != nil { + return fmt.Errorf("error getting root cert: %v", err) + } + + rootCA, err := parseCARoot(rootPEM, conf.Provider, conf.ClusterID) + if err != nil { + return err + } + + // Check if the CA root is already initialized and exit if it is, + // adding on any existing intermediate certs since they aren't directly + // tied to the provider. + // Every change to the CA after this initial bootstrapping should + // be done through the rotation process. + state := s.fsm.State() + _, activeRoot, err := state.CARootActive(nil) + if err != nil { + return err + } + if activeRoot != nil { + // This state shouldn't be possible to get into because we update the root and + // CA config in the same FSM operation. + if activeRoot.ID != rootCA.ID { + return fmt.Errorf("stored CA root %q is not the active root (%s)", rootCA.ID, activeRoot.ID) + } + + rootCA.IntermediateCerts = activeRoot.IntermediateCerts + s.setCAProvider(provider, rootCA) + + return nil + } + + // Get the highest index + idx, _, err := state.CARoots(nil) + if err != nil { + return err + } + + // Store the root cert in raft + resp, err := s.raftApply(structs.ConnectCARequestType, &structs.CARequest{ + Op: structs.CAOpSetRoots, + Index: idx, + Roots: []*structs.CARoot{rootCA}, + }) + if err != nil { + s.logger.Printf("[ERR] connect: Apply failed %v", err) + return err + } + if respErr, ok := resp.(error); ok { + return respErr + } + + s.setCAProvider(provider, rootCA) + + s.logger.Printf("[INFO] connect: initialized primary datacenter CA with provider %q", conf.Provider) + + return nil +} + // initializeSecondaryCA runs the routine for generating an intermediate CA CSR and getting // it signed by the primary DC if the root CA of the primary DC has changed since the last // intermediate. @@ -252,8 +406,11 @@ func (s *Server) startConnectLeader() { if s.config.ConnectEnabled && s.config.Datacenter != s.config.PrimaryDatacenter { go s.secondaryCARootWatch(s.connectCh) go s.replicateIntentions(s.connectCh) + } + go s.runCARootPruning(s.connectCh) + s.connectEnabled = true } @@ -274,6 +431,75 @@ func (s *Server) stopConnectLeader() { s.connectEnabled = false } +func (s *Server) runCARootPruning(stopCh <-chan struct{}) { + ticker := time.NewTicker(caRootPruneInterval) + defer ticker.Stop() + + for { + select { + case <-stopCh: + return + case <-ticker.C: + if err := s.pruneCARoots(); err != nil { + s.logger.Printf("[ERR] connect: error pruning CA roots: %v", err) + } + } + } +} + +// pruneCARoots looks for any CARoots that have been rotated out and expired. +func (s *Server) pruneCARoots() error { + if !s.config.ConnectEnabled { + return nil + } + + state := s.fsm.State() + idx, roots, err := state.CARoots(nil) + if err != nil { + return err + } + + _, caConf, err := state.CAConfig() + if err != nil { + return err + } + + common, err := caConf.GetCommonConfig() + if err != nil { + return err + } + + var newRoots structs.CARoots + for _, r := range roots { + if !r.Active && !r.RotatedOutAt.IsZero() && time.Now().Sub(r.RotatedOutAt) > common.LeafCertTTL*2 { + s.logger.Printf("[INFO] connect: pruning old unused root CA (ID: %s)", r.ID) + continue + } + newRoot := *r + newRoots = append(newRoots, &newRoot) + } + + // Return early if there's nothing to remove. + if len(newRoots) == len(roots) { + return nil + } + + // Commit the new root state. + var args structs.CARequest + args.Op = structs.CAOpSetRoots + args.Index = idx + args.Roots = newRoots + resp, err := s.raftApply(structs.ConnectCARequestType, args) + if err != nil { + return err + } + if respErr, ok := resp.(error); ok { + return respErr + } + + return nil +} + // secondaryCARootWatch maintains a blocking query to the primary datacenter's // ConnectCA.Roots endpoint to monitor when it needs to request a new signed // intermediate certificate. @@ -290,21 +516,25 @@ func (s *Server) secondaryCARootWatch(stopCh <-chan struct{}) { retryLoopBackoff(stopCh, func() error { var roots structs.IndexedCARoots if err := s.forwardDC("ConnectCA.Roots", s.config.PrimaryDatacenter, &args, &roots); err != nil { - return err + return fmt.Errorf("Error retrieving the primary datacenter's roots: %v", err) } // Check to see if the primary has been upgraded in case we're waiting to switch to // secondary mode. provider, _ := s.getCAProvider() + if provider == nil { + // this happens when leadership is being revoked and this go routine will be stopped + return nil + } if !s.configuredSecondaryCA() { - primaryHasVersion, err := s.datacentersMeetMinVersion(minMultiDCConnectVersion) - if err != nil { - return err + versionOk, primaryFound := ServersInDCMeetMinimumVersion(s.WANMembers(), s.config.PrimaryDatacenter, minMultiDCConnectVersion) + if !primaryFound { + return fmt.Errorf("Primary datacenter is unreachable - deferring secondary CA initialization") } - if primaryHasVersion { + if versionOk { if err := s.initializeSecondaryProvider(provider, roots); err != nil { - return err + return fmt.Errorf("Failed to initialize secondary CA provider: %v", err) } } } @@ -313,17 +543,14 @@ func (s *Server) secondaryCARootWatch(stopCh <-chan struct{}) { // intermediate. if s.configuredSecondaryCA() { if err := s.initializeSecondaryCA(provider, roots); err != nil { - return err + return fmt.Errorf("Failed to initialize the secondary CA: %v", err) } } args.QueryOptions.MinQueryIndex = nextIndexVal(args.QueryOptions.MinQueryIndex, roots.QueryMeta.Index) return nil }, func(err error) { - // Don't log the error if it's a result of the primary still starting up. - if err != errEmptyVersion { - s.logger.Printf("[ERR] connect: error watching primary datacenter roots: %v", err) - } + s.logger.Printf("[ERR] connect: %v", err) }) } @@ -491,53 +718,6 @@ func nextIndexVal(prevIdx, idx uint64) uint64 { return idx } -// datacentersMeetMinVersion returns whether this datacenter and the primary -// are ready and have upgraded to at least the given version. -func (s *Server) datacentersMeetMinVersion(minVersion *version.Version) (bool, error) { - localAutopilotHealth := s.autopilot.GetClusterHealth() - localServersMeetVersion, err := autopilotServersMeetMinimumVersion(localAutopilotHealth.Servers, minVersion) - if err != nil { - return false, err - } - if !localServersMeetVersion { - return false, err - } - - args := structs.DCSpecificRequest{ - Datacenter: s.config.PrimaryDatacenter, - } - var reply autopilot.OperatorHealthReply - if err := s.forwardDC("Operator.ServerHealth", s.config.PrimaryDatacenter, &args, &reply); err != nil { - return false, err - } - remoteServersMeetVersion, err := autopilotServersMeetMinimumVersion(reply.Servers, minVersion) - if err != nil { - return false, err - } - - return localServersMeetVersion && remoteServersMeetVersion, nil -} - -// autopilotServersMeetMinimumVersion returns whether the given slice of servers -// meets a minimum version. -func autopilotServersMeetMinimumVersion(servers []autopilot.ServerHealth, minVersion *version.Version) (bool, error) { - for _, server := range servers { - if server.Version == "" { - return false, errEmptyVersion - } - version, err := version.NewVersion(server.Version) - if err != nil { - return false, fmt.Errorf("error parsing remote server version: %v", err) - } - - if version.LessThan(minVersion) { - return false, nil - } - } - - return true, nil -} - // initializeSecondaryProvider configures the given provider for a secondary, non-root datacenter. func (s *Server) initializeSecondaryProvider(provider ca.Provider, roots structs.IndexedCARoots) error { if roots.TrustDomain == "" { diff --git a/agent/consul/leader_connect_test.go b/agent/consul/leader_connect_test.go index ea952340c..fde0622f8 100644 --- a/agent/consul/leader_connect_test.go +++ b/agent/consul/leader_connect_test.go @@ -3,6 +3,7 @@ package consul import ( "crypto/x509" "os" + "reflect" "strings" "testing" "time" @@ -13,6 +14,7 @@ import ( "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" uuid "github.com/hashicorp/go-uuid" + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -22,26 +24,38 @@ func TestLeader_SecondaryCA_Initialize(t *testing.T) { require := require.New(t) + masterToken := "8a85f086-dd95-4178-b128-e10902767c5c" + // Initialize primary as the primary DC dir1, s1 := testServerWithConfig(t, func(c *Config) { c.Datacenter = "primary" c.PrimaryDatacenter = "primary" - c.Build = "1.4.0" + c.Build = "1.6.0" + c.ACLsEnabled = true + c.ACLMasterToken = masterToken + c.ACLDefaultPolicy = "deny" }) defer os.RemoveAll(dir1) defer s1.Shutdown() + s1.tokens.UpdateAgentToken(masterToken, token.TokenSourceConfig) + testrpc.WaitForLeader(t, s1.RPC, "primary") // secondary as a secondary DC dir2, s2 := testServerWithConfig(t, func(c *Config) { c.Datacenter = "secondary" c.PrimaryDatacenter = "primary" - c.Build = "1.4.0" + c.Build = "1.6.0" + c.ACLsEnabled = true + c.ACLDefaultPolicy = "deny" }) defer os.RemoveAll(dir2) defer s2.Shutdown() + s2.tokens.UpdateAgentToken(masterToken, token.TokenSourceConfig) + s2.tokens.UpdateReplicationToken(masterToken, token.TokenSourceConfig) + // Create the WAN link joinWAN(t, s2, s1) testrpc.WaitForLeader(t, s2.RPC, "secondary") @@ -102,7 +116,7 @@ func TestLeader_SecondaryCA_IntermediateRefresh(t *testing.T) { require := require.New(t) dir1, s1 := testServerWithConfig(t, func(c *Config) { - c.Build = "1.4.0" + c.Build = "1.6.0" }) defer os.RemoveAll(dir1) defer s1.Shutdown() @@ -113,7 +127,7 @@ func TestLeader_SecondaryCA_IntermediateRefresh(t *testing.T) { dir2, s2 := testServerWithConfig(t, func(c *Config) { c.Datacenter = "dc2" c.PrimaryDatacenter = "dc1" - c.Build = "1.4.0" + c.Build = "1.6.0" }) defer os.RemoveAll(dir2) defer s2.Shutdown() @@ -235,7 +249,7 @@ func TestLeader_SecondaryCA_TransitionFromPrimary(t *testing.T) { dir1, s1 := testServerWithConfig(t, func(c *Config) { c.PrimaryDatacenter = "dc1" c.CAConfig.ClusterID = id1 - c.Build = "1.4.0" + c.Build = "1.6.0" }) defer os.RemoveAll(dir1) defer s1.Shutdown() @@ -249,7 +263,7 @@ func TestLeader_SecondaryCA_TransitionFromPrimary(t *testing.T) { c.Datacenter = "dc2" c.PrimaryDatacenter = "dc2" c.CAConfig.ClusterID = id2 - c.Build = "1.4.0" + c.Build = "1.6.0" }) defer os.RemoveAll(dir2) defer s2.Shutdown() @@ -333,7 +347,6 @@ func TestLeader_SecondaryCA_TransitionFromPrimary(t *testing.T) { func TestLeader_SecondaryCA_UpgradeBeforePrimary(t *testing.T) { t.Parallel() - require := require.New(t) maxRootsQueryTime = 500 * time.Millisecond // Initialize dc1 as the primary DC @@ -350,7 +363,7 @@ func TestLeader_SecondaryCA_UpgradeBeforePrimary(t *testing.T) { dir2, s2 := testServerWithConfig(t, func(c *Config) { c.Datacenter = "dc2" c.PrimaryDatacenter = "dc1" - c.Build = "1.4.0" + c.Build = "1.6.0" }) defer os.RemoveAll(dir2) defer s2.Shutdown() @@ -359,72 +372,59 @@ func TestLeader_SecondaryCA_UpgradeBeforePrimary(t *testing.T) { joinWAN(t, s2, s1) testrpc.WaitForLeader(t, s2.RPC, "dc2") - // Verify the root lists are different in each DC's state store. - var oldSecondaryRootID string - { + // ensure all the CA initialization stuff would have already been done + // this is necessary to ensure that not only has a leader been elected + // but that it has also finished its establishLeadership call + retry.Run(t, func(r *retry.R) { + require.True(r, s1.isReadyForConsistentReads()) + require.True(r, s2.isReadyForConsistentReads()) + }) + + // Verify the primary has a root (we faked its version too low but since its the primary it ignores any version checks) + retry.Run(t, func(r *retry.R) { state1 := s1.fsm.State() _, roots1, err := state1.CARoots(nil) - require.NoError(err) + require.NoError(r, err) + require.Len(r, roots1, 1) + }) - state2 := s2.fsm.State() - _, roots2, err := state2.CARoots(nil) - require.NoError(err) - require.Equal(1, len(roots1)) - require.Equal(1, len(roots2)) - require.NotEqual(roots1[0].ID, roots2[0].ID) - require.NotEqual(roots1[0].RootCert, roots2[0].RootCert) - oldSecondaryRootID = roots2[0].ID - } + // Verify the secondary does not have a root - defers initialization until the primary has been upgraded. + state2 := s2.fsm.State() + _, roots2, err := state2.CARoots(nil) + require.NoError(t, err) + require.Empty(t, roots2) // Update the version on the fly so s2 kicks off the secondary DC transition. - tags := s1.config.SerfLANConfig.Tags - tags["build"] = "1.4.0" - s1.serfLAN.SetTags(tags) + tags := s1.config.SerfWANConfig.Tags + tags["build"] = "1.6.0" + s1.serfWAN.SetTags(tags) // Wait for the secondary transition to happen and then verify the secondary DC // has both roots present. secondaryProvider, _ := s2.getCAProvider() retry.Run(t, func(r *retry.R) { - state := s2.fsm.State() - _, roots, err := state.CARoots(nil) - r.Check(err) - if len(roots) != 2 { - r.Fatalf("should have 2 roots: %v", roots) - } - inter, err := secondaryProvider.ActiveIntermediate() - r.Check(err) - if inter == "" { - r.Fatal("should have valid intermediate") - } - }) - { state1 := s1.fsm.State() _, roots1, err := state1.CARoots(nil) - require.NoError(err) + require.NoError(r, err) + require.Len(r, roots1, 1) state2 := s2.fsm.State() _, roots2, err := state2.CARoots(nil) - require.NoError(err) - require.Equal(1, len(roots1)) - require.Equal(2, len(roots2)) - var oldSecondaryRoot *structs.CARoot - var newSecondaryRoot *structs.CARoot - if roots2[0].ID == oldSecondaryRootID { - oldSecondaryRoot = roots2[0] - newSecondaryRoot = roots2[1] - } else { - oldSecondaryRoot = roots2[1] - newSecondaryRoot = roots2[0] - } - require.Equal(roots1[0].ID, newSecondaryRoot.ID) - require.Equal(roots1[0].RootCert, newSecondaryRoot.RootCert) - require.NotEqual(newSecondaryRoot.ID, oldSecondaryRoot.ID) - require.NotEqual(newSecondaryRoot.RootCert, oldSecondaryRoot.RootCert) - } + require.NoError(r, err) + require.Len(r, roots2, 1) + + // ensure the roots are the same + require.Equal(r, roots1[0].ID, roots2[0].ID) + require.Equal(r, roots1[0].RootCert, roots2[0].RootCert) + + inter, err := secondaryProvider.ActiveIntermediate() + require.NoError(r, err) + require.NotEmpty(r, inter, "should have valid intermediate") + }) _, caRoot := s1.getCAProvider() intermediatePEM, err := secondaryProvider.ActiveIntermediate() - require.NoError(err) + require.NoError(t, err) // Have dc2 sign a leaf cert and make sure the chain is correct. spiffeService := &connect.SpiffeIDService{ @@ -436,13 +436,13 @@ func TestLeader_SecondaryCA_UpgradeBeforePrimary(t *testing.T) { raw, _ := connect.TestCSR(t, spiffeService) leafCsr, err := connect.ParseCSR(raw) - require.NoError(err) + require.NoError(t, err) leafPEM, err := secondaryProvider.Sign(leafCsr) - require.NoError(err) + require.NoError(t, err) cert, err := connect.ParseCert(leafPEM) - require.NoError(err) + require.NoError(t, err) // Check that the leaf signed by the new cert can be verified using the // returned cert chain (signed intermediate + remote root). @@ -455,7 +455,7 @@ func TestLeader_SecondaryCA_UpgradeBeforePrimary(t *testing.T) { Intermediates: intermediatePool, Roots: rootPool, }) - require.NoError(err) + require.NoError(t, err) } func TestLeader_ReplicateIntentions(t *testing.T) { @@ -841,3 +841,189 @@ func TestLeader_GenerateCASignRequest(t *testing.T) { req := s.generateCASignRequest(csr) assert.Equal(t, "east", req.RequestDatacenter()) } + +func TestLeader_CARootPruning(t *testing.T) { + t.Parallel() + + caRootPruneInterval = 200 * time.Millisecond + + require := require.New(t) + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testrpc.WaitForTestAgent(t, s1.RPC, "dc1") + + // Get the current root + rootReq := &structs.DCSpecificRequest{ + Datacenter: "dc1", + } + var rootList structs.IndexedCARoots + require.Nil(msgpackrpc.CallWithCodec(codec, "ConnectCA.Roots", rootReq, &rootList)) + require.Len(rootList.Roots, 1) + oldRoot := rootList.Roots[0] + + // Update the provider config to use a new private key, which should + // cause a rotation. + _, newKey, err := connect.GeneratePrivateKey() + require.NoError(err) + newConfig := &structs.CAConfiguration{ + Provider: "consul", + Config: map[string]interface{}{ + "LeafCertTTL": "500ms", + "PrivateKey": newKey, + "RootCert": "", + "RotationPeriod": "2160h", + "SkipValidate": true, + }, + } + { + args := &structs.CARequest{ + Datacenter: "dc1", + Config: newConfig, + } + var reply interface{} + + require.NoError(msgpackrpc.CallWithCodec(codec, "ConnectCA.ConfigurationSet", args, &reply)) + } + + // Should have 2 roots now. + _, roots, err := s1.fsm.State().CARoots(nil) + require.NoError(err) + require.Len(roots, 2) + + time.Sleep(2 * time.Second) + + // Now the old root should be pruned. + _, roots, err = s1.fsm.State().CARoots(nil) + require.NoError(err) + require.Len(roots, 1) + require.True(roots[0].Active) + require.NotEqual(roots[0].ID, oldRoot.ID) +} + +func TestLeader_PersistIntermediateCAs(t *testing.T) { + t.Parallel() + + require := require.New(t) + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + dir2, s2 := testServerDCBootstrap(t, "dc1", false) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + dir3, s3 := testServerDCBootstrap(t, "dc1", false) + defer os.RemoveAll(dir3) + defer s3.Shutdown() + + joinLAN(t, s2, s1) + joinLAN(t, s3, s1) + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + // Get the current root + rootReq := &structs.DCSpecificRequest{ + Datacenter: "dc1", + } + var rootList structs.IndexedCARoots + require.Nil(msgpackrpc.CallWithCodec(codec, "ConnectCA.Roots", rootReq, &rootList)) + require.Len(rootList.Roots, 1) + + // Update the provider config to use a new private key, which should + // cause a rotation. + _, newKey, err := connect.GeneratePrivateKey() + require.NoError(err) + newConfig := &structs.CAConfiguration{ + Provider: "consul", + Config: map[string]interface{}{ + "PrivateKey": newKey, + "RootCert": "", + "RotationPeriod": 90 * 24 * time.Hour, + }, + } + { + args := &structs.CARequest{ + Datacenter: "dc1", + Config: newConfig, + } + var reply interface{} + + require.NoError(msgpackrpc.CallWithCodec(codec, "ConnectCA.ConfigurationSet", args, &reply)) + } + + // Get the active root before leader change. + _, root := s1.getCAProvider() + require.Len(root.IntermediateCerts, 1) + + // Force a leader change and make sure the root CA values are preserved. + s1.Leave() + s1.Shutdown() + + retry.Run(t, func(r *retry.R) { + var leader *Server + for _, s := range []*Server{s2, s3} { + if s.IsLeader() { + leader = s + break + } + } + if leader == nil { + r.Fatal("no leader") + } + + _, newLeaderRoot := leader.getCAProvider() + if !reflect.DeepEqual(newLeaderRoot, root) { + r.Fatalf("got %v, want %v", newLeaderRoot, root) + } + }) +} + +func TestLeader_ParseCARoot(t *testing.T) { + type test struct { + pem string + expectedError bool + } + tests := []test{ + {"", true}, + {`-----BEGIN CERTIFICATE----- +MIIDHDCCAsKgAwIBAgIQS+meruRVzrmVwEhXNrtk9jAKBggqhkjOPQQDAjCBuTEL +MAkGA1UEBhMCVVMxCzAJBgNVBAgTAkNBMRYwFAYDVQQHEw1TYW4gRnJhbmNpc2Nv +MRowGAYDVQQJExExMDEgU2Vjb25kIFN0cmVldDEOMAwGA1UEERMFOTQxMDUxFzAV +BgNVBAoTDkhhc2hpQ29ycCBJbmMuMUAwPgYDVQQDEzdDb25zdWwgQWdlbnQgQ0Eg +MTkzNzYxNzQwMjcxNzUxOTkyMzAyMzE1NDkxNjUzODYyMzAwNzE3MB4XDTE5MDQx +MjA5MTg0NVoXDTIwMDQxMTA5MTg0NVowHDEaMBgGA1UEAxMRY2xpZW50LmRjMS5j +b25zdWwwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAAS2UroGUh5k7eR//iPsn9ne +CMCVsERnjqQnK6eDWnM5kTXgXcPPe5pcAS9xs0g8BZ+oVsJSc7sH6RYvX+gw6bCl +o4IBRjCCAUIwDgYDVR0PAQH/BAQDAgWgMB0GA1UdJQQWMBQGCCsGAQUFBwMCBggr +BgEFBQcDATAMBgNVHRMBAf8EAjAAMGgGA1UdDgRhBF84NDphNDplZjoxYTpjODo1 +MzoxMDo1YTpjNTplYTpjZTphYTowZDo2ZjpjOTozODozZDphZjo0NTphZTo5OTo4 +YzpiYjoyNzpiYzpiMzpmYTpmMDozMToxNDo4ZTozNDBqBgNVHSMEYzBhgF8yYTox +MjpjYTo0Mzo0NzowODpiZjoxYTo0Yjo4MTpkNDo2MzowNTo1ODowZToxYzo3Zjoy +NTo0ZjozNDpmNDozYjpmYzo5YTpkNzo4Mjo2YjpkYzpmODo3YjphMTo5ZDAtBgNV +HREEJjAkghFjbGllbnQuZGMxLmNvbnN1bIIJbG9jYWxob3N0hwR/AAABMAoGCCqG +SM49BAMCA0gAMEUCIHcLS74KSQ7RA+edwOprmkPTh1nolwXz9/y9CJ5nMVqEAiEA +h1IHCbxWsUT3AiARwj5/D/CUppy6BHIFkvcpOCQoVyo= +-----END CERTIFICATE-----`, false}, + } + for _, test := range tests { + root, err := parseCARoot(test.pem, "consul", "cluster") + if err == nil && test.expectedError { + require.Error(t, err) + } + if test.pem != "" { + rootCert, err := connect.ParseCert(test.pem) + require.NoError(t, err) + + // just to make sure these two are not the same + require.NotEqual(t, rootCert.AuthorityKeyId, rootCert.SubjectKeyId) + + require.Equal(t, connect.HexString(rootCert.SubjectKeyId), root.SigningKeyID) + } + } +} diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index e39472288..ea1a2f210 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -2,11 +2,9 @@ package consul import ( "os" - "reflect" "testing" "time" - "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil/retry" @@ -1066,148 +1064,6 @@ func TestLeader_ACL_Initialization(t *testing.T) { } } -func TestLeader_CARootPruning(t *testing.T) { - t.Parallel() - - caRootPruneInterval = 200 * time.Millisecond - - require := require.New(t) - dir1, s1 := testServer(t) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - codec := rpcClient(t, s1) - defer codec.Close() - - testrpc.WaitForTestAgent(t, s1.RPC, "dc1") - - // Get the current root - rootReq := &structs.DCSpecificRequest{ - Datacenter: "dc1", - } - var rootList structs.IndexedCARoots - require.Nil(msgpackrpc.CallWithCodec(codec, "ConnectCA.Roots", rootReq, &rootList)) - require.Len(rootList.Roots, 1) - oldRoot := rootList.Roots[0] - - // Update the provider config to use a new private key, which should - // cause a rotation. - _, newKey, err := connect.GeneratePrivateKey() - require.NoError(err) - newConfig := &structs.CAConfiguration{ - Provider: "consul", - Config: map[string]interface{}{ - "LeafCertTTL": "500ms", - "PrivateKey": newKey, - "RootCert": "", - "RotationPeriod": "2160h", - "SkipValidate": true, - }, - } - { - args := &structs.CARequest{ - Datacenter: "dc1", - Config: newConfig, - } - var reply interface{} - - require.NoError(msgpackrpc.CallWithCodec(codec, "ConnectCA.ConfigurationSet", args, &reply)) - } - - // Should have 2 roots now. - _, roots, err := s1.fsm.State().CARoots(nil) - require.NoError(err) - require.Len(roots, 2) - - time.Sleep(2 * time.Second) - - // Now the old root should be pruned. - _, roots, err = s1.fsm.State().CARoots(nil) - require.NoError(err) - require.Len(roots, 1) - require.True(roots[0].Active) - require.NotEqual(roots[0].ID, oldRoot.ID) -} - -func TestLeader_PersistIntermediateCAs(t *testing.T) { - t.Parallel() - - require := require.New(t) - dir1, s1 := testServer(t) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - codec := rpcClient(t, s1) - defer codec.Close() - - dir2, s2 := testServerDCBootstrap(t, "dc1", false) - defer os.RemoveAll(dir2) - defer s2.Shutdown() - - dir3, s3 := testServerDCBootstrap(t, "dc1", false) - defer os.RemoveAll(dir3) - defer s3.Shutdown() - - joinLAN(t, s2, s1) - joinLAN(t, s3, s1) - - testrpc.WaitForLeader(t, s1.RPC, "dc1") - - // Get the current root - rootReq := &structs.DCSpecificRequest{ - Datacenter: "dc1", - } - var rootList structs.IndexedCARoots - require.Nil(msgpackrpc.CallWithCodec(codec, "ConnectCA.Roots", rootReq, &rootList)) - require.Len(rootList.Roots, 1) - - // Update the provider config to use a new private key, which should - // cause a rotation. - _, newKey, err := connect.GeneratePrivateKey() - require.NoError(err) - newConfig := &structs.CAConfiguration{ - Provider: "consul", - Config: map[string]interface{}{ - "PrivateKey": newKey, - "RootCert": "", - "RotationPeriod": 90 * 24 * time.Hour, - }, - } - { - args := &structs.CARequest{ - Datacenter: "dc1", - Config: newConfig, - } - var reply interface{} - - require.NoError(msgpackrpc.CallWithCodec(codec, "ConnectCA.ConfigurationSet", args, &reply)) - } - - // Get the active root before leader change. - _, root := s1.getCAProvider() - require.Len(root.IntermediateCerts, 1) - - // Force a leader change and make sure the root CA values are preserved. - s1.Leave() - s1.Shutdown() - - retry.Run(t, func(r *retry.R) { - var leader *Server - for _, s := range []*Server{s2, s3} { - if s.IsLeader() { - leader = s - break - } - } - if leader == nil { - r.Fatal("no leader") - } - - _, newLeaderRoot := leader.getCAProvider() - if !reflect.DeepEqual(newLeaderRoot, root) { - r.Fatalf("got %v, want %v", newLeaderRoot, root) - } - }) -} - func TestLeader_ACLUpgrade(t *testing.T) { t.Parallel() dir1, s1 := testServerWithConfig(t, func(c *Config) { @@ -1302,47 +1158,3 @@ func TestLeader_ConfigEntryBootstrap(t *testing.T) { require.Equal(t, global_entry_init.Config, global.Config) }) } - -func TestLeader_ParseCARoot(t *testing.T) { - type test struct { - pem string - expectedError bool - } - tests := []test{ - {"", true}, - {`-----BEGIN CERTIFICATE----- -MIIDHDCCAsKgAwIBAgIQS+meruRVzrmVwEhXNrtk9jAKBggqhkjOPQQDAjCBuTEL -MAkGA1UEBhMCVVMxCzAJBgNVBAgTAkNBMRYwFAYDVQQHEw1TYW4gRnJhbmNpc2Nv -MRowGAYDVQQJExExMDEgU2Vjb25kIFN0cmVldDEOMAwGA1UEERMFOTQxMDUxFzAV -BgNVBAoTDkhhc2hpQ29ycCBJbmMuMUAwPgYDVQQDEzdDb25zdWwgQWdlbnQgQ0Eg -MTkzNzYxNzQwMjcxNzUxOTkyMzAyMzE1NDkxNjUzODYyMzAwNzE3MB4XDTE5MDQx -MjA5MTg0NVoXDTIwMDQxMTA5MTg0NVowHDEaMBgGA1UEAxMRY2xpZW50LmRjMS5j -b25zdWwwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAAS2UroGUh5k7eR//iPsn9ne -CMCVsERnjqQnK6eDWnM5kTXgXcPPe5pcAS9xs0g8BZ+oVsJSc7sH6RYvX+gw6bCl -o4IBRjCCAUIwDgYDVR0PAQH/BAQDAgWgMB0GA1UdJQQWMBQGCCsGAQUFBwMCBggr -BgEFBQcDATAMBgNVHRMBAf8EAjAAMGgGA1UdDgRhBF84NDphNDplZjoxYTpjODo1 -MzoxMDo1YTpjNTplYTpjZTphYTowZDo2ZjpjOTozODozZDphZjo0NTphZTo5OTo4 -YzpiYjoyNzpiYzpiMzpmYTpmMDozMToxNDo4ZTozNDBqBgNVHSMEYzBhgF8yYTox -MjpjYTo0Mzo0NzowODpiZjoxYTo0Yjo4MTpkNDo2MzowNTo1ODowZToxYzo3Zjoy -NTo0ZjozNDpmNDozYjpmYzo5YTpkNzo4Mjo2YjpkYzpmODo3YjphMTo5ZDAtBgNV -HREEJjAkghFjbGllbnQuZGMxLmNvbnN1bIIJbG9jYWxob3N0hwR/AAABMAoGCCqG -SM49BAMCA0gAMEUCIHcLS74KSQ7RA+edwOprmkPTh1nolwXz9/y9CJ5nMVqEAiEA -h1IHCbxWsUT3AiARwj5/D/CUppy6BHIFkvcpOCQoVyo= ------END CERTIFICATE-----`, false}, - } - for _, test := range tests { - root, err := parseCARoot(test.pem, "consul", "cluster") - if err == nil && test.expectedError { - require.Error(t, err) - } - if test.pem != "" { - rootCert, err := connect.ParseCert(test.pem) - require.NoError(t, err) - - // just to make sure these two are not the same - require.NotEqual(t, rootCert.AuthorityKeyId, rootCert.SubjectKeyId) - - require.Equal(t, connect.HexString(rootCert.SubjectKeyId), root.SigningKeyID) - } - } -} diff --git a/agent/consul/server.go b/agent/consul/server.go index 31be21891..37aefb61d 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -142,12 +142,6 @@ type Server struct { caProviderRoot *structs.CARoot caProviderLock sync.RWMutex - // caPruningCh is used to shut down the CA root pruning goroutine when we - // lose leadership. - caPruningCh chan struct{} - caPruningLock sync.RWMutex - caPruningEnabled bool - // Consul configuration config *Config diff --git a/agent/consul/util.go b/agent/consul/util.go index 19ff2cc2b..534cc1136 100644 --- a/agent/consul/util.go +++ b/agent/consul/util.go @@ -276,9 +276,33 @@ func runtimeStats() map[string]string { // ServersMeetMinimumVersion returns whether the given alive servers are at least on the // given Consul version func ServersMeetMinimumVersion(members []serf.Member, minVersion *version.Version) bool { + return ServersMeetRequirements(members, func(srv *metadata.Server) bool { + return srv.Status != serf.StatusAlive || !srv.Build.LessThan(minVersion) + }) +} + +// ServersMeetMinimumVersion returns whether the given alive servers from a particular +// datacenter are at least on the given Consul version. This requires at least 1 alive server in the DC +func ServersInDCMeetMinimumVersion(members []serf.Member, datacenter string, minVersion *version.Version) (bool, bool) { + found := false + ok := ServersMeetRequirements(members, func(srv *metadata.Server) bool { + if srv.Status != serf.StatusAlive || srv.Datacenter != datacenter { + return true + } + + found = true + return !srv.Build.LessThan(minVersion) + }) + + return ok, found +} + +// ServersMeetRequirements returns whether the given server members meet the requirements as defined by the +// callback function +func ServersMeetRequirements(members []serf.Member, meetsRequirements func(*metadata.Server) bool) bool { for _, member := range members { - if valid, parts := metadata.IsConsulServer(member); valid && parts.Status == serf.StatusAlive { - if parts.Build.LessThan(minVersion) { + if valid, parts := metadata.IsConsulServer(member); valid { + if !meetsRequirements(parts) { return false } } diff --git a/agent/consul/util_test.go b/agent/consul/util_test.go index 654673b62..83969089e 100644 --- a/agent/consul/util_test.go +++ b/agent/consul/util_test.go @@ -405,6 +405,94 @@ func TestServersMeetMinimumVersion(t *testing.T) { } } +func TestServersInDCMeetMinimumVersion(t *testing.T) { + t.Parallel() + makeMember := func(version string, datacenter string) serf.Member { + return serf.Member{ + Name: "foo", + Addr: net.IP([]byte{127, 0, 0, 1}), + Tags: map[string]string{ + "role": "consul", + "id": "asdf", + "dc": datacenter, + "port": "10000", + "build": version, + "wan_join_port": "1234", + "vsn": "1", + "expect": "3", + "raft_vsn": "3", + }, + Status: serf.StatusAlive, + } + } + + cases := []struct { + members []serf.Member + ver *version.Version + expected bool + expectedFound bool + }{ + // One server, meets reqs + { + members: []serf.Member{ + makeMember("0.7.5", "primary"), + makeMember("0.7.3", "secondary"), + }, + ver: version.Must(version.NewVersion("0.7.5")), + expected: true, + expectedFound: true, + }, + // One server, doesn't meet reqs + { + members: []serf.Member{ + makeMember("0.7.5", "primary"), + makeMember("0.8.1", "secondary"), + }, + ver: version.Must(version.NewVersion("0.8.0")), + expected: false, + expectedFound: true, + }, + // Multiple servers, meets req version + { + members: []serf.Member{ + makeMember("0.7.5", "primary"), + makeMember("0.8.0", "primary"), + makeMember("0.7.0", "secondary"), + }, + ver: version.Must(version.NewVersion("0.7.5")), + expected: true, + expectedFound: true, + }, + // Multiple servers, doesn't meet req version + { + members: []serf.Member{ + makeMember("0.7.5", "primary"), + makeMember("0.8.0", "primary"), + makeMember("0.9.1", "secondary"), + }, + ver: version.Must(version.NewVersion("0.8.0")), + expected: false, + expectedFound: true, + }, + { + members: []serf.Member{ + makeMember("0.7.5", "secondary"), + makeMember("0.8.0", "secondary"), + makeMember("0.9.1", "secondary"), + }, + ver: version.Must(version.NewVersion("0.7.0")), + expected: true, + expectedFound: false, + }, + } + + for _, tc := range cases { + result, found := ServersInDCMeetMinimumVersion(tc.members, "primary", tc.ver) + require.Equal(t, tc.expected, result) + require.Equal(t, tc.expectedFound, found) + } +} + func TestInterpolateHIL(t *testing.T) { for _, test := range []struct { name string