Upgrade test: peering control plane traffic through mesh gateway (#16091)
This commit is contained in:
parent
c5f771b87c
commit
66067d8b7a
|
@ -2,12 +2,16 @@ package assert
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-cleanhttp"
|
||||
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
libservice "github.com/hashicorp/consul/test/integration/consul-container/libs/service"
|
||||
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
@ -24,7 +28,7 @@ func GetEnvoyListenerTCPFilters(t *testing.T, adminPort int) {
|
|||
}
|
||||
|
||||
retry.RunWith(failer(), t, func(r *retry.R) {
|
||||
dump, err = libservice.GetEnvoyConfigDump(adminPort, "")
|
||||
dump, err = GetEnvoyOutput(adminPort, "config_dump", map[string]string{})
|
||||
if err != nil {
|
||||
r.Fatal("could not fetch envoy configuration")
|
||||
}
|
||||
|
@ -61,18 +65,86 @@ func AssertUpstreamEndpointStatus(t *testing.T, adminPort int, clusterName, heal
|
|||
}
|
||||
|
||||
retry.RunWith(failer(), t, func(r *retry.R) {
|
||||
clusters, err = libservice.GetEnvoyClusters(adminPort)
|
||||
clusters, err = GetEnvoyOutput(adminPort, "clusters", map[string]string{"format": "json"})
|
||||
if err != nil {
|
||||
r.Fatal("could not fetch envoy configuration")
|
||||
r.Fatal("could not fetch envoy clusters")
|
||||
}
|
||||
|
||||
filter := fmt.Sprintf(`.cluster_statuses[] | select(.name|contains("%s")) | [.host_statuses[].health_status.eds_health_status] | [select(.[] == "%s")] | length`, clusterName, healthStatus)
|
||||
results, err := utils.JQFilter(clusters, filter)
|
||||
require.NoError(r, err, "could not parse envoy configuration")
|
||||
require.NoErrorf(r, err, "could not found cluster name %s", clusterName)
|
||||
require.Equal(r, count, len(results))
|
||||
})
|
||||
}
|
||||
|
||||
// AssertEnvoyMetricAtMost assert the filered metric by prefix and metric is >= count
|
||||
func AssertEnvoyMetricAtMost(t *testing.T, adminPort int, prefix, metric string, count int) {
|
||||
var (
|
||||
stats string
|
||||
err error
|
||||
)
|
||||
failer := func() *retry.Timer {
|
||||
return &retry.Timer{Timeout: 30 * time.Second, Wait: 500 * time.Millisecond}
|
||||
}
|
||||
|
||||
retry.RunWith(failer(), t, func(r *retry.R) {
|
||||
stats, err = GetEnvoyOutput(adminPort, "stats", nil)
|
||||
if err != nil {
|
||||
r.Fatal("could not fetch envoy stats")
|
||||
}
|
||||
lines := strings.Split(stats, "\n")
|
||||
err = processMetrics(lines, prefix, metric, func(v int) bool {
|
||||
return v <= count
|
||||
})
|
||||
require.NoError(r, err)
|
||||
})
|
||||
}
|
||||
|
||||
func processMetrics(metrics []string, prefix, metric string, condition func(v int) bool) error {
|
||||
for _, line := range metrics {
|
||||
if strings.Contains(line, prefix) &&
|
||||
strings.Contains(line, metric) {
|
||||
|
||||
metric := strings.Split(line, ":")
|
||||
fmt.Println(metric[1])
|
||||
|
||||
v, err := strconv.Atoi(strings.TrimSpace(metric[1]))
|
||||
if err != nil {
|
||||
return fmt.Errorf("err parse metric value %s: %s", metric[1], err)
|
||||
}
|
||||
|
||||
if condition(v) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("error processing stats")
|
||||
}
|
||||
|
||||
// AssertEnvoyMetricAtLeast assert the filered metric by prefix and metric is <= count
|
||||
func AssertEnvoyMetricAtLeast(t *testing.T, adminPort int, prefix, metric string, count int) {
|
||||
var (
|
||||
stats string
|
||||
err error
|
||||
)
|
||||
failer := func() *retry.Timer {
|
||||
return &retry.Timer{Timeout: 30 * time.Second, Wait: 500 * time.Millisecond}
|
||||
}
|
||||
|
||||
retry.RunWith(failer(), t, func(r *retry.R) {
|
||||
stats, err = GetEnvoyOutput(adminPort, "stats", nil)
|
||||
if err != nil {
|
||||
r.Fatal("could not fetch envoy stats")
|
||||
}
|
||||
lines := strings.Split(stats, "\n")
|
||||
|
||||
err = processMetrics(lines, prefix, metric, func(v int) bool {
|
||||
return v >= count
|
||||
})
|
||||
require.NoError(r, err)
|
||||
})
|
||||
}
|
||||
|
||||
// GetEnvoyHTTPrbacFilters validates that proxy was configured with an http connection manager
|
||||
// this assertion is currently unused current tests use http protocol
|
||||
func GetEnvoyHTTPrbacFilters(t *testing.T, port int) {
|
||||
|
@ -85,7 +157,7 @@ func GetEnvoyHTTPrbacFilters(t *testing.T, port int) {
|
|||
}
|
||||
|
||||
retry.RunWith(failer(), t, func(r *retry.R) {
|
||||
dump, err = libservice.GetEnvoyConfigDump(port, "")
|
||||
dump, err = GetEnvoyOutput(port, "config_dump", map[string]string{})
|
||||
if err != nil {
|
||||
r.Fatal("could not fetch envoy configuration")
|
||||
}
|
||||
|
@ -117,3 +189,33 @@ func sanitizeResult(s string) []string {
|
|||
result := strings.Split(strings.ReplaceAll(s, `,`, " "), " ")
|
||||
return append(result[:0], result[1:]...)
|
||||
}
|
||||
|
||||
func GetEnvoyOutput(port int, path string, query map[string]string) (string, error) {
|
||||
client := cleanhttp.DefaultClient()
|
||||
var u url.URL
|
||||
u.Host = fmt.Sprintf("localhost:%d", port)
|
||||
u.Scheme = "http"
|
||||
if path != "" {
|
||||
u.Path = path
|
||||
}
|
||||
q := u.Query()
|
||||
for k, v := range query {
|
||||
q.Add(k, v)
|
||||
}
|
||||
if query != nil {
|
||||
u.RawQuery = q.Encode()
|
||||
}
|
||||
|
||||
res, err := client.Get(u.String())
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
body, err := io.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return string(body), nil
|
||||
}
|
||||
|
|
|
@ -39,6 +39,10 @@ func (g ConnectContainer) GetAddr() (string, int) {
|
|||
return g.ip, g.appPort
|
||||
}
|
||||
|
||||
func (g ConnectContainer) Restart() error {
|
||||
return fmt.Errorf("Restart Unimplemented by ConnectContainer")
|
||||
}
|
||||
|
||||
func (g ConnectContainer) GetLogs() (string, error) {
|
||||
rc, err := g.container.Logs(context.Background())
|
||||
if err != nil {
|
||||
|
|
|
@ -49,6 +49,10 @@ func (g exampleContainer) GetAddr() (string, int) {
|
|||
return g.ip, g.httpPort
|
||||
}
|
||||
|
||||
func (g exampleContainer) Restart() error {
|
||||
return fmt.Errorf("Restart Unimplemented by ConnectContainer")
|
||||
}
|
||||
|
||||
func (g exampleContainer) GetLogs() (string, error) {
|
||||
rc, err := g.container.Logs(context.Background())
|
||||
if err != nil {
|
||||
|
|
|
@ -79,6 +79,23 @@ func (g gatewayContainer) GetAdminAddr() (string, int) {
|
|||
return "localhost", g.adminPort
|
||||
}
|
||||
|
||||
func (g gatewayContainer) Restart() error {
|
||||
_, err := g.container.State(context.Background())
|
||||
if err != nil {
|
||||
return fmt.Errorf("error get gateway state %s", err)
|
||||
}
|
||||
|
||||
err = g.container.Stop(context.Background(), nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error stop gateway %s", err)
|
||||
}
|
||||
err = g.container.Start(context.Background())
|
||||
if err != nil {
|
||||
return fmt.Errorf("error start gateway %s", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewGatewayService(ctx context.Context, name string, kind string, node libcluster.Agent) (Service, error) {
|
||||
nodeConfig := node.GetConfig()
|
||||
if nodeConfig.ScratchDir == "" {
|
||||
|
|
|
@ -3,9 +3,6 @@ package service
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/hashicorp/go-cleanhttp"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
|
||||
|
@ -124,39 +121,3 @@ func CreateAndRegisterStaticClientSidecar(
|
|||
|
||||
return clientConnectProxy, nil
|
||||
}
|
||||
|
||||
func GetEnvoyConfigDump(port int, filter string) (string, error) {
|
||||
client := cleanhttp.DefaultClient()
|
||||
url := fmt.Sprintf("http://localhost:%d/config_dump?%s", port, filter)
|
||||
|
||||
res, err := client.Get(url)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
body, err := io.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return string(body), nil
|
||||
}
|
||||
|
||||
func GetEnvoyClusters(port int) (string, error) {
|
||||
client := cleanhttp.DefaultClient()
|
||||
url := fmt.Sprintf("http://localhost:%d/clusters?format=json", port)
|
||||
|
||||
res, err := client.Get(url)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
body, err := io.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return string(body), nil
|
||||
}
|
||||
|
|
|
@ -14,4 +14,5 @@ type Service interface {
|
|||
GetServiceName() string
|
||||
Start() (err error)
|
||||
Terminate() error
|
||||
Restart() error
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ type BuiltCluster struct {
|
|||
Context *libcluster.BuildContext
|
||||
Service libservice.Service
|
||||
Container *libservice.ConnectContainer
|
||||
Gateway libservice.Service
|
||||
}
|
||||
|
||||
// BasicPeeringTwoClustersSetup sets up a scenario for testing peering, which consists of
|
||||
|
@ -50,6 +51,7 @@ func BasicPeeringTwoClustersSetup(
|
|||
|
||||
// Register an static-server service in acceptingCluster and export to dialing cluster
|
||||
var serverSidecarService libservice.Service
|
||||
var acceptingClusterGateway libservice.Service
|
||||
{
|
||||
clientNode := acceptingCluster.Clients()[0]
|
||||
|
||||
|
@ -62,10 +64,15 @@ func BasicPeeringTwoClustersSetup(
|
|||
libassert.CatalogServiceExists(t, acceptingClient, "static-server-sidecar-proxy")
|
||||
|
||||
require.NoError(t, serverSidecarService.Export("default", AcceptingPeerName, acceptingClient))
|
||||
|
||||
// Create the mesh gateway for dataplane traffic
|
||||
acceptingClusterGateway, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", clientNode)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Register an static-client service in dialing cluster and set upstream to static-server service
|
||||
var clientSidecarService *libservice.ConnectContainer
|
||||
var dialingClusterGateway libservice.Service
|
||||
{
|
||||
clientNode := dialingCluster.Clients()[0]
|
||||
|
||||
|
@ -75,6 +82,10 @@ func BasicPeeringTwoClustersSetup(
|
|||
require.NoError(t, err)
|
||||
|
||||
libassert.CatalogServiceExists(t, dialingClient, "static-client-sidecar-proxy")
|
||||
|
||||
// Create the mesh gateway for dataplane traffic
|
||||
dialingClusterGateway, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", clientNode)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
_, adminPort := clientSidecarService.GetAdminAddr()
|
||||
|
@ -87,12 +98,14 @@ func BasicPeeringTwoClustersSetup(
|
|||
Context: acceptingCtx,
|
||||
Service: serverSidecarService,
|
||||
Container: nil,
|
||||
Gateway: acceptingClusterGateway,
|
||||
},
|
||||
&BuiltCluster{
|
||||
Cluster: dialingCluster,
|
||||
Context: dialingCtx,
|
||||
Service: nil,
|
||||
Container: clientSidecarService,
|
||||
Gateway: dialingClusterGateway,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -204,9 +217,5 @@ func NewPeeringCluster(
|
|||
require.NoError(t, err)
|
||||
require.True(t, ok)
|
||||
|
||||
// Create the mesh gateway for dataplane traffic
|
||||
_, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", clientNode)
|
||||
require.NoError(t, err)
|
||||
|
||||
return cluster, ctx, client
|
||||
}
|
||||
|
|
|
@ -195,7 +195,7 @@ func verifySidecarHasTwoRootCAs(t *testing.T, sidecar libservice.Service) {
|
|||
}
|
||||
|
||||
retry.RunWith(failer(), t, func(r *retry.R) {
|
||||
dump, err := libservice.GetEnvoyConfigDump(adminPort, "include_eds")
|
||||
dump, err := libassert.GetEnvoyOutput(adminPort, "config_dump", map[string]string{})
|
||||
require.NoError(r, err, "could not fetch envoy configuration")
|
||||
|
||||
// Make sure there are two certs in the sidecar
|
||||
|
|
|
@ -0,0 +1,121 @@
|
|||
package upgrade
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert"
|
||||
libservice "github.com/hashicorp/consul/test/integration/consul-container/libs/service"
|
||||
libtopology "github.com/hashicorp/consul/test/integration/consul-container/libs/topology"
|
||||
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
|
||||
)
|
||||
|
||||
// TestPeering_Upgrade_ControlPlane_MGW verifies the peering control plane traffic go through the mesh gateway
|
||||
// PeerThroughMeshGateways can be inheritted by the upgraded cluster.
|
||||
//
|
||||
// 1. Create the basic peering topology of one dialing cluster and one accepting cluster
|
||||
// 2. Set PeerThroughMeshGateways = true
|
||||
// 3. Upgrade both clusters
|
||||
// 4. Verify the peering is re-established through mesh gateway
|
||||
func TestPeering_Upgrade_ControlPlane_MGW(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
type testcase struct {
|
||||
oldversion string
|
||||
targetVersion string
|
||||
}
|
||||
tcs := []testcase{
|
||||
// {
|
||||
// TODO: API changed from 1.13 to 1.14 in , PeerName to Peer
|
||||
// exportConfigEntry
|
||||
// oldversion: "1.13",
|
||||
// targetVersion: *utils.TargetVersion,
|
||||
// },
|
||||
{
|
||||
oldversion: "1.14",
|
||||
targetVersion: utils.TargetVersion,
|
||||
},
|
||||
}
|
||||
|
||||
run := func(t *testing.T, tc testcase) {
|
||||
accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, tc.oldversion)
|
||||
var (
|
||||
acceptingCluster = accepting.Cluster
|
||||
dialingCluster = dialing.Cluster
|
||||
)
|
||||
|
||||
dialingClient, err := dialingCluster.GetClient(nil, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
acceptingClient, err := acceptingCluster.GetClient(nil, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Enable peering control plane traffic through mesh gateway
|
||||
req := &api.MeshConfigEntry{
|
||||
Peering: &api.PeeringMeshConfig{
|
||||
PeerThroughMeshGateways: true,
|
||||
},
|
||||
}
|
||||
ok, _, err := dialingClient.ConfigEntries().Set(req, &api.WriteOptions{})
|
||||
require.True(t, ok)
|
||||
require.NoError(t, err)
|
||||
ok, _, err = acceptingClient.ConfigEntries().Set(req, &api.WriteOptions{})
|
||||
require.True(t, ok)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify control plane endpoints and traffic in gateway
|
||||
_, gatewayAdminPort := dialing.Gateway.GetAdminAddr()
|
||||
libassert.AssertUpstreamEndpointStatus(t, gatewayAdminPort, "server.dc1.peering", "HEALTHY", 1)
|
||||
libassert.AssertUpstreamEndpointStatus(t, gatewayAdminPort, "server.dc2.peering", "HEALTHY", 1)
|
||||
libassert.AssertEnvoyMetricAtLeast(t, gatewayAdminPort,
|
||||
"cluster.static-server.default.default.accepting-to-dialer.external",
|
||||
"upstream_cx_total", 1)
|
||||
|
||||
// Upgrade the accepting cluster and assert peering is still ACTIVE
|
||||
require.NoError(t, acceptingCluster.StandardUpgrade(t, context.Background(), tc.targetVersion))
|
||||
libassert.PeeringStatus(t, acceptingClient, libtopology.AcceptingPeerName, api.PeeringStateActive)
|
||||
libassert.PeeringStatus(t, dialingClient, libtopology.DialingPeerName, api.PeeringStateActive)
|
||||
|
||||
require.NoError(t, dialingCluster.StandardUpgrade(t, context.Background(), tc.targetVersion))
|
||||
libassert.PeeringStatus(t, acceptingClient, libtopology.AcceptingPeerName, api.PeeringStateActive)
|
||||
libassert.PeeringStatus(t, dialingClient, libtopology.DialingPeerName, api.PeeringStateActive)
|
||||
|
||||
// POST upgrade validation
|
||||
// - Restarted mesh gateway can receive consul generated configuration
|
||||
// - control plane traffic is through mesh gateway
|
||||
// - Register a new static-client service in dialing cluster and
|
||||
// - set upstream to static-server service in peered cluster
|
||||
|
||||
// Restart the gateway
|
||||
err = dialing.Gateway.Restart()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Restarted gateway should not have any measurement on data plane traffic
|
||||
libassert.AssertEnvoyMetricAtMost(t, gatewayAdminPort,
|
||||
"cluster.static-server.default.default.accepting-to-dialer.external",
|
||||
"upstream_cx_total", 0)
|
||||
// control plane metrics should be observed
|
||||
libassert.AssertEnvoyMetricAtLeast(t, gatewayAdminPort,
|
||||
"cluster.server.dc1.peering",
|
||||
"upstream_cx_total", 1)
|
||||
|
||||
clientSidecarService, err := libservice.CreateAndRegisterStaticClientSidecar(dialingCluster.Servers()[0], libtopology.DialingPeerName, true)
|
||||
require.NoError(t, err)
|
||||
_, port := clientSidecarService.GetAddr()
|
||||
_, adminPort := clientSidecarService.GetAdminAddr()
|
||||
libassert.AssertUpstreamEndpointStatus(t, adminPort, fmt.Sprintf("static-server.default.%s.external", libtopology.DialingPeerName), "HEALTHY", 1)
|
||||
libassert.HTTPServiceEchoes(t, "localhost", port, "")
|
||||
}
|
||||
|
||||
for _, tc := range tcs {
|
||||
t.Run(fmt.Sprintf("upgrade from %s to %s", tc.oldversion, tc.targetVersion),
|
||||
func(t *testing.T) {
|
||||
run(t, tc)
|
||||
})
|
||||
// time.Sleep(3 * time.Second)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue