Simple perf replication test using docker containers (#20393)

This commit is contained in:
Nick Cabatoff 2023-04-28 15:43:30 -04:00 committed by GitHub
parent 7781b037da
commit e0093a2791
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 433 additions and 33 deletions

View File

@ -57,12 +57,7 @@ type DockerCluster struct {
ClusterNodes []*DockerClusterNode
// Certificate fields
CACert *x509.Certificate
CACertBytes []byte
CACertPEM []byte
CACertPEMFile string
CAKey *ecdsa.PrivateKey
CAKeyPEM []byte
*testcluster.CA
RootCAs *x509.CertPool
barrierKeys [][]byte
@ -154,6 +149,11 @@ func (dc *DockerCluster) RootToken() string {
return dc.rootToken
}
func (dc *DockerCluster) SetRootToken(s string) {
dc.Logger.Trace("cluster root token changed", "helpful_env", fmt.Sprintf("VAULT_TOKEN=%s VAULT_CACERT=/vault/config/ca.pem", dc.RootToken()))
dc.rootToken = s
}
func (n *DockerClusterNode) Name() string {
return n.Cluster.ClusterName + "-" + n.NodeID
}
@ -241,17 +241,16 @@ func (dc *DockerCluster) clusterReady(ctx context.Context) error {
func (dc *DockerCluster) setupCA(opts *DockerClusterOptions) error {
var err error
var ca testcluster.CA
var caKey *ecdsa.PrivateKey
if opts != nil && opts.CAKey != nil {
caKey = opts.CAKey
ca.CAKey = opts.CAKey
} else {
caKey, err = ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
ca.CAKey, err = ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
if err != nil {
return err
}
}
dc.CAKey = caKey
var caBytes []byte
if opts != nil && len(opts.CACert) > 0 {
@ -269,7 +268,7 @@ func (dc *DockerCluster) setupCA(opts *DockerClusterOptions) error {
BasicConstraintsValid: true,
IsCA: true,
}
caBytes, err = x509.CreateCertificate(rand.Reader, CACertTemplate, CACertTemplate, caKey.Public(), caKey)
caBytes, err = x509.CreateCertificate(rand.Reader, CACertTemplate, CACertTemplate, ca.CAKey.Public(), ca.CAKey)
if err != nil {
return err
}
@ -278,25 +277,22 @@ func (dc *DockerCluster) setupCA(opts *DockerClusterOptions) error {
if err != nil {
return err
}
dc.CACert = CACert
dc.CACertBytes = caBytes
dc.RootCAs = x509.NewCertPool()
dc.RootCAs.AddCert(CACert)
ca.CACert = CACert
ca.CACertBytes = caBytes
CACertPEMBlock := &pem.Block{
Type: "CERTIFICATE",
Bytes: caBytes,
}
dc.CACertPEM = pem.EncodeToMemory(CACertPEMBlock)
ca.CACertPEM = pem.EncodeToMemory(CACertPEMBlock)
dc.CACertPEMFile = filepath.Join(dc.tmpDir, "ca", "ca.pem")
err = os.WriteFile(dc.CACertPEMFile, dc.CACertPEM, 0o755)
ca.CACertPEMFile = filepath.Join(dc.tmpDir, "ca", "ca.pem")
err = os.WriteFile(ca.CACertPEMFile, ca.CACertPEM, 0o755)
if err != nil {
return err
}
marshaledCAKey, err := x509.MarshalECPrivateKey(caKey)
marshaledCAKey, err := x509.MarshalECPrivateKey(ca.CAKey)
if err != nil {
return err
}
@ -304,7 +300,9 @@ func (dc *DockerCluster) setupCA(opts *DockerClusterOptions) error {
Type: "EC PRIVATE KEY",
Bytes: marshaledCAKey,
}
dc.CAKeyPEM = pem.EncodeToMemory(CAKeyPEMBlock)
ca.CAKeyPEM = pem.EncodeToMemory(CAKeyPEMBlock)
dc.CA = &ca
return nil
}
@ -418,6 +416,7 @@ func NewTestDockerCluster(t *testing.T, opts *DockerClusterOptions) *DockerClust
if err != nil {
t.Fatal(err)
}
dc.Logger.Trace("cluster started", "helpful_env", fmt.Sprintf("VAULT_TOKEN=%s VAULT_CACERT=/vault/config/ca.pem", dc.RootToken()))
return dc
}
@ -440,6 +439,7 @@ func NewDockerCluster(ctx context.Context, opts *DockerClusterOptions) (*DockerC
ClusterName: opts.ClusterName,
Logger: opts.Logger,
builtTags: map[string]struct{}{},
CA: opts.CA,
}
if err := dc.setupDockerCluster(ctx, opts); err != nil {
@ -500,7 +500,7 @@ func (n *DockerClusterNode) APIClient() *api.Client {
// bug in CloneConfig?
panic(fmt.Sprintf("NewClient error on cloned config: %v", err))
}
client.SetToken(n.client.Token())
client.SetToken(n.Cluster.rootToken)
return client
}
@ -794,7 +794,7 @@ type DockerClusterOptions struct {
NetworkName string
ImageRepo string
ImageTag string
CloneCA *DockerCluster
CA *testcluster.CA
VaultBinary string
Args []string
StartProbe func(*api.Client) error
@ -850,19 +850,13 @@ func (dc *DockerCluster) setupDockerCluster(ctx context.Context, opts *DockerClu
numCores = opts.NumCores
}
if opts.CloneCA != nil {
dc.CACert = opts.CloneCA.CACert
dc.CACertBytes = opts.CloneCA.CACertBytes
dc.CACertPEM = opts.CloneCA.CACertPEM
dc.CACertPEMFile = opts.CloneCA.CACertPEMFile
dc.CAKey = opts.CloneCA.CAKey
dc.CAKeyPEM = opts.CloneCA.CAKeyPEM
dc.RootCAs = opts.CloneCA.RootCAs
} else {
if dc.CA == nil {
if err := dc.setupCA(opts); err != nil {
return err
}
}
dc.RootCAs = x509.NewCertPool()
dc.RootCAs.AddCert(dc.CA.CACert)
for i := 0; i < numCores; i++ {
if err := dc.addNode(ctx, opts); err != nil {

View File

@ -0,0 +1,58 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package docker
import (
"context"
"os"
"strings"
"testing"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/sdk/helper/logging"
"github.com/hashicorp/vault/sdk/helper/testcluster"
)
func NewReplicationSetDocker(t *testing.T) (*testcluster.ReplicationSet, error) {
binary := os.Getenv("VAULT_BINARY")
if binary == "" {
t.Skip("only running docker test when $VAULT_BINARY present")
}
r := &testcluster.ReplicationSet{
Clusters: map[string]testcluster.VaultCluster{},
Logger: logging.NewVaultLogger(hclog.Trace).Named(t.Name()),
}
r.Builder = func(ctx context.Context, name string, baseLogger hclog.Logger) (testcluster.VaultCluster, error) {
cluster := NewTestDockerCluster(t, &DockerClusterOptions{
ImageRepo: "hashicorp/vault",
ImageTag: "latest",
VaultBinary: os.Getenv("VAULT_BINARY"),
ClusterOptions: testcluster.ClusterOptions{
ClusterName: strings.ReplaceAll(t.Name()+"-"+name, "/", "-"),
Logger: baseLogger.Named(name),
VaultNodeConfig: &testcluster.VaultNodeConfig{
LogLevel: "TRACE",
// If you want the test to run faster locally, you could
// uncomment this performance_multiplier change.
//StorageOptions: map[string]string{
// "performance_multiplier": "1",
//},
},
},
CA: r.CA,
})
return cluster, nil
}
a, err := r.Builder(context.TODO(), "A", r.Logger)
if err != nil {
return nil, err
}
r.Clusters["A"] = a
r.CA = a.(*DockerCluster).CA
return r, err
}

View File

@ -36,6 +36,10 @@ type ExecDevCluster struct {
Logger log.Logger
}
func (dc *ExecDevCluster) SetRootToken(token string) {
dc.rootToken = token
}
func (dc *ExecDevCluster) NamedLogger(s string) log.Logger {
return dc.Logger.Named(s)
}

View File

@ -0,0 +1,293 @@
package testcluster
import (
"context"
"fmt"
"reflect"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/vault/api"
"github.com/hashicorp/vault/sdk/helper/consts"
)
func GetPerformanceToken(pri VaultCluster, id, secondaryPublicKey string) (string, error) {
client := pri.Nodes()[0].APIClient()
req := map[string]interface{}{
"id": id,
}
if secondaryPublicKey != "" {
req["secondary_public_key"] = secondaryPublicKey
}
secret, err := client.Logical().Write("sys/replication/performance/primary/secondary-token", req)
if err != nil {
return "", err
}
if secondaryPublicKey != "" {
return secret.Data["token"].(string), nil
}
return secret.WrapInfo.Token, nil
}
func EnablePerfPrimary(ctx context.Context, pri VaultCluster) error {
client := pri.Nodes()[0].APIClient()
_, err := client.Logical().WriteWithContext(ctx, "sys/replication/performance/primary/enable", nil)
if err != nil {
return err
}
err = WaitForPerfReplicationState(ctx, pri, consts.ReplicationPerformancePrimary)
if err != nil {
return err
}
return WaitForActiveNodeAndPerfStandbys(ctx, pri)
}
func WaitForPerfReplicationState(ctx context.Context, cluster VaultCluster, state consts.ReplicationState) error {
client := cluster.Nodes()[0].APIClient()
var health *api.HealthResponse
var err error
for ctx.Err() == nil {
health, err = client.Sys().HealthWithContext(ctx)
if health.ReplicationPerformanceMode == state.GetPerformanceString() {
return nil
}
time.Sleep(500 * time.Millisecond)
}
if err == nil {
err = ctx.Err()
}
return err
}
func EnablePerformanceSecondaryNoWait(ctx context.Context, perfToken string, pri, sec VaultCluster, updatePrimary bool) error {
postData := map[string]interface{}{
"token": perfToken,
"ca_file": "/vault/config/ca.pem",
}
path := "sys/replication/performance/secondary/enable"
if updatePrimary {
path = "sys/replication/performance/secondary/update-primary"
}
err := WaitForActiveNodeAndPerfStandbys(ctx, sec)
if err != nil {
return err
}
_, err = sec.Nodes()[0].APIClient().Logical().Write(path, postData)
if err != nil {
return err
}
return WaitForPerfReplicationState(ctx, sec, consts.ReplicationPerformanceSecondary)
}
func EnablePerformanceSecondary(ctx context.Context, perfToken string, pri, sec VaultCluster, updatePrimary, skipPoisonPill bool) (string, error) {
if err := EnablePerformanceSecondaryNoWait(ctx, perfToken, pri, sec, updatePrimary); err != nil {
return "", err
}
if err := WaitForMatchingMerkleRoots(ctx, "sys/replication/performance/", pri, sec); err != nil {
return "", err
}
root, err := WaitForPerformanceSecondary(ctx, pri, sec, skipPoisonPill)
if err != nil {
return "", err
}
if err := WaitForPerfReplicationWorking(ctx, pri, sec); err != nil {
return "", err
}
return root, nil
}
func WaitForMatchingMerkleRoots(ctx context.Context, endpoint string, pri, sec VaultCluster) error {
getRoot := func(mode string, cli *api.Client) (string, error) {
status, err := cli.Logical().Read(endpoint + "status")
if err != nil {
return "", err
}
if status == nil || status.Data == nil || status.Data["mode"] == nil {
return "", fmt.Errorf("got nil secret or data")
}
if status.Data["mode"].(string) != mode {
return "", fmt.Errorf("expected mode=%s, got %s", mode, status.Data["mode"].(string))
}
return status.Data["merkle_root"].(string), nil
}
secClient := sec.Nodes()[0].APIClient()
priClient := pri.Nodes()[0].APIClient()
for i := 0; i < 30; i++ {
secRoot, err := getRoot("secondary", secClient)
if err != nil {
return err
}
priRoot, err := getRoot("primary", priClient)
if err != nil {
return err
}
if reflect.DeepEqual(priRoot, secRoot) {
return nil
}
time.Sleep(time.Second)
}
return fmt.Errorf("roots did not become equal")
}
func WaitForPerformanceSecondary(ctx context.Context, pri, sec VaultCluster, skipPoisonPill bool) (string, error) {
if len(pri.GetRecoveryKeys()) > 0 {
sec.SetBarrierKeys(pri.GetRecoveryKeys())
sec.SetRecoveryKeys(pri.GetRecoveryKeys())
} else {
sec.SetBarrierKeys(pri.GetBarrierKeys())
sec.SetRecoveryKeys(pri.GetBarrierKeys())
}
if len(sec.Nodes()) > 1 {
if skipPoisonPill {
// As part of prepareSecondary on the active node the keyring is
// deleted from storage. Its absence can cause standbys to seal
// themselves. But it's not reliable, so we'll seal them
// ourselves to force the issue.
for i := range sec.Nodes()[1:] {
if err := SealNode(ctx, sec, i+1); err != nil {
return "", err
}
}
} else {
// We want to make sure we unseal all the nodes so we first need to wait
// until two of the nodes seal due to the poison pill being written
if err := WaitForNCoresSealed(ctx, sec, len(sec.Nodes())-1); err != nil {
return "", err
}
}
}
if _, err := WaitForActiveNode(ctx, sec); err != nil {
return "", err
}
if err := UnsealAllNodes(ctx, sec); err != nil {
return "", err
}
perfSecondaryRootToken, err := GenerateRoot(sec, GenerateRootRegular)
if err != nil {
return "", err
}
sec.SetRootToken(perfSecondaryRootToken)
if err := WaitForActiveNodeAndPerfStandbys(ctx, sec); err != nil {
return "", err
}
return perfSecondaryRootToken, nil
}
func WaitForPerfReplicationWorking(ctx context.Context, pri, sec VaultCluster) error {
priClient, secClient := pri.Nodes()[0].APIClient(), sec.Nodes()[0].APIClient()
mountPoint, err := uuid.GenerateUUID()
if err != nil {
return err
}
err = priClient.Sys().Mount(mountPoint, &api.MountInput{
Type: "kv",
Local: false,
})
if err != nil {
return fmt.Errorf("unable to mount KV engine on primary")
}
path := mountPoint + "/foo"
_, err = priClient.Logical().Write(path, map[string]interface{}{
"bar": 1,
})
if err != nil {
return fmt.Errorf("unable to write KV on primary", "path", path)
}
for ctx.Err() == nil {
var secret *api.Secret
secret, err = secClient.Logical().Read(path)
if err == nil && secret != nil {
err = priClient.Sys().Unmount(mountPoint)
if err != nil {
return fmt.Errorf("unable to unmount KV engine on primary")
}
return nil
}
time.Sleep(100 * time.Millisecond)
}
return fmt.Errorf("unable to read replicated KV on secondary", "path", path, "err", err)
}
func SetupTwoClusterPerfReplication(ctx context.Context, pri, sec VaultCluster) error {
if err := EnablePerfPrimary(ctx, pri); err != nil {
return err
}
perfToken, err := GetPerformanceToken(pri, sec.ClusterID(), "")
if err != nil {
return err
}
_, err = EnablePerformanceSecondary(ctx, perfToken, pri, sec, false, false)
return err
}
type ReplicationSet struct {
// By convention, we recommend the following naming scheme for
// clusters in this map:
// A: perf primary
// B: primary's DR
// C: first perf secondary of A
// D: C's DR
// E: second perf secondary of A
// F: E's DR
// ... etc.
//
// We use generic names rather than role-specific names because
// that's less confusing when promotions take place that result in role
// changes. In other words, if D gets promoted to replace C as a perf
// secondary, and C gets demoted and updated to become D's DR secondary,
// they should maintain their initial names of D and C throughout.
Clusters map[string]VaultCluster
Builder ClusterBuilder
Logger hclog.Logger
CA *CA
}
type ClusterBuilder func(ctx context.Context, name string, logger hclog.Logger) (VaultCluster, error)
func NewReplicationSet(b ClusterBuilder) (*ReplicationSet, error) {
return &ReplicationSet{
Clusters: map[string]VaultCluster{},
Builder: b,
Logger: hclog.NewNullLogger(),
}, nil
}
func (r *ReplicationSet) StandardPerfReplication(ctx context.Context) error {
for _, name := range []string{"A", "C"} {
if _, ok := r.Clusters[name]; !ok {
cluster, err := r.Builder(ctx, name, r.Logger)
if err != nil {
return err
}
r.Clusters[name] = cluster
}
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
err := SetupTwoClusterPerfReplication(ctx, r.Clusters["A"], r.Clusters["C"])
if err != nil {
return err
}
return nil
}
func (r *ReplicationSet) Cleanup() {
for _, cluster := range r.Clusters {
cluster.Cleanup()
}
}

View File

@ -4,7 +4,9 @@
package testcluster
import (
"crypto/ecdsa"
"crypto/tls"
"crypto/x509"
"time"
"github.com/hashicorp/go-hclog"
@ -27,6 +29,7 @@ type VaultCluster interface {
Cleanup()
ClusterID() string
NamedLogger(string) hclog.Logger
SetRootToken(token string)
}
type VaultNodeConfig struct {
@ -96,3 +99,12 @@ type ClusterOptions struct {
VaultNodeConfig *VaultNodeConfig
VaultLicense string
}
type CA struct {
CACert *x509.Certificate
CACertBytes []byte
CACertPEM []byte
CACertPEMFile string
CAKey *ecdsa.PrivateKey
CAKeyPEM []byte
}

View File

@ -0,0 +1,35 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package replication_binary
/*
Example of how to use docker.NewReplicationSetDocker(t), assuming
you point VAULT_BINARY to an Enterprise Vault binary:
import (
"context"
"testing"
"time"
"github.com/hashicorp/vault/sdk/helper/testcluster/docker"
)
// TestStandardPerfReplication_Docker tests that we can create two 3-node
// clusters of docker containers and connect them using perf replication.
func TestStandardPerfReplication_Docker(t *testing.T) {
r, err := docker.NewReplicationSetDocker(t)
if err != nil {
t.Fatal(err)
}
defer r.Cleanup()
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
err = r.StandardPerfReplication(ctx)
if err != nil {
t.Fatal(err)
}
}
*/

View File

@ -801,6 +801,10 @@ type TestCluster struct {
opts *TestClusterOptions
}
func (c *TestCluster) SetRootToken(token string) {
c.RootToken = token
}
func (c *TestCluster) Start() {
}