Upgrade test: retain sidecar containers during upgrade. (#16100)

This commit is contained in:
cskh 2023-01-30 09:49:52 -05:00 committed by GitHub
parent 77c95779de
commit c3f518405a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 63 additions and 23 deletions

View File

@ -106,7 +106,6 @@ func processMetrics(metrics []string, prefix, metric string, condition func(v in
strings.Contains(line, metric) {
metric := strings.Split(line, ":")
fmt.Println(metric[1])
v, err := strconv.Atoi(strings.TrimSpace(metric[1]))
if err != nil {

View File

@ -23,7 +23,7 @@ type Agent interface {
IsServer() bool
RegisterTermination(func() error)
Terminate() error
TerminateAndRetainPod() error
TerminateAndRetainPod(bool) error
Upgrade(ctx context.Context, config Config) error
Exec(ctx context.Context, cmd []string) (int, error)
DataDir() string

View File

@ -250,7 +250,7 @@ func (c *Cluster) StandardUpgrade(t *testing.T, ctx context.Context, targetVersi
}
t.Logf("The number of followers = %d", len(followers))
upgradeFn := func(agent Agent, clientFactory func() *api.Client) error {
upgradeFn := func(agent Agent, clientFactory func() (*api.Client, error)) error {
config := agent.GetConfig()
config.Version = targetVersion
@ -279,10 +279,14 @@ func (c *Cluster) StandardUpgrade(t *testing.T, ctx context.Context, targetVersi
return err
}
client := clientFactory()
client, err := clientFactory()
if err != nil {
return err
}
// wait until the agent rejoin
// wait until the agent rejoin and leader is elected
WaitForMembers(t, client, len(c.Agents))
WaitForLeader(t, c, client)
return nil
}
@ -290,22 +294,42 @@ func (c *Cluster) StandardUpgrade(t *testing.T, ctx context.Context, targetVersi
for _, agent := range followers {
t.Logf("Upgrade follower: %s", agent.GetName())
if err := upgradeFn(agent, leader.GetClient); err != nil {
err := upgradeFn(agent, func() (*api.Client, error) {
return leader.GetClient(), nil
})
if err != nil {
return fmt.Errorf("error upgrading follower %q: %w", agent.GetName(), err)
}
}
t.Logf("Upgrade leader: %s", leader.GetName())
err = upgradeFn(leader, func() *api.Client {
err = upgradeFn(leader, func() (*api.Client, error) {
if len(followers) > 0 {
return followers[0].GetClient()
return followers[0].GetClient(), nil
}
return c.APIClient(0)
return leader.GetClient(), nil
})
if err != nil {
return fmt.Errorf("error upgrading leader %q: %w", leader.GetName(), err)
}
clientAgents := c.Clients()
for _, agent := range clientAgents {
t.Logf("Upgrade client agent: %s", agent.GetName())
err = upgradeFn(agent, func() (*api.Client, error) {
leader, err = c.Leader()
if err != nil {
return nil, err
}
return leader.GetClient(), nil
})
if err != nil {
return fmt.Errorf("error upgrading client agent %q: %w", agent.GetName(), err)
}
}
t.Log("Update completed\n")
return nil
}

View File

@ -336,7 +336,7 @@ func (c *consulContainerNode) Upgrade(ctx context.Context, config Config) error
return fmt.Errorf("new hostname %q should match old hostname %q", consulReq2.Hostname, c.consulReq.Hostname)
}
if err := c.TerminateAndRetainPod(); err != nil {
if err := c.TerminateAndRetainPod(true); err != nil {
return fmt.Errorf("error terminating running container during upgrade: %w", err)
}
@ -365,18 +365,22 @@ func (c *consulContainerNode) Upgrade(ctx context.Context, config Config) error
// This might also include running termination functions for containers associated with the agent.
// On failure, an error will be returned and the reaper process (RYUK) will handle cleanup.
func (c *consulContainerNode) Terminate() error {
return c.terminate(false)
return c.terminate(false, false)
}
func (c *consulContainerNode) TerminateAndRetainPod() error {
return c.terminate(true)
func (c *consulContainerNode) TerminateAndRetainPod(skipFuncs bool) error {
return c.terminate(true, skipFuncs)
}
func (c *consulContainerNode) terminate(retainPod bool) error {
func (c *consulContainerNode) terminate(retainPod bool, skipFuncs bool) error {
// Services might register a termination function that should also fire
// when the "agent" is cleaned up
for _, f := range c.terminateFuncs {
err := f()
if err != nil {
continue
// when the "agent" is cleaned up.
// If skipFuncs is tru, We skip the terminateFuncs of connect sidecar, e.g.,
// during upgrade
if !skipFuncs {
for _, f := range c.terminateFuncs {
err := f()
if err != nil {
continue
}
}
}

View File

@ -89,13 +89,13 @@ func (g ConnectContainer) GetAdminAddr() (string, int) {
// node. The container exposes port serviceBindPort and envoy admin port
// (19000) by mapping them onto host ports. The container's name has a prefix
// combining datacenter and name.
func NewConnectService(ctx context.Context, name string, serviceName string, serviceBindPort int, node libcluster.Agent) (*ConnectContainer, error) {
func NewConnectService(ctx context.Context, sidecarServiceName string, serviceName string, serviceBindPort int, node libcluster.Agent) (*ConnectContainer, error) {
nodeConfig := node.GetConfig()
if nodeConfig.ScratchDir == "" {
return nil, fmt.Errorf("node ScratchDir is required")
}
namePrefix := fmt.Sprintf("%s-service-connect-%s", node.GetDatacenter(), name)
namePrefix := fmt.Sprintf("%s-service-connect-%s", node.GetDatacenter(), sidecarServiceName)
containerName := utils.RandName(namePrefix)
envoyVersion := getEnvoyVersion()
@ -174,11 +174,13 @@ func NewConnectService(ctx context.Context, name string, serviceName string, ser
ip: info.IP,
appPort: info.MappedPorts[appPortStr].Int(),
adminPort: info.MappedPorts[adminPortStr].Int(),
serviceName: name,
serviceName: sidecarServiceName,
}
fmt.Printf("NewConnectService: name %s, bind port %d, public listener port %d\\n\"",
fmt.Printf("NewConnectService: name %s, mapped App Port %d, service bind port %d\n",
serviceName, out.appPort, serviceBindPort)
fmt.Printf("NewConnectService sidecar: name %s, mapped admin port %d, admin port %d\n",
sidecarServiceName, out.adminPort, adminPort)
return out, nil
}

View File

@ -49,6 +49,8 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) {
acceptingClient, err := acceptingCluster.GetClient(nil, false)
require.NoError(t, err)
_, gatewayAdminPort := dialing.Gateway.GetAdminAddr()
// Upgrade the accepting cluster and assert peering is still ACTIVE
require.NoError(t, acceptingCluster.StandardUpgrade(t, context.Background(), tc.targetVersion))
libassert.PeeringStatus(t, acceptingClient, libtopology.AcceptingPeerName, api.PeeringStateActive)
@ -61,6 +63,15 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) {
// POST upgrade validation
// - Register a new static-client service in dialing cluster and
// - set upstream to static-server service in peered cluster
// Restart the gateway
err = dialing.Gateway.Restart()
require.NoError(t, err)
// Restarted gateway should not have any measurement on data plane traffic
libassert.AssertEnvoyMetricAtMost(t, gatewayAdminPort,
"cluster.static-server.default.default.accepting-to-dialer.external",
"upstream_cx_total", 0)
clientSidecarService, err := libservice.CreateAndRegisterStaticClientSidecar(dialingCluster.Servers()[0], libtopology.DialingPeerName, true)
require.NoError(t, err)
_, port := clientSidecarService.GetAddr()