Backport of [NET-5163] Support locality testing in consul-container into release/1.16.x (#18503)
backport of commit 61b7c0d76ff33f883dfad875d1a39bfd115b332b Co-authored-by: Michael Zalimeni <michael.zalimeni@hashicorp.com>
This commit is contained in:
parent
86fb5dcf8c
commit
8936fe951a
|
@ -216,6 +216,12 @@ func DefaultFailer() *Timer {
|
|||
return &Timer{Timeout: 7 * time.Second, Wait: 25 * time.Millisecond}
|
||||
}
|
||||
|
||||
// ThirtySeconds repeats an operation for thirty seconds and waits 500ms in between.
|
||||
// Best for known slower operations like waiting on eventually consistent state.
|
||||
func ThirtySeconds() *Timer {
|
||||
return &Timer{Timeout: 30 * time.Second, Wait: 500 * time.Millisecond}
|
||||
}
|
||||
|
||||
// TwoSeconds repeats an operation for two seconds and waits 25ms in between.
|
||||
func TwoSeconds() *Timer {
|
||||
return &Timer{Timeout: 2 * time.Second, Wait: 25 * time.Millisecond}
|
||||
|
|
|
@ -216,7 +216,54 @@ func AssertFortioName(t *testing.T, urlbase string, name string, reqHost string)
|
|||
// client must be a custom http.Client
|
||||
func AssertFortioNameWithClient(t *testing.T, urlbase string, name string, reqHost string, client *http.Client) {
|
||||
t.Helper()
|
||||
var fortioNameRE = regexp.MustCompile(("\nFORTIO_NAME=(.+)\n"))
|
||||
foundName, err := FortioNameWithClient(t, urlbase, name, reqHost, client)
|
||||
require.NoError(t, err)
|
||||
t.Logf("got response from server name %q expect %q", foundName, name)
|
||||
assert.Equal(t, name, foundName)
|
||||
}
|
||||
|
||||
// WaitForFortioName is a convenience function for [WaitForFortioNameWithClient], using a [cleanhttp.DefaultClient()]
|
||||
func WaitForFortioName(t *testing.T, r retry.Retryer, urlbase string, name string, reqHost string) {
|
||||
t.Helper()
|
||||
client := cleanhttp.DefaultClient()
|
||||
WaitForFortioNameWithClient(t, r, urlbase, name, reqHost, client)
|
||||
}
|
||||
|
||||
// WaitForFortioNameWithClient enables waiting for FortioNameWithClient to return a specific
|
||||
// value. It uses the provided Retryer to wait for the expected name and only fails when
|
||||
// retries are exhausted.
|
||||
//
|
||||
// This is useful when performing failovers in tests and in other eventual consistency
|
||||
// scenarios that may take multiple seconds to resolve.
|
||||
//
|
||||
// Note that the underlying FortioNameWithClient has its own retry for successfully making
|
||||
// an HTTP request, which will be counted against the timeout of the provided Retryer if it
|
||||
// is a Timer, or incorporated into each attempt if it is a Counter.
|
||||
func WaitForFortioNameWithClient(t *testing.T, r retry.Retryer, urlbase string, name string, reqHost string, client *http.Client) {
|
||||
t.Helper()
|
||||
retry.RunWith(r, t, func(r *retry.R) {
|
||||
actual, err := FortioNameWithClient(r, urlbase, name, reqHost, client)
|
||||
require.NoError(r, err)
|
||||
if name != actual {
|
||||
r.Errorf("name %s did not match expected %s", name, actual)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// FortioNameWithClient returns the `FORTIO_NAME` returned by the fortio service at
|
||||
// urlbase/debug. This can be used to validate that the client is sending traffic to
|
||||
// the right envoy proxy.
|
||||
//
|
||||
// If reqHost is set, the Host field of the HTTP request will be set to its value.
|
||||
//
|
||||
// It retries with timeout defaultHTTPTimeout and wait defaultHTTPWait.
|
||||
//
|
||||
// client must be a custom http.Client
|
||||
func FortioNameWithClient(t retry.Failer, urlbase string, name string, reqHost string, client *http.Client) (string, error) {
|
||||
t.Helper()
|
||||
var fortioNameRE = regexp.MustCompile("\nFORTIO_NAME=(.+)\n")
|
||||
var body []byte
|
||||
|
||||
retry.RunWith(&retry.Timer{Timeout: defaultHTTPTimeout, Wait: defaultHTTPWait}, t, func(r *retry.R) {
|
||||
fullurl := fmt.Sprintf("%s/debug?env=dump", urlbase)
|
||||
req, err := http.NewRequest("GET", fullurl, nil)
|
||||
|
@ -236,16 +283,17 @@ func AssertFortioNameWithClient(t *testing.T, urlbase string, name string, reqHo
|
|||
r.Fatalf("could not make request to %q: status %d", fullurl, resp.StatusCode)
|
||||
}
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
body, err = io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
r.Fatalf("failed to read response body from %q: %v", fullurl, err)
|
||||
}
|
||||
|
||||
m := fortioNameRE.FindStringSubmatch(string(body))
|
||||
require.GreaterOrEqual(r, len(m), 2)
|
||||
t.Logf("got response from server name %q expect %q", m[1], name)
|
||||
assert.Equal(r, name, m[1])
|
||||
})
|
||||
|
||||
m := fortioNameRE.FindStringSubmatch(string(body))
|
||||
if len(m) < 2 {
|
||||
return "", fmt.Errorf("fortio name not found %s", name)
|
||||
}
|
||||
return m[1], nil
|
||||
}
|
||||
|
||||
// AssertContainerState validates service container status
|
||||
|
|
|
@ -163,8 +163,9 @@ func (g ConnectContainer) GetStatus() (string, error) {
|
|||
type SidecarConfig struct {
|
||||
Name string
|
||||
ServiceID string
|
||||
Namespace string
|
||||
EnableTProxy bool
|
||||
Namespace string
|
||||
Partition string
|
||||
}
|
||||
|
||||
// NewConnectService returns a container that runs envoy sidecar, launched by
|
||||
|
|
|
@ -46,6 +46,8 @@ type ServiceOpts struct {
|
|||
Checks Checks
|
||||
Connect SidecarService
|
||||
Namespace string
|
||||
Partition string
|
||||
Locality *api.Locality
|
||||
}
|
||||
|
||||
// createAndRegisterStaticServerAndSidecar register the services and launch static-server containers
|
||||
|
@ -71,6 +73,7 @@ func createAndRegisterStaticServerAndSidecar(node libcluster.Agent, httpPort int
|
|||
Name: fmt.Sprintf("%s-sidecar", svc.ID),
|
||||
ServiceID: svc.ID,
|
||||
Namespace: svc.Namespace,
|
||||
Partition: svc.Partition,
|
||||
EnableTProxy: svc.Connect != nil &&
|
||||
svc.Connect.SidecarService != nil &&
|
||||
svc.Connect.SidecarService.Proxy != nil &&
|
||||
|
@ -117,6 +120,8 @@ func CreateAndRegisterStaticServerAndSidecar(node libcluster.Agent, serviceOpts
|
|||
},
|
||||
},
|
||||
Namespace: serviceOpts.Namespace,
|
||||
Partition: serviceOpts.Partition,
|
||||
Locality: serviceOpts.Locality,
|
||||
Meta: serviceOpts.Meta,
|
||||
Check: &agentCheck,
|
||||
}
|
||||
|
@ -144,7 +149,10 @@ func CreateAndRegisterStaticServerAndSidecarWithChecks(node libcluster.Agent, se
|
|||
TTL: serviceOpts.Checks.TTL,
|
||||
},
|
||||
},
|
||||
Meta: serviceOpts.Meta,
|
||||
Meta: serviceOpts.Meta,
|
||||
Namespace: serviceOpts.Namespace,
|
||||
Partition: serviceOpts.Partition,
|
||||
Locality: serviceOpts.Locality,
|
||||
}
|
||||
|
||||
return createAndRegisterStaticServerAndSidecar(node, serviceOpts.HTTPPort, serviceOpts.GRPCPort, req)
|
||||
|
@ -155,6 +163,7 @@ func CreateAndRegisterStaticClientSidecar(
|
|||
peerName string,
|
||||
localMeshGateway bool,
|
||||
enableTProxy bool,
|
||||
serviceOpts *ServiceOpts,
|
||||
) (*ConnectContainer, error) {
|
||||
// Do some trickery to ensure that partial completion is correctly torn
|
||||
// down, but successful execution is not.
|
||||
|
@ -196,6 +205,27 @@ func CreateAndRegisterStaticClientSidecar(
|
|||
},
|
||||
}
|
||||
|
||||
// Set relevant fields for static client if opts are provided
|
||||
if serviceOpts != nil {
|
||||
if serviceOpts.Connect.Proxy.Mode != "" {
|
||||
return nil, fmt.Errorf("this helper does not support directly setting connect proxy mode; use enableTProxy and/or localMeshGateway instead")
|
||||
}
|
||||
// These options are defaulted above, so only set them as overrides
|
||||
if serviceOpts.Name != "" {
|
||||
req.Name = serviceOpts.Name
|
||||
}
|
||||
if serviceOpts.HTTPPort != 0 {
|
||||
req.Port = serviceOpts.HTTPPort
|
||||
}
|
||||
if serviceOpts.Connect.Port != 0 {
|
||||
req.Connect.SidecarService.Port = serviceOpts.Connect.Port
|
||||
}
|
||||
req.Meta = serviceOpts.Meta
|
||||
req.Namespace = serviceOpts.Namespace
|
||||
req.Partition = serviceOpts.Partition
|
||||
req.Locality = serviceOpts.Locality
|
||||
}
|
||||
|
||||
if err := node.GetClient().Agent().ServiceRegister(req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -151,7 +151,7 @@ func BasicPeeringTwoClustersSetup(
|
|||
|
||||
// Create a service and proxy instance
|
||||
var err error
|
||||
clientSidecarService, err = libservice.CreateAndRegisterStaticClientSidecar(clientNode, DialingPeerName, true, false)
|
||||
clientSidecarService, err = libservice.CreateAndRegisterStaticClientSidecar(clientNode, DialingPeerName, true, false, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
libassert.CatalogServiceExists(t, dialingClient, "static-client-sidecar-proxy", nil)
|
||||
|
|
|
@ -45,7 +45,7 @@ func CreateServices(t *testing.T, cluster *libcluster.Cluster) (libservice.Servi
|
|||
libassert.CatalogServiceExists(t, client, libservice.StaticServerServiceName, nil)
|
||||
|
||||
// Create a client proxy instance with the server as an upstream
|
||||
clientConnectProxy, err := libservice.CreateAndRegisterStaticClientSidecar(node, "", false, false)
|
||||
clientConnectProxy, err := libservice.CreateAndRegisterStaticClientSidecar(node, "", false, false, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
libassert.CatalogServiceExists(t, client, fmt.Sprintf("%s-sidecar-proxy", libservice.StaticClientServiceName), nil)
|
||||
|
|
|
@ -110,7 +110,7 @@ func createServices(t *testing.T, cluster *libcluster.Cluster) libservice.Servic
|
|||
libassert.CatalogServiceExists(t, client, libservice.StaticServerServiceName, nil)
|
||||
|
||||
// Create a client proxy instance with the server as an upstream
|
||||
clientConnectProxy, err := libservice.CreateAndRegisterStaticClientSidecar(node, "", false, false)
|
||||
clientConnectProxy, err := libservice.CreateAndRegisterStaticClientSidecar(node, "", false, false, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
libassert.CatalogServiceExists(t, client, "static-client-sidecar-proxy", nil)
|
||||
|
|
|
@ -130,7 +130,7 @@ func createServices(t *testing.T, cluster *libcluster.Cluster) libservice.Servic
|
|||
libassert.CatalogServiceExists(t, client, libservice.StaticServerServiceName, apiOpts)
|
||||
|
||||
// Create a client proxy instance with the server as an upstream
|
||||
clientConnectProxy, err := libservice.CreateAndRegisterStaticClientSidecar(node, "", false, false)
|
||||
clientConnectProxy, err := libservice.CreateAndRegisterStaticClientSidecar(node, "", false, false, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
libassert.CatalogServiceExists(t, client, "static-client-sidecar-proxy", apiOpts)
|
||||
|
|
|
@ -215,7 +215,7 @@ func createServices(t *testing.T, cluster *libcluster.Cluster) libservice.Servic
|
|||
client := node.GetClient()
|
||||
|
||||
// Create a client proxy instance with the server as an upstream
|
||||
clientConnectProxy, err := libservice.CreateAndRegisterStaticClientSidecar(node, "", false, true)
|
||||
clientConnectProxy, err := libservice.CreateAndRegisterStaticClientSidecar(node, "", false, true, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
libassert.CatalogServiceExists(t, client, "static-client-sidecar-proxy", nil)
|
||||
|
|
|
@ -92,7 +92,7 @@ func TestIngressGateway_GRPC_UpgradeToTarget_fromLatest(t *testing.T) {
|
|||
serverNodes := cluster.Servers()
|
||||
require.NoError(t, err)
|
||||
require.True(t, len(serverNodes) > 0)
|
||||
staticClientSvcSidecar, err := libservice.CreateAndRegisterStaticClientSidecar(serverNodes[0], "", true, false)
|
||||
staticClientSvcSidecar, err := libservice.CreateAndRegisterStaticClientSidecar(serverNodes[0], "", true, false, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
tests := func(t *testing.T) {
|
||||
|
|
|
@ -349,7 +349,7 @@ func setup(t *testing.T) (*libcluster.Cluster, libservice.Service, libservice.Se
|
|||
require.NoError(t, err)
|
||||
|
||||
// Create a client proxy instance with the server as an upstream
|
||||
staticClientProxy, err := libservice.CreateAndRegisterStaticClientSidecar(node, "", false, false)
|
||||
staticClientProxy, err := libservice.CreateAndRegisterStaticClientSidecar(node, "", false, false, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -87,7 +87,7 @@ func TestPeering_ControlPlaneMGW(t *testing.T) {
|
|||
"upstream_cx_total", 1)
|
||||
require.NoError(t, accepting.Gateway.Start())
|
||||
|
||||
clientSidecarService, err := libservice.CreateAndRegisterStaticClientSidecar(dialingCluster.Servers()[0], libtopology.DialingPeerName, true, false)
|
||||
clientSidecarService, err := libservice.CreateAndRegisterStaticClientSidecar(dialingCluster.Servers()[0], libtopology.DialingPeerName, true, false, nil)
|
||||
require.NoError(t, err)
|
||||
_, port := clientSidecarService.GetAddr()
|
||||
_, adminPort := clientSidecarService.GetAdminAddr()
|
||||
|
|
|
@ -7,10 +7,12 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert"
|
||||
libservice "github.com/hashicorp/consul/test/integration/consul-container/libs/service"
|
||||
libtopology "github.com/hashicorp/consul/test/integration/consul-container/libs/topology"
|
||||
|
@ -83,13 +85,13 @@ func TestPeering_HTTPRouter(t *testing.T) {
|
|||
}
|
||||
require.NoError(t, acceptingCluster.ConfigEntryWrite(routerConfigEntry))
|
||||
_, appPort := dialing.Container.GetAddr()
|
||||
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d/static-server-2", appPort), "static-server-2", "")
|
||||
libassert.WaitForFortioName(t, retry.ThirtySeconds(), fmt.Sprintf("http://localhost:%d/static-server-2", appPort), "static-server-2", "")
|
||||
|
||||
peeringUpgrade(t, accepting, dialing, utils.TargetVersion)
|
||||
|
||||
peeringPostUpgradeValidation(t, dialing)
|
||||
// TODO: restart static-server-2's sidecar
|
||||
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d/static-server-2", appPort), "static-server-2", "")
|
||||
libassert.WaitForFortioName(t, retry.ThirtySeconds(), fmt.Sprintf("http://localhost:%d/static-server-2", appPort), "static-server-2", "")
|
||||
}
|
||||
|
||||
// Verify resolver and failover can direct traffic to server in peered cluster
|
||||
|
@ -171,11 +173,12 @@ func TestPeering_HTTPResolverAndFailover(t *testing.T) {
|
|||
|
||||
assertionAdditionalResources := func() {
|
||||
// assert traffic can fail-over to static-server in peered cluster and restor to local static-server
|
||||
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", appPorts[0]), "static-server-dialing", "")
|
||||
// timeouts in this segment of the test reflect previously implicit retries in fortio name assertions for parity
|
||||
libassert.WaitForFortioName(t, &retry.Timer{Timeout: 2 * time.Minute, Wait: 500 * time.Millisecond}, fmt.Sprintf("http://localhost:%d", appPorts[0]), "static-server-dialing", "")
|
||||
require.NoError(t, serverConnectProxy.Stop())
|
||||
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", appPorts[0]), libservice.StaticServerServiceName, "")
|
||||
libassert.WaitForFortioName(t, &retry.Timer{Timeout: 2 * time.Minute, Wait: 500 * time.Millisecond}, fmt.Sprintf("http://localhost:%d", appPorts[0]), libservice.StaticServerServiceName, "")
|
||||
require.NoError(t, serverConnectProxy.Start())
|
||||
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", appPorts[0]), "static-server-dialing", "")
|
||||
libassert.WaitForFortioName(t, &retry.Timer{Timeout: 2 * time.Minute, Wait: 500 * time.Millisecond}, fmt.Sprintf("http://localhost:%d", appPorts[0]), "static-server-dialing", "")
|
||||
|
||||
// assert peer-static-server resolves to static-server in peered cluster
|
||||
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", appPorts[1]), libservice.StaticServerServiceName, "")
|
||||
|
@ -352,7 +355,7 @@ func peeringUpgrade(t *testing.T, accepting, dialing *libtopology.BuiltCluster,
|
|||
func peeringPostUpgradeValidation(t *testing.T, dialing *libtopology.BuiltCluster) {
|
||||
t.Helper()
|
||||
|
||||
clientSidecarService, err := libservice.CreateAndRegisterStaticClientSidecar(dialing.Cluster.Servers()[0], libtopology.DialingPeerName, true, false)
|
||||
clientSidecarService, err := libservice.CreateAndRegisterStaticClientSidecar(dialing.Cluster.Servers()[0], libtopology.DialingPeerName, true, false, nil)
|
||||
require.NoError(t, err)
|
||||
_, port := clientSidecarService.GetAddr()
|
||||
_, adminPort := clientSidecarService.GetAdminAddr()
|
||||
|
|
|
@ -57,7 +57,7 @@ func TestPeering_WanFedSecondaryDC(t *testing.T) {
|
|||
require.NoError(t, service.Export("default", "alpha-to-secondary", c3Agent.GetClient()))
|
||||
|
||||
// Create a testing sidecar to proxy requests through
|
||||
clientConnectProxy, err := libservice.CreateAndRegisterStaticClientSidecar(c2Agent, "secondary-to-alpha", false, false)
|
||||
clientConnectProxy, err := libservice.CreateAndRegisterStaticClientSidecar(c2Agent, "secondary-to-alpha", false, false, nil)
|
||||
require.NoError(t, err)
|
||||
libassert.CatalogServiceExists(t, c2Agent.GetClient(), "static-client-sidecar-proxy", nil)
|
||||
|
||||
|
|
Loading…
Reference in New Issue