integ-test: test consul upgrade from the snapshot of a running cluster (#15595)

* integ-test: test consul upgrade from the snapshot of a running cluster

* use Target version as default


Co-authored-by: Dan Stough <dan.stough@hashicorp.com>
This commit is contained in:
cskh 2022-12-01 10:39:09 -05:00 committed by GitHub
parent 2f56c1bdfe
commit 426c2b72d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 257 additions and 28 deletions

View File

@ -198,7 +198,7 @@ type Config struct {
NodeID *string `mapstructure:"node_id" json:"node_id,omitempty"`
NodeMeta map[string]string `mapstructure:"node_meta" json:"node_meta,omitempty"`
NodeName *string `mapstructure:"node_name" json:"node_name,omitempty"`
Peering Peering `mapstructure:"peering" json:"peering,omitempty"`
Peering Peering `mapstructure:"peering" json:"-"`
Performance Performance `mapstructure:"performance" json:"-"`
PidFile *string `mapstructure:"pid_file" json:"pid_file,omitempty"`
Ports Ports `mapstructure:"ports" json:"ports,omitempty"`

View File

@ -11,9 +11,11 @@ require (
github.com/hashicorp/serf v0.10.1
github.com/itchyny/gojq v0.12.9
github.com/pkg/errors v0.9.1
github.com/rogpeppe/go-internal v1.3.0
github.com/stretchr/testify v1.8.0
github.com/teris-io/shortid v0.0.0-20220617161101-71ec9f2aa569
github.com/testcontainers/testcontainers-go v0.13.0
golang.org/x/mod v0.4.2
)
require (

View File

@ -857,6 +857,7 @@ github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40T
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.3.0 h1:RR9dF3JtopPvtkroDZuVD7qquD0bnHlKSqaQhgwt8yk=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
@ -1037,6 +1038,7 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB
golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=

View File

@ -17,6 +17,8 @@ type Agent interface {
RegisterTermination(func() error)
Terminate() error
Upgrade(ctx context.Context, config Config, index int) error
Exec(ctx context.Context, cmd []string) (int, error)
DataDir() string
}
// Config is a set of configurations required to create a Agent

View File

@ -5,6 +5,7 @@ import (
"path/filepath"
"github.com/pkg/errors"
"golang.org/x/mod/semver"
agentconfig "github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
@ -26,6 +27,7 @@ type BuildContext struct {
injectAutoEncryption bool // initialize the built-in CA and set up agents to use auto-encrpt
injectCerts bool // initializes the built-in CA and distributes client certificates to agents
injectGossipEncryption bool // setup the agents to use a gossip encryption key
consulVersion string
}
// BuildOptions define the desired automated test setup overrides that are
@ -35,6 +37,7 @@ type BuildOptions struct {
InjectCerts bool // Provides a CA for all agents and (future) agent certs.
InjectAutoEncryption bool // Configures auto-encrypt for TLS and sets up certs. Overrides InjectCerts.
InjectGossipEncryption bool // Provides a gossip encryption key for all agents.
ConsulVersion string // The default Consul version for agents in the cluster when none is specified.
}
func NewBuildContext(opts BuildOptions) (*BuildContext, error) {
@ -43,6 +46,11 @@ func NewBuildContext(opts BuildOptions) (*BuildContext, error) {
injectAutoEncryption: opts.InjectAutoEncryption,
injectCerts: opts.InjectCerts,
injectGossipEncryption: opts.InjectGossipEncryption,
consulVersion: opts.ConsulVersion,
}
if opts.ConsulVersion == "" {
ctx.consulVersion = *utils.TargetVersion
}
if opts.InjectGossipEncryption {
@ -105,12 +113,16 @@ func NewConfigBuilder(ctx *BuildContext) *Builder {
HTTP: nil,
HTTPS: utils.IntToPointer(8501),
GRPC: utils.IntToPointer(8502),
GRPCTLS: utils.IntToPointer(8503),
SerfLAN: utils.IntToPointer(8301),
SerfWAN: utils.IntToPointer(8302),
Server: utils.IntToPointer(8300),
}
if ctx != nil && (ctx.consulVersion == "local" || semver.Compare("v"+ctx.consulVersion, "v1.14.0") >= 0) {
// Enable GRPCTLS for version after v1.14.0
b.conf.Ports.GRPCTLS = utils.IntToPointer(8503)
}
return b
}

View File

@ -109,11 +109,6 @@ func NewConsulContainer(ctx context.Context, config Config, network string, inde
return nil, err
}
localIP, err := podContainer.Host(ctx)
if err != nil {
return nil, err
}
mappedPort, err := podContainer.MappedPort(ctx, "8500")
if err != nil {
return nil, err
@ -136,7 +131,10 @@ func NewConsulContainer(ctx context.Context, config Config, network string, inde
Prefix: name,
})
uri := fmt.Sprintf("http://%s:%s", localIP, mappedPort.Port())
uri, err := podContainer.Endpoint(ctx, "http")
if err != nil {
return nil, err
}
apiConfig := api.DefaultConfig()
apiConfig.Address = uri
apiClient, err := api.NewClient(apiConfig)
@ -197,6 +195,10 @@ func (c *consulContainerNode) RegisterTermination(f func() error) {
c.terminateFuncs = append(c.terminateFuncs, f)
}
func (c *consulContainerNode) Exec(ctx context.Context, cmd []string) (int, error) {
return c.container.Exec(ctx, cmd)
}
func (c *consulContainerNode) Upgrade(ctx context.Context, config Config, index int) error {
pc, err := readSomeConfigFileFields(config.JSON)
if err != nil {
@ -237,14 +239,18 @@ func (c *consulContainerNode) Upgrade(ctx context.Context, config Config, index
_, consulReq2 := newContainerRequest(config, opts)
consulReq2.Env = c.consulReq.Env // copy license
if c.container != nil {
_ = c.container.StopLogProducer()
if err := c.container.Terminate(ctx); err != nil {
if err := c.container.Terminate(c.ctx); err != nil {
return err
}
}
c.consulReq = consulReq2
container, err := startContainer(ctx, c.consulReq)
c.ctx = ctx
c.container = container
if err != nil {
return err
}
@ -256,8 +262,6 @@ func (c *consulContainerNode) Upgrade(ctx context.Context, config Config, index
Prefix: name,
})
c.container = container
return nil
}
@ -265,13 +269,12 @@ func (c *consulContainerNode) Upgrade(ctx context.Context, config Config, index
// This might also include running termination functions for containers associated with the agent.
// On failure, an error will be returned and the reaper process (RYUK) will handle cleanup.
func (c *consulContainerNode) Terminate() error {
// Services might register a termination function that should also fire
// when the "agent" is cleaned up
for _, f := range c.terminateFuncs {
err := f()
if err != nil {
continue
}
}
@ -279,18 +282,31 @@ func (c *consulContainerNode) Terminate() error {
return nil
}
err := c.container.StopLogProducer()
state, err := c.container.State(context.Background())
if err == nil && state.Running {
// StopLogProducer can only be called on running containers
err = c.container.StopLogProducer()
if err1 := c.container.Terminate(c.ctx); err == nil {
err = err1
}
} else {
if err1 := c.container.Terminate(c.ctx); err == nil {
err = err1
}
}
c.container = nil
return err
}
func (c *consulContainerNode) DataDir() string {
return c.dataDir
}
func startContainer(ctx context.Context, req testcontainers.ContainerRequest) (testcontainers.Container, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*40)
defer cancel()
return testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
@ -313,12 +329,14 @@ type containerOpts struct {
func newContainerRequest(config Config, opts containerOpts) (podRequest, consulRequest testcontainers.ContainerRequest) {
skipReaper := isRYUKDisabled()
httpPort := "8500"
pod := testcontainers.ContainerRequest{
Image: pauseImage,
AutoRemove: false,
Name: opts.name + "-pod",
SkipReaper: skipReaper,
ExposedPorts: []string{"8500/tcp"},
ExposedPorts: []string{httpPort + "/tcp"},
Hostname: opts.hostname,
Networks: opts.addtionalNetworks,
}

View File

@ -3,6 +3,8 @@ package cluster
import (
"context"
"fmt"
"io/ioutil"
"path/filepath"
"strings"
"testing"
"time"
@ -121,6 +123,60 @@ func (c *Cluster) Remove(n libagent.Agent) error {
return nil
}
// StandardUpgrade upgrades a running consul cluster following the steps from
//
// https://developer.hashicorp.com/consul/docs/upgrading#standard-upgrades
//
// - takes a snapshot
// - terminate and rejoin the new version of consul
func (c *Cluster) StandardUpgrade(t *testing.T, ctx context.Context, targetVersion string) error {
execCode, err := c.Agents[0].Exec(context.Background(), []string{"consul", "snapshot", "save", "backup.snap"})
if execCode != 0 {
return fmt.Errorf("error taking snapshot of the cluster, returned code %d", execCode)
}
if err != nil {
return err
}
// verify only the leader can take a snapshot
snapshotCount := 0
for _, agent := range c.Agents {
files, err := ioutil.ReadDir(filepath.Join(agent.DataDir(), "raft", "snapshots"))
if err != nil {
return err
}
if len(files) >= 1 {
snapshotCount++
}
}
if snapshotCount != 1 {
return fmt.Errorf("only leader agent can have a snapshot file, got %d", snapshotCount)
}
// Upgrade individual agent to the target version
client := c.Agents[0].GetClient()
for _, agent := range c.Agents {
agent.Terminate()
if len(c.Agents) > 3 {
WaitForLeader(t, c, client)
} else {
time.Sleep(1 * time.Second)
}
config := agent.GetConfig()
config.Version = targetVersion
err = agent.Upgrade(context.Background(), config, 1)
if err != nil {
return err
}
// wait until the agent rejoin
WaitForLeader(t, c, client)
WaitForMembers(t, client, len(c.Agents))
}
return nil
}
// Terminate will attempt to terminate all agents in the cluster and its network. If any agent
// termination fails, Terminate will abort and return an error.
func (c *Cluster) Terminate() error {
@ -213,7 +269,7 @@ func (c *Cluster) Clients() ([]libagent.Agent, error) {
return clients, nil
}
const retryTimeout = 20 * time.Second
const retryTimeout = 90 * time.Second
const retryFrequency = 500 * time.Millisecond
func LongFailer() *retry.Timer {
@ -250,6 +306,6 @@ func WaitForMembers(t *testing.T, client *api.Client, expectN int) {
}
}
require.NoError(r, err)
require.Equal(r, activeMembers, expectN)
require.Equal(r, expectN, activeMembers)
})
}

View File

@ -17,10 +17,10 @@ import (
// A simulated client (a direct HTTP call) talks to it's upstream proxy through the
//
// Steps:
// * Create a single agent cluster.
// * Create the example static-server and sidecar containers, then register them both with Consul
// * Create an example static-client sidecar, then register both the service and sidecar with Consul
// * Make sure a call to the client sidecar local bind port returns a response from the upstream, static-server
// - Create a single agent cluster.
// - Create the example static-server and sidecar containers, then register them both with Consul
// - Create an example static-client sidecar, then register both the service and sidecar with Consul
// - Make sure a call to the client sidecar local bind port returns a response from the upstream, static-server
func TestBasicConnectService(t *testing.T) {
cluster := createCluster(t)
defer terminate(t, cluster)

View File

@ -0,0 +1,129 @@
package upgrade
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil/retry"
libagent "github.com/hashicorp/consul/test/integration/consul-container/libs/agent"
libcluster "github.com/hashicorp/consul/test/integration/consul-container/libs/cluster"
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
)
// Test upgrade a cluster of latest version to the target version
func TestStandardUpgradeToTarget_fromLatest(t *testing.T) {
type testcase struct {
oldversion string
targetVersion string
expectErr bool
}
tcs := []testcase{
// Use the case of "1.12.3" ==> "1.13.0" to verify the test can
// catch the upgrade bug found in snapshot of 1.13.0
{
oldversion: "1.12.3",
targetVersion: "1.13.0",
expectErr: true,
},
{
oldversion: "1.13",
targetVersion: *utils.TargetVersion,
},
{
oldversion: "1.14",
targetVersion: *utils.TargetVersion,
},
}
run := func(t *testing.T, tc testcase) {
var configs []libagent.Config
configCtx, err := libagent.NewBuildContext(libagent.BuildOptions{
ConsulVersion: tc.oldversion,
})
require.NoError(t, err)
numServers := 1
leaderConf, err := libagent.NewConfigBuilder(configCtx).
Bootstrap(numServers).
ToAgentConfig()
require.NoError(t, err)
t.Logf("Cluster config:\n%s", leaderConf.JSON)
leaderConf.Version = tc.oldversion
for i := 0; i < numServers; i++ {
configs = append(configs, *leaderConf)
}
cluster, err := libcluster.New(configs)
require.NoError(t, err)
defer terminate(t, cluster)
server := cluster.Agents[0]
client := server.GetClient()
libcluster.WaitForLeader(t, cluster, client)
libcluster.WaitForMembers(t, client, numServers)
// Create a service to be stored in the snapshot
serviceName := "api"
index := serviceCreate(t, client, serviceName)
ch := make(chan []*api.ServiceEntry)
errCh := make(chan error)
go func() {
service, q, err := client.Health().Service(serviceName, "", false, &api.QueryOptions{WaitIndex: index})
if err == nil && q.QueryBackend != api.QueryBackendStreaming {
err = fmt.Errorf("invalid backend for this test %s", q.QueryBackend)
}
if err != nil {
errCh <- err
} else {
ch <- service
}
}()
require.NoError(t, client.Agent().ServiceRegister(
&api.AgentServiceRegistration{Name: serviceName, Port: 9998},
))
timer := time.NewTimer(1 * time.Second)
select {
case err := <-errCh:
require.NoError(t, err)
case service := <-ch:
require.Len(t, service, 1)
require.Equal(t, serviceName, service[0].Service.Service)
require.Equal(t, 9998, service[0].Service.Port)
case <-timer.C:
t.Fatalf("test timeout")
}
// upgrade the cluster to the Target version
err = cluster.StandardUpgrade(t, context.Background(), tc.targetVersion)
if !tc.expectErr {
require.NoError(t, err)
libcluster.WaitForLeader(t, cluster, client)
libcluster.WaitForMembers(t, client, numServers)
// Verify service is restored from the snapshot
retry.RunWith(&retry.Timer{Timeout: 5 * time.Second, Wait: 500 * time.Microsecond}, t, func(r *retry.R) {
service, _, err := client.Catalog().Service(serviceName, "", &api.QueryOptions{})
require.NoError(r, err)
require.Len(r, service, 1)
require.Equal(r, serviceName, service[0].ServiceName)
})
} else {
require.Error(t, fmt.Errorf("context deadline exceeded"))
}
}
for _, tc := range tcs {
t.Run(fmt.Sprintf("upgrade from %s to %s", tc.oldversion, tc.targetVersion),
func(t *testing.T) {
run(t, tc)
})
time.Sleep(5 * time.Second)
}
}

View File

@ -256,7 +256,15 @@ func clientsCreate(t *testing.T, numClients int, image string, version string, c
}
func serviceCreate(t *testing.T, client *api.Client, serviceName string) uint64 {
err := client.Agent().ServiceRegister(&api.AgentServiceRegistration{Name: serviceName, Port: 9999})
err := client.Agent().ServiceRegister(&api.AgentServiceRegistration{
Name: serviceName,
Port: 9999,
Connect: &api.AgentServiceConnect{
SidecarService: &api.AgentServiceRegistration{
Port: 22005,
},
},
})
require.NoError(t, err)
service, meta, err := client.Catalog().Service(serviceName, "", &api.QueryOptions{})
@ -272,7 +280,7 @@ func serversCluster(t *testing.T, numServers int, version string, image string)
var configs []libagent.Config
conf, err := libagent.NewConfigBuilder(nil).
Bootstrap(3).
Bootstrap(numServers).
ToAgentConfig()
require.NoError(t, err)