diff --git a/test/integration/consul-container/libs/assert/envoy.go b/test/integration/consul-container/libs/assert/envoy.go index 1a562542f..7f1e59356 100644 --- a/test/integration/consul-container/libs/assert/envoy.go +++ b/test/integration/consul-container/libs/assert/envoy.go @@ -106,7 +106,6 @@ func processMetrics(metrics []string, prefix, metric string, condition func(v in strings.Contains(line, metric) { metric := strings.Split(line, ":") - fmt.Println(metric[1]) v, err := strconv.Atoi(strings.TrimSpace(metric[1])) if err != nil { diff --git a/test/integration/consul-container/libs/cluster/agent.go b/test/integration/consul-container/libs/cluster/agent.go index 68b10bbaa..4de8f9a51 100644 --- a/test/integration/consul-container/libs/cluster/agent.go +++ b/test/integration/consul-container/libs/cluster/agent.go @@ -23,7 +23,7 @@ type Agent interface { IsServer() bool RegisterTermination(func() error) Terminate() error - TerminateAndRetainPod() error + TerminateAndRetainPod(bool) error Upgrade(ctx context.Context, config Config) error Exec(ctx context.Context, cmd []string) (int, error) DataDir() string diff --git a/test/integration/consul-container/libs/cluster/cluster.go b/test/integration/consul-container/libs/cluster/cluster.go index 7eb1b81dd..4db66c020 100644 --- a/test/integration/consul-container/libs/cluster/cluster.go +++ b/test/integration/consul-container/libs/cluster/cluster.go @@ -250,7 +250,7 @@ func (c *Cluster) StandardUpgrade(t *testing.T, ctx context.Context, targetVersi } t.Logf("The number of followers = %d", len(followers)) - upgradeFn := func(agent Agent, clientFactory func() *api.Client) error { + upgradeFn := func(agent Agent, clientFactory func() (*api.Client, error)) error { config := agent.GetConfig() config.Version = targetVersion @@ -279,10 +279,14 @@ func (c *Cluster) StandardUpgrade(t *testing.T, ctx context.Context, targetVersi return err } - client := clientFactory() + client, err := clientFactory() + if err != nil { + return err + } - // wait until the agent rejoin + // wait until the agent rejoin and leader is elected WaitForMembers(t, client, len(c.Agents)) + WaitForLeader(t, c, client) return nil } @@ -290,22 +294,42 @@ func (c *Cluster) StandardUpgrade(t *testing.T, ctx context.Context, targetVersi for _, agent := range followers { t.Logf("Upgrade follower: %s", agent.GetName()) - if err := upgradeFn(agent, leader.GetClient); err != nil { + err := upgradeFn(agent, func() (*api.Client, error) { + return leader.GetClient(), nil + }) + if err != nil { return fmt.Errorf("error upgrading follower %q: %w", agent.GetName(), err) } } t.Logf("Upgrade leader: %s", leader.GetName()) - err = upgradeFn(leader, func() *api.Client { + err = upgradeFn(leader, func() (*api.Client, error) { if len(followers) > 0 { - return followers[0].GetClient() + return followers[0].GetClient(), nil } - return c.APIClient(0) + return leader.GetClient(), nil }) if err != nil { return fmt.Errorf("error upgrading leader %q: %w", leader.GetName(), err) } + clientAgents := c.Clients() + for _, agent := range clientAgents { + t.Logf("Upgrade client agent: %s", agent.GetName()) + + err = upgradeFn(agent, func() (*api.Client, error) { + leader, err = c.Leader() + if err != nil { + return nil, err + } + return leader.GetClient(), nil + }) + if err != nil { + return fmt.Errorf("error upgrading client agent %q: %w", agent.GetName(), err) + } + } + + t.Log("Update completed\n") return nil } diff --git a/test/integration/consul-container/libs/cluster/container.go b/test/integration/consul-container/libs/cluster/container.go index 85a20d3c6..6b833eb58 100644 --- a/test/integration/consul-container/libs/cluster/container.go +++ b/test/integration/consul-container/libs/cluster/container.go @@ -336,7 +336,7 @@ func (c *consulContainerNode) Upgrade(ctx context.Context, config Config) error return fmt.Errorf("new hostname %q should match old hostname %q", consulReq2.Hostname, c.consulReq.Hostname) } - if err := c.TerminateAndRetainPod(); err != nil { + if err := c.TerminateAndRetainPod(true); err != nil { return fmt.Errorf("error terminating running container during upgrade: %w", err) } @@ -365,18 +365,22 @@ func (c *consulContainerNode) Upgrade(ctx context.Context, config Config) error // This might also include running termination functions for containers associated with the agent. // On failure, an error will be returned and the reaper process (RYUK) will handle cleanup. func (c *consulContainerNode) Terminate() error { - return c.terminate(false) + return c.terminate(false, false) } -func (c *consulContainerNode) TerminateAndRetainPod() error { - return c.terminate(true) +func (c *consulContainerNode) TerminateAndRetainPod(skipFuncs bool) error { + return c.terminate(true, skipFuncs) } -func (c *consulContainerNode) terminate(retainPod bool) error { +func (c *consulContainerNode) terminate(retainPod bool, skipFuncs bool) error { // Services might register a termination function that should also fire - // when the "agent" is cleaned up - for _, f := range c.terminateFuncs { - err := f() - if err != nil { - continue + // when the "agent" is cleaned up. + // If skipFuncs is tru, We skip the terminateFuncs of connect sidecar, e.g., + // during upgrade + if !skipFuncs { + for _, f := range c.terminateFuncs { + err := f() + if err != nil { + continue + } } } diff --git a/test/integration/consul-container/libs/service/connect.go b/test/integration/consul-container/libs/service/connect.go index 9763e372f..9ac54d3bd 100644 --- a/test/integration/consul-container/libs/service/connect.go +++ b/test/integration/consul-container/libs/service/connect.go @@ -89,13 +89,13 @@ func (g ConnectContainer) GetAdminAddr() (string, int) { // node. The container exposes port serviceBindPort and envoy admin port // (19000) by mapping them onto host ports. The container's name has a prefix // combining datacenter and name. -func NewConnectService(ctx context.Context, name string, serviceName string, serviceBindPort int, node libcluster.Agent) (*ConnectContainer, error) { +func NewConnectService(ctx context.Context, sidecarServiceName string, serviceName string, serviceBindPort int, node libcluster.Agent) (*ConnectContainer, error) { nodeConfig := node.GetConfig() if nodeConfig.ScratchDir == "" { return nil, fmt.Errorf("node ScratchDir is required") } - namePrefix := fmt.Sprintf("%s-service-connect-%s", node.GetDatacenter(), name) + namePrefix := fmt.Sprintf("%s-service-connect-%s", node.GetDatacenter(), sidecarServiceName) containerName := utils.RandName(namePrefix) envoyVersion := getEnvoyVersion() @@ -174,11 +174,13 @@ func NewConnectService(ctx context.Context, name string, serviceName string, ser ip: info.IP, appPort: info.MappedPorts[appPortStr].Int(), adminPort: info.MappedPorts[adminPortStr].Int(), - serviceName: name, + serviceName: sidecarServiceName, } - fmt.Printf("NewConnectService: name %s, bind port %d, public listener port %d\\n\"", + fmt.Printf("NewConnectService: name %s, mapped App Port %d, service bind port %d\n", serviceName, out.appPort, serviceBindPort) + fmt.Printf("NewConnectService sidecar: name %s, mapped admin port %d, admin port %d\n", + sidecarServiceName, out.adminPort, adminPort) return out, nil } diff --git a/test/integration/consul-container/test/upgrade/peers_http_test.go b/test/integration/consul-container/test/upgrade/peers_http_test.go index 982f99dd0..846150f9e 100644 --- a/test/integration/consul-container/test/upgrade/peers_http_test.go +++ b/test/integration/consul-container/test/upgrade/peers_http_test.go @@ -49,6 +49,8 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) { acceptingClient, err := acceptingCluster.GetClient(nil, false) require.NoError(t, err) + _, gatewayAdminPort := dialing.Gateway.GetAdminAddr() + // Upgrade the accepting cluster and assert peering is still ACTIVE require.NoError(t, acceptingCluster.StandardUpgrade(t, context.Background(), tc.targetVersion)) libassert.PeeringStatus(t, acceptingClient, libtopology.AcceptingPeerName, api.PeeringStateActive) @@ -61,6 +63,15 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) { // POST upgrade validation // - Register a new static-client service in dialing cluster and // - set upstream to static-server service in peered cluster + + // Restart the gateway + err = dialing.Gateway.Restart() + require.NoError(t, err) + // Restarted gateway should not have any measurement on data plane traffic + libassert.AssertEnvoyMetricAtMost(t, gatewayAdminPort, + "cluster.static-server.default.default.accepting-to-dialer.external", + "upstream_cx_total", 0) + clientSidecarService, err := libservice.CreateAndRegisterStaticClientSidecar(dialingCluster.Servers()[0], libtopology.DialingPeerName, true) require.NoError(t, err) _, port := clientSidecarService.GetAddr()