From 46b0b33bdaa68cf440e2e935cf6eef082df71eff Mon Sep 17 00:00:00 2001 From: Nick Cabatoff Date: Wed, 24 May 2023 16:21:10 -0400 Subject: [PATCH] Cluster test helper improvements (#20424) --- sdk/helper/testcluster/docker/environment.go | 42 ++++++++++++++ sdk/helper/testcluster/docker/replication.go | 58 +++++++------------ sdk/helper/testcluster/replication.go | 59 +++++++++++++++++++- sdk/helper/testcluster/util.go | 49 +++++++++++----- 4 files changed, 154 insertions(+), 54 deletions(-) diff --git a/sdk/helper/testcluster/docker/environment.go b/sdk/helper/testcluster/docker/environment.go index 1979c56c7..204bacbdb 100644 --- a/sdk/helper/testcluster/docker/environment.go +++ b/sdk/helper/testcluster/docker/environment.go @@ -773,6 +773,48 @@ func (n *DockerClusterNode) Start(ctx context.Context, opts *DockerClusterOption return nil } +func (n *DockerClusterNode) Pause(ctx context.Context) error { + return n.DockerAPI.ContainerPause(ctx, n.Container.ID) +} + +func (n *DockerClusterNode) AddNetworkDelay(ctx context.Context, delay time.Duration, targetIP string) error { + ip := net.ParseIP(targetIP) + if ip == nil { + return fmt.Errorf("targetIP %q is not an IP address", targetIP) + } + // Let's attempt to get a unique handle for the filter rule; we'll assume that + // every targetIP has a unique last octet, which is true currently for how + // we're doing docker networking. + lastOctet := ip.To4()[3] + + stdout, stderr, exitCode, err := n.runner.RunCmdWithOutput(ctx, n.Container.ID, []string{ + "/bin/sh", + "-xec", strings.Join([]string{ + fmt.Sprintf("echo isolating node %s", targetIP), + "apk add iproute2", + // If we're running this script a second time on the same node, + // the add dev will fail; since we only want to run the netem + // command once, we'll do so in the case where the add dev doesn't fail. + "tc qdisc add dev eth0 root handle 1: prio && " + + fmt.Sprintf("tc qdisc add dev eth0 parent 1:1 handle 2: netem delay %dms", delay/time.Millisecond), + // Here we create a u32 filter as per https://man7.org/linux/man-pages/man8/tc-u32.8.html + // Its parent is 1:0 (which I guess is the root?) + // Its handle must be unique, so we base it on targetIP + fmt.Sprintf("tc filter add dev eth0 parent 1:0 protocol ip pref 55 handle ::%x u32 match ip dst %s flowid 2:1", lastOctet, targetIP), + }, "; "), + }) + if err != nil { + return err + } + + n.Logger.Trace(string(stdout)) + n.Logger.Trace(string(stderr)) + if exitCode != 0 { + return fmt.Errorf("got nonzero exit code from iptables: %d", exitCode) + } + return nil +} + type LogConsumerWriter struct { consumer func(string) } diff --git a/sdk/helper/testcluster/docker/replication.go b/sdk/helper/testcluster/docker/replication.go index 2661e654c..0bd8feea2 100644 --- a/sdk/helper/testcluster/docker/replication.go +++ b/sdk/helper/testcluster/docker/replication.go @@ -15,12 +15,22 @@ import ( "github.com/hashicorp/vault/sdk/helper/testcluster" ) -type ReplicationDockerOptions struct { - NumCores int - ClusterName string +func DefaultOptions(t *testing.T) *DockerClusterOptions { + return &DockerClusterOptions{ + ImageRepo: "hashicorp/vault", + ImageTag: "latest", + VaultBinary: os.Getenv("VAULT_BINARY"), + ClusterOptions: testcluster.ClusterOptions{ + NumCores: 3, + ClusterName: strings.ReplaceAll(t.Name(), "/", "-"), + VaultNodeConfig: &testcluster.VaultNodeConfig{ + LogLevel: "TRACE", + }, + }, + } } -func NewReplicationSetDocker(t *testing.T, opt *ReplicationDockerOptions) (*testcluster.ReplicationSet, error) { +func NewReplicationSetDocker(t *testing.T, opts *DockerClusterOptions) (*testcluster.ReplicationSet, error) { binary := os.Getenv("VAULT_BINARY") if binary == "" { t.Skip("only running docker test when $VAULT_BINARY present") @@ -31,48 +41,20 @@ func NewReplicationSetDocker(t *testing.T, opt *ReplicationDockerOptions) (*test Logger: logging.NewVaultLogger(hclog.Trace).Named(t.Name()), } - if opt == nil { - opt = &ReplicationDockerOptions{} - } - - var nc int - if opt.NumCores > 0 { - nc = opt.NumCores - } - - clusterName := t.Name() - if opt.ClusterName != "" { - clusterName = opt.ClusterName - } // clusterName is used for container name as well. // A container name should not exceed 64 chars. // There are additional chars that are added to the name as well // like "-A-core0". So, setting a max limit for a cluster name. - if len(clusterName) > MaxClusterNameLength { + if len(opts.ClusterName) > MaxClusterNameLength { return nil, fmt.Errorf("cluster name length exceeded the maximum allowed length of %v", MaxClusterNameLength) } r.Builder = func(ctx context.Context, name string, baseLogger hclog.Logger) (testcluster.VaultCluster, error) { - cluster := NewTestDockerCluster(t, &DockerClusterOptions{ - ImageRepo: "hashicorp/vault", - ImageTag: "latest", - VaultBinary: os.Getenv("VAULT_BINARY"), - ClusterOptions: testcluster.ClusterOptions{ - ClusterName: strings.ReplaceAll(clusterName+"-"+name, "/", "-"), - Logger: baseLogger.Named(name), - VaultNodeConfig: &testcluster.VaultNodeConfig{ - LogLevel: "TRACE", - // If you want the test to run faster locally, you could - // uncomment this performance_multiplier change. - //StorageOptions: map[string]string{ - // "performance_multiplier": "1", - //}, - }, - NumCores: nc, - }, - CA: r.CA, - }) - return cluster, nil + myOpts := *opts + myOpts.Logger = baseLogger.Named(name) + myOpts.ClusterName += "-" + strings.ReplaceAll(name, "/", "-") + myOpts.CA = r.CA + return NewTestDockerCluster(t, &myOpts), nil } a, err := r.Builder(context.TODO(), "A", r.Logger) diff --git a/sdk/helper/testcluster/replication.go b/sdk/helper/testcluster/replication.go index 0d7977fb5..72c8bc67c 100644 --- a/sdk/helper/testcluster/replication.go +++ b/sdk/helper/testcluster/replication.go @@ -228,7 +228,16 @@ func WaitForPerformanceSecondary(ctx context.Context, pri, sec VaultCluster, ski } func WaitForPerfReplicationWorking(ctx context.Context, pri, sec VaultCluster) error { - priClient, secClient := pri.Nodes()[0].APIClient(), sec.Nodes()[0].APIClient() + priActiveIdx, err := WaitForActiveNode(ctx, pri) + if err != nil { + return err + } + secActiveIdx, err := WaitForActiveNode(ctx, sec) + if err != nil { + return err + } + + priClient, secClient := pri.Nodes()[priActiveIdx].APIClient(), sec.Nodes()[secActiveIdx].APIClient() mountPoint, err := uuid.GenerateUUID() if err != nil { return err @@ -261,7 +270,10 @@ func WaitForPerfReplicationWorking(ctx context.Context, pri, sec VaultCluster) e } time.Sleep(100 * time.Millisecond) } - return fmt.Errorf("unable to read replicated KV on secondary", "path", path, "err", err) + if err == nil { + err = ctx.Err() + } + return fmt.Errorf("unable to read replicated KV on secondary, path=%s, err=%v", path, err) } func SetupTwoClusterPerfReplication(ctx context.Context, pri, sec VaultCluster) error { @@ -839,3 +851,46 @@ func (r *ReplicationSet) Cleanup() { cluster.Cleanup() } } + +func WaitForPerfReplicationConnectionStatus(ctx context.Context, client *api.Client) error { + type Primary struct { + APIAddress string `mapstructure:"api_address"` + ConnectionStatus string `mapstructure:"connection_status"` + ClusterAddress string `mapstructure:"cluster_address"` + LastHeartbeat string `mapstructure:"last_heartbeat"` + } + type Status struct { + Primaries []Primary `mapstructure:"primaries"` + } + return WaitForPerfReplicationStatus(ctx, client, func(m map[string]interface{}) error { + var status Status + err := mapstructure.Decode(m, &status) + if err != nil { + return err + } + if len(status.Primaries) == 0 { + return fmt.Errorf("primaries is zero") + } + for _, v := range status.Primaries { + if v.ConnectionStatus == "connected" { + return nil + } + } + return fmt.Errorf("no primaries connected") + }) +} + +func WaitForPerfReplicationStatus(ctx context.Context, client *api.Client, accept func(map[string]interface{}) error) error { + var err error + var secret *api.Secret + for ctx.Err() == nil { + secret, err = client.Logical().Read("sys/replication/performance/status") + if err == nil && secret != nil && secret.Data != nil { + if err = accept(secret.Data); err == nil { + return nil + } + } + time.Sleep(500 * time.Millisecond) + } + return fmt.Errorf("unable to get acceptable replication status within allotted time: error=%v secret=%#v", err, secret) +} diff --git a/sdk/helper/testcluster/util.go b/sdk/helper/testcluster/util.go index ce443164f..4ecf5f533 100644 --- a/sdk/helper/testcluster/util.go +++ b/sdk/helper/testcluster/util.go @@ -11,6 +11,7 @@ import ( "sync/atomic" "time" + "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-uuid" "github.com/hashicorp/vault/api" "github.com/hashicorp/vault/sdk/helper/xor" @@ -92,23 +93,43 @@ func NodeSealed(ctx context.Context, cluster VaultCluster, nodeIdx int) error { } func WaitForNCoresSealed(ctx context.Context, cluster VaultCluster, n int) error { - for ctx.Err() == nil { - sealed := 0 - for i := range cluster.Nodes() { - ctx, cancel := context.WithTimeout(ctx, 5*time.Millisecond) - if err := NodeSealed(ctx, cluster, i); err == nil { - sealed++ - } - cancel() - } + ctx, cancel := context.WithCancel(ctx) + defer cancel() - if sealed >= n { - return nil - } - time.Sleep(time.Second) + errs := make(chan error) + for i := range cluster.Nodes() { + go func(i int) { + var err error + for ctx.Err() == nil { + err = NodeSealed(ctx, cluster, i) + if err == nil { + errs <- nil + return + } + time.Sleep(100 * time.Millisecond) + } + if err == nil { + err = ctx.Err() + } + errs <- err + }(i) } - return fmt.Errorf("%d cores were not sealed", n) + var merr *multierror.Error + var sealed int + for range cluster.Nodes() { + err := <-errs + if err != nil { + merr = multierror.Append(merr, err) + } else { + sealed++ + if sealed == n { + return nil + } + } + } + + return fmt.Errorf("%d cores were not sealed, errs: %v", n, merr.ErrorOrNil()) } func NodeHealthy(ctx context.Context, cluster VaultCluster, nodeIdx int) error {