server: don't activate federation state replication or anti-entropy until all servers are running 1.8.0+ (#8014)
This commit is contained in:
parent
7f14d3ac8a
commit
3ad570ba99
|
@ -1,6 +1,7 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
|
@ -11,6 +12,10 @@ import (
|
|||
memdb "github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
var (
|
||||
errFederationStatesNotEnabled = errors.New("Federation states are currently disabled until all servers in the datacenter support the feature")
|
||||
)
|
||||
|
||||
// FederationState endpoint is used to manipulate federation states from all
|
||||
// datacenters.
|
||||
type FederationState struct {
|
||||
|
@ -25,6 +30,11 @@ func (c *FederationState) Apply(args *structs.FederationStateRequest, reply *boo
|
|||
if done, err := c.srv.forward("FederationState.Apply", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
if !c.srv.DatacenterSupportsFederationStates() {
|
||||
return errFederationStatesNotEnabled
|
||||
}
|
||||
|
||||
defer metrics.MeasureSince([]string{"federation_state", "apply"}, time.Now())
|
||||
|
||||
// Fetch the ACL token, if any.
|
||||
|
@ -69,6 +79,11 @@ func (c *FederationState) Get(args *structs.FederationStateQuery, reply *structs
|
|||
if done, err := c.srv.forward("FederationState.Get", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
if !c.srv.DatacenterSupportsFederationStates() {
|
||||
return errFederationStatesNotEnabled
|
||||
}
|
||||
|
||||
defer metrics.MeasureSince([]string{"federation_state", "get"}, time.Now())
|
||||
|
||||
// Fetch the ACL token, if any.
|
||||
|
@ -105,6 +120,11 @@ func (c *FederationState) List(args *structs.DCSpecificRequest, reply *structs.I
|
|||
if done, err := c.srv.forward("FederationState.List", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
if !c.srv.DatacenterSupportsFederationStates() {
|
||||
return errFederationStatesNotEnabled
|
||||
}
|
||||
|
||||
defer metrics.MeasureSince([]string{"federation_state", "list"}, time.Now())
|
||||
|
||||
// Fetch the ACL token, if any.
|
||||
|
@ -143,6 +163,11 @@ func (c *FederationState) ListMeshGateways(args *structs.DCSpecificRequest, repl
|
|||
if done, err := c.srv.forward("FederationState.ListMeshGateways", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
if !c.srv.DatacenterSupportsFederationStates() {
|
||||
return errFederationStatesNotEnabled
|
||||
}
|
||||
|
||||
defer metrics.MeasureSince([]string{"federation_state", "list_mesh_gateways"}, time.Now())
|
||||
|
||||
return c.srv.blockingQuery(
|
||||
|
|
|
@ -2,6 +2,7 @@ package consul
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"time"
|
||||
|
@ -9,6 +10,12 @@ import (
|
|||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
var errFederationStatesNotSupported = errors.New("Not all servers in the datacenter support federation states - preventing replication")
|
||||
|
||||
func isErrFederationStatesNotSupported(err error) bool {
|
||||
return errors.Is(err, errFederationStatesNotSupported)
|
||||
}
|
||||
|
||||
type FederationStateReplicator struct {
|
||||
srv *Server
|
||||
gatewayLocator *GatewayLocator
|
||||
|
@ -27,6 +34,9 @@ func (r *FederationStateReplicator) MetricName() string { return "federation-sta
|
|||
|
||||
// FetchRemote implements IndexReplicatorDelegate.
|
||||
func (r *FederationStateReplicator) FetchRemote(lastRemoteIndex uint64) (int, interface{}, uint64, error) {
|
||||
if !r.srv.DatacenterSupportsFederationStates() {
|
||||
return 0, nil, 0, errFederationStatesNotSupported
|
||||
}
|
||||
lenRemote, remote, remoteIndex, err := r.fetchRemote(lastRemoteIndex)
|
||||
if r.gatewayLocator != nil {
|
||||
r.gatewayLocator.SetLastFederationStateReplicationError(err)
|
||||
|
|
|
@ -1529,3 +1529,64 @@ func (s *Server) reapTombstones(index uint64) {
|
|||
)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) DatacenterSupportsFederationStates() bool {
|
||||
if atomic.LoadInt32(&s.dcSupportsFederationStates) != 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
state := serversFederationStatesInfo{
|
||||
supported: true,
|
||||
found: false,
|
||||
}
|
||||
|
||||
// check if they are supported in the primary dc
|
||||
if s.config.PrimaryDatacenter != s.config.Datacenter {
|
||||
s.router.CheckServers(s.config.PrimaryDatacenter, state.update)
|
||||
|
||||
if !state.supported || !state.found {
|
||||
s.logger.Debug("federation states are not enabled in the primary dc")
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// check the servers in the local DC
|
||||
s.router.CheckServers(s.config.Datacenter, state.update)
|
||||
|
||||
if state.supported && state.found {
|
||||
atomic.StoreInt32(&s.dcSupportsFederationStates, 1)
|
||||
return true
|
||||
}
|
||||
|
||||
s.logger.Debug("federation states are not enabled in this datacenter", "datacenter", s.config.Datacenter)
|
||||
return false
|
||||
}
|
||||
|
||||
type serversFederationStatesInfo struct {
|
||||
// supported indicates whether every processed server supports federation states
|
||||
supported bool
|
||||
|
||||
// found indicates that at least one server was processed
|
||||
found bool
|
||||
}
|
||||
|
||||
func (s *serversFederationStatesInfo) update(srv *metadata.Server) bool {
|
||||
if srv.Status != serf.StatusAlive && srv.Status != serf.StatusFailed {
|
||||
// they are left or something so regardless we treat these servers as meeting
|
||||
// the version requirement
|
||||
return true
|
||||
}
|
||||
|
||||
// mark that we processed at least one server
|
||||
s.found = true
|
||||
|
||||
if supported, ok := srv.FeatureFlags["fs"]; ok && supported == 1 {
|
||||
return true
|
||||
}
|
||||
|
||||
// mark that at least one server does not support federation states
|
||||
s.supported = false
|
||||
|
||||
// prevent continuing server evaluation
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -43,6 +43,10 @@ func (s *Server) federationStateAntiEntropySync(ctx context.Context) error {
|
|||
var lastFetchIndex uint64
|
||||
|
||||
retryLoopBackoff(ctx.Done(), func() error {
|
||||
if !s.DatacenterSupportsFederationStates() {
|
||||
return nil
|
||||
}
|
||||
|
||||
idx, err := s.federationStateAntiEntropyMaybeSync(ctx, lastFetchIndex)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -1226,3 +1226,299 @@ func TestLeader_ACLLegacyReplication(t *testing.T) {
|
|||
require.False(t, srv.leaderRoutineManager.IsRunning(aclRoleReplicationRoutineName))
|
||||
require.False(t, srv.leaderRoutineManager.IsRunning(aclTokenReplicationRoutineName))
|
||||
}
|
||||
|
||||
func TestDatacenterSupportsFederationStates(t *testing.T) {
|
||||
addGateway := func(t *testing.T, srv *Server, dc, node string) {
|
||||
t.Helper()
|
||||
arg := structs.RegisterRequest{
|
||||
Datacenter: dc,
|
||||
Node: node,
|
||||
Address: "127.0.0.1",
|
||||
Service: &structs.NodeService{
|
||||
Kind: structs.ServiceKindMeshGateway,
|
||||
ID: "mesh-gateway",
|
||||
Service: "mesh-gateway",
|
||||
Port: 8080,
|
||||
},
|
||||
}
|
||||
|
||||
var out struct{}
|
||||
require.NoError(t, srv.RPC("Catalog.Register", &arg, &out))
|
||||
}
|
||||
|
||||
t.Run("one node primary with old version", func(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.NodeName = "node1"
|
||||
c.Datacenter = "dc1"
|
||||
c.PrimaryDatacenter = "dc1"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
s1.updateSerfTags("ft_fs", "0")
|
||||
|
||||
waitForLeaderEstablishment(t, s1)
|
||||
|
||||
addGateway(t, s1, "dc1", "node1")
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if s1.DatacenterSupportsFederationStates() {
|
||||
r.Fatal("server 1 shouldn't activate fedstates")
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("one node primary with new version", func(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.NodeName = "node1"
|
||||
c.Datacenter = "dc1"
|
||||
c.PrimaryDatacenter = "dc1"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
waitForLeaderEstablishment(t, s1)
|
||||
|
||||
addGateway(t, s1, "dc1", "node1")
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if !s1.DatacenterSupportsFederationStates() {
|
||||
r.Fatal("server 1 didn't activate fedstates")
|
||||
}
|
||||
})
|
||||
|
||||
// Wait until after AE runs at least once.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
arg := structs.FederationStateQuery{
|
||||
Datacenter: "dc1",
|
||||
TargetDatacenter: "dc1",
|
||||
}
|
||||
|
||||
var out structs.FederationStateResponse
|
||||
require.NoError(r, s1.RPC("FederationState.Get", &arg, &out))
|
||||
require.NotNil(r, out.State)
|
||||
require.Len(r, out.State.MeshGateways, 1)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("two node primary with mixed versions", func(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.NodeName = "node1"
|
||||
c.Datacenter = "dc1"
|
||||
c.PrimaryDatacenter = "dc1"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
s1.updateSerfTags("ft_fs", "0")
|
||||
|
||||
waitForLeaderEstablishment(t, s1)
|
||||
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.NodeName = "node2"
|
||||
c.Datacenter = "dc1"
|
||||
c.PrimaryDatacenter = "dc1"
|
||||
c.Bootstrap = false
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
// Put s1 last so we don't trigger a leader election.
|
||||
servers := []*Server{s2, s1}
|
||||
|
||||
// Try to join
|
||||
joinLAN(t, s2, s1)
|
||||
for _, s := range servers {
|
||||
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 2)) })
|
||||
}
|
||||
|
||||
waitForLeaderEstablishment(t, s1)
|
||||
|
||||
addGateway(t, s1, "dc1", "node1")
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if s1.DatacenterSupportsFederationStates() {
|
||||
r.Fatal("server 1 shouldn't activate fedstates")
|
||||
}
|
||||
})
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if s2.DatacenterSupportsFederationStates() {
|
||||
r.Fatal("server 2 shouldn't activate fedstates")
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("two node primary with new version", func(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.NodeName = "node1"
|
||||
c.Datacenter = "dc1"
|
||||
c.PrimaryDatacenter = "dc1"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
waitForLeaderEstablishment(t, s1)
|
||||
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.NodeName = "node2"
|
||||
c.Datacenter = "dc1"
|
||||
c.PrimaryDatacenter = "dc1"
|
||||
c.Bootstrap = false
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
// Put s1 last so we don't trigger a leader election.
|
||||
servers := []*Server{s2, s1}
|
||||
|
||||
// Try to join
|
||||
joinLAN(t, s2, s1)
|
||||
for _, s := range servers {
|
||||
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 2)) })
|
||||
}
|
||||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
testrpc.WaitForLeader(t, s2.RPC, "dc1")
|
||||
|
||||
addGateway(t, s1, "dc1", "node1")
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if !s1.DatacenterSupportsFederationStates() {
|
||||
r.Fatal("server 1 didn't activate fedstates")
|
||||
}
|
||||
})
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if !s2.DatacenterSupportsFederationStates() {
|
||||
r.Fatal("server 2 didn't activate fedstates")
|
||||
}
|
||||
})
|
||||
|
||||
// Wait until after AE runs at least once.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
arg := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
|
||||
var out structs.IndexedFederationStates
|
||||
require.NoError(r, s1.RPC("FederationState.List", &arg, &out))
|
||||
require.Len(r, out.States, 1)
|
||||
require.Len(r, out.States[0].MeshGateways, 1)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("primary and secondary with new version", func(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.NodeName = "node1"
|
||||
c.Datacenter = "dc1"
|
||||
c.PrimaryDatacenter = "dc1"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
waitForLeaderEstablishment(t, s1)
|
||||
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.NodeName = "node2"
|
||||
c.Datacenter = "dc2"
|
||||
c.PrimaryDatacenter = "dc1"
|
||||
c.FederationStateReplicationRate = 100
|
||||
c.FederationStateReplicationBurst = 100
|
||||
c.FederationStateReplicationApplyLimit = 1000000
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
waitForLeaderEstablishment(t, s2)
|
||||
|
||||
// Try to join
|
||||
joinWAN(t, s2, s1)
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc2")
|
||||
|
||||
addGateway(t, s1, "dc1", "node1")
|
||||
addGateway(t, s2, "dc2", "node2")
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if !s1.DatacenterSupportsFederationStates() {
|
||||
r.Fatal("server 1 didn't activate fedstates")
|
||||
}
|
||||
})
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if !s2.DatacenterSupportsFederationStates() {
|
||||
r.Fatal("server 2 didn't activate fedstates")
|
||||
}
|
||||
})
|
||||
|
||||
// Wait until after AE runs at least once for both.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
arg := structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
}
|
||||
|
||||
var out structs.IndexedFederationStates
|
||||
require.NoError(r, s1.RPC("FederationState.List", &arg, &out))
|
||||
require.Len(r, out.States, 2)
|
||||
require.Len(r, out.States[0].MeshGateways, 1)
|
||||
require.Len(r, out.States[1].MeshGateways, 1)
|
||||
})
|
||||
|
||||
// Wait until after replication runs for the secondary.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
arg := structs.DCSpecificRequest{
|
||||
Datacenter: "dc2",
|
||||
}
|
||||
|
||||
var out structs.IndexedFederationStates
|
||||
require.NoError(r, s1.RPC("FederationState.List", &arg, &out))
|
||||
require.Len(r, out.States, 2)
|
||||
require.Len(r, out.States[0].MeshGateways, 1)
|
||||
require.Len(r, out.States[1].MeshGateways, 1)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("primary and secondary with mixed versions", func(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.NodeName = "node1"
|
||||
c.Datacenter = "dc1"
|
||||
c.PrimaryDatacenter = "dc1"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
s1.updateSerfTags("ft_fs", "0")
|
||||
|
||||
waitForLeaderEstablishment(t, s1)
|
||||
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.NodeName = "node2"
|
||||
c.Datacenter = "dc2"
|
||||
c.PrimaryDatacenter = "dc1"
|
||||
c.FederationStateReplicationRate = 100
|
||||
c.FederationStateReplicationBurst = 100
|
||||
c.FederationStateReplicationApplyLimit = 1000000
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
waitForLeaderEstablishment(t, s2)
|
||||
|
||||
// Try to join
|
||||
joinWAN(t, s2, s1)
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc2")
|
||||
|
||||
addGateway(t, s1, "dc1", "node1")
|
||||
addGateway(t, s2, "dc2", "node2")
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if s1.DatacenterSupportsFederationStates() {
|
||||
r.Fatal("server 1 shouldn't activate fedstates")
|
||||
}
|
||||
})
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
if s2.DatacenterSupportsFederationStates() {
|
||||
r.Fatal("server 2 shouldn't activate fedstates")
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
|
|
@ -183,7 +183,7 @@ func (r *IndexReplicator) Replicate(ctx context.Context, lastRemoteIndex uint64,
|
|||
metrics.MeasureSince([]string{"leader", "replication", r.Delegate.MetricName(), "fetch"}, fetchStart)
|
||||
|
||||
if err != nil {
|
||||
return 0, false, fmt.Errorf("failed to retrieve %s: %v", r.Delegate.PluralNoun(), err)
|
||||
return 0, false, fmt.Errorf("failed to retrieve %s: %w", r.Delegate.PluralNoun(), err)
|
||||
}
|
||||
|
||||
r.Logger.Debug("finished fetching remote objects",
|
||||
|
|
|
@ -163,6 +163,12 @@ type Server struct {
|
|||
// federation states
|
||||
federationStateReplicator *Replicator
|
||||
|
||||
// dcSupportsFederationStates is used to determine whether we can
|
||||
// replicate federation states or not. All servers in the local
|
||||
// DC must be on a version of Consul supporting federation states
|
||||
// before this will get enabled.
|
||||
dcSupportsFederationStates int32
|
||||
|
||||
// tokens holds ACL tokens initially from the configuration, but can
|
||||
// be updated at runtime, so should always be used instead of going to
|
||||
// the configuration directly.
|
||||
|
@ -449,6 +455,7 @@ func NewServerLogger(config *Config, logger hclog.InterceptLogger, tokens *token
|
|||
Rate: s.config.FederationStateReplicationRate,
|
||||
Burst: s.config.FederationStateReplicationBurst,
|
||||
Logger: logger,
|
||||
SuppressErrorLog: isErrFederationStatesNotSupported,
|
||||
}
|
||||
s.federationStateReplicator, err = NewReplicator(&federationStateReplicatorConfig)
|
||||
if err != nil {
|
||||
|
|
|
@ -74,6 +74,9 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
|
|||
conf.Tags["acls"] = string(structs.ACLModeDisabled)
|
||||
}
|
||||
|
||||
// feature flag: advertise support for federation states
|
||||
conf.Tags["ft_fs"] = "1"
|
||||
|
||||
var subLoggerName string
|
||||
if wan {
|
||||
subLoggerName = logging.WAN
|
||||
|
|
Loading…
Reference in New Issue