Cluster test helper improvements (#20424)

This commit is contained in:
Nick Cabatoff 2023-05-24 16:21:10 -04:00 committed by GitHub
parent 7956c382e6
commit 46b0b33bda
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 154 additions and 54 deletions

View File

@ -773,6 +773,48 @@ func (n *DockerClusterNode) Start(ctx context.Context, opts *DockerClusterOption
return nil
}
func (n *DockerClusterNode) Pause(ctx context.Context) error {
return n.DockerAPI.ContainerPause(ctx, n.Container.ID)
}
func (n *DockerClusterNode) AddNetworkDelay(ctx context.Context, delay time.Duration, targetIP string) error {
ip := net.ParseIP(targetIP)
if ip == nil {
return fmt.Errorf("targetIP %q is not an IP address", targetIP)
}
// Let's attempt to get a unique handle for the filter rule; we'll assume that
// every targetIP has a unique last octet, which is true currently for how
// we're doing docker networking.
lastOctet := ip.To4()[3]
stdout, stderr, exitCode, err := n.runner.RunCmdWithOutput(ctx, n.Container.ID, []string{
"/bin/sh",
"-xec", strings.Join([]string{
fmt.Sprintf("echo isolating node %s", targetIP),
"apk add iproute2",
// If we're running this script a second time on the same node,
// the add dev will fail; since we only want to run the netem
// command once, we'll do so in the case where the add dev doesn't fail.
"tc qdisc add dev eth0 root handle 1: prio && " +
fmt.Sprintf("tc qdisc add dev eth0 parent 1:1 handle 2: netem delay %dms", delay/time.Millisecond),
// Here we create a u32 filter as per https://man7.org/linux/man-pages/man8/tc-u32.8.html
// Its parent is 1:0 (which I guess is the root?)
// Its handle must be unique, so we base it on targetIP
fmt.Sprintf("tc filter add dev eth0 parent 1:0 protocol ip pref 55 handle ::%x u32 match ip dst %s flowid 2:1", lastOctet, targetIP),
}, "; "),
})
if err != nil {
return err
}
n.Logger.Trace(string(stdout))
n.Logger.Trace(string(stderr))
if exitCode != 0 {
return fmt.Errorf("got nonzero exit code from iptables: %d", exitCode)
}
return nil
}
type LogConsumerWriter struct {
consumer func(string)
}

View File

@ -15,12 +15,22 @@ import (
"github.com/hashicorp/vault/sdk/helper/testcluster"
)
type ReplicationDockerOptions struct {
NumCores int
ClusterName string
func DefaultOptions(t *testing.T) *DockerClusterOptions {
return &DockerClusterOptions{
ImageRepo: "hashicorp/vault",
ImageTag: "latest",
VaultBinary: os.Getenv("VAULT_BINARY"),
ClusterOptions: testcluster.ClusterOptions{
NumCores: 3,
ClusterName: strings.ReplaceAll(t.Name(), "/", "-"),
VaultNodeConfig: &testcluster.VaultNodeConfig{
LogLevel: "TRACE",
},
},
}
}
func NewReplicationSetDocker(t *testing.T, opt *ReplicationDockerOptions) (*testcluster.ReplicationSet, error) {
func NewReplicationSetDocker(t *testing.T, opts *DockerClusterOptions) (*testcluster.ReplicationSet, error) {
binary := os.Getenv("VAULT_BINARY")
if binary == "" {
t.Skip("only running docker test when $VAULT_BINARY present")
@ -31,48 +41,20 @@ func NewReplicationSetDocker(t *testing.T, opt *ReplicationDockerOptions) (*test
Logger: logging.NewVaultLogger(hclog.Trace).Named(t.Name()),
}
if opt == nil {
opt = &ReplicationDockerOptions{}
}
var nc int
if opt.NumCores > 0 {
nc = opt.NumCores
}
clusterName := t.Name()
if opt.ClusterName != "" {
clusterName = opt.ClusterName
}
// clusterName is used for container name as well.
// A container name should not exceed 64 chars.
// There are additional chars that are added to the name as well
// like "-A-core0". So, setting a max limit for a cluster name.
if len(clusterName) > MaxClusterNameLength {
if len(opts.ClusterName) > MaxClusterNameLength {
return nil, fmt.Errorf("cluster name length exceeded the maximum allowed length of %v", MaxClusterNameLength)
}
r.Builder = func(ctx context.Context, name string, baseLogger hclog.Logger) (testcluster.VaultCluster, error) {
cluster := NewTestDockerCluster(t, &DockerClusterOptions{
ImageRepo: "hashicorp/vault",
ImageTag: "latest",
VaultBinary: os.Getenv("VAULT_BINARY"),
ClusterOptions: testcluster.ClusterOptions{
ClusterName: strings.ReplaceAll(clusterName+"-"+name, "/", "-"),
Logger: baseLogger.Named(name),
VaultNodeConfig: &testcluster.VaultNodeConfig{
LogLevel: "TRACE",
// If you want the test to run faster locally, you could
// uncomment this performance_multiplier change.
//StorageOptions: map[string]string{
// "performance_multiplier": "1",
//},
},
NumCores: nc,
},
CA: r.CA,
})
return cluster, nil
myOpts := *opts
myOpts.Logger = baseLogger.Named(name)
myOpts.ClusterName += "-" + strings.ReplaceAll(name, "/", "-")
myOpts.CA = r.CA
return NewTestDockerCluster(t, &myOpts), nil
}
a, err := r.Builder(context.TODO(), "A", r.Logger)

View File

@ -228,7 +228,16 @@ func WaitForPerformanceSecondary(ctx context.Context, pri, sec VaultCluster, ski
}
func WaitForPerfReplicationWorking(ctx context.Context, pri, sec VaultCluster) error {
priClient, secClient := pri.Nodes()[0].APIClient(), sec.Nodes()[0].APIClient()
priActiveIdx, err := WaitForActiveNode(ctx, pri)
if err != nil {
return err
}
secActiveIdx, err := WaitForActiveNode(ctx, sec)
if err != nil {
return err
}
priClient, secClient := pri.Nodes()[priActiveIdx].APIClient(), sec.Nodes()[secActiveIdx].APIClient()
mountPoint, err := uuid.GenerateUUID()
if err != nil {
return err
@ -261,7 +270,10 @@ func WaitForPerfReplicationWorking(ctx context.Context, pri, sec VaultCluster) e
}
time.Sleep(100 * time.Millisecond)
}
return fmt.Errorf("unable to read replicated KV on secondary", "path", path, "err", err)
if err == nil {
err = ctx.Err()
}
return fmt.Errorf("unable to read replicated KV on secondary, path=%s, err=%v", path, err)
}
func SetupTwoClusterPerfReplication(ctx context.Context, pri, sec VaultCluster) error {
@ -839,3 +851,46 @@ func (r *ReplicationSet) Cleanup() {
cluster.Cleanup()
}
}
func WaitForPerfReplicationConnectionStatus(ctx context.Context, client *api.Client) error {
type Primary struct {
APIAddress string `mapstructure:"api_address"`
ConnectionStatus string `mapstructure:"connection_status"`
ClusterAddress string `mapstructure:"cluster_address"`
LastHeartbeat string `mapstructure:"last_heartbeat"`
}
type Status struct {
Primaries []Primary `mapstructure:"primaries"`
}
return WaitForPerfReplicationStatus(ctx, client, func(m map[string]interface{}) error {
var status Status
err := mapstructure.Decode(m, &status)
if err != nil {
return err
}
if len(status.Primaries) == 0 {
return fmt.Errorf("primaries is zero")
}
for _, v := range status.Primaries {
if v.ConnectionStatus == "connected" {
return nil
}
}
return fmt.Errorf("no primaries connected")
})
}
func WaitForPerfReplicationStatus(ctx context.Context, client *api.Client, accept func(map[string]interface{}) error) error {
var err error
var secret *api.Secret
for ctx.Err() == nil {
secret, err = client.Logical().Read("sys/replication/performance/status")
if err == nil && secret != nil && secret.Data != nil {
if err = accept(secret.Data); err == nil {
return nil
}
}
time.Sleep(500 * time.Millisecond)
}
return fmt.Errorf("unable to get acceptable replication status within allotted time: error=%v secret=%#v", err, secret)
}

View File

@ -11,6 +11,7 @@ import (
"sync/atomic"
"time"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/vault/api"
"github.com/hashicorp/vault/sdk/helper/xor"
@ -92,23 +93,43 @@ func NodeSealed(ctx context.Context, cluster VaultCluster, nodeIdx int) error {
}
func WaitForNCoresSealed(ctx context.Context, cluster VaultCluster, n int) error {
for ctx.Err() == nil {
sealed := 0
for i := range cluster.Nodes() {
ctx, cancel := context.WithTimeout(ctx, 5*time.Millisecond)
if err := NodeSealed(ctx, cluster, i); err == nil {
sealed++
}
cancel()
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if sealed >= n {
return nil
}
time.Sleep(time.Second)
errs := make(chan error)
for i := range cluster.Nodes() {
go func(i int) {
var err error
for ctx.Err() == nil {
err = NodeSealed(ctx, cluster, i)
if err == nil {
errs <- nil
return
}
time.Sleep(100 * time.Millisecond)
}
if err == nil {
err = ctx.Err()
}
errs <- err
}(i)
}
return fmt.Errorf("%d cores were not sealed", n)
var merr *multierror.Error
var sealed int
for range cluster.Nodes() {
err := <-errs
if err != nil {
merr = multierror.Append(merr, err)
} else {
sealed++
if sealed == n {
return nil
}
}
}
return fmt.Errorf("%d cores were not sealed, errs: %v", n, merr.ErrorOrNil())
}
func NodeHealthy(ctx context.Context, cluster VaultCluster, nodeIdx int) error {