server: initialize mgw-wanfed to use local gateways more on startup (#9528)

Fixes #9342
This commit is contained in:
R.B. Boyer 2021-01-25 17:30:38 -06:00 committed by GitHub
parent f3e2e3545c
commit 5777fa1f59
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 219 additions and 132 deletions

3
.changelog/9528.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
server: When wan federating via mesh gateways after initial federation default to using the local mesh gateways unless the heuristic indicates a bypass is required.
```

View File

@ -4696,6 +4696,7 @@ func TestAgent_JoinWAN_viaMeshGateway(t *testing.T) {
}
# wanfed
primary_gateways = ["` + gwAddr + `"]
retry_interval_wan = "250ms"
connect {
enabled = true
enable_mesh_gateway_wan_federation = true
@ -4721,6 +4722,7 @@ func TestAgent_JoinWAN_viaMeshGateway(t *testing.T) {
}
# wanfed
primary_gateways = ["` + gwAddr + `"]
retry_interval_wan = "250ms"
connect {
enabled = true
enable_mesh_gateway_wan_federation = true

View File

@ -46,6 +46,7 @@ type GatewayLocator struct {
gatewaysLock sync.Mutex
primaryGateways []string // WAN addrs
localGateways []string // LAN addrs
populatedGateways bool
// primaryMeshGatewayDiscoveredAddresses is the current fallback addresses
// for the mesh gateways in the primary datacenter.
@ -205,6 +206,10 @@ func (g *GatewayLocator) listGateways(primary bool) []string {
g.gatewaysLock.Lock()
defer g.gatewaysLock.Unlock()
if !g.populatedGateways {
return nil // don't even do anything yet
}
var addrs []string
if primary {
if g.datacenter == g.primaryDatacenter {
@ -267,6 +272,7 @@ type serverDelegate interface {
blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta structs.QueryMetaCompat, fn queryFn) error
IsLeader() bool
LeaderLastContact() time.Time
setDatacenterSupportsFederationStates()
}
func NewGatewayLocator(
@ -283,6 +289,8 @@ func NewGatewayLocator(
primaryGatewaysReadyCh: make(chan struct{}),
}
g.logPrimaryDialingMessage(g.DialPrimaryThroughLocalGateway())
// initialize
g.SetLastFederationStateReplicationError(nil, false)
return g
}
@ -292,7 +300,10 @@ func (g *GatewayLocator) Run(ctx context.Context) {
var lastFetchIndex uint64
retryLoopBackoff(ctx, func() error {
idx, err := g.runOnce(lastFetchIndex)
if err != nil {
if errors.Is(err, errGatewayLocalStateNotInitialized) {
// don't do exponential backoff for something that's not broken
return nil
} else if err != nil {
return err
}
@ -300,9 +311,7 @@ func (g *GatewayLocator) Run(ctx context.Context) {
return nil
}, func(err error) {
if !errors.Is(err, errGatewayLocalStateNotInitialized) {
g.logger.Error("error tracking primary and local mesh gateways", "error", err)
}
})
}
@ -367,6 +376,10 @@ func (g *GatewayLocator) checkLocalStateIsReady() error {
}
func (g *GatewayLocator) updateFromState(results []*structs.FederationState) {
if len(results) > 0 {
g.srv.setDatacenterSupportsFederationStates()
}
var (
local structs.CheckServiceNodes
primary structs.CheckServiceNodes
@ -388,6 +401,8 @@ func (g *GatewayLocator) updateFromState(results []*structs.FederationState) {
g.gatewaysLock.Lock()
defer g.gatewaysLock.Unlock()
g.populatedGateways = true
changed := false
primaryReady := false
if !stringslice.Equal(g.primaryGateways, primaryAddrs) {

View File

@ -2,6 +2,7 @@ package consul
import (
"errors"
"sync/atomic"
"testing"
"time"
@ -64,13 +65,25 @@ func TestGatewayLocator(t *testing.T) {
)
g.SetUseReplicationSignal(isLeader)
t.Run("before first run", func(t *testing.T) {
assert.False(t, g.DialPrimaryThroughLocalGateway()) // not important
assert.Len(t, tsd.Calls, 0)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true))
assert.False(t, tsd.datacenterSupportsFederationStates())
})
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(1), idx)
t.Run("after first run", func(t *testing.T) {
assert.False(t, g.DialPrimaryThroughLocalGateway()) // not important
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true))
assert.False(t, tsd.datacenterSupportsFederationStates()) // no results, so we don't flip the bit yet
})
})
}
})
@ -93,17 +106,25 @@ func TestGatewayLocator(t *testing.T) {
)
g.SetUseReplicationSignal(isLeader)
t.Run("before first run", func(t *testing.T) {
assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure!
assert.Len(t, tsd.Calls, 0)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true))
assert.False(t, tsd.datacenterSupportsFederationStates())
})
idx, err := g.runOnce(0)
require.NoError(t, err)
if isLeader {
assert.False(t, g.DialPrimaryThroughLocalGateway())
} else {
assert.True(t, g.DialPrimaryThroughLocalGateway())
}
assert.Equal(t, uint64(1), idx)
t.Run("after first run", func(t *testing.T) {
assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure!
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true))
assert.False(t, tsd.datacenterSupportsFederationStates()) // no results, so we don't flip the bit yet
})
})
}
})
@ -130,20 +151,28 @@ func TestGatewayLocator(t *testing.T) {
"8.8.8.8:8888",
})
t.Run("before first run", func(t *testing.T) {
assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure!
assert.Len(t, tsd.Calls, 0)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true)) // don't return any data until we initialize
assert.False(t, tsd.datacenterSupportsFederationStates())
})
idx, err := g.runOnce(0)
require.NoError(t, err)
if isLeader {
assert.False(t, g.DialPrimaryThroughLocalGateway())
} else {
assert.True(t, g.DialPrimaryThroughLocalGateway())
}
assert.Equal(t, uint64(1), idx)
t.Run("after first run", func(t *testing.T) {
assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure!
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string{
"7.7.7.7:7777",
"8.8.8.8:8888",
}, g.listGateways(true))
assert.False(t, tsd.datacenterSupportsFederationStates()) // no results, so we don't flip the bit yet
})
})
}
})
@ -170,10 +199,20 @@ func TestGatewayLocator(t *testing.T) {
)
g.SetUseReplicationSignal(isLeader)
t.Run("before first run", func(t *testing.T) {
assert.False(t, g.DialPrimaryThroughLocalGateway()) // not important
assert.Len(t, tsd.Calls, 0)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true)) // don't return any data until we initialize
assert.False(t, tsd.datacenterSupportsFederationStates())
})
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(2), idx)
t.Run("after first run", func(t *testing.T) {
assert.False(t, g.DialPrimaryThroughLocalGateway()) // not important
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"1.2.3.4:5555",
@ -183,6 +222,8 @@ func TestGatewayLocator(t *testing.T) {
"1.2.3.4:5555",
"4.3.2.1:9999",
}, g.listGateways(true))
assert.True(t, tsd.datacenterSupportsFederationStates()) // have results, so we flip the bit
})
})
}
})
@ -205,66 +246,34 @@ func TestGatewayLocator(t *testing.T) {
)
g.SetUseReplicationSignal(isLeader)
idx, err := g.runOnce(0)
require.NoError(t, err)
if isLeader {
assert.False(t, g.DialPrimaryThroughLocalGateway())
} else {
assert.True(t, g.DialPrimaryThroughLocalGateway())
}
assert.Equal(t, uint64(2), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(false))
if isLeader {
assert.Equal(t, []string{
"1.2.3.4:5555",
"4.3.2.1:9999",
}, g.listGateways(true))
} else {
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(true))
}
})
}
})
t.Run("secondary - with data and fallback - no repl", func(t *testing.T) {
// Only run for the leader.
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
g.SetUseReplicationSignal(true)
g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777",
"8.8.8.8:8888",
t.Run("before first run", func(t *testing.T) {
assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure!
assert.Len(t, tsd.Calls, 0)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true)) // don't return any data until we initialize
assert.False(t, tsd.datacenterSupportsFederationStates())
})
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(2), idx)
t.Run("after first run", func(t *testing.T) {
assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure!
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(false))
assert.Equal(t, []string{
"1.2.3.4:5555",
"4.3.2.1:9999",
"7.7.7.7:7777",
"8.8.8.8:8888",
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(true))
assert.True(t, tsd.datacenterSupportsFederationStates()) // have results, so we flip the bit
})
})
}
})
t.Run("secondary - with data and fallback - repl ok", func(t *testing.T) {
@ -286,10 +295,20 @@ func TestGatewayLocator(t *testing.T) {
g.SetLastFederationStateReplicationError(nil, true)
t.Run("before first run", func(t *testing.T) {
assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure!
assert.Len(t, tsd.Calls, 0)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true)) // don't return any data until we initialize
assert.False(t, tsd.datacenterSupportsFederationStates())
})
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.True(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(2), idx)
t.Run("after first run", func(t *testing.T) {
assert.True(t, g.DialPrimaryThroughLocalGateway())
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"5.6.7.8:5555",
@ -299,6 +318,8 @@ func TestGatewayLocator(t *testing.T) {
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(true))
assert.True(t, tsd.datacenterSupportsFederationStates()) // have results, so we flip the bit
})
})
t.Run("secondary - with data and fallback - repl ok then failed 2 times", func(t *testing.T) {
@ -322,10 +343,20 @@ func TestGatewayLocator(t *testing.T) {
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
t.Run("before first run", func(t *testing.T) {
assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure!
assert.Len(t, tsd.Calls, 0)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true)) // don't return any data until we initialize
assert.False(t, tsd.datacenterSupportsFederationStates())
})
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.True(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(2), idx)
t.Run("after first run", func(t *testing.T) {
assert.True(t, g.DialPrimaryThroughLocalGateway())
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"5.6.7.8:5555",
@ -335,6 +366,8 @@ func TestGatewayLocator(t *testing.T) {
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(true))
assert.True(t, tsd.datacenterSupportsFederationStates()) // have results, so we flip the bit
})
})
t.Run("secondary - with data and fallback - repl ok then failed 3 times", func(t *testing.T) {
@ -359,10 +392,20 @@ func TestGatewayLocator(t *testing.T) {
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
t.Run("before first run", func(t *testing.T) {
assert.False(t, g.DialPrimaryThroughLocalGateway()) // too many errors
assert.Len(t, tsd.Calls, 0)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true)) // don't return any data until we initialize
assert.False(t, tsd.datacenterSupportsFederationStates())
})
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(2), idx)
t.Run("after first run", func(t *testing.T) {
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"5.6.7.8:5555",
@ -374,6 +417,8 @@ func TestGatewayLocator(t *testing.T) {
"7.7.7.7:7777",
"8.8.8.8:8888",
}, g.listGateways(true))
assert.True(t, tsd.datacenterSupportsFederationStates()) // have results, so we flip the bit
})
})
t.Run("secondary - with data and fallback - repl ok then failed 3 times then ok again", func(t *testing.T) {
@ -399,10 +444,20 @@ func TestGatewayLocator(t *testing.T) {
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
g.SetLastFederationStateReplicationError(nil, true)
t.Run("before first run", func(t *testing.T) {
assert.True(t, g.DialPrimaryThroughLocalGateway()) // all better again
assert.Len(t, tsd.Calls, 0)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true)) // don't return any data until we initialize
assert.False(t, tsd.datacenterSupportsFederationStates())
})
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.True(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(2), idx)
t.Run("after first run", func(t *testing.T) {
assert.True(t, g.DialPrimaryThroughLocalGateway()) // all better again
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"5.6.7.8:5555",
@ -412,10 +467,14 @@ func TestGatewayLocator(t *testing.T) {
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(true))
assert.True(t, tsd.datacenterSupportsFederationStates()) // have results, so we flip the bit
})
})
}
type testServerDelegate struct {
dcSupportsFederationStates int32 // atomically accessed, at start to prevent alignment issues
State *state.Store
Calls []uint64
@ -424,6 +483,14 @@ type testServerDelegate struct {
lastContact time.Time
}
func (d *testServerDelegate) setDatacenterSupportsFederationStates() {
atomic.StoreInt32(&d.dcSupportsFederationStates, 1)
}
func (d *testServerDelegate) datacenterSupportsFederationStates() bool {
return atomic.LoadInt32(&d.dcSupportsFederationStates) != 0
}
// This is just enough to exercise the logic.
func (d *testServerDelegate) blockingQuery(
queryOpts structs.QueryOptionsCompat,