open-consul/agent/consul/gateway_locator_test.go
R.B. Boyer 813d69622e
agent: handle re-bootstrapping in a secondary datacenter when WAN federation via mesh gateways is configured (#7931)
The main fix here is to always union the `primary-gateways` list with
the list of mesh gateways in the primary returned from the replicated
federation states list. This will allow any replicated (incorrect) state
to be supplemented with user-configured (correct) state in the config
file. Eventually the game of random selection whack-a-mole will pick a
winning entry and re-replicate the latest federation states from the
primary. If the user-configured state is actually the incorrect one,
then the same eventual correct selection process will work in that case,
too.

The secondary fix is actually to finish making wanfed-via-mgws actually
work as originally designed. Once a secondary datacenter has replicated
federation states for the primary AND managed to stand up its own local
mesh gateways then all of the RPCs from a secondary to the primary
SHOULD go through two sets of mesh gateways to arrive in the consul
servers in the primary (one hop for the secondary datacenter's mesh
gateway, and one hop through the primary datacenter's mesh gateway).
This was neglected in the initial implementation. While everything
works, ideally we should treat communications that go around the mesh
gateways as just provided for bootstrapping purposes.

Now we heuristically use the success/failure history of the federation
state replicator goroutine loop to determine if our current mesh gateway
route is working as intended. If it is, we try using the local gateways,
and if those don't work we fall back on trying the primary via the union
of the replicated state and the go-discover configuration flags.

This can be improved slightly in the future by possibly initializing the
gateway choice to local on startup if we already have replicated state.
This PR does not address that improvement.

Fixes #7339
2020-05-27 11:31:10 -05:00

376 lines
9.2 KiB
Go

package consul
import (
"errors"
"testing"
"time"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil"
memdb "github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestGatewayLocator(t *testing.T) {
state, err := state.NewStateStore(nil)
require.NoError(t, err)
dc1 := &structs.FederationState{
Datacenter: "dc1",
MeshGateways: []structs.CheckServiceNode{
newTestMeshGatewayNode(
"dc1", "gateway1", "1.2.3.4", 5555, map[string]string{structs.MetaWANFederationKey: "1"}, api.HealthPassing,
),
newTestMeshGatewayNode(
"dc1", "gateway2", "4.3.2.1", 9999, map[string]string{structs.MetaWANFederationKey: "1"}, api.HealthPassing,
),
},
UpdatedAt: time.Now().UTC(),
}
dc2 := &structs.FederationState{
Datacenter: "dc2",
MeshGateways: []structs.CheckServiceNode{
newTestMeshGatewayNode(
"dc2", "gateway1", "5.6.7.8", 5555, map[string]string{structs.MetaWANFederationKey: "1"}, api.HealthPassing,
),
newTestMeshGatewayNode(
"dc2", "gateway2", "8.7.6.5", 9999, map[string]string{structs.MetaWANFederationKey: "1"}, api.HealthPassing,
),
},
UpdatedAt: time.Now().UTC(),
}
t.Run("primary - no data", func(t *testing.T) {
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
logger,
tsd,
"dc1",
"dc1",
)
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(1), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true))
})
t.Run("secondary - no data", func(t *testing.T) {
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(1), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true))
})
t.Run("secondary - just fallback", func(t *testing.T) {
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777",
"8.8.8.8:8888",
})
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(1), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string{
"7.7.7.7:7777",
"8.8.8.8:8888",
}, g.listGateways(true))
})
// Insert data for the dcs
require.NoError(t, state.FederationStateSet(1, dc1))
require.NoError(t, state.FederationStateSet(2, dc2))
t.Run("primary - with data", func(t *testing.T) {
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
logger,
tsd,
"dc1",
"dc1",
)
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(2), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"1.2.3.4:5555",
"4.3.2.1:9999",
}, g.listGateways(false))
assert.Equal(t, []string{
"1.2.3.4:5555",
"4.3.2.1:9999",
}, g.listGateways(true))
})
t.Run("secondary - with data", func(t *testing.T) {
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(2), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(false))
assert.Equal(t, []string{
"1.2.3.4:5555",
"4.3.2.1:9999",
}, g.listGateways(true))
})
t.Run("secondary - with data and fallback - no repl", func(t *testing.T) {
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777",
"8.8.8.8:8888",
})
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(2), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(false))
assert.Equal(t, []string{
"1.2.3.4:5555",
"4.3.2.1:9999",
"7.7.7.7:7777",
"8.8.8.8:8888",
}, g.listGateways(true))
})
t.Run("secondary - with data and fallback - repl ok", func(t *testing.T) {
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777",
"8.8.8.8:8888",
})
g.SetLastFederationStateReplicationError(nil)
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.True(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(2), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(false))
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(true))
})
t.Run("secondary - with data and fallback - repl ok then failed 2 times", func(t *testing.T) {
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777",
"8.8.8.8:8888",
})
g.SetLastFederationStateReplicationError(nil)
g.SetLastFederationStateReplicationError(errors.New("fake"))
g.SetLastFederationStateReplicationError(errors.New("fake"))
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.True(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(2), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(false))
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(true))
})
t.Run("secondary - with data and fallback - repl ok then failed 3 times", func(t *testing.T) {
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777",
"8.8.8.8:8888",
})
g.SetLastFederationStateReplicationError(nil)
g.SetLastFederationStateReplicationError(errors.New("fake"))
g.SetLastFederationStateReplicationError(errors.New("fake"))
g.SetLastFederationStateReplicationError(errors.New("fake"))
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(2), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(false))
assert.Equal(t, []string{
"1.2.3.4:5555",
"4.3.2.1:9999",
"7.7.7.7:7777",
"8.8.8.8:8888",
}, g.listGateways(true))
})
t.Run("secondary - with data and fallback - repl ok then failed 3 times then ok again", func(t *testing.T) {
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777",
"8.8.8.8:8888",
})
g.SetLastFederationStateReplicationError(nil)
g.SetLastFederationStateReplicationError(errors.New("fake"))
g.SetLastFederationStateReplicationError(errors.New("fake"))
g.SetLastFederationStateReplicationError(errors.New("fake"))
g.SetLastFederationStateReplicationError(nil)
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.True(t, g.DialPrimaryThroughLocalGateway())
assert.Equal(t, uint64(2), idx)
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(false))
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(true))
})
}
type testServerDelegate struct {
State *state.Store
Calls []uint64
isLeader bool
lastContact time.Time
}
// This is just enough to exercise the logic.
func (d *testServerDelegate) blockingQuery(
queryOpts structs.QueryOptionsCompat,
queryMeta structs.QueryMetaCompat,
fn queryFn,
) error {
minQueryIndex := queryOpts.GetMinQueryIndex()
d.Calls = append(d.Calls, minQueryIndex)
var ws memdb.WatchSet
err := fn(ws, d.State)
if err == nil && queryMeta.GetIndex() < 1 {
queryMeta.SetIndex(1)
}
return err
}
func newFakeStateStore() (*state.Store, error) {
return state.NewStateStore(nil)
}
func (d *testServerDelegate) IsLeader() bool {
return d.isLeader
}
func (d *testServerDelegate) LeaderLastContact() time.Time {
return d.lastContact
}