diff --git a/test/integration/consul-container/libs/assert/service.go b/test/integration/consul-container/libs/assert/service.go index 266dbdd71..ae2c50b8c 100644 --- a/test/integration/consul-container/libs/assert/service.go +++ b/test/integration/consul-container/libs/assert/service.go @@ -13,7 +13,7 @@ import ( ) const ( - defaultHTTPTimeout = 30 * time.Second + defaultHTTPTimeout = 100 * time.Second defaultHTTPWait = defaultWait ) diff --git a/test/integration/consul-container/libs/cluster/cluster.go b/test/integration/consul-container/libs/cluster/cluster.go index 44274ff21..4588cd5ad 100644 --- a/test/integration/consul-container/libs/cluster/cluster.go +++ b/test/integration/consul-container/libs/cluster/cluster.go @@ -25,13 +25,14 @@ import ( // These fields are public in the event someone might want to surgically // craft a test case. type Cluster struct { - Agents []libagent.Agent - CACert string - CAKey string - ID string - Index int - Network testcontainers.Network - NetworkName string + Agents []libagent.Agent + BuildContext *libagent.BuildContext + CACert string + CAKey string + ID string + Index int + Network testcontainers.Network + NetworkName string } // New creates a Consul cluster. An agent will be started for each of the given @@ -238,6 +239,31 @@ func (c *Cluster) Leader() (libagent.Agent, error) { return nil, fmt.Errorf("leader not found") } +// GetClient returns a consul API client to the node if node is provided. +// Otherwise, GetClient returns the API client to the first node of either +// server or client agent. +func (c *Cluster) GetClient(node libagent.Agent, isServer bool) (*api.Client, error) { + var err error + if node != nil { + return node.GetClient(), err + } + + nodes, err := c.Clients() + if isServer { + nodes, err = c.Servers() + } + + if err != nil { + return nil, fmt.Errorf("unable to get the api client: %s", err) + } + + if len(nodes) <= 0 { + return nil, fmt.Errorf("not enough node: %d", len(nodes)) + } + + return nodes[0].GetClient(), err +} + func getLeader(client *api.Client) (string, error) { leaderAdd, err := client.Status().Leader() if err != nil { diff --git a/test/integration/consul-container/libs/cluster/helpers.go b/test/integration/consul-container/libs/cluster/helpers.go index 31eb76a44..f555a676a 100644 --- a/test/integration/consul-container/libs/cluster/helpers.go +++ b/test/integration/consul-container/libs/cluster/helpers.go @@ -8,27 +8,34 @@ import ( "github.com/stretchr/testify/require" "github.com/hashicorp/consul/api" - // "github.com/hashicorp/consul/sdk/testutil/retry" libagent "github.com/hashicorp/consul/test/integration/consul-container/libs/agent" - libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert" libservice "github.com/hashicorp/consul/test/integration/consul-container/libs/service" "github.com/hashicorp/consul/test/integration/consul-container/libs/utils" ) -// creatingAcceptingClusterAndSetup creates a cluster with 3 servers and 1 client. -// It also creates and registers a service+sidecar. +type Options struct { + Datacenter string + NumServer int + NumClient int + Version string +} + +// CreatingPeeringClusterAndSetup creates a cluster with peering enabled +// It also creates and registers a mesh-gateway at the client agent. // The API client returned is pointed at the client agent. -func CreatingAcceptingClusterAndSetup(t *testing.T, numServer int, version string, acceptingPeerName string) (*Cluster, *api.Client, *libagent.BuildContext) { +func CreatingPeeringClusterAndSetup(t *testing.T, clusterOpts *Options) (*Cluster, *api.Client) { var configs []libagent.Config opts := libagent.BuildOptions{ + Datacenter: clusterOpts.Datacenter, InjectAutoEncryption: true, InjectGossipEncryption: true, - ConsulVersion: version, + ConsulVersion: clusterOpts.Version, } ctx, err := libagent.NewBuildContext(opts) require.NoError(t, err) + numServer := clusterOpts.NumServer for i := 0; i < numServer; i++ { serverConf, err := libagent.NewConfigBuilder(ctx). Bootstrap(numServer). @@ -36,7 +43,7 @@ func CreatingAcceptingClusterAndSetup(t *testing.T, numServer int, version strin RetryJoin(fmt.Sprintf("agent-%d", (i+1)%3)). // Round-robin join the servers ToAgentConfig() require.NoError(t, err) - t.Logf("dc1 server config %d: \n%s", i, serverConf.JSON) + t.Logf("%s server config %d: \n%s", clusterOpts.Datacenter, i, serverConf.JSON) configs = append(configs, *serverConf) } @@ -49,16 +56,16 @@ func CreatingAcceptingClusterAndSetup(t *testing.T, numServer int, version strin ToAgentConfig() require.NoError(t, err) - t.Logf("dc1 client config: \n%s", clientConf.JSON) + t.Logf("%s client config: \n%s", clusterOpts.Datacenter, clientConf.JSON) configs = append(configs, *clientConf) cluster, err := New(configs) require.NoError(t, err) + cluster.BuildContext = ctx - // Use the client agent as the HTTP endpoint since we will not rotate it - clientNode := cluster.Agents[numServer] - client := clientNode.GetClient() + client, err := cluster.GetClient(nil, false) + require.NoError(t, err) WaitForLeader(t, cluster, client) WaitForMembers(t, client, numServer+1) @@ -68,78 +75,8 @@ func CreatingAcceptingClusterAndSetup(t *testing.T, numServer int, version strin require.True(t, ok) // Create the mesh gateway for dataplane traffic - _, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", clientNode) + clientNodes, _ := cluster.Clients() + _, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", clientNodes[0]) require.NoError(t, err) - - // Create a service and proxy instance - _, _, err = libservice.CreateAndRegisterStaticServerAndSidecar(clientNode) - require.NoError(t, err) - - libassert.CatalogServiceExists(t, client, "static-server") - libassert.CatalogServiceExists(t, client, "static-server-sidecar-proxy") - - // Export the service - config := &api.ExportedServicesConfigEntry{ - Name: "default", - Services: []api.ExportedService{ - { - Name: "static-server", - Consumers: []api.ServiceConsumer{ - // TODO: need to handle the changed field name in 1.13 - {Peer: acceptingPeerName}, - }, - }, - }, - } - - ok, _, err = client.ConfigEntries().Set(config, &api.WriteOptions{}) - require.NoError(t, err) - require.True(t, ok) - - return cluster, client, ctx -} - -// createDialingClusterAndSetup creates a cluster for peering with a single dev agent -func CreateDialingClusterAndSetup(t *testing.T, version string, dialingPeerName string) (*Cluster, *api.Client, libservice.Service) { - opts := libagent.BuildOptions{ - Datacenter: "dc2", - InjectAutoEncryption: true, - InjectGossipEncryption: true, - ConsulVersion: version, - } - ctx, err := libagent.NewBuildContext(opts) - require.NoError(t, err) - - conf, err := libagent.NewConfigBuilder(ctx). - Peering(true). - ToAgentConfig() - require.NoError(t, err) - t.Logf("dc2 server config: \n%s", conf.JSON) - - configs := []libagent.Config{*conf} - - cluster, err := New(configs) - require.NoError(t, err) - - node := cluster.Agents[0] - client := node.GetClient() - WaitForLeader(t, cluster, client) - WaitForMembers(t, client, 1) - - // Default Proxy Settings - ok, err := utils.ApplyDefaultProxySettings(client) - require.NoError(t, err) - require.True(t, ok) - - // Create the mesh gateway for dataplane traffic - _, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", node) - require.NoError(t, err) - - // Create a service and proxy instance - clientProxyService, err := libservice.CreateAndRegisterStaticClientSidecar(node, dialingPeerName, true) - require.NoError(t, err) - - libassert.CatalogServiceExists(t, client, "static-client-sidecar-proxy") - - return cluster, client, clientProxyService + return cluster, client } diff --git a/test/integration/consul-container/libs/service/connect.go b/test/integration/consul-container/libs/service/connect.go index b8a446139..94607d45c 100644 --- a/test/integration/consul-container/libs/service/connect.go +++ b/test/integration/consul-container/libs/service/connect.go @@ -6,6 +6,7 @@ import ( "time" "github.com/docker/go-connections/nat" + "github.com/hashicorp/consul/api" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" @@ -15,12 +16,12 @@ import ( // ConnectContainer type ConnectContainer struct { - ctx context.Context - container testcontainers.Container - ip string - appPort int - adminPort int - req testcontainers.ContainerRequest + ctx context.Context + container testcontainers.Container + ip string + appPort int + adminPort int + serviceName string } func (g ConnectContainer) GetName() string { @@ -68,6 +69,19 @@ func (c ConnectContainer) Terminate() error { return err } +func (g ConnectContainer) Export(partition, peer string, client *api.Client) error { + return fmt.Errorf("ConnectContainer export unimplemented") +} + +func (g ConnectContainer) GetServiceName() string { + return g.serviceName +} + +// NewConnectService returns a container that runs envoy sidecar, launched by +// "consul connect envoy", for service name (serviceName) on the specified +// node. The container exposes port serviceBindPort and envoy admin port (19000) +// by mapping them onto host ports. The container's name has a prefix +// combining datacenter and name. func NewConnectService(ctx context.Context, name string, serviceName string, serviceBindPort int, node libnode.Agent) (*ConnectContainer, error) { namePrefix := fmt.Sprintf("%s-service-connect-%s", node.GetDatacenter(), name) containerName := utils.RandName(namePrefix) @@ -93,7 +107,6 @@ func NewConnectService(ctx context.Context, name string, serviceName string, ser Cmd: []string{ "consul", "connect", "envoy", "-sidecar-for", serviceName, - "-service", name, "-admin-bind", "0.0.0.0:19000", "-grpc-addr", fmt.Sprintf("%s:8502", nodeIP), "-http-addr", fmt.Sprintf("%s:8500", nodeIP), @@ -141,10 +154,14 @@ func NewConnectService(ctx context.Context, name string, serviceName string, ser } node.RegisterTermination(terminate) + fmt.Printf("NewConnectService: name %s, mappedAppPort %d, bind port %d\n", + serviceName, mappedAppPort.Int(), serviceBindPort) + return &ConnectContainer{ - container: container, - ip: ip, - appPort: mappedAppPort.Int(), - adminPort: mappedAdminPort.Int(), + container: container, + ip: ip, + appPort: mappedAppPort.Int(), + adminPort: mappedAdminPort.Int(), + serviceName: name, }, nil } diff --git a/test/integration/consul-container/libs/service/examples.go b/test/integration/consul-container/libs/service/examples.go index 743175d7b..85cb0a888 100644 --- a/test/integration/consul-container/libs/service/examples.go +++ b/test/integration/consul-container/libs/service/examples.go @@ -6,6 +6,7 @@ import ( "time" "github.com/docker/go-connections/nat" + "github.com/hashicorp/consul/api" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" @@ -15,12 +16,12 @@ import ( // exampleContainer type exampleContainer struct { - ctx context.Context - container testcontainers.Container - ip string - httpPort int - grpcPort int - req testcontainers.ContainerRequest + ctx context.Context + container testcontainers.Container + ip string + httpPort int + grpcPort int + serviceName string } func (g exampleContainer) GetName() string { @@ -64,6 +65,28 @@ func (c exampleContainer) Terminate() error { return err } +func (g exampleContainer) Export(partition, peerName string, client *api.Client) error { + config := &api.ExportedServicesConfigEntry{ + Name: partition, + Services: []api.ExportedService{ + { + Name: g.GetServiceName(), + Consumers: []api.ServiceConsumer{ + // TODO: need to handle the changed field name in 1.13 + {Peer: peerName}, + }, + }, + }, + } + + _, _, err := client.ConfigEntries().Set(config, &api.WriteOptions{}) + return err +} + +func (g exampleContainer) GetServiceName() string { + return g.serviceName +} + func NewExampleService(ctx context.Context, name string, httpPort int, grpcPort int, node libnode.Agent) (Service, error) { namePrefix := fmt.Sprintf("%s-service-example-%s", node.GetDatacenter(), name) containerName := utils.RandName(namePrefix) @@ -115,5 +138,6 @@ func NewExampleService(ctx context.Context, name string, httpPort int, grpcPort } node.RegisterTermination(terminate) - return &exampleContainer{container: container, ip: ip, httpPort: mappedHTPPPort.Int(), grpcPort: mappedGRPCPort.Int()}, nil + fmt.Printf("Example service exposed http port %d, gRPC port %d\n", mappedHTPPPort.Int(), mappedGRPCPort.Int()) + return &exampleContainer{container: container, ip: ip, httpPort: mappedHTPPPort.Int(), grpcPort: mappedGRPCPort.Int(), serviceName: name}, nil } diff --git a/test/integration/consul-container/libs/service/gateway.go b/test/integration/consul-container/libs/service/gateway.go index b518faf93..232619e7b 100644 --- a/test/integration/consul-container/libs/service/gateway.go +++ b/test/integration/consul-container/libs/service/gateway.go @@ -8,17 +8,19 @@ import ( "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" + "github.com/hashicorp/consul/api" libnode "github.com/hashicorp/consul/test/integration/consul-container/libs/agent" "github.com/hashicorp/consul/test/integration/consul-container/libs/utils" ) // gatewayContainer type gatewayContainer struct { - ctx context.Context - container testcontainers.Container - ip string - port int - req testcontainers.ContainerRequest + ctx context.Context + container testcontainers.Container + ip string + port int + req testcontainers.ContainerRequest + serviceName string } func (g gatewayContainer) GetName() string { @@ -62,6 +64,14 @@ func (c gatewayContainer) Terminate() error { return err } +func (g gatewayContainer) Export(partition, peer string, client *api.Client) error { + return fmt.Errorf("gatewayContainer export unimplemented") +} + +func (g gatewayContainer) GetServiceName() string { + return g.serviceName +} + func NewGatewayService(ctx context.Context, name string, kind string, node libnode.Agent) (Service, error) { namePrefix := fmt.Sprintf("%s-service-gateway-%s", node.GetDatacenter(), name) containerName := utils.RandName(namePrefix) @@ -130,5 +140,5 @@ func NewGatewayService(ctx context.Context, name string, kind string, node libno } node.RegisterTermination(terminate) - return &gatewayContainer{container: container, ip: ip, port: mappedPort.Int()}, nil + return &gatewayContainer{container: container, ip: ip, port: mappedPort.Int(), serviceName: name}, nil } diff --git a/test/integration/consul-container/libs/service/service.go b/test/integration/consul-container/libs/service/service.go index 3ecdf8768..94dfb6c95 100644 --- a/test/integration/consul-container/libs/service/service.go +++ b/test/integration/consul-container/libs/service/service.go @@ -1,5 +1,9 @@ package service +import ( + "github.com/hashicorp/consul/api" +) + // Service represents a process that will be registered with the // Consul catalog, including Consul components such as sidecars and gateways type Service interface { @@ -7,4 +11,7 @@ type Service interface { GetName() string GetAddr() (string, int) Start() (err error) + // Export a service to the peering cluster + Export(partition, peer string, client *api.Client) error + GetServiceName() string } diff --git a/test/integration/consul-container/test/basic/connect_service_test.go b/test/integration/consul-container/test/basic/connect_service_test.go index 2ee6bc7af..aab4f1216 100644 --- a/test/integration/consul-container/test/basic/connect_service_test.go +++ b/test/integration/consul-container/test/basic/connect_service_test.go @@ -54,8 +54,8 @@ func createCluster(t *testing.T) *libcluster.Cluster { cluster, err := libcluster.New(configs) require.NoError(t, err) - node := cluster.Agents[0] - client := node.GetClient() + client, err := cluster.GetClient(nil, true) + require.NoError(t, err) libcluster.WaitForLeader(t, cluster, client) libcluster.WaitForMembers(t, client, 1) diff --git a/test/integration/consul-container/test/peering/rotate_server_and_ca_then_fail_test.go b/test/integration/consul-container/test/peering/rotate_server_and_ca_then_fail_test.go index 0ccc0abe2..7afbdc432 100644 --- a/test/integration/consul-container/test/peering/rotate_server_and_ca_then_fail_test.go +++ b/test/integration/consul-container/test/peering/rotate_server_and_ca_then_fail_test.go @@ -3,7 +3,6 @@ package peering import ( "context" "encoding/pem" - "sync" "testing" "time" @@ -16,11 +15,7 @@ import ( libcluster "github.com/hashicorp/consul/test/integration/consul-container/libs/cluster" libservice "github.com/hashicorp/consul/test/integration/consul-container/libs/service" "github.com/hashicorp/consul/test/integration/consul-container/libs/utils" -) - -const ( - acceptingPeerName = "accepting-to-dialer" - dialingPeerName = "dialing-to-acceptor" + "github.com/hashicorp/consul/test/integration/consul-container/test/topology" ) // TestPeering_RotateServerAndCAThenFail_ @@ -33,10 +28,11 @@ const ( // upstream. // // ## Steps +// +// ### Setup +// - Setup the basic peering topology: 2 clusters, exporting service from accepting cluster to dialing cluster +// // ### Part 1 -// - Create an accepting cluster with 3 servers. 1 client should be used to host a service for export -// - Create a single agent dialing cluster. -// - Create the peering and export the service. Verify it is working // - Incrementally replace the follower nodes. // - Replace the leader agent // - Verify the dialer can reach the new server nodes and the service becomes available. @@ -50,41 +46,20 @@ const ( // - Terminate the server nodes in the exporting cluster // - Make sure there is still service connectivity from the importing cluster func TestPeering_RotateServerAndCAThenFail_(t *testing.T) { - var acceptingCluster, dialingCluster *libcluster.Cluster - var acceptingClient, dialingClient *api.Client - var acceptingCtx *libagent.BuildContext - var clientSidecarService libservice.Service - - var wg sync.WaitGroup - - wg.Add(1) - go func() { - acceptingCluster, acceptingClient, acceptingCtx = libcluster.CreatingAcceptingClusterAndSetup(t, 3, *utils.TargetVersion, acceptingPeerName) - wg.Done() - }() + acceptingCluster, dialingCluster, _, staticClientSvcSidecar := topology.BasicPeeringTwoClustersSetup(t, *utils.TargetVersion) defer func() { - terminate(t, acceptingCluster) + err := acceptingCluster.Terminate() + require.NoErrorf(t, err, "termining accepting cluster") + dialingCluster.Terminate() + require.NoErrorf(t, err, "termining dialing cluster") }() - wg.Add(1) - go func() { - dialingCluster, dialingClient, clientSidecarService = libcluster.CreateDialingClusterAndSetup(t, *utils.TargetVersion, dialingPeerName) - wg.Done() - }() - defer func() { - terminate(t, dialingCluster) - }() - - wg.Wait() - - err := dialingCluster.PeerWithCluster(acceptingClient, acceptingPeerName, dialingPeerName) + dialingClient, err := dialingCluster.GetClient(nil, false) require.NoError(t, err) + _, port := staticClientSvcSidecar.GetAddr() - libassert.PeeringStatus(t, acceptingClient, acceptingPeerName, api.PeeringStateActive) - libassert.PeeringExports(t, acceptingClient, acceptingPeerName, 1) - - _, port := clientSidecarService.GetAddr() - libassert.HTTPServiceEchoes(t, "localhost", port) + acceptingClient, err := acceptingCluster.GetClient(nil, false) + require.NoError(t, err) t.Run("test rotating servers", func(t *testing.T) { @@ -98,21 +73,21 @@ func TestPeering_RotateServerAndCAThenFail_(t *testing.T) { for idx, follower := range followers { t.Log("Removing follower", idx) - rotateServer(t, acceptingCluster, acceptingClient, acceptingCtx, follower) + rotateServer(t, acceptingCluster, acceptingClient, acceptingCluster.BuildContext, follower) } t.Log("Removing leader") - rotateServer(t, acceptingCluster, acceptingClient, acceptingCtx, leader) + rotateServer(t, acceptingCluster, acceptingClient, acceptingCluster.BuildContext, leader) - libassert.PeeringStatus(t, acceptingClient, acceptingPeerName, api.PeeringStateActive) - libassert.PeeringExports(t, acceptingClient, acceptingPeerName, 1) + libassert.PeeringStatus(t, acceptingClient, topology.AcceptingPeerName, api.PeeringStateActive) + libassert.PeeringExports(t, acceptingClient, topology.AcceptingPeerName, 1) libassert.HTTPServiceEchoes(t, "localhost", port) }) t.Run("rotate exporting cluster's root CA", func(t *testing.T) { // we will verify that the peering on the dialing side persists the updates CAs - peeringBefore, peerMeta, err := dialingClient.Peerings().Read(context.Background(), dialingPeerName, &api.QueryOptions{}) + peeringBefore, peerMeta, err := dialingClient.Peerings().Read(context.Background(), topology.DialingPeerName, &api.QueryOptions{}) require.NoError(t, err) _, caMeta, err := acceptingClient.Connect().CAGetConfig(&api.QueryOptions{}) @@ -141,7 +116,7 @@ func TestPeering_RotateServerAndCAThenFail_(t *testing.T) { require.NoError(t, err) // The peering object should reflect the update - peeringAfter, _, err := dialingClient.Peerings().Read(context.Background(), dialingPeerName, &api.QueryOptions{ + peeringAfter, _, err := dialingClient.Peerings().Read(context.Background(), topology.DialingPeerName, &api.QueryOptions{ WaitIndex: peerMeta.LastIndex, WaitTime: 30 * time.Second, }) @@ -155,10 +130,10 @@ func TestPeering_RotateServerAndCAThenFail_(t *testing.T) { require.Len(t, rootList.Roots, 2) // Connectivity should still be contained - _, port := clientSidecarService.GetAddr() + _, port := staticClientSvcSidecar.GetAddr() libassert.HTTPServiceEchoes(t, "localhost", port) - verifySidecarHasTwoRootCAs(t, clientSidecarService) + verifySidecarHasTwoRootCAs(t, staticClientSvcSidecar) }) t.Run("terminate exporting clusters servers and ensure imported services are still reachable", func(t *testing.T) { @@ -178,7 +153,7 @@ func TestPeering_RotateServerAndCAThenFail_(t *testing.T) { // ensure any transitory actions like replication cleanup would not affect the next verifications time.Sleep(30 * time.Second) - _, port := clientSidecarService.GetAddr() + _, port := staticClientSvcSidecar.GetAddr() libassert.HTTPServiceEchoes(t, "localhost", port) }) } @@ -190,7 +165,7 @@ func terminate(t *testing.T, cluster *libcluster.Cluster) { // rotateServer add a new server agent to the cluster, then forces the prior agent to leave. func rotateServer(t *testing.T, cluster *libcluster.Cluster, client *api.Client, ctx *libagent.BuildContext, node libagent.Agent) { - conf, err := libagent.NewConfigBuilder(ctx). + conf, err := libagent.NewConfigBuilder(cluster.BuildContext). Bootstrap(0). Peering(true). RetryJoin("agent-3"). // Always use the client agent since it never leaves the cluster diff --git a/test/integration/consul-container/test/topology/peering_topology.go b/test/integration/consul-container/test/topology/peering_topology.go new file mode 100644 index 000000000..31a53ad91 --- /dev/null +++ b/test/integration/consul-container/test/topology/peering_topology.go @@ -0,0 +1,87 @@ +package topology + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/api" + libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert" + libcluster "github.com/hashicorp/consul/test/integration/consul-container/libs/cluster" + libservice "github.com/hashicorp/consul/test/integration/consul-container/libs/service" +) + +const ( + AcceptingPeerName = "accepting-to-dialer" + DialingPeerName = "dialing-to-acceptor" +) + +// BasicPeeringTwoClustersSetup sets up a scenario for testing peering, which consists of +// - an accepting cluster with 3 servers and 1 client agnet. The client should be used to +// host a service for export: staticServerSvc. +// - an dialing cluster with 1 server and 1 client. The client should be used to host a +// service connecting to staticServerSvc. +// - Create the peering, export the service from accepting cluster, and verify service +// connectivity. +// +// It returns objects of the accepting cluster, dialing cluster, staticServerSvc, and staticClientSvcSidecar +func BasicPeeringTwoClustersSetup(t *testing.T, consulVersion string) (*libcluster.Cluster, *libcluster.Cluster, *libservice.Service, *libservice.ConnectContainer) { + var wg sync.WaitGroup + var acceptingCluster, dialingCluster *libcluster.Cluster + var acceptingClient *api.Client + + wg.Add(1) + go func() { + opts := &libcluster.Options{ + Datacenter: "dc1", + NumServer: 3, + NumClient: 1, + Version: consulVersion, + } + acceptingCluster, acceptingClient = libcluster.CreatingPeeringClusterAndSetup(t, opts) + wg.Done() + }() + + wg.Add(1) + go func() { + opts := &libcluster.Options{ + Datacenter: "dc2", + NumServer: 1, + NumClient: 1, + Version: consulVersion, + } + dialingCluster, _ = libcluster.CreatingPeeringClusterAndSetup(t, opts) + wg.Done() + }() + wg.Wait() + + err := dialingCluster.PeerWithCluster(acceptingClient, AcceptingPeerName, DialingPeerName) + require.NoError(t, err) + + libassert.PeeringStatus(t, acceptingClient, AcceptingPeerName, api.PeeringStateActive) + + // Register an static-server service in acceptingCluster and export to dialing cluster + clientNodes, err := acceptingCluster.Clients() + require.NoError(t, err) + require.True(t, len(clientNodes) > 0) + staticServerSvc, _, err := libservice.CreateAndRegisterStaticServerAndSidecar(clientNodes[0]) + require.NoError(t, err) + libassert.CatalogServiceExists(t, acceptingClient, "static-server") + libassert.CatalogServiceExists(t, acceptingClient, "static-server-sidecar-proxy") + + staticServerSvc.Export("default", AcceptingPeerName, acceptingClient) + libassert.PeeringExports(t, acceptingClient, AcceptingPeerName, 1) + + // Register an static-client service in dialing cluster and set upstream to static-server service + clientNodesDialing, err := dialingCluster.Clients() + require.NoError(t, err) + require.True(t, len(clientNodesDialing) > 0) + staticClientSvcSidecar, err := libservice.CreateAndRegisterStaticClientSidecar(clientNodesDialing[0], DialingPeerName, true) + require.NoError(t, err) + + _, port := staticClientSvcSidecar.GetAddr() + libassert.HTTPServiceEchoes(t, "localhost", port) + + return acceptingCluster, dialingCluster, &staticServerSvc, staticClientSvcSidecar +} diff --git a/test/integration/consul-container/test/upgrade/peers_http_test.go b/test/integration/consul-container/test/upgrade/peers_http_test.go index 0eabbf81b..944521260 100644 --- a/test/integration/consul-container/test/upgrade/peers_http_test.go +++ b/test/integration/consul-container/test/upgrade/peers_http_test.go @@ -3,7 +3,6 @@ package upgrade import ( "context" "fmt" - "sync" "testing" "time" @@ -11,13 +10,8 @@ import ( "github.com/hashicorp/consul/api" libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert" - libcluster "github.com/hashicorp/consul/test/integration/consul-container/libs/cluster" "github.com/hashicorp/consul/test/integration/consul-container/libs/utils" -) - -const ( - acceptingPeerName = "accepting-to-dialer" - dialingPeerName = "dialing-to-acceptor" + "github.com/hashicorp/consul/test/integration/consul-container/test/topology" ) // TestPeering_UpgradeToTarget_fromLatest checks peering status after dialing cluster @@ -41,46 +35,30 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) { } run := func(t *testing.T, tc testcase) { - var acceptingCluster, dialingCluster *libcluster.Cluster - var acceptingClient, dialingClient *api.Client - - var wg sync.WaitGroup - - wg.Add(1) - go func() { - acceptingCluster, acceptingClient, _ = libcluster.CreatingAcceptingClusterAndSetup(t, 3, tc.oldversion, acceptingPeerName) - wg.Done() - }() + acceptingCluster, dialingCluster, _, staticClientSvcSidecar := topology.BasicPeeringTwoClustersSetup(t, tc.oldversion) + // move to teardown defer func() { - terminate(t, acceptingCluster) + err := acceptingCluster.Terminate() + require.NoErrorf(t, err, "termining accepting cluster") + dialingCluster.Terminate() + require.NoErrorf(t, err, "termining dialing cluster") }() - wg.Add(1) - go func() { - dialingCluster, dialingClient, _ = libcluster.CreateDialingClusterAndSetup(t, tc.oldversion, dialingPeerName) - wg.Done() - }() - defer func() { - terminate(t, dialingCluster) - }() - wg.Wait() - - err := dialingCluster.PeerWithCluster(acceptingClient, acceptingPeerName, dialingPeerName) + dialingClient, err := dialingCluster.GetClient(nil, false) require.NoError(t, err) - - libassert.PeeringStatus(t, acceptingClient, acceptingPeerName, api.PeeringStateActive) - libassert.PeeringExports(t, acceptingClient, acceptingPeerName, 1) + _, port := staticClientSvcSidecar.GetAddr() // Upgrade the dialingCluster cluster and assert peering is still ACTIVE err = dialingCluster.StandardUpgrade(t, context.Background(), tc.targetVersion) require.NoError(t, err) - libassert.PeeringStatus(t, dialingClient, dialingPeerName, api.PeeringStateActive) + libassert.PeeringStatus(t, dialingClient, topology.DialingPeerName, api.PeeringStateActive) + libassert.HTTPServiceEchoes(t, "localhost", port) // Upgrade the accepting cluster and assert peering is still ACTIVE err = acceptingCluster.StandardUpgrade(t, context.Background(), tc.targetVersion) require.NoError(t, err) - libassert.PeeringStatus(t, acceptingClient, acceptingPeerName, api.PeeringStateActive) + libassert.PeeringStatus(t, dialingClient, topology.DialingPeerName, api.PeeringStateActive) } for _, tc := range tcs {