server: when wan federating via mesh gateways only do heuristic primary DC bypass on the leader (#9366)

Fixes #9341
This commit is contained in:
R.B. Boyer 2021-01-22 10:03:24 -06:00 committed by GitHub
parent 90cf562311
commit 0247f409a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 236 additions and 114 deletions

4
.changelog/9366.txt Normal file
View File

@ -0,0 +1,4 @@
```release-note:bug
server: When wan federating via mesh gateways only do heuristic primary DC bypass on the leader.
```

View File

@ -39,7 +39,7 @@ func (r *FederationStateReplicator) FetchRemote(lastRemoteIndex uint64) (int, in
}
lenRemote, remote, remoteIndex, err := r.fetchRemote(lastRemoteIndex)
if r.gatewayLocator != nil {
r.gatewayLocator.SetLastFederationStateReplicationError(err)
r.gatewayLocator.SetLastFederationStateReplicationError(err, true)
}
return lenRemote, remote, remoteIndex, err
}

View File

@ -59,11 +59,12 @@ type GatewayLocator struct {
// these are a collection of measurements that factor into deciding if we
// should directly dial the primary's mesh gateways or if we should try to
// route through our local gateways (if they are up).
lastReplLock sync.Mutex
lastReplSuccess time.Time
lastReplFailure time.Time
lastReplSuccesses uint64
lastReplFailures uint64
lastReplLock sync.Mutex
lastReplSuccess time.Time
lastReplFailure time.Time
lastReplSuccesses uint64
lastReplFailures uint64
useReplicationSignal bool // this should be set to true on the leader
}
// SetLastFederationStateReplicationError is used to indicate if the federation
@ -74,14 +75,23 @@ type GatewayLocator struct {
// our chosen mesh-gateway configuration can reach the primary's servers (like
// a ping or status RPC) we cheat and use the federation state replicator
// goroutine's success or failure as a proxy.
func (g *GatewayLocator) SetLastFederationStateReplicationError(err error) {
func (g *GatewayLocator) SetLastFederationStateReplicationError(err error, fromReplication bool) {
if g == nil {
return
}
g.lastReplLock.Lock()
defer g.lastReplLock.Unlock()
oldChoice := g.dialPrimaryThroughLocalGateway()
if err == nil {
g.lastReplSuccess = time.Now().UTC()
g.lastReplSuccesses++
g.lastReplFailures = 0
if fromReplication {
// If we get info from replication, assume replication is operating.
g.useReplicationSignal = true
}
} else {
g.lastReplFailure = time.Now().UTC()
g.lastReplFailures++
@ -93,6 +103,16 @@ func (g *GatewayLocator) SetLastFederationStateReplicationError(err error) {
}
}
func (g *GatewayLocator) SetUseReplicationSignal(newValue bool) {
if g == nil {
return
}
g.lastReplLock.Lock()
g.useReplicationSignal = newValue
g.lastReplLock.Unlock()
}
func (g *GatewayLocator) logPrimaryDialingMessage(useLocal bool) {
if g.datacenter == g.primaryDatacenter {
// These messages are useless when the server is in the primary
@ -140,6 +160,12 @@ func (g *GatewayLocator) DialPrimaryThroughLocalGateway() bool {
const localFederationStateReplicatorFailuresBeforeDialingDirectly = 3
func (g *GatewayLocator) dialPrimaryThroughLocalGateway() bool {
if !g.useReplicationSignal {
// Followers should blindly assume these gateways work. The leader will
// try to bypass them and correct the replicated federation state info
// that the followers will eventually pick up on.
return true
}
if g.lastReplSuccess.IsZero() && g.lastReplFailure.IsZero() {
return false // no data yet
}

View File

@ -18,6 +18,9 @@ import (
func TestGatewayLocator(t *testing.T) {
state := state.NewStateStore(nil)
serverRoles := []string{"leader", "follower"}
now := time.Now().UTC()
dc1 := &structs.FederationState{
Datacenter: "dc1",
MeshGateways: []structs.CheckServiceNode{
@ -44,67 +47,105 @@ func TestGatewayLocator(t *testing.T) {
}
t.Run("primary - no data", func(t *testing.T) {
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
logger,
tsd,
"dc1",
"dc1",
)
for _, role := range serverRoles {
t.Run(role, func(t *testing.T) {
isLeader := role == "leader"
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(1), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true))
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: isLeader}
if !isLeader {
tsd.lastContact = now
}
g := NewGatewayLocator(
logger,
tsd,
"dc1",
"dc1",
)
g.SetUseReplicationSignal(isLeader)
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(1), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true))
})
}
})
t.Run("secondary - no data", func(t *testing.T) {
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
for _, role := range serverRoles {
t.Run(role, func(t *testing.T) {
isLeader := role == "leader"
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(1), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true))
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: isLeader}
if !isLeader {
tsd.lastContact = now
}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
g.SetUseReplicationSignal(isLeader)
idx, err := g.runOnce(0)
require.NoError(t, err)
if isLeader {
assert.False(t, g.DialPrimaryThroughLocalGateway())
} else {
assert.True(t, g.DialPrimaryThroughLocalGateway())
}
assert.Equal(t, uint64(1), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true))
})
}
})
t.Run("secondary - just fallback", func(t *testing.T) {
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777",
"8.8.8.8:8888",
})
for _, role := range serverRoles {
t.Run(role, func(t *testing.T) {
isLeader := role == "leader"
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(1), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string{
"7.7.7.7:7777",
"8.8.8.8:8888",
}, g.listGateways(true))
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: isLeader}
if !isLeader {
tsd.lastContact = now
}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
g.SetUseReplicationSignal(isLeader)
g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777",
"8.8.8.8:8888",
})
idx, err := g.runOnce(0)
require.NoError(t, err)
if isLeader {
assert.False(t, g.DialPrimaryThroughLocalGateway())
} else {
assert.True(t, g.DialPrimaryThroughLocalGateway())
}
assert.Equal(t, uint64(1), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string{
"7.7.7.7:7777",
"8.8.8.8:8888",
}, g.listGateways(true))
})
}
})
// Insert data for the dcs
@ -112,56 +153,88 @@ func TestGatewayLocator(t *testing.T) {
require.NoError(t, state.FederationStateSet(2, dc2))
t.Run("primary - with data", func(t *testing.T) {
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
logger,
tsd,
"dc1",
"dc1",
)
for _, role := range serverRoles {
t.Run(role, func(t *testing.T) {
isLeader := role == "leader"
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(2), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"1.2.3.4:5555",
"4.3.2.1:9999",
}, g.listGateways(false))
assert.Equal(t, []string{
"1.2.3.4:5555",
"4.3.2.1:9999",
}, g.listGateways(true))
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: isLeader}
if !isLeader {
tsd.lastContact = now
}
g := NewGatewayLocator(
logger,
tsd,
"dc1",
"dc1",
)
g.SetUseReplicationSignal(isLeader)
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(2), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"1.2.3.4:5555",
"4.3.2.1:9999",
}, g.listGateways(false))
assert.Equal(t, []string{
"1.2.3.4:5555",
"4.3.2.1:9999",
}, g.listGateways(true))
})
}
})
t.Run("secondary - with data", func(t *testing.T) {
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
for _, role := range serverRoles {
t.Run(role, func(t *testing.T) {
isLeader := role == "leader"
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(2), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(false))
assert.Equal(t, []string{
"1.2.3.4:5555",
"4.3.2.1:9999",
}, g.listGateways(true))
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: isLeader}
if !isLeader {
tsd.lastContact = now
}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
g.SetUseReplicationSignal(isLeader)
idx, err := g.runOnce(0)
require.NoError(t, err)
if isLeader {
assert.False(t, g.DialPrimaryThroughLocalGateway())
} else {
assert.True(t, g.DialPrimaryThroughLocalGateway())
}
assert.Equal(t, uint64(2), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(false))
if isLeader {
assert.Equal(t, []string{
"1.2.3.4:5555",
"4.3.2.1:9999",
}, g.listGateways(true))
} else {
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(true))
}
})
}
})
t.Run("secondary - with data and fallback - no repl", func(t *testing.T) {
// Only run for the leader.
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
@ -170,6 +243,7 @@ func TestGatewayLocator(t *testing.T) {
"dc2",
"dc1",
)
g.SetUseReplicationSignal(true)
g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777",
@ -194,6 +268,7 @@ func TestGatewayLocator(t *testing.T) {
})
t.Run("secondary - with data and fallback - repl ok", func(t *testing.T) {
// Only run for the leader.
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
@ -202,13 +277,14 @@ func TestGatewayLocator(t *testing.T) {
"dc2",
"dc1",
)
g.SetUseReplicationSignal(true)
g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777",
"8.8.8.8:8888",
})
g.SetLastFederationStateReplicationError(nil)
g.SetLastFederationStateReplicationError(nil, true)
idx, err := g.runOnce(0)
require.NoError(t, err)
@ -226,6 +302,7 @@ func TestGatewayLocator(t *testing.T) {
})
t.Run("secondary - with data and fallback - repl ok then failed 2 times", func(t *testing.T) {
// Only run for the leader.
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
@ -234,15 +311,16 @@ func TestGatewayLocator(t *testing.T) {
"dc2",
"dc1",
)
g.SetUseReplicationSignal(true)
g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777",
"8.8.8.8:8888",
})
g.SetLastFederationStateReplicationError(nil)
g.SetLastFederationStateReplicationError(errors.New("fake"))
g.SetLastFederationStateReplicationError(errors.New("fake"))
g.SetLastFederationStateReplicationError(nil, true)
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
idx, err := g.runOnce(0)
require.NoError(t, err)
@ -260,6 +338,7 @@ func TestGatewayLocator(t *testing.T) {
})
t.Run("secondary - with data and fallback - repl ok then failed 3 times", func(t *testing.T) {
// Only run for the leader.
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
@ -268,16 +347,17 @@ func TestGatewayLocator(t *testing.T) {
"dc2",
"dc1",
)
g.SetUseReplicationSignal(true)
g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777",
"8.8.8.8:8888",
})
g.SetLastFederationStateReplicationError(nil)
g.SetLastFederationStateReplicationError(errors.New("fake"))
g.SetLastFederationStateReplicationError(errors.New("fake"))
g.SetLastFederationStateReplicationError(errors.New("fake"))
g.SetLastFederationStateReplicationError(nil, true)
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
idx, err := g.runOnce(0)
require.NoError(t, err)
@ -297,6 +377,7 @@ func TestGatewayLocator(t *testing.T) {
})
t.Run("secondary - with data and fallback - repl ok then failed 3 times then ok again", func(t *testing.T) {
// Only run for the leader.
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
@ -305,17 +386,18 @@ func TestGatewayLocator(t *testing.T) {
"dc2",
"dc1",
)
g.SetUseReplicationSignal(true)
g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777",
"8.8.8.8:8888",
})
g.SetLastFederationStateReplicationError(nil)
g.SetLastFederationStateReplicationError(errors.New("fake"))
g.SetLastFederationStateReplicationError(errors.New("fake"))
g.SetLastFederationStateReplicationError(errors.New("fake"))
g.SetLastFederationStateReplicationError(nil)
g.SetLastFederationStateReplicationError(nil, true)
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
g.SetLastFederationStateReplicationError(nil, true)
idx, err := g.runOnce(0)
require.NoError(t, err)

View File

@ -978,12 +978,22 @@ func (s *Server) startFederationStateReplication() {
return
}
if s.gatewayLocator != nil {
s.gatewayLocator.SetUseReplicationSignal(true)
s.gatewayLocator.SetLastFederationStateReplicationError(nil, false)
}
s.leaderRoutineManager.Start(federationStateReplicationRoutineName, s.federationStateReplicator.Run)
}
func (s *Server) stopFederationStateReplication() {
// will be a no-op when not started
s.leaderRoutineManager.Stop(federationStateReplicationRoutineName)
if s.gatewayLocator != nil {
s.gatewayLocator.SetUseReplicationSignal(false)
s.gatewayLocator.SetLastFederationStateReplicationError(nil, false)
}
}
// getOrCreateAutopilotConfig is used to get the autopilot config, initializing it if necessary