open-consul/agent/consul/gateway_locator_test.go
Dan Stough e502be8c6e
[OSS] gRPC Blocking Queries (#17426)
* feat: initial grpc blocking queries

* changelog and docs update
2023-05-23 17:29:10 -04:00

527 lines
15 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package consul
import (
"errors"
"sync/atomic"
"testing"
"time"
memdb "github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/blockingquery"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil"
)
func TestGatewayLocator(t *testing.T) {
state := state.NewStateStore(nil)
serverRoles := []string{"leader", "follower"}
now := time.Now().UTC()
dc1 := &structs.FederationState{
Datacenter: "dc1",
MeshGateways: []structs.CheckServiceNode{
newTestMeshGatewayNode(
"dc1", "gateway1", "1.2.3.4", 5555, map[string]string{structs.MetaWANFederationKey: "1"}, api.HealthPassing,
),
newTestMeshGatewayNode(
"dc1", "gateway2", "4.3.2.1", 9999, map[string]string{structs.MetaWANFederationKey: "1"}, api.HealthPassing,
),
},
UpdatedAt: time.Now().UTC(),
}
dc2 := &structs.FederationState{
Datacenter: "dc2",
MeshGateways: []structs.CheckServiceNode{
newTestMeshGatewayNode(
"dc2", "gateway1", "5.6.7.8", 5555, map[string]string{structs.MetaWANFederationKey: "1"}, api.HealthPassing,
),
newTestMeshGatewayNode(
"dc2", "gateway2", "8.7.6.5", 9999, map[string]string{structs.MetaWANFederationKey: "1"}, api.HealthPassing,
),
},
UpdatedAt: time.Now().UTC(),
}
t.Run("primary - no data", func(t *testing.T) {
for _, role := range serverRoles {
t.Run(role, func(t *testing.T) {
isLeader := role == "leader"
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: isLeader}
if !isLeader {
tsd.lastContact = now
}
g := NewGatewayLocator(
logger,
tsd,
"dc1",
"dc1",
)
g.SetUseReplicationSignal(isLeader)
t.Run("before first run", func(t *testing.T) {
assert.False(t, g.DialPrimaryThroughLocalGateway()) // not important
assert.Len(t, tsd.Calls, 0)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true))
assert.False(t, tsd.datacenterSupportsFederationStates())
})
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.Equal(t, uint64(1), idx)
t.Run("after first run", func(t *testing.T) {
assert.False(t, g.DialPrimaryThroughLocalGateway()) // not important
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true))
assert.False(t, tsd.datacenterSupportsFederationStates()) // no results, so we don't flip the bit yet
})
})
}
})
t.Run("secondary - no data", func(t *testing.T) {
for _, role := range serverRoles {
t.Run(role, func(t *testing.T) {
isLeader := role == "leader"
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: isLeader}
if !isLeader {
tsd.lastContact = now
}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
g.SetUseReplicationSignal(isLeader)
t.Run("before first run", func(t *testing.T) {
assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure!
assert.Len(t, tsd.Calls, 0)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true))
assert.False(t, tsd.datacenterSupportsFederationStates())
})
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.Equal(t, uint64(1), idx)
t.Run("after first run", func(t *testing.T) {
assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure!
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true))
assert.False(t, tsd.datacenterSupportsFederationStates()) // no results, so we don't flip the bit yet
})
})
}
})
t.Run("secondary - just fallback", func(t *testing.T) {
for _, role := range serverRoles {
t.Run(role, func(t *testing.T) {
isLeader := role == "leader"
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: isLeader}
if !isLeader {
tsd.lastContact = now
}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
g.SetUseReplicationSignal(isLeader)
g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777",
"8.8.8.8:8888",
})
t.Run("before first run", func(t *testing.T) {
assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure!
assert.Len(t, tsd.Calls, 0)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true)) // don't return any data until we initialize
assert.False(t, tsd.datacenterSupportsFederationStates())
})
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.Equal(t, uint64(1), idx)
t.Run("after first run", func(t *testing.T) {
assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure!
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string{
"7.7.7.7:7777",
"8.8.8.8:8888",
}, g.listGateways(true))
assert.False(t, tsd.datacenterSupportsFederationStates()) // no results, so we don't flip the bit yet
})
})
}
})
// Insert data for the dcs
require.NoError(t, state.FederationStateSet(1, dc1))
require.NoError(t, state.FederationStateSet(2, dc2))
t.Run("primary - with data", func(t *testing.T) {
for _, role := range serverRoles {
t.Run(role, func(t *testing.T) {
isLeader := role == "leader"
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: isLeader}
if !isLeader {
tsd.lastContact = now
}
g := NewGatewayLocator(
logger,
tsd,
"dc1",
"dc1",
)
g.SetUseReplicationSignal(isLeader)
t.Run("before first run", func(t *testing.T) {
assert.False(t, g.DialPrimaryThroughLocalGateway()) // not important
assert.Len(t, tsd.Calls, 0)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true)) // don't return any data until we initialize
assert.False(t, tsd.datacenterSupportsFederationStates())
})
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.Equal(t, uint64(2), idx)
t.Run("after first run", func(t *testing.T) {
assert.False(t, g.DialPrimaryThroughLocalGateway()) // not important
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"1.2.3.4:5555",
"4.3.2.1:9999",
}, g.listGateways(false))
assert.Equal(t, []string{
"1.2.3.4:5555",
"4.3.2.1:9999",
}, g.listGateways(true))
assert.True(t, tsd.datacenterSupportsFederationStates()) // have results, so we flip the bit
})
})
}
})
t.Run("secondary - with data", func(t *testing.T) {
for _, role := range serverRoles {
t.Run(role, func(t *testing.T) {
isLeader := role == "leader"
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: isLeader}
if !isLeader {
tsd.lastContact = now
}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
g.SetUseReplicationSignal(isLeader)
t.Run("before first run", func(t *testing.T) {
assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure!
assert.Len(t, tsd.Calls, 0)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true)) // don't return any data until we initialize
assert.False(t, tsd.datacenterSupportsFederationStates())
})
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.Equal(t, uint64(2), idx)
t.Run("after first run", func(t *testing.T) {
assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure!
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(false))
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(true))
assert.True(t, tsd.datacenterSupportsFederationStates()) // have results, so we flip the bit
})
})
}
})
t.Run("secondary - with data and fallback - repl ok", func(t *testing.T) {
// Only run for the leader.
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
g.SetUseReplicationSignal(true)
g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777",
"8.8.8.8:8888",
})
g.SetLastFederationStateReplicationError(nil, true)
t.Run("before first run", func(t *testing.T) {
assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure!
assert.Len(t, tsd.Calls, 0)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true)) // don't return any data until we initialize
assert.False(t, tsd.datacenterSupportsFederationStates())
})
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.Equal(t, uint64(2), idx)
t.Run("after first run", func(t *testing.T) {
assert.True(t, g.DialPrimaryThroughLocalGateway())
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(false))
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(true))
assert.True(t, tsd.datacenterSupportsFederationStates()) // have results, so we flip the bit
})
})
t.Run("secondary - with data and fallback - repl ok then failed 2 times", func(t *testing.T) {
// Only run for the leader.
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
g.SetUseReplicationSignal(true)
g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777",
"8.8.8.8:8888",
})
g.SetLastFederationStateReplicationError(nil, true)
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
t.Run("before first run", func(t *testing.T) {
assert.True(t, g.DialPrimaryThroughLocalGateway()) // defaults to sure!
assert.Len(t, tsd.Calls, 0)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true)) // don't return any data until we initialize
assert.False(t, tsd.datacenterSupportsFederationStates())
})
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.Equal(t, uint64(2), idx)
t.Run("after first run", func(t *testing.T) {
assert.True(t, g.DialPrimaryThroughLocalGateway())
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(false))
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(true))
assert.True(t, tsd.datacenterSupportsFederationStates()) // have results, so we flip the bit
})
})
t.Run("secondary - with data and fallback - repl ok then failed 3 times", func(t *testing.T) {
// Only run for the leader.
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
g.SetUseReplicationSignal(true)
g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777",
"8.8.8.8:8888",
})
g.SetLastFederationStateReplicationError(nil, true)
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
t.Run("before first run", func(t *testing.T) {
assert.False(t, g.DialPrimaryThroughLocalGateway()) // too many errors
assert.Len(t, tsd.Calls, 0)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true)) // don't return any data until we initialize
assert.False(t, tsd.datacenterSupportsFederationStates())
})
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.Equal(t, uint64(2), idx)
t.Run("after first run", func(t *testing.T) {
assert.False(t, g.DialPrimaryThroughLocalGateway())
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(false))
assert.Equal(t, []string{
"1.2.3.4:5555",
"4.3.2.1:9999",
"7.7.7.7:7777",
"8.8.8.8:8888",
}, g.listGateways(true))
assert.True(t, tsd.datacenterSupportsFederationStates()) // have results, so we flip the bit
})
})
t.Run("secondary - with data and fallback - repl ok then failed 3 times then ok again", func(t *testing.T) {
// Only run for the leader.
logger := testutil.Logger(t)
tsd := &testServerDelegate{State: state, isLeader: true}
g := NewGatewayLocator(
logger,
tsd,
"dc2",
"dc1",
)
g.SetUseReplicationSignal(true)
g.RefreshPrimaryGatewayFallbackAddresses([]string{
"7.7.7.7:7777",
"8.8.8.8:8888",
})
g.SetLastFederationStateReplicationError(nil, true)
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
g.SetLastFederationStateReplicationError(errors.New("fake"), true)
g.SetLastFederationStateReplicationError(nil, true)
t.Run("before first run", func(t *testing.T) {
assert.True(t, g.DialPrimaryThroughLocalGateway()) // all better again
assert.Len(t, tsd.Calls, 0)
assert.Equal(t, []string(nil), g.listGateways(false))
assert.Equal(t, []string(nil), g.listGateways(true)) // don't return any data until we initialize
assert.False(t, tsd.datacenterSupportsFederationStates())
})
idx, err := g.runOnce(0)
require.NoError(t, err)
assert.Equal(t, uint64(2), idx)
t.Run("after first run", func(t *testing.T) {
assert.True(t, g.DialPrimaryThroughLocalGateway()) // all better again
assert.Len(t, tsd.Calls, 1)
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(false))
assert.Equal(t, []string{
"5.6.7.8:5555",
"8.7.6.5:9999",
}, g.listGateways(true))
assert.True(t, tsd.datacenterSupportsFederationStates()) // have results, so we flip the bit
})
})
}
var _ serverDelegate = (*testServerDelegate)(nil)
type testServerDelegate struct {
dcSupportsFederationStates int32 // atomically accessed, at start to prevent alignment issues
State *state.Store
Calls []uint64
isLeader bool
lastContact time.Time
}
func (d *testServerDelegate) setDatacenterSupportsFederationStates() {
atomic.StoreInt32(&d.dcSupportsFederationStates, 1)
}
func (d *testServerDelegate) datacenterSupportsFederationStates() bool {
return atomic.LoadInt32(&d.dcSupportsFederationStates) != 0
}
// This is just enough to exercise the logic.
func (d *testServerDelegate) blockingQuery(
queryOpts blockingquery.RequestOptions,
queryMeta blockingquery.ResponseMeta,
fn blockingquery.QueryFn,
) error {
minQueryIndex := queryOpts.GetMinQueryIndex()
d.Calls = append(d.Calls, minQueryIndex)
var ws memdb.WatchSet
err := fn(ws, d.State)
if err == nil && queryMeta.GetIndex() < 1 {
queryMeta.SetIndex(1)
}
return err
}
func (d *testServerDelegate) IsLeader() bool {
return d.isLeader
}
func (d *testServerDelegate) LeaderLastContact() time.Time {
return d.lastContact
}