port ENT upgrade tests flattening (#16824)

This commit is contained in:
Nick Irvine 2023-03-30 13:07:16 -07:00 committed by GitHub
parent 60bf97bf80
commit d22d6d569f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 834 additions and 881 deletions

View File

@ -76,9 +76,7 @@ Below are the supported CLI options
## Adding a new upgrade integration test ## Adding a new upgrade integration test
All upgrade tests are defined in [test/integration/consul-container/test/upgrade](/test/integration/consul-container/test/upgrade) subdirectory. The test framework uses All upgrade tests are defined in [test/integration/consul-container/test/upgrade](/test/integration/consul-container/test/upgrade) subdirectory.
[functional table-driven tests in Go](https://yourbasic.org/golang/table-driven-unit-test/) and
using function types to modify the basic configuration for each test case.
Following is a guide for adding a new upgrade test case. Following is a guide for adding a new upgrade test case.
1. Create consul cluster(s) with a specified version. Some utility functions are provided to make 1. Create consul cluster(s) with a specified version. Some utility functions are provided to make
@ -91,63 +89,65 @@ a single cluster or two peered clusters:
NumClients: 1, NumClients: 1,
BuildOpts: &libcluster.BuildOptions{ BuildOpts: &libcluster.BuildOptions{
Datacenter: "dc1", Datacenter: "dc1",
ConsulVersion: oldVersion, ConsulVersion: utils.LatestVersion,
}, },
}) })
```
Or
```go
// BasicPeeringTwoClustersSetup creates two peered clusters, named accpeting and dialing // BasicPeeringTwoClustersSetup creates two peered clusters, named accpeting and dialing
accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, oldVersion, false) accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, utils.LatestVersion, false)
``` ```
2. For tests with multiple test cases, it should always start by invoking Some workloads may require extra resources. They should be created in this setup section. For example,
```go [https://github.com/hashicorp/consul-enterprise/blob/19e515db29541132dbbda73efb7a458cd29d705f/test/integration/consul-container/test/upgrade/peering_http_test.go#L30-L41](this peering test creates a second static-server).
type testcase struct {
name string
create func()
extraAssertion func()
}
```
see example [here](./l7_traffic_management/resolver_default_subset_test.go). For upgrade tests with a single test case, they can be written like
```go
run := func(t *testing.T, oldVersion, targetVersion string) {
// insert test
}
t.Run(fmt.Sprintf("Upgrade from %s to %s", utils.LatestVersion, utils.TargetVersion),
func(t *testing.T) {
run(t, utils.LatestVersion, utils.TargetVersion)
})
```
see example [here](./acl_node_test.go)
Addtitional configurations or user-workload can be created with a customized [`create` function](./l7_traffic_management/resolver_default_subset_test.go). 2. Verify the workload
3. Call the upgrade method and assert the upgrading cluster succeeds.
We also restart the envoy proxy to make sure the upgraded agent can generate
the correct envoy configurations.
```go
err = cluster.StandardUpgrade(t, context.Background(), targetVersion)
require.NoError(t, err)
require.NoError(t, staticServerConnectProxy.Restart())
require.NoError(t, staticClientConnectProxy.Restart())
```
4. Verify the user workload after upgrade, e.g.,
```go ```go
libassert.HTTPServiceEchoes(t, "localhost", port, "") libassert.HTTPServiceEchoes(t, "localhost", port, "")
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", appPort), "static-server-2-v2", "") libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", appPort), "static-server-2-v2", "")
``` ```
3. Call the `StandardUpgrade` method and check that the upgrade succeeded.
We also restart the Envoy proxy to make sure the upgraded agent can generate
the correct Envoy configurations.
```go
require.NoError(t,
cluster.StandardUpgrade(t, context.Background(), utils.TargetVersion))
require.NoError(t, staticServerConnectProxy.Restart())
require.NoError(t, staticClientConnectProxy.Restart())
```
4. Verify the workload from step 3 again
```go
libassert.HTTPServiceEchoes(t, "localhost", port, "")
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", appPort), "static-server-2-v2", "")
```
For longer verifications, it can be nice to make a local function instead:
```go
tests := func() {
libassert.HTTPServiceEchoes(t, "localhost", port, "")
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", appPort), "static-server-2-v2", "")
}
tests()
// ... do upgrade
tests()
```
### Errors Test Cases ### Errors Test Cases
There are some caveats for special error handling of versions prior to `1.14`. There are some caveats for special error handling of versions prior to `1.14`.
Upgrade tests for features such peering, had API changes that returns an error if attempt to upgrade, and should be accounted for in upgrade tests. If running upgrade tests for any version before `1.14`, the following lines of code needs to be added to skip test or it will not pass. Upgrade tests for features such as peering had API changes that return an error if an upgrade is attempted, and should be accounted for in upgrade tests. If running upgrade tests for any version before `1.14`, the following lines of code need to be added to skip it or it will not pass.
```go ```go
fromVersion, err := version.NewVersion(utils.LatestVersion) fromVersion, err := version.NewVersion(utils.LatestVersion)
require.NoError(t, err) require.NoError(t, err)
if fromVersion.LessThan(utils.Version_1_14) { if fromVersion.LessThan(utils.Version_1_14) {
continue t.Skip("...")
} }
``` ```
See example [here](https://github.com/hashicorp/consul-enterprise/blob/005a0a92c5f39804cef4ad5c4cd6fd3334b95aa2/test/integration/consul-container/test/upgrade/peering_control_plane_mgw_test.go#L92-L96) See example [here](https://github.com/hashicorp/consul-enterprise/blob/005a0a92c5f39804cef4ad5c4cd6fd3334b95aa2/test/integration/consul-container/test/upgrade/peering_control_plane_mgw_test.go#L92-L96)

View File

@ -5,7 +5,6 @@ package upgrade
import ( import (
"context" "context"
"fmt"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -16,42 +15,35 @@ import (
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils" "github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
) )
// TestACL_Upgrade_Node_Token test verifies the following after upgrade // TestACL_NodeToken test verifies the following after upgrade
// 1. Upgraded agent can inherit the persistend token and join the cluster // 1. Upgraded agent can inherit the persistend token and join the cluster
// 2. Agent token prior to upgrade is still valid after upgrade // 2. Agent token prior to upgrade is still valid after upgrade
func TestACL_Upgrade_Node_Token(t *testing.T) { func TestACL_NodeToken(t *testing.T) {
t.Parallel() t.Parallel()
run := func(t *testing.T, oldVersion, targetVersion string) { // NOTE: Disable auto.encrypt due to its conflict with ACL token during bootstrap
// NOTE: Disable auto.encrypt due to its conflict with ACL token during bootstrap cluster, _, _ := libtopology.NewCluster(t, &libtopology.ClusterConfig{
cluster, _, _ := libtopology.NewCluster(t, &libtopology.ClusterConfig{ NumServers: 1,
NumServers: 1, NumClients: 1,
NumClients: 1, BuildOpts: &libcluster.BuildOptions{
BuildOpts: &libcluster.BuildOptions{ Datacenter: "dc1",
Datacenter: "dc1", ConsulVersion: utils.LatestVersion,
ConsulVersion: oldVersion, InjectAutoEncryption: false,
InjectAutoEncryption: false, ACLEnabled: true,
ACLEnabled: true, },
}, ApplyDefaultProxySettings: true,
ApplyDefaultProxySettings: true, })
})
agentToken, err := cluster.CreateAgentToken("dc1", agentToken, err := cluster.CreateAgentToken("dc1",
cluster.Agents[1].GetAgentName()) cluster.Agents[1].GetAgentName())
require.NoError(t, err) require.NoError(t, err)
err = cluster.StandardUpgrade(t, context.Background(), targetVersion) err = cluster.StandardUpgrade(t, context.Background(), utils.TargetVersion)
require.NoError(t, err) require.NoError(t, err)
// Post upgrade validation: agent token can be used to query the node // Post upgrade validation: agent token can be used to query the node
// Assert consul catalog nodes -token e3dc19d9-658d-a430-bcf4-7302efa397fc // Assert consul catalog nodes -token e3dc19d9-658d-a430-bcf4-7302efa397fc
client, err := cluster.Agents[1].NewClient(agentToken, false) client, err := cluster.Agents[1].NewClient(agentToken, false)
require.NoError(t, err) require.NoError(t, err)
libassert.CatalogNodeExists(t, client, cluster.Agents[1].GetAgentName()) libassert.CatalogNodeExists(t, client, cluster.Agents[1].GetAgentName())
}
t.Run(fmt.Sprintf("Upgrade from %s to %s", utils.LatestVersion, utils.TargetVersion),
func(t *testing.T) {
run(t, utils.LatestVersion, utils.TargetVersion)
})
} }

View File

@ -0,0 +1,85 @@
package upgrade
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil/retry"
libcluster "github.com/hashicorp/consul/test/integration/consul-container/libs/cluster"
libservice "github.com/hashicorp/consul/test/integration/consul-container/libs/service"
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
)
// Test upgrade a cluster of latest version to the target version
func TestBasic(t *testing.T) {
t.Parallel()
configCtx := libcluster.NewBuildContext(t, libcluster.BuildOptions{
ConsulImageName: utils.TargetImageName,
ConsulVersion: utils.LatestVersion,
})
const numServers = 1
serverConf := libcluster.NewConfigBuilder(configCtx).
Bootstrap(numServers).
ToAgentConfig(t)
t.Logf("Cluster config:\n%s", serverConf.JSON)
require.Equal(t, utils.LatestVersion, serverConf.Version) // TODO: remove
cluster, err := libcluster.NewN(t, *serverConf, numServers)
require.NoError(t, err)
client := cluster.APIClient(0)
libcluster.WaitForLeader(t, cluster, client)
libcluster.WaitForMembers(t, client, numServers)
// Create a service to be stored in the snapshot
const serviceName = "api"
index := libservice.ServiceCreate(t, client, serviceName)
require.NoError(t, client.Agent().ServiceRegister(
&api.AgentServiceRegistration{Name: serviceName, Port: 9998},
))
checkServiceHealth(t, client, "api", index)
// upgrade the cluster to the Target version
t.Logf("initiating standard upgrade to version=%q", utils.TargetVersion)
err = cluster.StandardUpgrade(t, context.Background(), utils.TargetVersion)
require.NoError(t, err)
libcluster.WaitForLeader(t, cluster, client)
libcluster.WaitForMembers(t, client, numServers)
// Verify service is restored from the snapshot
retry.RunWith(&retry.Timer{Timeout: 5 * time.Second, Wait: 500 * time.Microsecond}, t, func(r *retry.R) {
service, _, err := client.Catalog().Service(serviceName, "", &api.QueryOptions{})
require.NoError(r, err)
require.Len(r, service, 1)
require.Equal(r, serviceName, service[0].ServiceName)
})
}
func checkServiceHealth(t *testing.T, client *api.Client, serviceName string, index uint64) {
failer := func() *retry.Timer {
return &retry.Timer{Timeout: time.Second * 10, Wait: time.Second}
}
retry.RunWith(failer(), t, func(r *retry.R) {
ch, errCh := libservice.ServiceHealthBlockingQuery(client, serviceName, index)
select {
case err := <-errCh:
require.NoError(r, err)
case service := <-ch:
require.Equal(r, 1, len(service))
index = service[0].Service.ModifyIndex
require.Equal(r, serviceName, service[0].Service.Service)
require.Equal(r, 9998, service[0].Service.Port)
}
})
}

View File

@ -6,7 +6,6 @@ package upgrade
import ( import (
"context" "context"
"testing" "testing"
"time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -20,13 +19,15 @@ import (
) )
// Test health check GRPC call using Target Servers and Latest GA Clients // Test health check GRPC call using Target Servers and Latest GA Clients
// Note: this upgrade test doesn't use StandardUpgrade since it requires
// a cluster with clients and servers with mixed versions
func TestTargetServersWithLatestGAClients(t *testing.T) { func TestTargetServersWithLatestGAClients(t *testing.T) {
t.Parallel() t.Parallel()
fromVersion, err := version.NewVersion(utils.LatestVersion) fromVersion, err := version.NewVersion(utils.LatestVersion)
require.NoError(t, err) require.NoError(t, err)
if fromVersion.LessThan(utils.Version_1_14) { if fromVersion.LessThan(utils.Version_1_14) {
return t.Skip("TODO: why are we skipping this?")
} }
const ( const (
@ -59,22 +60,11 @@ func TestTargetServersWithLatestGAClients(t *testing.T) {
const serviceName = "api" const serviceName = "api"
index := libservice.ServiceCreate(t, client, serviceName) index := libservice.ServiceCreate(t, client, serviceName)
ch, errCh := libservice.ServiceHealthBlockingQuery(client, serviceName, index)
require.NoError(t, client.Agent().ServiceRegister( require.NoError(t, client.Agent().ServiceRegister(
&api.AgentServiceRegistration{Name: serviceName, Port: 9998}, &api.AgentServiceRegistration{Name: serviceName, Port: 9998},
)) ))
timer := time.NewTimer(3 * time.Second) checkServiceHealth(t, client, "api", index)
select {
case err := <-errCh:
require.NoError(t, err)
case service := <-ch:
require.Len(t, service, 1)
require.Equal(t, serviceName, service[0].Service.Service)
require.Equal(t, 9998, service[0].Service.Port)
case <-timer.C:
t.Fatalf("test timeout")
}
} }
// Test health check GRPC call using Mixed (majority latest) Servers and Latest GA Clients // Test health check GRPC call using Mixed (majority latest) Servers and Latest GA Clients
@ -156,21 +146,8 @@ func testMixedServersGAClient(t *testing.T, majorityIsTarget bool) {
const serviceName = "api" const serviceName = "api"
index := libservice.ServiceCreate(t, client, serviceName) index := libservice.ServiceCreate(t, client, serviceName)
ch, errCh := libservice.ServiceHealthBlockingQuery(client, serviceName, index)
require.NoError(t, client.Agent().ServiceRegister( require.NoError(t, client.Agent().ServiceRegister(
&api.AgentServiceRegistration{Name: serviceName, Port: 9998}, &api.AgentServiceRegistration{Name: serviceName, Port: 9998},
)) ))
checkServiceHealth(t, client, "api", index)
timer := time.NewTimer(3 * time.Second)
select {
case err := <-errCh:
require.NoError(t, err)
case service := <-ch:
require.Len(t, service, 1)
require.Equal(t, serviceName, service[0].Service.Service)
require.Equal(t, 9998, service[0].Service.Port)
case <-timer.C:
t.Fatalf("test timeout")
}
} }

View File

@ -18,403 +18,360 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
// TestTrafficManagement_ServiceResolver tests that upgraded cluster inherits and interpret // Test resolver directs traffic to default subset
// the resolver config entry correctly. // - Create 2 additional static-server instances: one in V1 subset and the other in V2 subset
// // - resolver directs traffic to the default subset, which is V2.
// The basic topology is a cluster with one static-client and one static-server. Addtional func TestTrafficManagement_ResolveDefaultSubset(t *testing.T) {
// services and resolver can be added to the create func() for each test cases.
func TestTrafficManagement_ServiceResolver(t *testing.T) {
t.Parallel() t.Parallel()
type testcase struct { cluster, staticServerProxy, staticClientProxy := setup(t)
name string
// create creates addtional resources in the cluster depending on cases, e.g., static-client, node := cluster.Agents[0]
// static server, and config-entries. It returns the proxy services of the client, an assertation client := node.GetClient()
// function to be called to verify the resources, and a restartFn to be called after upgrade.
create func(*libcluster.Cluster, libservice.Service) (libservice.Service, func(), func(), error) // Create static-server-v1 and static-server-v2
// extraAssertion adds additional assertion function to the common resources across cases. serviceOptsV1 := &libservice.ServiceOpts{
// common resources includes static-client in dialing cluster, and static-server in accepting cluster. Name: libservice.StaticServerServiceName,
// ID: "static-server-v1",
// extraAssertion needs to be run before and after upgrade Meta: map[string]string{"version": "v1"},
extraAssertion func(libservice.Service) HTTPPort: 8081,
GRPCPort: 8078,
} }
tcs := []testcase{ _, serverConnectProxyV1, err := libservice.CreateAndRegisterStaticServerAndSidecar(node, serviceOptsV1)
{ require.NoError(t, err)
// Test resolver directs traffic to default subset
// - Create 2 additional static-server instances: one in V1 subset and the other in V2 subset
// - resolver directs traffic to the default subset, which is V2.
name: "resolver default subset",
create: func(cluster *libcluster.Cluster, clientConnectProxy libservice.Service) (libservice.Service, func(), func(), error) {
node := cluster.Agents[0]
client := node.GetClient()
// Create static-server-v1 and static-server-v2 serviceOptsV2 := &libservice.ServiceOpts{
serviceOptsV1 := &libservice.ServiceOpts{ Name: libservice.StaticServerServiceName,
Name: libservice.StaticServerServiceName, ID: "static-server-v2",
ID: "static-server-v1", Meta: map[string]string{"version": "v2"},
Meta: map[string]string{"version": "v1"}, HTTPPort: 8082,
HTTPPort: 8081, GRPCPort: 8077,
GRPCPort: 8078, }
} _, serverConnectProxyV2, err := libservice.CreateAndRegisterStaticServerAndSidecar(node, serviceOptsV2)
_, serverConnectProxyV1, err := libservice.CreateAndRegisterStaticServerAndSidecar(node, serviceOptsV1) require.NoError(t, err)
require.NoError(t, err) libassert.CatalogServiceExists(t, client, libservice.StaticServerServiceName, nil)
serviceOptsV2 := &libservice.ServiceOpts{ // TODO: verify the number of instance of static-server is 3
Name: libservice.StaticServerServiceName, libassert.AssertServiceHasHealthyInstances(t, node, libservice.StaticServerServiceName, true, 3)
ID: "static-server-v2",
Meta: map[string]string{"version": "v2"},
HTTPPort: 8082,
GRPCPort: 8077,
}
_, serverConnectProxyV2, err := libservice.CreateAndRegisterStaticServerAndSidecar(node, serviceOptsV2)
require.NoError(t, err)
libassert.CatalogServiceExists(t, client, libservice.StaticServerServiceName, nil)
// TODO: verify the number of instance of static-server is 3 // Register service resolver
libassert.AssertServiceHasHealthyInstances(t, node, libservice.StaticServerServiceName, true, 3) serviceResolver := &api.ServiceResolverConfigEntry{
Kind: api.ServiceResolver,
// Register service resolver Name: libservice.StaticServerServiceName,
serviceResolver := &api.ServiceResolverConfigEntry{ DefaultSubset: "v2",
Kind: api.ServiceResolver, Subsets: map[string]api.ServiceResolverSubset{
Name: libservice.StaticServerServiceName, "v1": {
DefaultSubset: "v2", Filter: "Service.Meta.version == v1",
Subsets: map[string]api.ServiceResolverSubset{
"v1": {
Filter: "Service.Meta.version == v1",
},
"v2": {
Filter: "Service.Meta.version == v2",
},
},
}
err = cluster.ConfigEntryWrite(serviceResolver)
if err != nil {
return nil, nil, nil, fmt.Errorf("error writing config entry %s", err)
}
_, serverAdminPortV1 := serverConnectProxyV1.GetAdminAddr()
_, serverAdminPortV2 := serverConnectProxyV2.GetAdminAddr()
restartFn := func() {
require.NoError(t, serverConnectProxyV1.Restart())
require.NoError(t, serverConnectProxyV2.Restart())
}
_, adminPort := clientConnectProxy.GetAdminAddr()
assertionFn := func() {
libassert.AssertEnvoyRunning(t, serverAdminPortV1)
libassert.AssertEnvoyRunning(t, serverAdminPortV2)
libassert.AssertEnvoyPresentsCertURI(t, serverAdminPortV1, libservice.StaticServerServiceName)
libassert.AssertEnvoyPresentsCertURI(t, serverAdminPortV2, libservice.StaticServerServiceName)
libassert.AssertUpstreamEndpointStatus(t, adminPort, "v2.static-server.default", "HEALTHY", 1)
// assert static-server proxies should be healthy
libassert.AssertServiceHasHealthyInstances(t, node, libservice.StaticServerServiceName, true, 3)
}
return nil, assertionFn, restartFn, nil
}, },
extraAssertion: func(clientConnectProxy libservice.Service) { "v2": {
_, port := clientConnectProxy.GetAddr() Filter: "Service.Meta.version == v2",
_, adminPort := clientConnectProxy.GetAdminAddr()
libassert.AssertUpstreamEndpointStatus(t, adminPort, "v2.static-server.default", "HEALTHY", 1)
// static-client upstream should connect to static-server-v2 because the default subset value is to v2 set in the service resolver
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", port), "static-server-v2", "")
},
},
{
// Test resolver resolves service instance based on their check status
// - Create one addtional static-server with checks and V1 subset
// - resolver directs traffic to "test" service
name: "resolver default onlypassing",
create: func(cluster *libcluster.Cluster, clientConnectProxy libservice.Service) (libservice.Service, func(), func(), error) {
node := cluster.Agents[0]
serviceOptsV1 := &libservice.ServiceOpts{
Name: libservice.StaticServerServiceName,
ID: "static-server-v1",
Meta: map[string]string{"version": "v1"},
HTTPPort: 8081,
GRPCPort: 8078,
Checks: libservice.Checks{
Name: "main",
TTL: "30m",
},
Connect: libservice.SidecarService{
Port: 21011,
},
}
_, serverConnectProxyV1, err := libservice.CreateAndRegisterStaticServerAndSidecarWithChecks(node, serviceOptsV1)
require.NoError(t, err)
// Register service resolver
serviceResolver := &api.ServiceResolverConfigEntry{
Kind: api.ServiceResolver,
Name: libservice.StaticServerServiceName,
DefaultSubset: "test",
Subsets: map[string]api.ServiceResolverSubset{
"test": {
OnlyPassing: true,
},
},
ConnectTimeout: 120 * time.Second,
}
_, serverAdminPortV1 := serverConnectProxyV1.GetAdminAddr()
restartFn := func() {
require.NoError(t, serverConnectProxyV1.Restart())
}
_, port := clientConnectProxy.GetAddr()
_, adminPort := clientConnectProxy.GetAdminAddr()
assertionFn := func() {
// force static-server-v1 into a warning state
err = node.GetClient().Agent().UpdateTTL("service:static-server-v1", "", "warn")
require.NoError(t, err)
// ###########################
// ## with onlypassing=true
// assert only one static-server proxy is healthy
err = cluster.ConfigEntryWrite(serviceResolver)
require.NoError(t, err)
libassert.AssertServiceHasHealthyInstances(t, node, libservice.StaticServerServiceName, true, 1)
libassert.AssertEnvoyRunning(t, serverAdminPortV1)
libassert.AssertEnvoyPresentsCertURI(t, serverAdminPortV1, libservice.StaticServerServiceName)
// assert static-server proxies should be healthy
libassert.AssertServiceHasHealthyInstances(t, node, libservice.StaticServerServiceName, true, 1)
// static-client upstream should have 1 healthy endpoint for test.static-server
libassert.AssertUpstreamEndpointStatus(t, adminPort, "test.static-server.default", "HEALTHY", 1)
// static-client upstream should have 1 unhealthy endpoint for test.static-server
libassert.AssertUpstreamEndpointStatus(t, adminPort, "test.static-server.default", "UNHEALTHY", 1)
// static-client upstream should connect to static-server since it is passing
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", port), libservice.StaticServerServiceName, "")
// ###########################
// ## with onlypassing=false
// revert to OnlyPassing=false by deleting the config
err = cluster.ConfigEntryDelete(serviceResolver)
require.NoError(t, err)
// Consul health check assert only one static-server proxy is healthy when onlyPassing is false
libassert.AssertServiceHasHealthyInstances(t, node, libservice.StaticServerServiceName, false, 2)
// Although the service status is in warning state, when onlypassing is set to false Envoy
// health check returns all service instances with "warning" or "passing" state as Healthy enpoints
libassert.AssertUpstreamEndpointStatus(t, adminPort, "static-server.default", "HEALTHY", 2)
// static-client upstream should have 0 unhealthy endpoint for static-server
libassert.AssertUpstreamEndpointStatus(t, adminPort, "static-server.default", "UNHEALTHY", 0)
}
return nil, assertionFn, restartFn, nil
},
extraAssertion: func(clientConnectProxy libservice.Service) {
},
},
{
// Test resolver directs traffic to default subset
// - Create 3 static-server-2 server instances: one in V1, one in V2, one without any version
// - service2Resolver directs traffic to static-server-2-v2
name: "resolver subset redirect",
create: func(cluster *libcluster.Cluster, clientConnectProxy libservice.Service) (libservice.Service, func(), func(), error) {
node := cluster.Agents[0]
client := node.GetClient()
serviceOpts2 := &libservice.ServiceOpts{
Name: libservice.StaticServer2ServiceName,
ID: "static-server-2",
HTTPPort: 8081,
GRPCPort: 8078,
}
_, server2ConnectProxy, err := libservice.CreateAndRegisterStaticServerAndSidecar(node, serviceOpts2)
require.NoError(t, err)
libassert.CatalogServiceExists(t, client, libservice.StaticServer2ServiceName, nil)
serviceOptsV1 := &libservice.ServiceOpts{
Name: libservice.StaticServer2ServiceName,
ID: "static-server-2-v1",
Meta: map[string]string{"version": "v1"},
HTTPPort: 8082,
GRPCPort: 8077,
}
_, server2ConnectProxyV1, err := libservice.CreateAndRegisterStaticServerAndSidecar(node, serviceOptsV1)
require.NoError(t, err)
serviceOptsV2 := &libservice.ServiceOpts{
Name: libservice.StaticServer2ServiceName,
ID: "static-server-2-v2",
Meta: map[string]string{"version": "v2"},
HTTPPort: 8083,
GRPCPort: 8076,
}
_, server2ConnectProxyV2, err := libservice.CreateAndRegisterStaticServerAndSidecar(node, serviceOptsV2)
require.NoError(t, err)
libassert.CatalogServiceExists(t, client, libservice.StaticServer2ServiceName, nil)
// Register static-server service resolver
serviceResolver := &api.ServiceResolverConfigEntry{
Kind: api.ServiceResolver,
Name: libservice.StaticServer2ServiceName,
Subsets: map[string]api.ServiceResolverSubset{
"v1": {
Filter: "Service.Meta.version == v1",
},
"v2": {
Filter: "Service.Meta.version == v2",
},
},
}
err = cluster.ConfigEntryWrite(serviceResolver)
require.NoError(t, err)
// Register static-server-2 service resolver to redirect traffic
// from static-server to static-server-2-v2
service2Resolver := &api.ServiceResolverConfigEntry{
Kind: api.ServiceResolver,
Name: libservice.StaticServerServiceName,
Redirect: &api.ServiceResolverRedirect{
Service: libservice.StaticServer2ServiceName,
ServiceSubset: "v2",
},
}
err = cluster.ConfigEntryWrite(service2Resolver)
require.NoError(t, err)
_, server2AdminPort := server2ConnectProxy.GetAdminAddr()
_, server2AdminPortV1 := server2ConnectProxyV1.GetAdminAddr()
_, server2AdminPortV2 := server2ConnectProxyV2.GetAdminAddr()
restartFn := func() {
require.NoErrorf(t, server2ConnectProxy.Restart(), "%s", server2ConnectProxy.GetName())
require.NoErrorf(t, server2ConnectProxyV1.Restart(), "%s", server2ConnectProxyV1.GetName())
require.NoErrorf(t, server2ConnectProxyV2.Restart(), "%s", server2ConnectProxyV2.GetName())
}
assertionFn := func() {
// assert 3 static-server-2 instances are healthy
libassert.AssertServiceHasHealthyInstances(t, node, libservice.StaticServer2ServiceName, false, 3)
libassert.AssertEnvoyRunning(t, server2AdminPort)
libassert.AssertEnvoyRunning(t, server2AdminPortV1)
libassert.AssertEnvoyRunning(t, server2AdminPortV2)
libassert.AssertEnvoyPresentsCertURI(t, server2AdminPort, libservice.StaticServer2ServiceName)
libassert.AssertEnvoyPresentsCertURI(t, server2AdminPortV1, libservice.StaticServer2ServiceName)
libassert.AssertEnvoyPresentsCertURI(t, server2AdminPortV2, libservice.StaticServer2ServiceName)
// assert static-server proxies should be healthy
libassert.AssertServiceHasHealthyInstances(t, node, libservice.StaticServer2ServiceName, true, 3)
}
return nil, assertionFn, restartFn, nil
},
extraAssertion: func(clientConnectProxy libservice.Service) {
_, appPort := clientConnectProxy.GetAddr()
_, adminPort := clientConnectProxy.GetAdminAddr()
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", appPort), "static-server-2-v2", "")
libassert.AssertUpstreamEndpointStatus(t, adminPort, "v2.static-server-2.default", "HEALTHY", 1)
}, },
}, },
} }
require.NoError(t, cluster.ConfigEntryWrite(serviceResolver))
run := func(t *testing.T, tc testcase, oldVersion, targetVersion string) { assertionFn := func() {
buildOpts := &libcluster.BuildOptions{ _, serverAdminPortV1 := serverConnectProxyV1.GetAdminAddr()
ConsulVersion: oldVersion, _, serverAdminPortV2 := serverConnectProxyV2.GetAdminAddr()
Datacenter: "dc1",
InjectAutoEncryption: true,
}
cluster, _, _ := topology.NewCluster(t, &topology.ClusterConfig{
NumServers: 1,
NumClients: 1,
BuildOpts: buildOpts,
ApplyDefaultProxySettings: true,
})
node := cluster.Agents[0]
client := node.GetClient()
staticClientProxy, staticServerProxy, err := createStaticClientAndServer(cluster)
require.NoError(t, err)
libassert.CatalogServiceExists(t, client, libservice.StaticServerServiceName, nil)
libassert.CatalogServiceExists(t, client, fmt.Sprintf("%s-sidecar-proxy", libservice.StaticClientServiceName), nil)
err = cluster.ConfigEntryWrite(&api.ProxyConfigEntry{
Kind: api.ProxyDefaults,
Name: "global",
Config: map[string]interface{}{
"protocol": "http",
},
})
require.NoError(t, err)
_, port := staticClientProxy.GetAddr()
_, adminPort := staticClientProxy.GetAdminAddr() _, adminPort := staticClientProxy.GetAdminAddr()
_, serverAdminPort := staticServerProxy.GetAdminAddr() _, port := staticClientProxy.GetAddr()
libassert.HTTPServiceEchoes(t, "localhost", port, "")
libassert.AssertEnvoyPresentsCertURI(t, adminPort, libservice.StaticClientServiceName)
libassert.AssertEnvoyPresentsCertURI(t, serverAdminPort, libservice.StaticServerServiceName)
_, assertionAdditionalResources, restartFn, err := tc.create(cluster, staticClientProxy) libassert.AssertEnvoyRunning(t, serverAdminPortV1)
require.NoError(t, err) libassert.AssertEnvoyRunning(t, serverAdminPortV2)
// validate client and proxy is up and running
libassert.AssertContainerState(t, staticClientProxy, "running")
assertionAdditionalResources()
tc.extraAssertion(staticClientProxy)
// Upgrade cluster, restart sidecars then begin service traffic validation libassert.AssertEnvoyPresentsCertURI(t, serverAdminPortV1, libservice.StaticServerServiceName)
require.NoError(t, cluster.StandardUpgrade(t, context.Background(), targetVersion)) libassert.AssertEnvoyPresentsCertURI(t, serverAdminPortV2, libservice.StaticServerServiceName)
require.NoError(t, staticClientProxy.Restart())
require.NoError(t, staticServerProxy.Restart())
restartFn()
// POST upgrade validation; repeat client & proxy validation libassert.AssertUpstreamEndpointStatus(t, adminPort, "v2.static-server.default", "HEALTHY", 1)
libassert.HTTPServiceEchoes(t, "localhost", port, "")
libassert.AssertEnvoyRunning(t, adminPort)
libassert.AssertEnvoyRunning(t, serverAdminPort)
// certs are valid // assert static-server proxies should be healthy
libassert.AssertEnvoyPresentsCertURI(t, adminPort, libservice.StaticClientServiceName) libassert.AssertServiceHasHealthyInstances(t, node, libservice.StaticServerServiceName, true, 3)
libassert.AssertEnvoyPresentsCertURI(t, serverAdminPort, libservice.StaticServerServiceName)
assertionAdditionalResources() libassert.AssertUpstreamEndpointStatus(t, adminPort, "v2.static-server.default", "HEALTHY", 1)
tc.extraAssertion(staticClientProxy)
// static-client upstream should connect to static-server-v2 because the default subset value is to v2 set in the service resolver
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", port), "static-server-v2", "")
} }
for _, tc := range tcs { // validate client and proxy is up and running
t.Run(fmt.Sprintf("%s upgrade from %s to %s", tc.name, utils.LatestVersion, utils.TargetVersion), libassert.AssertContainerState(t, staticClientProxy, "running")
func(t *testing.T) { validate(t, staticServerProxy, staticClientProxy)
run(t, tc, utils.LatestVersion, utils.TargetVersion) assertionFn()
})
} // Upgrade cluster, restart sidecars then begin service traffic validation
require.NoError(t, cluster.StandardUpgrade(t, context.Background(), utils.TargetVersion))
require.NoError(t, staticClientProxy.Restart())
require.NoError(t, staticServerProxy.Restart())
require.NoError(t, serverConnectProxyV1.Restart())
require.NoError(t, serverConnectProxyV2.Restart())
validate(t, staticServerProxy, staticClientProxy)
assertionFn()
} }
// createStaticClientAndServer creates a static-client and a static-server in the cluster // Test resolver resolves service instance based on their check status
func createStaticClientAndServer(cluster *libcluster.Cluster) (libservice.Service, libservice.Service, error) { // - Create one addtional static-server with checks and V1 subset
// - resolver directs traffic to "test" service
func TestTrafficManagement_ResolverDefaultOnlyPassing(t *testing.T) {
cluster, staticServerProxy, staticClientProxy := setup(t)
node := cluster.Agents[0] node := cluster.Agents[0]
serviceOptsV1 := &libservice.ServiceOpts{
Name: libservice.StaticServerServiceName,
ID: "static-server-v1",
Meta: map[string]string{"version": "v1"},
HTTPPort: 8081,
GRPCPort: 8078,
Checks: libservice.Checks{
Name: "main",
TTL: "30m",
},
Connect: libservice.SidecarService{
Port: 21011,
},
}
_, serverConnectProxyV1, err := libservice.CreateAndRegisterStaticServerAndSidecarWithChecks(node, serviceOptsV1)
require.NoError(t, err)
// Register service resolver
serviceResolver := &api.ServiceResolverConfigEntry{
Kind: api.ServiceResolver,
Name: libservice.StaticServerServiceName,
DefaultSubset: "test",
Subsets: map[string]api.ServiceResolverSubset{
"test": {
OnlyPassing: true,
},
},
ConnectTimeout: 120 * time.Second,
}
_, serverAdminPortV1 := serverConnectProxyV1.GetAdminAddr()
assertionFn := func() {
_, port := staticClientProxy.GetAddr()
_, adminPort := staticClientProxy.GetAdminAddr()
// force static-server-v1 into a warning state
err = node.GetClient().Agent().UpdateTTL("service:static-server-v1", "", "warn")
require.NoError(t, err)
// ###########################
// ## with onlypassing=true
// assert only one static-server proxy is healthy
err = cluster.ConfigEntryWrite(serviceResolver)
require.NoError(t, err)
libassert.AssertServiceHasHealthyInstances(t, node, libservice.StaticServerServiceName, true, 1)
libassert.AssertEnvoyRunning(t, serverAdminPortV1)
libassert.AssertEnvoyPresentsCertURI(t, serverAdminPortV1, libservice.StaticServerServiceName)
// assert static-server proxies should be healthy
libassert.AssertServiceHasHealthyInstances(t, node, libservice.StaticServerServiceName, true, 1)
// static-client upstream should have 1 healthy endpoint for test.static-server
libassert.AssertUpstreamEndpointStatus(t, adminPort, "test.static-server.default", "HEALTHY", 1)
// static-client upstream should have 1 unhealthy endpoint for test.static-server
libassert.AssertUpstreamEndpointStatus(t, adminPort, "test.static-server.default", "UNHEALTHY", 1)
// static-client upstream should connect to static-server since it is passing
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", port), libservice.StaticServerServiceName, "")
// ###########################
// ## with onlypassing=false
// revert to OnlyPassing=false by deleting the config
err = cluster.ConfigEntryDelete(serviceResolver)
require.NoError(t, err)
// Consul health check assert only one static-server proxy is healthy when onlyPassing is false
libassert.AssertServiceHasHealthyInstances(t, node, libservice.StaticServerServiceName, false, 2)
// Although the service status is in warning state, when onlypassing is set to false Envoy
// health check returns all service instances with "warning" or "passing" state as Healthy enpoints
libassert.AssertUpstreamEndpointStatus(t, adminPort, "static-server.default", "HEALTHY", 2)
// static-client upstream should have 0 unhealthy endpoint for static-server
libassert.AssertUpstreamEndpointStatus(t, adminPort, "static-server.default", "UNHEALTHY", 0)
}
// validate client and proxy is up and running
libassert.AssertContainerState(t, staticClientProxy, "running")
validate(t, staticServerProxy, staticClientProxy)
assertionFn()
// Upgrade cluster, restart sidecars then begin service traffic validation
require.NoError(t, cluster.StandardUpgrade(t, context.Background(), utils.TargetVersion))
require.NoError(t, staticClientProxy.Restart())
require.NoError(t, staticServerProxy.Restart())
require.NoError(t, serverConnectProxyV1.Restart())
validate(t, staticServerProxy, staticClientProxy)
assertionFn()
}
// Test resolver directs traffic to default subset
// - Create 3 static-server-2 server instances: one in V1, one in V2, one without any version
// - service2Resolver directs traffic to static-server-2-v2V
func TestTrafficManagement_ResolverSubsetRedirect(t *testing.T) {
cluster, staticServerProxy, staticClientProxy := setup(t)
node := cluster.Agents[0]
client := node.GetClient()
serviceOpts2 := &libservice.ServiceOpts{
Name: libservice.StaticServer2ServiceName,
ID: "static-server-2",
HTTPPort: 8081,
GRPCPort: 8078,
}
_, server2ConnectProxy, err := libservice.CreateAndRegisterStaticServerAndSidecar(node, serviceOpts2)
require.NoError(t, err)
libassert.CatalogServiceExists(t, client, libservice.StaticServer2ServiceName, nil)
serviceOptsV1 := &libservice.ServiceOpts{
Name: libservice.StaticServer2ServiceName,
ID: "static-server-2-v1",
Meta: map[string]string{"version": "v1"},
HTTPPort: 8082,
GRPCPort: 8077,
}
_, server2ConnectProxyV1, err := libservice.CreateAndRegisterStaticServerAndSidecar(node, serviceOptsV1)
require.NoError(t, err)
serviceOptsV2 := &libservice.ServiceOpts{
Name: libservice.StaticServer2ServiceName,
ID: "static-server-2-v2",
Meta: map[string]string{"version": "v2"},
HTTPPort: 8083,
GRPCPort: 8076,
}
_, server2ConnectProxyV2, err := libservice.CreateAndRegisterStaticServerAndSidecar(node, serviceOptsV2)
require.NoError(t, err)
libassert.CatalogServiceExists(t, client, libservice.StaticServer2ServiceName, nil)
// Register static-server service resolver
serviceResolver := &api.ServiceResolverConfigEntry{
Kind: api.ServiceResolver,
Name: libservice.StaticServer2ServiceName,
Subsets: map[string]api.ServiceResolverSubset{
"v1": {
Filter: "Service.Meta.version == v1",
},
"v2": {
Filter: "Service.Meta.version == v2",
},
},
}
require.NoError(t, cluster.ConfigEntryWrite(serviceResolver))
// Register static-server-2 service resolver to redirect traffic
// from static-server to static-server-2-v2
server2Resolver := &api.ServiceResolverConfigEntry{
Kind: api.ServiceResolver,
Name: libservice.StaticServerServiceName,
Redirect: &api.ServiceResolverRedirect{
Service: libservice.StaticServer2ServiceName,
ServiceSubset: "v2",
},
}
require.NoError(t, cluster.ConfigEntryWrite(server2Resolver))
assertionFn := func() {
_, server2AdminPort := server2ConnectProxy.GetAdminAddr()
_, server2AdminPortV1 := server2ConnectProxyV1.GetAdminAddr()
_, server2AdminPortV2 := server2ConnectProxyV2.GetAdminAddr()
// assert 3 static-server-2 instances are healthy
libassert.AssertServiceHasHealthyInstances(t, node, libservice.StaticServer2ServiceName, false, 3)
libassert.AssertEnvoyRunning(t, server2AdminPort)
libassert.AssertEnvoyRunning(t, server2AdminPortV1)
libassert.AssertEnvoyRunning(t, server2AdminPortV2)
libassert.AssertEnvoyPresentsCertURI(t, server2AdminPort, libservice.StaticServer2ServiceName)
libassert.AssertEnvoyPresentsCertURI(t, server2AdminPortV1, libservice.StaticServer2ServiceName)
libassert.AssertEnvoyPresentsCertURI(t, server2AdminPortV2, libservice.StaticServer2ServiceName)
// assert static-server proxies should be healthy
libassert.AssertServiceHasHealthyInstances(t, node, libservice.StaticServer2ServiceName, true, 3)
_, appPort := staticClientProxy.GetAddr()
_, adminPort := staticClientProxy.GetAdminAddr()
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", appPort), "static-server-2-v2", "")
libassert.AssertUpstreamEndpointStatus(t, adminPort, "v2.static-server-2.default", "HEALTHY", 1)
}
require.NoError(t, err)
// validate client and proxy is up and running
libassert.AssertContainerState(t, staticClientProxy, "running")
validate(t, staticServerProxy, staticClientProxy)
assertionFn()
// Upgrade cluster, restart sidecars then begin service traffic validation
require.NoError(t, cluster.StandardUpgrade(t, context.Background(), utils.TargetVersion))
require.NoError(t, staticClientProxy.Restart())
require.NoError(t, staticServerProxy.Restart())
require.NoErrorf(t, server2ConnectProxy.Restart(), "%s", server2ConnectProxy.GetName())
require.NoErrorf(t, server2ConnectProxyV1.Restart(), "%s", server2ConnectProxyV1.GetName())
require.NoErrorf(t, server2ConnectProxyV2.Restart(), "%s", server2ConnectProxyV2.GetName())
validate(t, staticServerProxy, staticClientProxy)
assertionFn()
}
func setup(t *testing.T) (*libcluster.Cluster, libservice.Service, libservice.Service) {
buildOpts := &libcluster.BuildOptions{
ConsulVersion: utils.LatestVersion,
Datacenter: "dc1",
InjectAutoEncryption: true,
}
cluster, _, _ := topology.NewCluster(t, &topology.ClusterConfig{
NumServers: 1,
NumClients: 1,
BuildOpts: buildOpts,
ApplyDefaultProxySettings: true,
})
node := cluster.Agents[0]
client := node.GetClient()
serviceOpts := &libservice.ServiceOpts{ serviceOpts := &libservice.ServiceOpts{
Name: libservice.StaticServerServiceName, Name: libservice.StaticServerServiceName,
ID: "static-server", ID: "static-server",
HTTPPort: 8080, HTTPPort: 8080,
GRPCPort: 8079, GRPCPort: 8079,
} }
_, serverConnectProxy, err := libservice.CreateAndRegisterStaticServerAndSidecar(node, serviceOpts) _, staticServerProxy, err := libservice.CreateAndRegisterStaticServerAndSidecar(node, serviceOpts)
if err != nil { require.NoError(t, err)
return nil, nil, err
}
// Create a client proxy instance with the server as an upstream // Create a client proxy instance with the server as an upstream
clientConnectProxy, err := libservice.CreateAndRegisterStaticClientSidecar(node, "", false) staticClientProxy, err := libservice.CreateAndRegisterStaticClientSidecar(node, "", false)
if err != nil { require.NoError(t, err)
return nil, nil, err
}
return clientConnectProxy, serverConnectProxy, nil require.NoError(t, err)
libassert.CatalogServiceExists(t, client, libservice.StaticServerServiceName, nil)
libassert.CatalogServiceExists(t, client, fmt.Sprintf("%s-sidecar-proxy", libservice.StaticClientServiceName), nil)
err = cluster.ConfigEntryWrite(&api.ProxyConfigEntry{
Kind: api.ProxyDefaults,
Name: "global",
Config: map[string]interface{}{
"protocol": "http",
},
})
require.NoError(t, err)
validate(t, staticServerProxy, staticClientProxy)
return cluster, staticServerProxy, staticClientProxy
}
func validate(t *testing.T, staticServerProxy, staticClientProxy libservice.Service) {
_, port := staticClientProxy.GetAddr()
_, adminPort := staticClientProxy.GetAdminAddr()
_, serverAdminPort := staticServerProxy.GetAdminAddr()
libassert.HTTPServiceEchoes(t, "localhost", port, "")
libassert.AssertEnvoyRunning(t, adminPort)
libassert.AssertEnvoyRunning(t, serverAdminPort)
libassert.AssertEnvoyPresentsCertURI(t, adminPort, libservice.StaticClientServiceName)
libassert.AssertEnvoyPresentsCertURI(t, serverAdminPort, libservice.StaticServerServiceName)
} }

View File

@ -17,82 +17,75 @@ import (
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils" "github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
) )
// TestPeering_Upgrade_ControlPlane_MGW verifies the peering control plane traffic go through the mesh gateway // TestPeering_ControlPlaneMGW verifies the peering control plane traffic go through the mesh gateway
// PeerThroughMeshGateways can be inheritted by the upgraded cluster. // PeerThroughMeshGateways can be inheritted by the upgraded cluster.
// //
// 1. Create the basic peering topology of one dialing cluster and one accepting cluster // 1. Create the basic peering topology of one dialing cluster and one accepting cluster
// 2. Set PeerThroughMeshGateways = true // 2. Set PeerThroughMeshGateways = true
// 3. Upgrade both clusters // 3. Upgrade both clusters
// 4. Verify the peering is re-established through mesh gateway // 4. Verify the peering is re-established through mesh gateway
func TestPeering_Upgrade_ControlPlane_MGW(t *testing.T) { func TestPeering_ControlPlaneMGW(t *testing.T) {
t.Parallel() t.Parallel()
run := func(t *testing.T, oldVersion, targetVersion string) { accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, utils.LatestVersion, true)
accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, oldVersion, true) var (
var ( acceptingCluster = accepting.Cluster
acceptingCluster = accepting.Cluster dialingCluster = dialing.Cluster
dialingCluster = dialing.Cluster )
)
dialingClient, err := dialingCluster.GetClient(nil, false) dialingClient, err := dialingCluster.GetClient(nil, false)
require.NoError(t, err) require.NoError(t, err)
acceptingClient, err := acceptingCluster.GetClient(nil, false) acceptingClient, err := acceptingCluster.GetClient(nil, false)
require.NoError(t, err) require.NoError(t, err)
// Verify control plane endpoints and traffic in gateway // Verify control plane endpoints and traffic in gateway
_, gatewayAdminPort := dialing.Gateway.GetAdminAddr() _, gatewayAdminPort := dialing.Gateway.GetAdminAddr()
libassert.AssertUpstreamEndpointStatus(t, gatewayAdminPort, "server.dc1.peering", "HEALTHY", 1) libassert.AssertUpstreamEndpointStatus(t, gatewayAdminPort, "server.dc1.peering", "HEALTHY", 1)
libassert.AssertUpstreamEndpointStatus(t, gatewayAdminPort, "server.dc2.peering", "HEALTHY", 1) libassert.AssertUpstreamEndpointStatus(t, gatewayAdminPort, "server.dc2.peering", "HEALTHY", 1)
libassert.AssertEnvoyMetricAtLeast(t, gatewayAdminPort, libassert.AssertEnvoyMetricAtLeast(t, gatewayAdminPort,
"cluster.static-server.default.default.accepting-to-dialer.external", "cluster.static-server.default.default.accepting-to-dialer.external",
"upstream_cx_total", 1) "upstream_cx_total", 1)
libassert.AssertEnvoyMetricAtLeast(t, gatewayAdminPort, libassert.AssertEnvoyMetricAtLeast(t, gatewayAdminPort,
"cluster.server.dc1.peering", "cluster.server.dc1.peering",
"upstream_cx_total", 1) "upstream_cx_total", 1)
// Upgrade the accepting cluster and assert peering is still ACTIVE // Upgrade the accepting cluster and assert peering is still ACTIVE
require.NoError(t, acceptingCluster.StandardUpgrade(t, context.Background(), targetVersion)) require.NoError(t, acceptingCluster.StandardUpgrade(t, context.Background(), utils.TargetVersion))
libassert.PeeringStatus(t, acceptingClient, libtopology.AcceptingPeerName, api.PeeringStateActive) libassert.PeeringStatus(t, acceptingClient, libtopology.AcceptingPeerName, api.PeeringStateActive)
libassert.PeeringStatus(t, dialingClient, libtopology.DialingPeerName, api.PeeringStateActive) libassert.PeeringStatus(t, dialingClient, libtopology.DialingPeerName, api.PeeringStateActive)
require.NoError(t, dialingCluster.StandardUpgrade(t, context.Background(), targetVersion)) require.NoError(t, dialingCluster.StandardUpgrade(t, context.Background(), utils.TargetVersion))
libassert.PeeringStatus(t, acceptingClient, libtopology.AcceptingPeerName, api.PeeringStateActive) libassert.PeeringStatus(t, acceptingClient, libtopology.AcceptingPeerName, api.PeeringStateActive)
libassert.PeeringStatus(t, dialingClient, libtopology.DialingPeerName, api.PeeringStateActive) libassert.PeeringStatus(t, dialingClient, libtopology.DialingPeerName, api.PeeringStateActive)
// POST upgrade validation // POST upgrade validation
// - Restarted mesh gateway can receive consul generated configuration // - Restarted mesh gateway can receive consul generated configuration
// - control plane traffic is through mesh gateway // - control plane traffic is through mesh gateway
// - Register a new static-client service in dialing cluster and // - Register a new static-client service in dialing cluster and
// - set upstream to static-server service in peered cluster // - set upstream to static-server service in peered cluster
// Stop the accepting gateway and restart dialing gateway // Stop the accepting gateway and restart dialing gateway
// to force peering control plane traffic through dialing mesh gateway // to force peering control plane traffic through dialing mesh gateway
require.NoError(t, accepting.Gateway.Stop()) require.NoError(t, accepting.Gateway.Stop())
require.NoError(t, dialing.Gateway.Restart()) require.NoError(t, dialing.Gateway.Restart())
// Restarted dialing gateway should not have any measurement on data plane traffic // Restarted dialing gateway should not have any measurement on data plane traffic
libassert.AssertEnvoyMetricAtMost(t, gatewayAdminPort, libassert.AssertEnvoyMetricAtMost(t, gatewayAdminPort,
"cluster.static-server.default.default.accepting-to-dialer.external", "cluster.static-server.default.default.accepting-to-dialer.external",
"upstream_cx_total", 0) "upstream_cx_total", 0)
// control plane metrics should be observed // control plane metrics should be observed
libassert.AssertEnvoyMetricAtLeast(t, gatewayAdminPort, libassert.AssertEnvoyMetricAtLeast(t, gatewayAdminPort,
"cluster.server.dc1.peering", "cluster.server.dc1.peering",
"upstream_cx_total", 1) "upstream_cx_total", 1)
require.NoError(t, accepting.Gateway.Start()) require.NoError(t, accepting.Gateway.Start())
clientSidecarService, err := libservice.CreateAndRegisterStaticClientSidecar(dialingCluster.Servers()[0], libtopology.DialingPeerName, true) clientSidecarService, err := libservice.CreateAndRegisterStaticClientSidecar(dialingCluster.Servers()[0], libtopology.DialingPeerName, true)
require.NoError(t, err) require.NoError(t, err)
_, port := clientSidecarService.GetAddr() _, port := clientSidecarService.GetAddr()
_, adminPort := clientSidecarService.GetAdminAddr() _, adminPort := clientSidecarService.GetAdminAddr()
require.NoError(t, clientSidecarService.Restart()) require.NoError(t, clientSidecarService.Restart())
libassert.AssertUpstreamEndpointStatus(t, adminPort, fmt.Sprintf("static-server.default.%s.external", libtopology.DialingPeerName), "HEALTHY", 1) libassert.AssertUpstreamEndpointStatus(t, adminPort, fmt.Sprintf("static-server.default.%s.external", libtopology.DialingPeerName), "HEALTHY", 1)
libassert.HTTPServiceEchoes(t, "localhost", port, "") libassert.HTTPServiceEchoes(t, "localhost", port, "")
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", port), libservice.StaticServerServiceName, "") libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", port), libservice.StaticServerServiceName, "")
}
t.Run(fmt.Sprintf("Upgrade from %s to %s", utils.LatestVersion, utils.TargetVersion),
func(t *testing.T) {
run(t, utils.LatestVersion, utils.TargetVersion)
})
} }

View File

@ -18,370 +18,319 @@ import (
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils" "github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
) )
// TestPeering_UpgradeToTarget_fromLatest checks peering status after dialing cluster func TestPeering_Basic(t *testing.T) {
// and accepting cluster upgrade t.Parallel()
func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) { accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, utils.LatestVersion, false)
peeringUpgrade(t, accepting, dialing, utils.TargetVersion)
peeringPostUpgradeValidation(t, dialing)
}
func TestPeering_HTTPRouter(t *testing.T) {
t.Parallel()
accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, utils.LatestVersion, false)
acceptingCluster := accepting.Cluster
// Create a second static-server at the client agent of accepting cluster and
// a service-router that routes /static-server-2 to static-server-2
serviceOpts := &libservice.ServiceOpts{
Name: libservice.StaticServer2ServiceName,
ID: "static-server-2",
Meta: map[string]string{"version": "v2"},
HTTPPort: 8081,
GRPCPort: 8078,
}
_, _, err := libservice.CreateAndRegisterStaticServerAndSidecar(acceptingCluster.Clients()[0], serviceOpts)
require.NoError(t, err, "creating static-server-2")
libassert.CatalogServiceExists(t, acceptingCluster.Clients()[0].GetClient(), libservice.StaticServer2ServiceName, nil)
require.NoError(t, acceptingCluster.ConfigEntryWrite(&api.ProxyConfigEntry{
Kind: api.ProxyDefaults,
Name: "global",
Config: map[string]interface{}{
"protocol": "http",
},
}))
routerConfigEntry := &api.ServiceRouterConfigEntry{
Kind: api.ServiceRouter,
Name: libservice.StaticServerServiceName,
Routes: []api.ServiceRoute{
{
Match: &api.ServiceRouteMatch{
HTTP: &api.ServiceRouteHTTPMatch{
PathPrefix: "/" + libservice.StaticServer2ServiceName + "/",
},
},
Destination: &api.ServiceRouteDestination{
Service: libservice.StaticServer2ServiceName,
PrefixRewrite: "/",
},
},
},
}
require.NoError(t, acceptingCluster.ConfigEntryWrite(routerConfigEntry))
_, appPort := dialing.Container.GetAddr()
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d/static-server-2", appPort), "static-server-2", "")
peeringUpgrade(t, accepting, dialing, utils.TargetVersion)
peeringPostUpgradeValidation(t, dialing)
// TODO: restart static-server-2's sidecar
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d/static-server-2", appPort), "static-server-2", "")
}
// Verify resolver and failover can direct traffic to server in peered cluster
// In addtional to the basic topology, this case provisions the following
// services in the dialing cluster:
//
// - a new static-client at server_0 that has two upstreams: static-server (5000)
// and peer-static-server (5001)
// - a local static-server service at client_0
// - service-resolved named static-server with failover to static-server in accepting cluster
// - service-resolved named peer-static-server to static-server in accepting cluster
func TestPeering_HTTPResolverAndFailover(t *testing.T) {
t.Parallel() t.Parallel()
type testcase struct { accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, utils.LatestVersion, false)
name string dialingCluster := dialing.Cluster
// create creates addtional resources in peered clusters depending on cases, e.g., static-client,
// static server, and config-entries. It returns the proxy services, an assertation function to
// be called to verify the resources.
create func(*cluster.Cluster, *cluster.Cluster) (libservice.Service, libservice.Service, func(), error)
// extraAssertion adds additional assertion function to the common resources across cases.
// common resources includes static-client in dialing cluster, and static-server in accepting cluster.
extraAssertion func(int)
}
tcs := []testcase{ require.NoError(t, dialingCluster.ConfigEntryWrite(&api.ProxyConfigEntry{
// { Kind: api.ProxyDefaults,
// TODO: API changed from 1.13 to 1.14 in , PeerName to Peer Name: "global",
// exportConfigEntry Config: map[string]interface{}{
// oldversion: "1.13", "protocol": "http",
// targetVersion: *utils.TargetVersion,
// },
{
name: "basic",
create: func(accepting *cluster.Cluster, dialing *cluster.Cluster) (libservice.Service, libservice.Service, func(), error) {
return nil, nil, func() {}, nil
},
extraAssertion: func(clientUpstreamPort int) {},
}, },
{ }))
name: "http_router",
// Create a second static-service at the client agent of accepting cluster and
// a service-router that routes /static-server-2 to static-server-2
create: func(accepting *cluster.Cluster, dialing *cluster.Cluster) (libservice.Service, libservice.Service, func(), error) {
c := accepting
serviceOpts := &libservice.ServiceOpts{
Name: libservice.StaticServer2ServiceName,
ID: "static-server-2",
Meta: map[string]string{"version": "v2"},
HTTPPort: 8081,
GRPCPort: 8078,
}
_, serverConnectProxy, err := libservice.CreateAndRegisterStaticServerAndSidecar(c.Clients()[0], serviceOpts)
if err != nil {
return nil, nil, nil, err
}
libassert.CatalogServiceExists(t, c.Clients()[0].GetClient(), libservice.StaticServer2ServiceName, nil)
err = c.ConfigEntryWrite(&api.ProxyConfigEntry{ clientConnectProxy, err := createAndRegisterStaticClientSidecarWith2Upstreams(dialingCluster,
Kind: api.ProxyDefaults, []string{libservice.StaticServerServiceName, "peer-static-server"}, true,
Name: "global", )
Config: map[string]interface{}{ require.NoErrorf(t, err, "error creating client connect proxy in cluster %s", dialingCluster.NetworkName)
"protocol": "http",
},
})
if err != nil {
return nil, nil, nil, err
}
routerConfigEntry := &api.ServiceRouterConfigEntry{
Kind: api.ServiceRouter,
Name: libservice.StaticServerServiceName,
Routes: []api.ServiceRoute{
{
Match: &api.ServiceRouteMatch{
HTTP: &api.ServiceRouteHTTPMatch{
PathPrefix: "/" + libservice.StaticServer2ServiceName + "/",
},
},
Destination: &api.ServiceRouteDestination{
Service: libservice.StaticServer2ServiceName,
PrefixRewrite: "/",
},
},
},
}
err = c.ConfigEntryWrite(routerConfigEntry)
return serverConnectProxy, nil, func() {}, err
},
extraAssertion: func(clientUpstreamPort int) {
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d/static-server-2", clientUpstreamPort), "static-server-2", "")
},
},
{
name: "http splitter and resolver",
// In addtional to the basic topology, this case provisions the following
// services in the dialing cluster:
//
// - a new static-client at server_0 that has two upstreams: split-static-server (5000)
// and peer-static-server (5001)
// - a local static-server service at client_0
// - service-splitter named split-static-server w/ 2 services: "local-static-server" and
// "peer-static-server".
// - service-resolved named local-static-server
// - service-resolved named peer-static-server
create: func(accepting *cluster.Cluster, dialing *cluster.Cluster) (libservice.Service, libservice.Service, func(), error) {
err := dialing.ConfigEntryWrite(&api.ProxyConfigEntry{
Kind: api.ProxyDefaults,
Name: "global",
Config: map[string]interface{}{
"protocol": "http",
},
})
if err != nil {
return nil, nil, nil, err
}
clientConnectProxy, err := createAndRegisterStaticClientSidecarWith2Upstreams(dialing, // make a resolver for service peer-static-server
[]string{"split-static-server", "peer-static-server"}, true, resolverConfigEntry := &api.ServiceResolverConfigEntry{
) Kind: api.ServiceResolver,
if err != nil { Name: "peer-static-server",
return nil, nil, nil, fmt.Errorf("error creating client connect proxy in cluster %s", dialing.NetworkName) Redirect: &api.ServiceResolverRedirect{
} Service: libservice.StaticServerServiceName,
Peer: libtopology.DialingPeerName,
// make a resolver for service peer-static-server
resolverConfigEntry := &api.ServiceResolverConfigEntry{
Kind: api.ServiceResolver,
Name: "peer-static-server",
Redirect: &api.ServiceResolverRedirect{
Service: libservice.StaticServerServiceName,
Peer: libtopology.DialingPeerName,
},
}
err = dialing.ConfigEntryWrite(resolverConfigEntry)
if err != nil {
return nil, nil, nil, fmt.Errorf("error writing resolver config entry for %s", resolverConfigEntry.Name)
}
// make a splitter for service split-static-server
splitter := &api.ServiceSplitterConfigEntry{
Kind: api.ServiceSplitter,
Name: "split-static-server",
Splits: []api.ServiceSplit{
{
Weight: 50,
Service: "local-static-server",
ResponseHeaders: &api.HTTPHeaderModifiers{
Set: map[string]string{
"x-test-split": "local",
},
},
},
{
Weight: 50,
Service: "peer-static-server",
ResponseHeaders: &api.HTTPHeaderModifiers{
Set: map[string]string{
"x-test-split": "peer",
},
},
},
},
}
err = dialing.ConfigEntryWrite(splitter)
if err != nil {
return nil, nil, nil, fmt.Errorf("error writing splitter config entry for %s", splitter.Name)
}
// make a resolver for service local-static-server
resolverConfigEntry = &api.ServiceResolverConfigEntry{
Kind: api.ServiceResolver,
Name: "local-static-server",
Redirect: &api.ServiceResolverRedirect{
Service: libservice.StaticServerServiceName,
},
}
err = dialing.ConfigEntryWrite(resolverConfigEntry)
if err != nil {
return nil, nil, nil, fmt.Errorf("error writing resolver config entry for %s", resolverConfigEntry.Name)
}
// Make a static-server in dialing cluster
serviceOpts := &libservice.ServiceOpts{
Name: libservice.StaticServerServiceName,
ID: "static-server",
HTTPPort: 8081,
GRPCPort: 8078,
}
_, serverConnectProxy, err := libservice.CreateAndRegisterStaticServerAndSidecar(dialing.Clients()[0], serviceOpts)
libassert.CatalogServiceExists(t, dialing.Clients()[0].GetClient(), libservice.StaticServerServiceName, nil)
if err != nil {
return nil, nil, nil, err
}
_, appPorts := clientConnectProxy.GetAddrs()
assertionFn := func() {
libassert.HTTPServiceEchoesResHeader(t, "localhost", appPorts[0], "", map[string]string{
"X-Test-Split": "local",
})
libassert.HTTPServiceEchoesResHeader(t, "localhost", appPorts[0], "", map[string]string{
"X-Test-Split": "peer",
})
libassert.HTTPServiceEchoes(t, "localhost", appPorts[0], "")
}
return serverConnectProxy, clientConnectProxy, assertionFn, nil
},
extraAssertion: func(clientUpstreamPort int) {},
},
{
name: "http resolver and failover",
// Verify resolver and failover can direct traffic to server in peered cluster
// In addtional to the basic topology, this case provisions the following
// services in the dialing cluster:
//
// - a new static-client at server_0 that has two upstreams: static-server (5000)
// and peer-static-server (5001)
// - a local static-server service at client_0
// - service-resolved named static-server with failover to static-server in accepting cluster
// - service-resolved named peer-static-server to static-server in accepting cluster
create: func(accepting *cluster.Cluster, dialing *cluster.Cluster) (libservice.Service, libservice.Service, func(), error) {
err := dialing.ConfigEntryWrite(&api.ProxyConfigEntry{
Kind: api.ProxyDefaults,
Name: "global",
Config: map[string]interface{}{
"protocol": "http",
},
})
if err != nil {
return nil, nil, nil, err
}
clientConnectProxy, err := createAndRegisterStaticClientSidecarWith2Upstreams(dialing,
[]string{libservice.StaticServerServiceName, "peer-static-server"}, true,
)
if err != nil {
return nil, nil, nil, fmt.Errorf("error creating client connect proxy in cluster %s", dialing.NetworkName)
}
// make a resolver for service peer-static-server
resolverConfigEntry := &api.ServiceResolverConfigEntry{
Kind: api.ServiceResolver,
Name: "peer-static-server",
Redirect: &api.ServiceResolverRedirect{
Service: libservice.StaticServerServiceName,
Peer: libtopology.DialingPeerName,
},
}
err = dialing.ConfigEntryWrite(resolverConfigEntry)
if err != nil {
return nil, nil, nil, fmt.Errorf("error writing resolver config entry for %s", resolverConfigEntry.Name)
}
// make a resolver for service static-server
resolverConfigEntry = &api.ServiceResolverConfigEntry{
Kind: api.ServiceResolver,
Name: libservice.StaticServerServiceName,
Failover: map[string]api.ServiceResolverFailover{
"*": {
Targets: []api.ServiceResolverFailoverTarget{
{
Peer: libtopology.DialingPeerName,
},
},
},
},
}
err = dialing.ConfigEntryWrite(resolverConfigEntry)
if err != nil {
return nil, nil, nil, fmt.Errorf("error writing resolver config entry for %s", resolverConfigEntry.Name)
}
// Make a static-server in dialing cluster
serviceOpts := &libservice.ServiceOpts{
Name: libservice.StaticServerServiceName,
ID: "static-server-dialing",
HTTPPort: 8081,
GRPCPort: 8078,
}
_, serverConnectProxy, err := libservice.CreateAndRegisterStaticServerAndSidecar(dialing.Clients()[0], serviceOpts)
libassert.CatalogServiceExists(t, dialing.Clients()[0].GetClient(), libservice.StaticServerServiceName, nil)
if err != nil {
return nil, nil, nil, err
}
_, appPorts := clientConnectProxy.GetAddrs()
assertionFn := func() {
// assert traffic can fail-over to static-server in peered cluster and restor to local static-server
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", appPorts[0]), "static-server-dialing", "")
require.NoError(t, serverConnectProxy.Stop())
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", appPorts[0]), libservice.StaticServerServiceName, "")
require.NoError(t, serverConnectProxy.Start())
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", appPorts[0]), "static-server-dialing", "")
// assert peer-static-server resolves to static-server in peered cluster
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", appPorts[1]), libservice.StaticServerServiceName, "")
}
return serverConnectProxy, clientConnectProxy, assertionFn, nil
},
extraAssertion: func(clientUpstreamPort int) {},
}, },
} }
require.NoErrorf(t, dialingCluster.ConfigEntryWrite(resolverConfigEntry),
"error writing resolver config entry for %s", resolverConfigEntry.Name)
run := func(t *testing.T, tc testcase, oldVersion, targetVersion string) { // make a resolver for service static-server
accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, oldVersion, false) resolverConfigEntry = &api.ServiceResolverConfigEntry{
var ( Kind: api.ServiceResolver,
acceptingCluster = accepting.Cluster Name: libservice.StaticServerServiceName,
dialingCluster = dialing.Cluster Failover: map[string]api.ServiceResolverFailover{
) "*": {
Targets: []api.ServiceResolverFailoverTarget{
dialingClient, err := dialingCluster.GetClient(nil, false) {
require.NoError(t, err) Peer: libtopology.DialingPeerName,
},
acceptingClient, err := acceptingCluster.GetClient(nil, false) },
require.NoError(t, err) },
},
_, gatewayAdminPort := dialing.Gateway.GetAdminAddr()
_, staticClientPort := dialing.Container.GetAddr()
_, appPort := dialing.Container.GetAddr()
_, secondClientProxy, assertionAdditionalResources, err := tc.create(acceptingCluster, dialingCluster)
require.NoError(t, err)
assertionAdditionalResources()
tc.extraAssertion(appPort)
// Upgrade the accepting cluster and assert peering is still ACTIVE
require.NoError(t, acceptingCluster.StandardUpgrade(t, context.Background(), targetVersion))
libassert.PeeringStatus(t, acceptingClient, libtopology.AcceptingPeerName, api.PeeringStateActive)
libassert.PeeringStatus(t, dialingClient, libtopology.DialingPeerName, api.PeeringStateActive)
require.NoError(t, dialingCluster.StandardUpgrade(t, context.Background(), targetVersion))
libassert.PeeringStatus(t, acceptingClient, libtopology.AcceptingPeerName, api.PeeringStateActive)
libassert.PeeringStatus(t, dialingClient, libtopology.DialingPeerName, api.PeeringStateActive)
// POST upgrade validation
// - Register a new static-client service in dialing cluster and
// - set upstream to static-server service in peered cluster
// Restart the gateway & proxy sidecar, and verify existing connection
require.NoError(t, dialing.Gateway.Restart())
// Restarted gateway should not have any measurement on data plane traffic
libassert.AssertEnvoyMetricAtMost(t, gatewayAdminPort,
"cluster.static-server.default.default.accepting-to-dialer.external",
"upstream_cx_total", 0)
libassert.HTTPServiceEchoes(t, "localhost", staticClientPort, "")
require.NoError(t, dialing.Container.Restart())
libassert.HTTPServiceEchoes(t, "localhost", staticClientPort, "")
require.NoError(t, accepting.Container.Restart())
libassert.HTTPServiceEchoes(t, "localhost", staticClientPort, "")
// restart the secondClientProxy if exist
if secondClientProxy != nil {
require.NoError(t, secondClientProxy.Restart())
}
assertionAdditionalResources()
clientSidecarService, err := libservice.CreateAndRegisterStaticClientSidecar(dialingCluster.Servers()[0], libtopology.DialingPeerName, true)
require.NoError(t, err)
_, port := clientSidecarService.GetAddr()
_, adminPort := clientSidecarService.GetAdminAddr()
libassert.AssertUpstreamEndpointStatus(t, adminPort, fmt.Sprintf("static-server.default.%s.external", libtopology.DialingPeerName), "HEALTHY", 1)
libassert.HTTPServiceEchoes(t, "localhost", port, "")
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", port), libservice.StaticServerServiceName, "")
// TODO: restart static-server-2's sidecar
tc.extraAssertion(appPort)
} }
require.NoErrorf(t, dialingCluster.ConfigEntryWrite(resolverConfigEntry),
"error writing resolver config entry for %s", resolverConfigEntry.Name)
for _, tc := range tcs { // Make a static-server in dialing cluster
t.Run(fmt.Sprintf("%s upgrade from %s to %s", tc.name, utils.LatestVersion, utils.TargetVersion), serviceOpts := &libservice.ServiceOpts{
func(t *testing.T) { Name: libservice.StaticServerServiceName,
run(t, tc, utils.LatestVersion, utils.TargetVersion) ID: "static-server-dialing",
}) HTTPPort: 8081,
GRPCPort: 8078,
} }
_, serverConnectProxy, err := libservice.CreateAndRegisterStaticServerAndSidecar(dialingCluster.Clients()[0], serviceOpts)
require.NoError(t, err)
libassert.CatalogServiceExists(t, dialingCluster.Clients()[0].GetClient(), libservice.StaticServerServiceName, nil)
_, appPorts := clientConnectProxy.GetAddrs()
assertionAdditionalResources := func() {
// assert traffic can fail-over to static-server in peered cluster and restor to local static-server
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", appPorts[0]), "static-server-dialing", "")
require.NoError(t, serverConnectProxy.Stop())
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", appPorts[0]), libservice.StaticServerServiceName, "")
require.NoError(t, serverConnectProxy.Start())
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", appPorts[0]), "static-server-dialing", "")
// assert peer-static-server resolves to static-server in peered cluster
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", appPorts[1]), libservice.StaticServerServiceName, "")
}
assertionAdditionalResources()
peeringUpgrade(t, accepting, dialing, utils.TargetVersion)
require.NoError(t, clientConnectProxy.Restart())
assertionAdditionalResources()
peeringPostUpgradeValidation(t, dialing)
// TODO: restart static-server-2's sidecar
}
// In addtional to the basic topology, this case provisions the following
// services in the dialing cluster:
//
// - a new static-client at server_0 that has two upstreams: split-static-server (5000)
// and peer-static-server (5001)
// - a local static-server service at client_0
// - service-splitter named split-static-server w/ 2 services: "local-static-server" and
// "peer-static-server".
// - service-resolved named local-static-server
// - service-resolved named peer-static-server
func TestPeering_HTTPResolverAndSplitter(t *testing.T) {
t.Parallel()
accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, utils.LatestVersion, false)
dialingCluster := dialing.Cluster
require.NoError(t, dialingCluster.ConfigEntryWrite(&api.ProxyConfigEntry{
Kind: api.ProxyDefaults,
Name: "global",
Config: map[string]interface{}{
"protocol": "http",
},
}))
clientConnectProxy, err := createAndRegisterStaticClientSidecarWith2Upstreams(dialingCluster,
[]string{"split-static-server", "peer-static-server"}, true,
)
require.NoErrorf(t, err, "creating client connect proxy in cluster %s", dialingCluster.NetworkName)
// make a resolver for service peer-static-server
resolverConfigEntry := &api.ServiceResolverConfigEntry{
Kind: api.ServiceResolver,
Name: "peer-static-server",
Redirect: &api.ServiceResolverRedirect{
Service: libservice.StaticServerServiceName,
Peer: libtopology.DialingPeerName,
},
}
require.NoErrorf(t, dialingCluster.ConfigEntryWrite(resolverConfigEntry),
"writing resolver config entry for %s", resolverConfigEntry.Name)
// make a splitter for service split-static-server
splitter := &api.ServiceSplitterConfigEntry{
Kind: api.ServiceSplitter,
Name: "split-static-server",
Splits: []api.ServiceSplit{
{
Weight: 50,
Service: "local-static-server",
ResponseHeaders: &api.HTTPHeaderModifiers{
Set: map[string]string{
"x-test-split": "local",
},
},
},
{
Weight: 50,
Service: "peer-static-server",
ResponseHeaders: &api.HTTPHeaderModifiers{
Set: map[string]string{
"x-test-split": "peer",
},
},
},
},
}
require.NoErrorf(t, dialingCluster.ConfigEntryWrite(splitter),
"error writing splitter config entry for %s", splitter.Name)
// make a resolver for service local-static-server
resolverConfigEntry = &api.ServiceResolverConfigEntry{
Kind: api.ServiceResolver,
Name: "local-static-server",
Redirect: &api.ServiceResolverRedirect{
Service: libservice.StaticServerServiceName,
},
}
require.NoErrorf(t, dialingCluster.ConfigEntryWrite(resolverConfigEntry),
"error writing resolver config entry for %s", resolverConfigEntry.Name)
// Make a static-server in dialing cluster
serviceOpts := &libservice.ServiceOpts{
Name: libservice.StaticServerServiceName,
ID: "static-server",
HTTPPort: 8081,
GRPCPort: 8078,
}
_, _, err = libservice.CreateAndRegisterStaticServerAndSidecar(dialingCluster.Clients()[0], serviceOpts)
require.NoError(t, err)
libassert.CatalogServiceExists(t, dialingCluster.Clients()[0].GetClient(), libservice.StaticServerServiceName, nil)
_, appPorts := clientConnectProxy.GetAddrs()
assertionAdditionalResources := func() {
libassert.HTTPServiceEchoesResHeader(t, "localhost", appPorts[0], "", map[string]string{
"X-Test-Split": "local",
})
libassert.HTTPServiceEchoesResHeader(t, "localhost", appPorts[0], "", map[string]string{
"X-Test-Split": "peer",
})
libassert.HTTPServiceEchoes(t, "localhost", appPorts[0], "")
}
assertionAdditionalResources()
peeringUpgrade(t, accepting, dialing, utils.TargetVersion)
require.NoError(t, clientConnectProxy.Restart())
assertionAdditionalResources()
peeringPostUpgradeValidation(t, dialing)
// TODO: restart static-server-2's sidecar
}
func peeringUpgrade(t *testing.T, accepting, dialing *libtopology.BuiltCluster, targetVersion string) {
t.Helper()
dialingClient, err := dialing.Cluster.GetClient(nil, false)
require.NoError(t, err)
acceptingClient, err := accepting.Cluster.GetClient(nil, false)
require.NoError(t, err)
_, gatewayAdminPort := dialing.Gateway.GetAdminAddr()
_, staticClientPort := dialing.Container.GetAddr()
// Upgrade the accepting cluster and assert peering is still ACTIVE
require.NoError(t, accepting.Cluster.StandardUpgrade(t, context.Background(), targetVersion))
libassert.PeeringStatus(t, acceptingClient, libtopology.AcceptingPeerName, api.PeeringStateActive)
libassert.PeeringStatus(t, dialingClient, libtopology.DialingPeerName, api.PeeringStateActive)
require.NoError(t, dialing.Cluster.StandardUpgrade(t, context.Background(), targetVersion))
libassert.PeeringStatus(t, acceptingClient, libtopology.AcceptingPeerName, api.PeeringStateActive)
libassert.PeeringStatus(t, dialingClient, libtopology.DialingPeerName, api.PeeringStateActive)
// POST upgrade validation
// - Register a new static-client service in dialing cluster and
// - set upstream to static-server service in peered cluster
// Restart the gateway & proxy sidecar, and verify existing connection
require.NoError(t, dialing.Gateway.Restart())
// Restarted gateway should not have any measurement on data plane traffic
libassert.AssertEnvoyMetricAtMost(t, gatewayAdminPort,
"cluster.static-server.default.default.accepting-to-dialer.external",
"upstream_cx_total", 0)
libassert.HTTPServiceEchoes(t, "localhost", staticClientPort, "")
require.NoError(t, dialing.Container.Restart())
libassert.HTTPServiceEchoes(t, "localhost", staticClientPort, "")
require.NoError(t, accepting.Container.Restart())
libassert.HTTPServiceEchoes(t, "localhost", staticClientPort, "")
}
func peeringPostUpgradeValidation(t *testing.T, dialing *libtopology.BuiltCluster) {
t.Helper()
clientSidecarService, err := libservice.CreateAndRegisterStaticClientSidecar(dialing.Cluster.Servers()[0], libtopology.DialingPeerName, true)
require.NoError(t, err)
_, port := clientSidecarService.GetAddr()
_, adminPort := clientSidecarService.GetAdminAddr()
libassert.AssertUpstreamEndpointStatus(t, adminPort, fmt.Sprintf("static-server.default.%s.external", libtopology.DialingPeerName), "HEALTHY", 1)
libassert.HTTPServiceEchoes(t, "localhost", port, "")
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", port), libservice.StaticServerServiceName, "")
} }
// createAndRegisterStaticClientSidecarWith2Upstreams creates a static-client that // createAndRegisterStaticClientSidecarWith2Upstreams creates a static-client that