server: use the presense of stored federation state data as a sign that we already activated the federation state feature flag (#9519)

This way we only have to wait for the serf barrier to pass once before
we can make use of federation state APIs Without this patch every
restart needs to re-compute the change.
This commit is contained in:
R.B. Boyer 2021-01-25 13:24:32 -06:00 committed by GitHub
parent 7c8cfb7a2a
commit 6622185d64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 105 additions and 1 deletions

3
.changelog/9519.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
server: use the presense of stored federation state data as a sign that we already activated the federation state feature flag
```

View File

@ -186,6 +186,16 @@ func waitForNewACLReplication(t *testing.T, server *Server, expectedReplicationT
}) })
} }
func waitForFederationStateFeature(t *testing.T, server *Server) {
t.Helper()
retry.Run(t, func(r *retry.R) {
require.True(r, server.DatacenterSupportsFederationStates())
})
require.True(t, server.DatacenterSupportsFederationStates())
}
func seeEachOther(a, b []serf.Member, addra, addrb string) bool { func seeEachOther(a, b []serf.Member, addra, addrb string) bool {
return serfMembersContains(a, addrb) && serfMembersContains(b, addra) return serfMembersContains(a, addrb) && serfMembersContains(b, addra)
} }

View File

@ -1484,6 +1484,10 @@ func (s *Server) reapTombstones(index uint64) {
} }
} }
func (s *Server) setDatacenterSupportsFederationStates() {
atomic.StoreInt32(&s.dcSupportsFederationStates, 1)
}
func (s *Server) DatacenterSupportsFederationStates() bool { func (s *Server) DatacenterSupportsFederationStates() bool {
if atomic.LoadInt32(&s.dcSupportsFederationStates) != 0 { if atomic.LoadInt32(&s.dcSupportsFederationStates) != 0 {
return true return true
@ -1508,7 +1512,7 @@ func (s *Server) DatacenterSupportsFederationStates() bool {
s.router.CheckServers(s.config.Datacenter, state.update) s.router.CheckServers(s.config.Datacenter, state.update)
if state.supported && state.found { if state.supported && state.found {
atomic.StoreInt32(&s.dcSupportsFederationStates, 1) s.setDatacenterSupportsFederationStates()
return true return true
} }

View File

@ -17,6 +17,16 @@ const (
) )
func (s *Server) startFederationStateAntiEntropy() { func (s *Server) startFederationStateAntiEntropy() {
// Check to see if we can skip waiting for serf feature detection below.
if !s.DatacenterSupportsFederationStates() {
_, fedStates, err := s.fsm.State().FederationStateList(nil)
if err != nil {
s.logger.Warn("Failed to check for existing federation states and activate the feature flag quicker; skipping this optimization", "error", err)
} else if len(fedStates) > 0 {
s.setDatacenterSupportsFederationStates()
}
}
if s.config.DisableFederationStateAntiEntropy { if s.config.DisableFederationStateAntiEntropy {
return return
} }

View File

@ -13,6 +13,83 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestLeader_FederationStateAntiEntropy_FeatureIsStickyEvenIfSerfTagsRegress(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
// We test this by having two datacenters with one server each. They
// initially come up and pass the serf barrier, then we power them both
// off. We leave the primary off permanently, and then we stand up the
// secondary. Hopefully it should transition to allow federation states.
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
waitForLeaderEstablishment(t, s1)
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc1"
c.FederationStateReplicationRate = 100
c.FederationStateReplicationBurst = 100
c.FederationStateReplicationApplyLimit = 1000000
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
codec2 := rpcClient(t, s2)
defer codec2.Close()
waitForLeaderEstablishment(t, s2)
// Create the WAN link
joinWAN(t, s2, s1)
waitForLeaderEstablishment(t, s1)
waitForLeaderEstablishment(t, s2)
waitForFederationStateFeature(t, s1)
waitForFederationStateFeature(t, s2)
// Wait for everybody's AE to complete.
retry.Run(t, func(r *retry.R) {
_, states, err := s1.fsm.State().FederationStateList(nil)
require.NoError(r, err)
require.Len(r, states, 2)
})
// Shutdown s1 and s2.
s1.Shutdown()
s2.Shutdown()
// Restart just s2
dir2new, s2new := testServerWithConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc1"
c.FederationStateReplicationRate = 100
c.FederationStateReplicationBurst = 100
c.FederationStateReplicationApplyLimit = 1000000
c.DataDir = s2.config.DataDir
c.NodeName = s2.config.NodeName
c.NodeID = s2.config.NodeID
})
defer os.RemoveAll(dir2new)
defer s2new.Shutdown()
waitForLeaderEstablishment(t, s2new)
// It should be able to transition without connectivity to the primary.
waitForFederationStateFeature(t, s2new)
}
func TestLeader_FederationStateAntiEntropy_BlockingQuery(t *testing.T) { func TestLeader_FederationStateAntiEntropy_BlockingQuery(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("too slow for testing.Short") t.Skip("too slow for testing.Short")