upgrade test: fix flaky peering through mesh gateway (#16271)
This commit is contained in:
parent
f482e41f0d
commit
4b5c8d7edc
|
@ -127,7 +127,7 @@ func AssertEnvoyMetricAtLeast(t *testing.T, adminPort int, prefix, metric string
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
failer := func() *retry.Timer {
|
failer := func() *retry.Timer {
|
||||||
return &retry.Timer{Timeout: 30 * time.Second, Wait: 500 * time.Millisecond}
|
return &retry.Timer{Timeout: 60 * time.Second, Wait: 500 * time.Millisecond}
|
||||||
}
|
}
|
||||||
|
|
||||||
retry.RunWith(failer(), t, func(r *retry.R) {
|
retry.RunWith(failer(), t, func(r *retry.R) {
|
||||||
|
|
|
@ -109,6 +109,13 @@ func (g ConnectContainer) Start() error {
|
||||||
return g.container.Start(g.ctx)
|
return g.container.Start(g.ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g ConnectContainer) Stop() error {
|
||||||
|
if g.container == nil {
|
||||||
|
return fmt.Errorf("container has not been initialized")
|
||||||
|
}
|
||||||
|
return g.container.Stop(context.Background(), nil)
|
||||||
|
}
|
||||||
|
|
||||||
func (g ConnectContainer) Terminate() error {
|
func (g ConnectContainer) Terminate() error {
|
||||||
return cluster.TerminateContainer(g.ctx, g.container, true)
|
return cluster.TerminateContainer(g.ctx, g.container, true)
|
||||||
}
|
}
|
||||||
|
|
|
@ -101,6 +101,13 @@ func (g exampleContainer) Start() error {
|
||||||
return g.container.Start(context.Background())
|
return g.container.Start(context.Background())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g exampleContainer) Stop() error {
|
||||||
|
if g.container == nil {
|
||||||
|
return fmt.Errorf("container has not been initialized")
|
||||||
|
}
|
||||||
|
return g.container.Stop(context.Background(), nil)
|
||||||
|
}
|
||||||
|
|
||||||
func (c exampleContainer) Terminate() error {
|
func (c exampleContainer) Terminate() error {
|
||||||
return cluster.TerminateContainer(c.ctx, c.container, true)
|
return cluster.TerminateContainer(c.ctx, c.container, true)
|
||||||
}
|
}
|
||||||
|
|
|
@ -86,6 +86,13 @@ func (g gatewayContainer) Start() error {
|
||||||
return g.container.Start(context.Background())
|
return g.container.Start(context.Background())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g gatewayContainer) Stop() error {
|
||||||
|
if g.container == nil {
|
||||||
|
return fmt.Errorf("container has not been initialized")
|
||||||
|
}
|
||||||
|
return g.container.Stop(context.Background(), nil)
|
||||||
|
}
|
||||||
|
|
||||||
func (c gatewayContainer) Terminate() error {
|
func (c gatewayContainer) Terminate() error {
|
||||||
return cluster.TerminateContainer(c.ctx, c.container, true)
|
return cluster.TerminateContainer(c.ctx, c.container, true)
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ type Service interface {
|
||||||
GetName() string
|
GetName() string
|
||||||
GetServiceName() string
|
GetServiceName() string
|
||||||
Start() (err error)
|
Start() (err error)
|
||||||
|
Stop() (err error)
|
||||||
Terminate() error
|
Terminate() error
|
||||||
Restart() error
|
Restart() error
|
||||||
GetStatus() (string, error)
|
GetStatus() (string, error)
|
||||||
|
|
|
@ -41,6 +41,7 @@ type BuiltCluster struct {
|
||||||
func BasicPeeringTwoClustersSetup(
|
func BasicPeeringTwoClustersSetup(
|
||||||
t *testing.T,
|
t *testing.T,
|
||||||
consulVersion string,
|
consulVersion string,
|
||||||
|
peeringThroughMeshgateway bool,
|
||||||
) (*BuiltCluster, *BuiltCluster) {
|
) (*BuiltCluster, *BuiltCluster) {
|
||||||
// acceptingCluster, acceptingCtx, acceptingClient := NewPeeringCluster(t, "dc1", 3, consulVersion, true)
|
// acceptingCluster, acceptingCtx, acceptingClient := NewPeeringCluster(t, "dc1", 3, consulVersion, true)
|
||||||
acceptingCluster, acceptingCtx, acceptingClient := NewPeeringCluster(t, 3, &libcluster.BuildOptions{
|
acceptingCluster, acceptingCtx, acceptingClient := NewPeeringCluster(t, 3, &libcluster.BuildOptions{
|
||||||
|
@ -53,6 +54,38 @@ func BasicPeeringTwoClustersSetup(
|
||||||
ConsulVersion: consulVersion,
|
ConsulVersion: consulVersion,
|
||||||
InjectAutoEncryption: true,
|
InjectAutoEncryption: true,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// Create the mesh gateway for dataplane traffic and peering control plane traffic (if enabled)
|
||||||
|
acceptingClusterGateway, err := libservice.NewGatewayService(context.Background(), "mesh", "mesh", acceptingCluster.Clients()[0])
|
||||||
|
require.NoError(t, err)
|
||||||
|
dialingClusterGateway, err := libservice.NewGatewayService(context.Background(), "mesh", "mesh", dialingCluster.Clients()[0])
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Enable peering control plane traffic through mesh gateway
|
||||||
|
if peeringThroughMeshgateway {
|
||||||
|
req := &api.MeshConfigEntry{
|
||||||
|
Peering: &api.PeeringMeshConfig{
|
||||||
|
PeerThroughMeshGateways: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
configCluster := func(cli *api.Client) error {
|
||||||
|
libassert.CatalogServiceExists(t, cli, "mesh")
|
||||||
|
ok, _, err := cli.ConfigEntries().Set(req, &api.WriteOptions{})
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("config entry is not set")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error writing config entry: %s", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
err = configCluster(dialingClient)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = configCluster(acceptingClient)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
require.NoError(t, dialingCluster.PeerWithCluster(acceptingClient, AcceptingPeerName, DialingPeerName))
|
require.NoError(t, dialingCluster.PeerWithCluster(acceptingClient, AcceptingPeerName, DialingPeerName))
|
||||||
|
|
||||||
libassert.PeeringStatus(t, acceptingClient, AcceptingPeerName, api.PeeringStateActive)
|
libassert.PeeringStatus(t, acceptingClient, AcceptingPeerName, api.PeeringStateActive)
|
||||||
|
@ -60,7 +93,6 @@ func BasicPeeringTwoClustersSetup(
|
||||||
|
|
||||||
// Register an static-server service in acceptingCluster and export to dialing cluster
|
// Register an static-server service in acceptingCluster and export to dialing cluster
|
||||||
var serverService, serverSidecarService libservice.Service
|
var serverService, serverSidecarService libservice.Service
|
||||||
var acceptingClusterGateway libservice.Service
|
|
||||||
{
|
{
|
||||||
clientNode := acceptingCluster.Clients()[0]
|
clientNode := acceptingCluster.Clients()[0]
|
||||||
|
|
||||||
|
@ -81,15 +113,10 @@ func BasicPeeringTwoClustersSetup(
|
||||||
libassert.CatalogServiceExists(t, acceptingClient, "static-server-sidecar-proxy")
|
libassert.CatalogServiceExists(t, acceptingClient, "static-server-sidecar-proxy")
|
||||||
|
|
||||||
require.NoError(t, serverService.Export("default", AcceptingPeerName, acceptingClient))
|
require.NoError(t, serverService.Export("default", AcceptingPeerName, acceptingClient))
|
||||||
|
|
||||||
// Create the mesh gateway for dataplane traffic
|
|
||||||
acceptingClusterGateway, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", clientNode)
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register an static-client service in dialing cluster and set upstream to static-server service
|
// Register an static-client service in dialing cluster and set upstream to static-server service
|
||||||
var clientSidecarService *libservice.ConnectContainer
|
var clientSidecarService *libservice.ConnectContainer
|
||||||
var dialingClusterGateway libservice.Service
|
|
||||||
{
|
{
|
||||||
clientNode := dialingCluster.Clients()[0]
|
clientNode := dialingCluster.Clients()[0]
|
||||||
|
|
||||||
|
@ -100,9 +127,6 @@ func BasicPeeringTwoClustersSetup(
|
||||||
|
|
||||||
libassert.CatalogServiceExists(t, dialingClient, "static-client-sidecar-proxy")
|
libassert.CatalogServiceExists(t, dialingClient, "static-client-sidecar-proxy")
|
||||||
|
|
||||||
// Create the mesh gateway for dataplane traffic
|
|
||||||
dialingClusterGateway, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", clientNode)
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_, adminPort := clientSidecarService.GetAdminAddr()
|
_, adminPort := clientSidecarService.GetAdminAddr()
|
||||||
|
|
|
@ -50,7 +50,7 @@ import (
|
||||||
func TestPeering_RotateServerAndCAThenFail_(t *testing.T) {
|
func TestPeering_RotateServerAndCAThenFail_(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, utils.TargetVersion)
|
accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, utils.TargetVersion, false)
|
||||||
var (
|
var (
|
||||||
acceptingCluster = accepting.Cluster
|
acceptingCluster = accepting.Cluster
|
||||||
dialingCluster = dialing.Cluster
|
dialingCluster = dialing.Cluster
|
||||||
|
|
|
@ -42,7 +42,7 @@ func TestPeering_Upgrade_ControlPlane_MGW(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
run := func(t *testing.T, tc testcase) {
|
run := func(t *testing.T, tc testcase) {
|
||||||
accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, tc.oldversion)
|
accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, tc.oldversion, true)
|
||||||
var (
|
var (
|
||||||
acceptingCluster = accepting.Cluster
|
acceptingCluster = accepting.Cluster
|
||||||
dialingCluster = dialing.Cluster
|
dialingCluster = dialing.Cluster
|
||||||
|
@ -54,19 +54,6 @@ func TestPeering_Upgrade_ControlPlane_MGW(t *testing.T) {
|
||||||
acceptingClient, err := acceptingCluster.GetClient(nil, false)
|
acceptingClient, err := acceptingCluster.GetClient(nil, false)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Enable peering control plane traffic through mesh gateway
|
|
||||||
req := &api.MeshConfigEntry{
|
|
||||||
Peering: &api.PeeringMeshConfig{
|
|
||||||
PeerThroughMeshGateways: true,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
ok, _, err := dialingClient.ConfigEntries().Set(req, &api.WriteOptions{})
|
|
||||||
require.True(t, ok)
|
|
||||||
require.NoError(t, err)
|
|
||||||
ok, _, err = acceptingClient.ConfigEntries().Set(req, &api.WriteOptions{})
|
|
||||||
require.True(t, ok)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Verify control plane endpoints and traffic in gateway
|
// Verify control plane endpoints and traffic in gateway
|
||||||
_, gatewayAdminPort := dialing.Gateway.GetAdminAddr()
|
_, gatewayAdminPort := dialing.Gateway.GetAdminAddr()
|
||||||
libassert.AssertUpstreamEndpointStatus(t, gatewayAdminPort, "server.dc1.peering", "HEALTHY", 1)
|
libassert.AssertUpstreamEndpointStatus(t, gatewayAdminPort, "server.dc1.peering", "HEALTHY", 1)
|
||||||
|
@ -74,6 +61,9 @@ func TestPeering_Upgrade_ControlPlane_MGW(t *testing.T) {
|
||||||
libassert.AssertEnvoyMetricAtLeast(t, gatewayAdminPort,
|
libassert.AssertEnvoyMetricAtLeast(t, gatewayAdminPort,
|
||||||
"cluster.static-server.default.default.accepting-to-dialer.external",
|
"cluster.static-server.default.default.accepting-to-dialer.external",
|
||||||
"upstream_cx_total", 1)
|
"upstream_cx_total", 1)
|
||||||
|
libassert.AssertEnvoyMetricAtLeast(t, gatewayAdminPort,
|
||||||
|
"cluster.server.dc1.peering",
|
||||||
|
"upstream_cx_total", 1)
|
||||||
|
|
||||||
// Upgrade the accepting cluster and assert peering is still ACTIVE
|
// Upgrade the accepting cluster and assert peering is still ACTIVE
|
||||||
require.NoError(t, acceptingCluster.StandardUpgrade(t, context.Background(), tc.targetVersion))
|
require.NoError(t, acceptingCluster.StandardUpgrade(t, context.Background(), tc.targetVersion))
|
||||||
|
@ -90,11 +80,12 @@ func TestPeering_Upgrade_ControlPlane_MGW(t *testing.T) {
|
||||||
// - Register a new static-client service in dialing cluster and
|
// - Register a new static-client service in dialing cluster and
|
||||||
// - set upstream to static-server service in peered cluster
|
// - set upstream to static-server service in peered cluster
|
||||||
|
|
||||||
// Restart the gateway & proxy sidecar
|
// Stop the accepting gateway and restart dialing gateway
|
||||||
|
// to force peering control plane traffic through dialing mesh gateway
|
||||||
|
require.NoError(t, accepting.Gateway.Stop())
|
||||||
require.NoError(t, dialing.Gateway.Restart())
|
require.NoError(t, dialing.Gateway.Restart())
|
||||||
require.NoError(t, dialing.Container.Restart())
|
|
||||||
|
|
||||||
// Restarted gateway should not have any measurement on data plane traffic
|
// Restarted dialing gateway should not have any measurement on data plane traffic
|
||||||
libassert.AssertEnvoyMetricAtMost(t, gatewayAdminPort,
|
libassert.AssertEnvoyMetricAtMost(t, gatewayAdminPort,
|
||||||
"cluster.static-server.default.default.accepting-to-dialer.external",
|
"cluster.static-server.default.default.accepting-to-dialer.external",
|
||||||
"upstream_cx_total", 0)
|
"upstream_cx_total", 0)
|
||||||
|
@ -102,6 +93,7 @@ func TestPeering_Upgrade_ControlPlane_MGW(t *testing.T) {
|
||||||
libassert.AssertEnvoyMetricAtLeast(t, gatewayAdminPort,
|
libassert.AssertEnvoyMetricAtLeast(t, gatewayAdminPort,
|
||||||
"cluster.server.dc1.peering",
|
"cluster.server.dc1.peering",
|
||||||
"upstream_cx_total", 1)
|
"upstream_cx_total", 1)
|
||||||
|
require.NoError(t, accepting.Gateway.Start())
|
||||||
|
|
||||||
clientSidecarService, err := libservice.CreateAndRegisterStaticClientSidecar(dialingCluster.Servers()[0], libtopology.DialingPeerName, true)
|
clientSidecarService, err := libservice.CreateAndRegisterStaticClientSidecar(dialingCluster.Servers()[0], libtopology.DialingPeerName, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -99,7 +99,7 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
run := func(t *testing.T, tc testcase) {
|
run := func(t *testing.T, tc testcase) {
|
||||||
accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, tc.oldversion)
|
accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, tc.oldversion, false)
|
||||||
var (
|
var (
|
||||||
acceptingCluster = accepting.Cluster
|
acceptingCluster = accepting.Cluster
|
||||||
dialingCluster = dialing.Cluster
|
dialingCluster = dialing.Cluster
|
||||||
|
|
Loading…
Reference in New Issue