Refactoring the peering integ test to accommodate coming changes of o… (#15885)

* Refactoring the peering integ test to accommodate coming changes of other upgrade scenarios.

- Add a utils package under test that contains methods to set up various test scenarios.
- Deduplication: have a single CreatingPeeringClusterAndSetup replace
  CreatingAcceptingClusterAndSetup and CreateDialingClusterAndSetup.
- Separate peering cluster creation and server registration.

* Apply suggestions from code review

Co-authored-by: Dan Stough <dan.stough@hashicorp.com>
This commit is contained in:
cskh 2023-01-04 15:28:15 -05:00 committed by GitHub
parent cb5389cc89
commit bb797ff36c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 262 additions and 201 deletions

View File

@ -13,7 +13,7 @@ import (
) )
const ( const (
defaultHTTPTimeout = 30 * time.Second defaultHTTPTimeout = 100 * time.Second
defaultHTTPWait = defaultWait defaultHTTPWait = defaultWait
) )

View File

@ -25,13 +25,14 @@ import (
// These fields are public in the event someone might want to surgically // These fields are public in the event someone might want to surgically
// craft a test case. // craft a test case.
type Cluster struct { type Cluster struct {
Agents []libagent.Agent Agents []libagent.Agent
CACert string BuildContext *libagent.BuildContext
CAKey string CACert string
ID string CAKey string
Index int ID string
Network testcontainers.Network Index int
NetworkName string Network testcontainers.Network
NetworkName string
} }
// New creates a Consul cluster. An agent will be started for each of the given // New creates a Consul cluster. An agent will be started for each of the given
@ -238,6 +239,31 @@ func (c *Cluster) Leader() (libagent.Agent, error) {
return nil, fmt.Errorf("leader not found") return nil, fmt.Errorf("leader not found")
} }
// GetClient returns a consul API client to the node if node is provided.
// Otherwise, GetClient returns the API client to the first node of either
// server or client agent.
func (c *Cluster) GetClient(node libagent.Agent, isServer bool) (*api.Client, error) {
var err error
if node != nil {
return node.GetClient(), err
}
nodes, err := c.Clients()
if isServer {
nodes, err = c.Servers()
}
if err != nil {
return nil, fmt.Errorf("unable to get the api client: %s", err)
}
if len(nodes) <= 0 {
return nil, fmt.Errorf("not enough node: %d", len(nodes))
}
return nodes[0].GetClient(), err
}
func getLeader(client *api.Client) (string, error) { func getLeader(client *api.Client) (string, error) {
leaderAdd, err := client.Status().Leader() leaderAdd, err := client.Status().Leader()
if err != nil { if err != nil {

View File

@ -8,27 +8,34 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
// "github.com/hashicorp/consul/sdk/testutil/retry"
libagent "github.com/hashicorp/consul/test/integration/consul-container/libs/agent" libagent "github.com/hashicorp/consul/test/integration/consul-container/libs/agent"
libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert"
libservice "github.com/hashicorp/consul/test/integration/consul-container/libs/service" libservice "github.com/hashicorp/consul/test/integration/consul-container/libs/service"
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils" "github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
) )
// creatingAcceptingClusterAndSetup creates a cluster with 3 servers and 1 client. type Options struct {
// It also creates and registers a service+sidecar. Datacenter string
NumServer int
NumClient int
Version string
}
// CreatingPeeringClusterAndSetup creates a cluster with peering enabled
// It also creates and registers a mesh-gateway at the client agent.
// The API client returned is pointed at the client agent. // The API client returned is pointed at the client agent.
func CreatingAcceptingClusterAndSetup(t *testing.T, numServer int, version string, acceptingPeerName string) (*Cluster, *api.Client, *libagent.BuildContext) { func CreatingPeeringClusterAndSetup(t *testing.T, clusterOpts *Options) (*Cluster, *api.Client) {
var configs []libagent.Config var configs []libagent.Config
opts := libagent.BuildOptions{ opts := libagent.BuildOptions{
Datacenter: clusterOpts.Datacenter,
InjectAutoEncryption: true, InjectAutoEncryption: true,
InjectGossipEncryption: true, InjectGossipEncryption: true,
ConsulVersion: version, ConsulVersion: clusterOpts.Version,
} }
ctx, err := libagent.NewBuildContext(opts) ctx, err := libagent.NewBuildContext(opts)
require.NoError(t, err) require.NoError(t, err)
numServer := clusterOpts.NumServer
for i := 0; i < numServer; i++ { for i := 0; i < numServer; i++ {
serverConf, err := libagent.NewConfigBuilder(ctx). serverConf, err := libagent.NewConfigBuilder(ctx).
Bootstrap(numServer). Bootstrap(numServer).
@ -36,7 +43,7 @@ func CreatingAcceptingClusterAndSetup(t *testing.T, numServer int, version strin
RetryJoin(fmt.Sprintf("agent-%d", (i+1)%3)). // Round-robin join the servers RetryJoin(fmt.Sprintf("agent-%d", (i+1)%3)). // Round-robin join the servers
ToAgentConfig() ToAgentConfig()
require.NoError(t, err) require.NoError(t, err)
t.Logf("dc1 server config %d: \n%s", i, serverConf.JSON) t.Logf("%s server config %d: \n%s", clusterOpts.Datacenter, i, serverConf.JSON)
configs = append(configs, *serverConf) configs = append(configs, *serverConf)
} }
@ -49,16 +56,16 @@ func CreatingAcceptingClusterAndSetup(t *testing.T, numServer int, version strin
ToAgentConfig() ToAgentConfig()
require.NoError(t, err) require.NoError(t, err)
t.Logf("dc1 client config: \n%s", clientConf.JSON) t.Logf("%s client config: \n%s", clusterOpts.Datacenter, clientConf.JSON)
configs = append(configs, *clientConf) configs = append(configs, *clientConf)
cluster, err := New(configs) cluster, err := New(configs)
require.NoError(t, err) require.NoError(t, err)
cluster.BuildContext = ctx
// Use the client agent as the HTTP endpoint since we will not rotate it client, err := cluster.GetClient(nil, false)
clientNode := cluster.Agents[numServer] require.NoError(t, err)
client := clientNode.GetClient()
WaitForLeader(t, cluster, client) WaitForLeader(t, cluster, client)
WaitForMembers(t, client, numServer+1) WaitForMembers(t, client, numServer+1)
@ -68,78 +75,8 @@ func CreatingAcceptingClusterAndSetup(t *testing.T, numServer int, version strin
require.True(t, ok) require.True(t, ok)
// Create the mesh gateway for dataplane traffic // Create the mesh gateway for dataplane traffic
_, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", clientNode) clientNodes, _ := cluster.Clients()
_, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", clientNodes[0])
require.NoError(t, err) require.NoError(t, err)
return cluster, client
// Create a service and proxy instance
_, _, err = libservice.CreateAndRegisterStaticServerAndSidecar(clientNode)
require.NoError(t, err)
libassert.CatalogServiceExists(t, client, "static-server")
libassert.CatalogServiceExists(t, client, "static-server-sidecar-proxy")
// Export the service
config := &api.ExportedServicesConfigEntry{
Name: "default",
Services: []api.ExportedService{
{
Name: "static-server",
Consumers: []api.ServiceConsumer{
// TODO: need to handle the changed field name in 1.13
{Peer: acceptingPeerName},
},
},
},
}
ok, _, err = client.ConfigEntries().Set(config, &api.WriteOptions{})
require.NoError(t, err)
require.True(t, ok)
return cluster, client, ctx
}
// createDialingClusterAndSetup creates a cluster for peering with a single dev agent
func CreateDialingClusterAndSetup(t *testing.T, version string, dialingPeerName string) (*Cluster, *api.Client, libservice.Service) {
opts := libagent.BuildOptions{
Datacenter: "dc2",
InjectAutoEncryption: true,
InjectGossipEncryption: true,
ConsulVersion: version,
}
ctx, err := libagent.NewBuildContext(opts)
require.NoError(t, err)
conf, err := libagent.NewConfigBuilder(ctx).
Peering(true).
ToAgentConfig()
require.NoError(t, err)
t.Logf("dc2 server config: \n%s", conf.JSON)
configs := []libagent.Config{*conf}
cluster, err := New(configs)
require.NoError(t, err)
node := cluster.Agents[0]
client := node.GetClient()
WaitForLeader(t, cluster, client)
WaitForMembers(t, client, 1)
// Default Proxy Settings
ok, err := utils.ApplyDefaultProxySettings(client)
require.NoError(t, err)
require.True(t, ok)
// Create the mesh gateway for dataplane traffic
_, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", node)
require.NoError(t, err)
// Create a service and proxy instance
clientProxyService, err := libservice.CreateAndRegisterStaticClientSidecar(node, dialingPeerName, true)
require.NoError(t, err)
libassert.CatalogServiceExists(t, client, "static-client-sidecar-proxy")
return cluster, client, clientProxyService
} }

View File

@ -6,6 +6,7 @@ import (
"time" "time"
"github.com/docker/go-connections/nat" "github.com/docker/go-connections/nat"
"github.com/hashicorp/consul/api"
"github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait" "github.com/testcontainers/testcontainers-go/wait"
@ -15,12 +16,12 @@ import (
// ConnectContainer // ConnectContainer
type ConnectContainer struct { type ConnectContainer struct {
ctx context.Context ctx context.Context
container testcontainers.Container container testcontainers.Container
ip string ip string
appPort int appPort int
adminPort int adminPort int
req testcontainers.ContainerRequest serviceName string
} }
func (g ConnectContainer) GetName() string { func (g ConnectContainer) GetName() string {
@ -68,6 +69,19 @@ func (c ConnectContainer) Terminate() error {
return err return err
} }
func (g ConnectContainer) Export(partition, peer string, client *api.Client) error {
return fmt.Errorf("ConnectContainer export unimplemented")
}
func (g ConnectContainer) GetServiceName() string {
return g.serviceName
}
// NewConnectService returns a container that runs envoy sidecar, launched by
// "consul connect envoy", for service name (serviceName) on the specified
// node. The container exposes port serviceBindPort and envoy admin port (19000)
// by mapping them onto host ports. The container's name has a prefix
// combining datacenter and name.
func NewConnectService(ctx context.Context, name string, serviceName string, serviceBindPort int, node libnode.Agent) (*ConnectContainer, error) { func NewConnectService(ctx context.Context, name string, serviceName string, serviceBindPort int, node libnode.Agent) (*ConnectContainer, error) {
namePrefix := fmt.Sprintf("%s-service-connect-%s", node.GetDatacenter(), name) namePrefix := fmt.Sprintf("%s-service-connect-%s", node.GetDatacenter(), name)
containerName := utils.RandName(namePrefix) containerName := utils.RandName(namePrefix)
@ -93,7 +107,6 @@ func NewConnectService(ctx context.Context, name string, serviceName string, ser
Cmd: []string{ Cmd: []string{
"consul", "connect", "envoy", "consul", "connect", "envoy",
"-sidecar-for", serviceName, "-sidecar-for", serviceName,
"-service", name,
"-admin-bind", "0.0.0.0:19000", "-admin-bind", "0.0.0.0:19000",
"-grpc-addr", fmt.Sprintf("%s:8502", nodeIP), "-grpc-addr", fmt.Sprintf("%s:8502", nodeIP),
"-http-addr", fmt.Sprintf("%s:8500", nodeIP), "-http-addr", fmt.Sprintf("%s:8500", nodeIP),
@ -141,10 +154,14 @@ func NewConnectService(ctx context.Context, name string, serviceName string, ser
} }
node.RegisterTermination(terminate) node.RegisterTermination(terminate)
fmt.Printf("NewConnectService: name %s, mappedAppPort %d, bind port %d\n",
serviceName, mappedAppPort.Int(), serviceBindPort)
return &ConnectContainer{ return &ConnectContainer{
container: container, container: container,
ip: ip, ip: ip,
appPort: mappedAppPort.Int(), appPort: mappedAppPort.Int(),
adminPort: mappedAdminPort.Int(), adminPort: mappedAdminPort.Int(),
serviceName: name,
}, nil }, nil
} }

View File

@ -6,6 +6,7 @@ import (
"time" "time"
"github.com/docker/go-connections/nat" "github.com/docker/go-connections/nat"
"github.com/hashicorp/consul/api"
"github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait" "github.com/testcontainers/testcontainers-go/wait"
@ -15,12 +16,12 @@ import (
// exampleContainer // exampleContainer
type exampleContainer struct { type exampleContainer struct {
ctx context.Context ctx context.Context
container testcontainers.Container container testcontainers.Container
ip string ip string
httpPort int httpPort int
grpcPort int grpcPort int
req testcontainers.ContainerRequest serviceName string
} }
func (g exampleContainer) GetName() string { func (g exampleContainer) GetName() string {
@ -64,6 +65,28 @@ func (c exampleContainer) Terminate() error {
return err return err
} }
func (g exampleContainer) Export(partition, peerName string, client *api.Client) error {
config := &api.ExportedServicesConfigEntry{
Name: partition,
Services: []api.ExportedService{
{
Name: g.GetServiceName(),
Consumers: []api.ServiceConsumer{
// TODO: need to handle the changed field name in 1.13
{Peer: peerName},
},
},
},
}
_, _, err := client.ConfigEntries().Set(config, &api.WriteOptions{})
return err
}
func (g exampleContainer) GetServiceName() string {
return g.serviceName
}
func NewExampleService(ctx context.Context, name string, httpPort int, grpcPort int, node libnode.Agent) (Service, error) { func NewExampleService(ctx context.Context, name string, httpPort int, grpcPort int, node libnode.Agent) (Service, error) {
namePrefix := fmt.Sprintf("%s-service-example-%s", node.GetDatacenter(), name) namePrefix := fmt.Sprintf("%s-service-example-%s", node.GetDatacenter(), name)
containerName := utils.RandName(namePrefix) containerName := utils.RandName(namePrefix)
@ -115,5 +138,6 @@ func NewExampleService(ctx context.Context, name string, httpPort int, grpcPort
} }
node.RegisterTermination(terminate) node.RegisterTermination(terminate)
return &exampleContainer{container: container, ip: ip, httpPort: mappedHTPPPort.Int(), grpcPort: mappedGRPCPort.Int()}, nil fmt.Printf("Example service exposed http port %d, gRPC port %d\n", mappedHTPPPort.Int(), mappedGRPCPort.Int())
return &exampleContainer{container: container, ip: ip, httpPort: mappedHTPPPort.Int(), grpcPort: mappedGRPCPort.Int(), serviceName: name}, nil
} }

View File

@ -8,17 +8,19 @@ import (
"github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait" "github.com/testcontainers/testcontainers-go/wait"
"github.com/hashicorp/consul/api"
libnode "github.com/hashicorp/consul/test/integration/consul-container/libs/agent" libnode "github.com/hashicorp/consul/test/integration/consul-container/libs/agent"
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils" "github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
) )
// gatewayContainer // gatewayContainer
type gatewayContainer struct { type gatewayContainer struct {
ctx context.Context ctx context.Context
container testcontainers.Container container testcontainers.Container
ip string ip string
port int port int
req testcontainers.ContainerRequest req testcontainers.ContainerRequest
serviceName string
} }
func (g gatewayContainer) GetName() string { func (g gatewayContainer) GetName() string {
@ -62,6 +64,14 @@ func (c gatewayContainer) Terminate() error {
return err return err
} }
func (g gatewayContainer) Export(partition, peer string, client *api.Client) error {
return fmt.Errorf("gatewayContainer export unimplemented")
}
func (g gatewayContainer) GetServiceName() string {
return g.serviceName
}
func NewGatewayService(ctx context.Context, name string, kind string, node libnode.Agent) (Service, error) { func NewGatewayService(ctx context.Context, name string, kind string, node libnode.Agent) (Service, error) {
namePrefix := fmt.Sprintf("%s-service-gateway-%s", node.GetDatacenter(), name) namePrefix := fmt.Sprintf("%s-service-gateway-%s", node.GetDatacenter(), name)
containerName := utils.RandName(namePrefix) containerName := utils.RandName(namePrefix)
@ -130,5 +140,5 @@ func NewGatewayService(ctx context.Context, name string, kind string, node libno
} }
node.RegisterTermination(terminate) node.RegisterTermination(terminate)
return &gatewayContainer{container: container, ip: ip, port: mappedPort.Int()}, nil return &gatewayContainer{container: container, ip: ip, port: mappedPort.Int(), serviceName: name}, nil
} }

View File

@ -1,5 +1,9 @@
package service package service
import (
"github.com/hashicorp/consul/api"
)
// Service represents a process that will be registered with the // Service represents a process that will be registered with the
// Consul catalog, including Consul components such as sidecars and gateways // Consul catalog, including Consul components such as sidecars and gateways
type Service interface { type Service interface {
@ -7,4 +11,7 @@ type Service interface {
GetName() string GetName() string
GetAddr() (string, int) GetAddr() (string, int)
Start() (err error) Start() (err error)
// Export a service to the peering cluster
Export(partition, peer string, client *api.Client) error
GetServiceName() string
} }

View File

@ -54,8 +54,8 @@ func createCluster(t *testing.T) *libcluster.Cluster {
cluster, err := libcluster.New(configs) cluster, err := libcluster.New(configs)
require.NoError(t, err) require.NoError(t, err)
node := cluster.Agents[0] client, err := cluster.GetClient(nil, true)
client := node.GetClient() require.NoError(t, err)
libcluster.WaitForLeader(t, cluster, client) libcluster.WaitForLeader(t, cluster, client)
libcluster.WaitForMembers(t, client, 1) libcluster.WaitForMembers(t, client, 1)

View File

@ -3,7 +3,6 @@ package peering
import ( import (
"context" "context"
"encoding/pem" "encoding/pem"
"sync"
"testing" "testing"
"time" "time"
@ -16,11 +15,7 @@ import (
libcluster "github.com/hashicorp/consul/test/integration/consul-container/libs/cluster" libcluster "github.com/hashicorp/consul/test/integration/consul-container/libs/cluster"
libservice "github.com/hashicorp/consul/test/integration/consul-container/libs/service" libservice "github.com/hashicorp/consul/test/integration/consul-container/libs/service"
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils" "github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
) "github.com/hashicorp/consul/test/integration/consul-container/test/topology"
const (
acceptingPeerName = "accepting-to-dialer"
dialingPeerName = "dialing-to-acceptor"
) )
// TestPeering_RotateServerAndCAThenFail_ // TestPeering_RotateServerAndCAThenFail_
@ -33,10 +28,11 @@ const (
// upstream. // upstream.
// //
// ## Steps // ## Steps
//
// ### Setup
// - Setup the basic peering topology: 2 clusters, exporting service from accepting cluster to dialing cluster
//
// ### Part 1 // ### Part 1
// - Create an accepting cluster with 3 servers. 1 client should be used to host a service for export
// - Create a single agent dialing cluster.
// - Create the peering and export the service. Verify it is working
// - Incrementally replace the follower nodes. // - Incrementally replace the follower nodes.
// - Replace the leader agent // - Replace the leader agent
// - Verify the dialer can reach the new server nodes and the service becomes available. // - Verify the dialer can reach the new server nodes and the service becomes available.
@ -50,41 +46,20 @@ const (
// - Terminate the server nodes in the exporting cluster // - Terminate the server nodes in the exporting cluster
// - Make sure there is still service connectivity from the importing cluster // - Make sure there is still service connectivity from the importing cluster
func TestPeering_RotateServerAndCAThenFail_(t *testing.T) { func TestPeering_RotateServerAndCAThenFail_(t *testing.T) {
var acceptingCluster, dialingCluster *libcluster.Cluster acceptingCluster, dialingCluster, _, staticClientSvcSidecar := topology.BasicPeeringTwoClustersSetup(t, *utils.TargetVersion)
var acceptingClient, dialingClient *api.Client
var acceptingCtx *libagent.BuildContext
var clientSidecarService libservice.Service
var wg sync.WaitGroup
wg.Add(1)
go func() {
acceptingCluster, acceptingClient, acceptingCtx = libcluster.CreatingAcceptingClusterAndSetup(t, 3, *utils.TargetVersion, acceptingPeerName)
wg.Done()
}()
defer func() { defer func() {
terminate(t, acceptingCluster) err := acceptingCluster.Terminate()
require.NoErrorf(t, err, "termining accepting cluster")
dialingCluster.Terminate()
require.NoErrorf(t, err, "termining dialing cluster")
}() }()
wg.Add(1) dialingClient, err := dialingCluster.GetClient(nil, false)
go func() {
dialingCluster, dialingClient, clientSidecarService = libcluster.CreateDialingClusterAndSetup(t, *utils.TargetVersion, dialingPeerName)
wg.Done()
}()
defer func() {
terminate(t, dialingCluster)
}()
wg.Wait()
err := dialingCluster.PeerWithCluster(acceptingClient, acceptingPeerName, dialingPeerName)
require.NoError(t, err) require.NoError(t, err)
_, port := staticClientSvcSidecar.GetAddr()
libassert.PeeringStatus(t, acceptingClient, acceptingPeerName, api.PeeringStateActive) acceptingClient, err := acceptingCluster.GetClient(nil, false)
libassert.PeeringExports(t, acceptingClient, acceptingPeerName, 1) require.NoError(t, err)
_, port := clientSidecarService.GetAddr()
libassert.HTTPServiceEchoes(t, "localhost", port)
t.Run("test rotating servers", func(t *testing.T) { t.Run("test rotating servers", func(t *testing.T) {
@ -98,21 +73,21 @@ func TestPeering_RotateServerAndCAThenFail_(t *testing.T) {
for idx, follower := range followers { for idx, follower := range followers {
t.Log("Removing follower", idx) t.Log("Removing follower", idx)
rotateServer(t, acceptingCluster, acceptingClient, acceptingCtx, follower) rotateServer(t, acceptingCluster, acceptingClient, acceptingCluster.BuildContext, follower)
} }
t.Log("Removing leader") t.Log("Removing leader")
rotateServer(t, acceptingCluster, acceptingClient, acceptingCtx, leader) rotateServer(t, acceptingCluster, acceptingClient, acceptingCluster.BuildContext, leader)
libassert.PeeringStatus(t, acceptingClient, acceptingPeerName, api.PeeringStateActive) libassert.PeeringStatus(t, acceptingClient, topology.AcceptingPeerName, api.PeeringStateActive)
libassert.PeeringExports(t, acceptingClient, acceptingPeerName, 1) libassert.PeeringExports(t, acceptingClient, topology.AcceptingPeerName, 1)
libassert.HTTPServiceEchoes(t, "localhost", port) libassert.HTTPServiceEchoes(t, "localhost", port)
}) })
t.Run("rotate exporting cluster's root CA", func(t *testing.T) { t.Run("rotate exporting cluster's root CA", func(t *testing.T) {
// we will verify that the peering on the dialing side persists the updates CAs // we will verify that the peering on the dialing side persists the updates CAs
peeringBefore, peerMeta, err := dialingClient.Peerings().Read(context.Background(), dialingPeerName, &api.QueryOptions{}) peeringBefore, peerMeta, err := dialingClient.Peerings().Read(context.Background(), topology.DialingPeerName, &api.QueryOptions{})
require.NoError(t, err) require.NoError(t, err)
_, caMeta, err := acceptingClient.Connect().CAGetConfig(&api.QueryOptions{}) _, caMeta, err := acceptingClient.Connect().CAGetConfig(&api.QueryOptions{})
@ -141,7 +116,7 @@ func TestPeering_RotateServerAndCAThenFail_(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// The peering object should reflect the update // The peering object should reflect the update
peeringAfter, _, err := dialingClient.Peerings().Read(context.Background(), dialingPeerName, &api.QueryOptions{ peeringAfter, _, err := dialingClient.Peerings().Read(context.Background(), topology.DialingPeerName, &api.QueryOptions{
WaitIndex: peerMeta.LastIndex, WaitIndex: peerMeta.LastIndex,
WaitTime: 30 * time.Second, WaitTime: 30 * time.Second,
}) })
@ -155,10 +130,10 @@ func TestPeering_RotateServerAndCAThenFail_(t *testing.T) {
require.Len(t, rootList.Roots, 2) require.Len(t, rootList.Roots, 2)
// Connectivity should still be contained // Connectivity should still be contained
_, port := clientSidecarService.GetAddr() _, port := staticClientSvcSidecar.GetAddr()
libassert.HTTPServiceEchoes(t, "localhost", port) libassert.HTTPServiceEchoes(t, "localhost", port)
verifySidecarHasTwoRootCAs(t, clientSidecarService) verifySidecarHasTwoRootCAs(t, staticClientSvcSidecar)
}) })
t.Run("terminate exporting clusters servers and ensure imported services are still reachable", func(t *testing.T) { t.Run("terminate exporting clusters servers and ensure imported services are still reachable", func(t *testing.T) {
@ -178,7 +153,7 @@ func TestPeering_RotateServerAndCAThenFail_(t *testing.T) {
// ensure any transitory actions like replication cleanup would not affect the next verifications // ensure any transitory actions like replication cleanup would not affect the next verifications
time.Sleep(30 * time.Second) time.Sleep(30 * time.Second)
_, port := clientSidecarService.GetAddr() _, port := staticClientSvcSidecar.GetAddr()
libassert.HTTPServiceEchoes(t, "localhost", port) libassert.HTTPServiceEchoes(t, "localhost", port)
}) })
} }
@ -190,7 +165,7 @@ func terminate(t *testing.T, cluster *libcluster.Cluster) {
// rotateServer add a new server agent to the cluster, then forces the prior agent to leave. // rotateServer add a new server agent to the cluster, then forces the prior agent to leave.
func rotateServer(t *testing.T, cluster *libcluster.Cluster, client *api.Client, ctx *libagent.BuildContext, node libagent.Agent) { func rotateServer(t *testing.T, cluster *libcluster.Cluster, client *api.Client, ctx *libagent.BuildContext, node libagent.Agent) {
conf, err := libagent.NewConfigBuilder(ctx). conf, err := libagent.NewConfigBuilder(cluster.BuildContext).
Bootstrap(0). Bootstrap(0).
Peering(true). Peering(true).
RetryJoin("agent-3"). // Always use the client agent since it never leaves the cluster RetryJoin("agent-3"). // Always use the client agent since it never leaves the cluster

View File

@ -0,0 +1,87 @@
package topology
import (
"sync"
"testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/api"
libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert"
libcluster "github.com/hashicorp/consul/test/integration/consul-container/libs/cluster"
libservice "github.com/hashicorp/consul/test/integration/consul-container/libs/service"
)
const (
AcceptingPeerName = "accepting-to-dialer"
DialingPeerName = "dialing-to-acceptor"
)
// BasicPeeringTwoClustersSetup sets up a scenario for testing peering, which consists of
// - an accepting cluster with 3 servers and 1 client agnet. The client should be used to
// host a service for export: staticServerSvc.
// - an dialing cluster with 1 server and 1 client. The client should be used to host a
// service connecting to staticServerSvc.
// - Create the peering, export the service from accepting cluster, and verify service
// connectivity.
//
// It returns objects of the accepting cluster, dialing cluster, staticServerSvc, and staticClientSvcSidecar
func BasicPeeringTwoClustersSetup(t *testing.T, consulVersion string) (*libcluster.Cluster, *libcluster.Cluster, *libservice.Service, *libservice.ConnectContainer) {
var wg sync.WaitGroup
var acceptingCluster, dialingCluster *libcluster.Cluster
var acceptingClient *api.Client
wg.Add(1)
go func() {
opts := &libcluster.Options{
Datacenter: "dc1",
NumServer: 3,
NumClient: 1,
Version: consulVersion,
}
acceptingCluster, acceptingClient = libcluster.CreatingPeeringClusterAndSetup(t, opts)
wg.Done()
}()
wg.Add(1)
go func() {
opts := &libcluster.Options{
Datacenter: "dc2",
NumServer: 1,
NumClient: 1,
Version: consulVersion,
}
dialingCluster, _ = libcluster.CreatingPeeringClusterAndSetup(t, opts)
wg.Done()
}()
wg.Wait()
err := dialingCluster.PeerWithCluster(acceptingClient, AcceptingPeerName, DialingPeerName)
require.NoError(t, err)
libassert.PeeringStatus(t, acceptingClient, AcceptingPeerName, api.PeeringStateActive)
// Register an static-server service in acceptingCluster and export to dialing cluster
clientNodes, err := acceptingCluster.Clients()
require.NoError(t, err)
require.True(t, len(clientNodes) > 0)
staticServerSvc, _, err := libservice.CreateAndRegisterStaticServerAndSidecar(clientNodes[0])
require.NoError(t, err)
libassert.CatalogServiceExists(t, acceptingClient, "static-server")
libassert.CatalogServiceExists(t, acceptingClient, "static-server-sidecar-proxy")
staticServerSvc.Export("default", AcceptingPeerName, acceptingClient)
libassert.PeeringExports(t, acceptingClient, AcceptingPeerName, 1)
// Register an static-client service in dialing cluster and set upstream to static-server service
clientNodesDialing, err := dialingCluster.Clients()
require.NoError(t, err)
require.True(t, len(clientNodesDialing) > 0)
staticClientSvcSidecar, err := libservice.CreateAndRegisterStaticClientSidecar(clientNodesDialing[0], DialingPeerName, true)
require.NoError(t, err)
_, port := staticClientSvcSidecar.GetAddr()
libassert.HTTPServiceEchoes(t, "localhost", port)
return acceptingCluster, dialingCluster, &staticServerSvc, staticClientSvcSidecar
}

View File

@ -3,7 +3,6 @@ package upgrade
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"testing" "testing"
"time" "time"
@ -11,13 +10,8 @@ import (
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert" libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert"
libcluster "github.com/hashicorp/consul/test/integration/consul-container/libs/cluster"
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils" "github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
) "github.com/hashicorp/consul/test/integration/consul-container/test/topology"
const (
acceptingPeerName = "accepting-to-dialer"
dialingPeerName = "dialing-to-acceptor"
) )
// TestPeering_UpgradeToTarget_fromLatest checks peering status after dialing cluster // TestPeering_UpgradeToTarget_fromLatest checks peering status after dialing cluster
@ -41,46 +35,30 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) {
} }
run := func(t *testing.T, tc testcase) { run := func(t *testing.T, tc testcase) {
var acceptingCluster, dialingCluster *libcluster.Cluster acceptingCluster, dialingCluster, _, staticClientSvcSidecar := topology.BasicPeeringTwoClustersSetup(t, tc.oldversion)
var acceptingClient, dialingClient *api.Client // move to teardown
var wg sync.WaitGroup
wg.Add(1)
go func() {
acceptingCluster, acceptingClient, _ = libcluster.CreatingAcceptingClusterAndSetup(t, 3, tc.oldversion, acceptingPeerName)
wg.Done()
}()
defer func() { defer func() {
terminate(t, acceptingCluster) err := acceptingCluster.Terminate()
require.NoErrorf(t, err, "termining accepting cluster")
dialingCluster.Terminate()
require.NoErrorf(t, err, "termining dialing cluster")
}() }()
wg.Add(1) dialingClient, err := dialingCluster.GetClient(nil, false)
go func() {
dialingCluster, dialingClient, _ = libcluster.CreateDialingClusterAndSetup(t, tc.oldversion, dialingPeerName)
wg.Done()
}()
defer func() {
terminate(t, dialingCluster)
}()
wg.Wait()
err := dialingCluster.PeerWithCluster(acceptingClient, acceptingPeerName, dialingPeerName)
require.NoError(t, err) require.NoError(t, err)
_, port := staticClientSvcSidecar.GetAddr()
libassert.PeeringStatus(t, acceptingClient, acceptingPeerName, api.PeeringStateActive)
libassert.PeeringExports(t, acceptingClient, acceptingPeerName, 1)
// Upgrade the dialingCluster cluster and assert peering is still ACTIVE // Upgrade the dialingCluster cluster and assert peering is still ACTIVE
err = dialingCluster.StandardUpgrade(t, context.Background(), tc.targetVersion) err = dialingCluster.StandardUpgrade(t, context.Background(), tc.targetVersion)
require.NoError(t, err) require.NoError(t, err)
libassert.PeeringStatus(t, dialingClient, dialingPeerName, api.PeeringStateActive) libassert.PeeringStatus(t, dialingClient, topology.DialingPeerName, api.PeeringStateActive)
libassert.HTTPServiceEchoes(t, "localhost", port)
// Upgrade the accepting cluster and assert peering is still ACTIVE // Upgrade the accepting cluster and assert peering is still ACTIVE
err = acceptingCluster.StandardUpgrade(t, context.Background(), tc.targetVersion) err = acceptingCluster.StandardUpgrade(t, context.Background(), tc.targetVersion)
require.NoError(t, err) require.NoError(t, err)
libassert.PeeringStatus(t, acceptingClient, acceptingPeerName, api.PeeringStateActive) libassert.PeeringStatus(t, dialingClient, topology.DialingPeerName, api.PeeringStateActive)
} }
for _, tc := range tcs { for _, tc := range tcs {