Make -dev-four-cluster an ent-only option and remove ent-specific testhelpers (#7215)
This commit is contained in:
parent
d9894e5505
commit
ec2a1a11a6
|
@ -63,6 +63,11 @@ var _ cli.CommandAutocomplete = (*ServerCommand)(nil)
|
|||
|
||||
var memProfilerEnabled = false
|
||||
|
||||
var enableFourClusterDev = func(c *ServerCommand, base *vault.CoreConfig, info map[string]string, infoKeys []string, devListenAddress, tempDir string) int {
|
||||
c.logger.Error("-dev-four-cluster only supported in enterprise Vault")
|
||||
return 1
|
||||
}
|
||||
|
||||
const storageMigrationLock = "core/migration"
|
||||
|
||||
type ServerCommand struct {
|
||||
|
@ -705,7 +710,7 @@ func (c *ServerCommand) Run(args []string) int {
|
|||
}
|
||||
|
||||
if c.flagDevFourCluster {
|
||||
return c.enableFourClusterDev(coreConfig, info, infoKeys, c.flagDevListenAddr, os.Getenv("VAULT_DEV_TEMP_DIR"))
|
||||
return enableFourClusterDev(c, coreConfig, info, infoKeys, c.flagDevListenAddr, os.Getenv("VAULT_DEV_TEMP_DIR"))
|
||||
}
|
||||
|
||||
var disableClustering bool
|
||||
|
|
|
@ -1,314 +0,0 @@
|
|||
package command
|
||||
|
||||
import (
|
||||
"crypto/ecdsa"
|
||||
"crypto/elliptic"
|
||||
"crypto/rand"
|
||||
"crypto/x509"
|
||||
"crypto/x509/pkix"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/big"
|
||||
mathrand "math/rand"
|
||||
"net"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/vault/helper/namespace"
|
||||
"github.com/hashicorp/vault/helper/testhelpers"
|
||||
vaulthttp "github.com/hashicorp/vault/http"
|
||||
"github.com/hashicorp/vault/sdk/logical"
|
||||
"github.com/hashicorp/vault/sdk/version"
|
||||
"github.com/hashicorp/vault/vault"
|
||||
shamirseal "github.com/hashicorp/vault/vault/seal/shamir"
|
||||
testing "github.com/mitchellh/go-testing-interface"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func (c *ServerCommand) enableFourClusterDev(base *vault.CoreConfig, info map[string]string, infoKeys []string, devListenAddress, tempDir string) int {
|
||||
var err error
|
||||
ctx := namespace.RootContext(nil)
|
||||
clusters := map[string]*vault.TestCluster{}
|
||||
|
||||
if base.DevToken == "" {
|
||||
base.DevToken = "root"
|
||||
}
|
||||
base.EnableRaw = true
|
||||
|
||||
// Without setting something in the future we get bombarded with warnings which are quite annoying during testing
|
||||
base.DevLicenseDuration = 6 * time.Hour
|
||||
|
||||
// Set a base temp dir
|
||||
if tempDir == "" {
|
||||
tempDir, err = ioutil.TempDir("", "vault-test-cluster-")
|
||||
if err != nil {
|
||||
c.UI.Error(fmt.Sprintf("failed to create top-level temp dir: %s", err))
|
||||
return 1
|
||||
}
|
||||
}
|
||||
|
||||
caKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
|
||||
if err != nil {
|
||||
c.UI.Error(fmt.Sprintf("Failed to generate CA key: %s", err))
|
||||
return 1
|
||||
}
|
||||
certIPs := []net.IP{
|
||||
net.IPv6loopback,
|
||||
net.ParseIP("127.0.0.1"),
|
||||
}
|
||||
caCertTemplate := &x509.Certificate{
|
||||
Subject: pkix.Name{
|
||||
CommonName: "localhost",
|
||||
},
|
||||
DNSNames: []string{"localhost"},
|
||||
IPAddresses: certIPs,
|
||||
KeyUsage: x509.KeyUsage(x509.KeyUsageCertSign | x509.KeyUsageCRLSign),
|
||||
SerialNumber: big.NewInt(mathrand.Int63()),
|
||||
NotBefore: time.Now().Add(-30 * time.Second),
|
||||
NotAfter: time.Now().Add(262980 * time.Hour),
|
||||
BasicConstraintsValid: true,
|
||||
IsCA: true,
|
||||
}
|
||||
caBytes, err := x509.CreateCertificate(rand.Reader, caCertTemplate, caCertTemplate, caKey.Public(), caKey)
|
||||
if err != nil {
|
||||
c.UI.Error(fmt.Sprintf("Failed to generate certificate: %s", err))
|
||||
return 1
|
||||
}
|
||||
|
||||
getCluster := func(name string) error {
|
||||
factory := c.PhysicalBackends["inmem_transactional_ha"]
|
||||
backend, err := factory(nil, c.logger)
|
||||
if err != nil {
|
||||
c.UI.Error(fmt.Sprintf("Error initializing storage of type %s: %s", "inmem_transactional_ha", err))
|
||||
return errors.New("")
|
||||
}
|
||||
base.Physical = backend
|
||||
base.Seal = vault.NewDefaultSeal(shamirseal.NewSeal(c.logger.Named("shamir")))
|
||||
|
||||
testCluster := vault.NewTestCluster(&testing.RuntimeT{}, base, &vault.TestClusterOptions{
|
||||
HandlerFunc: vaulthttp.Handler,
|
||||
//BaseListenAddress: c.flagDevListenAddr,
|
||||
Logger: c.logger.Named(name),
|
||||
TempDir: fmt.Sprintf("%s/%s", tempDir, name),
|
||||
CAKey: caKey,
|
||||
CACert: caBytes,
|
||||
})
|
||||
|
||||
clusters[name] = testCluster
|
||||
|
||||
for i, core := range testCluster.Cores {
|
||||
info[fmt.Sprintf("%s node %d redirect address", name, i)] = fmt.Sprintf("https://%s", core.Listeners[0].Address.String())
|
||||
infoKeys = append(infoKeys, fmt.Sprintf("%s node %d redirect address", name, i))
|
||||
core.Server.Handler = vaulthttp.Handler(&vault.HandlerProperties{
|
||||
Core: core.Core,
|
||||
})
|
||||
core.SetClusterHandler(core.Server.Handler)
|
||||
}
|
||||
|
||||
testCluster.Start()
|
||||
|
||||
req := &logical.Request{
|
||||
ID: "dev-gen-root",
|
||||
Operation: logical.UpdateOperation,
|
||||
ClientToken: testCluster.RootToken,
|
||||
Path: "auth/token/create",
|
||||
Data: map[string]interface{}{
|
||||
"id": base.DevToken,
|
||||
"policies": []string{"root"},
|
||||
"no_parent": true,
|
||||
"no_default_policy": true,
|
||||
},
|
||||
}
|
||||
resp, err := testCluster.Cores[0].HandleRequest(ctx, req)
|
||||
if err != nil {
|
||||
c.UI.Error(fmt.Sprintf("failed to create root token with ID %s: %s", base.DevToken, err))
|
||||
return errors.New("")
|
||||
}
|
||||
if resp == nil {
|
||||
c.UI.Error(fmt.Sprintf("nil response when creating root token with ID %s", base.DevToken))
|
||||
return errors.New("")
|
||||
}
|
||||
if resp.Auth == nil {
|
||||
c.UI.Error(fmt.Sprintf("nil auth when creating root token with ID %s", base.DevToken))
|
||||
return errors.New("")
|
||||
}
|
||||
|
||||
testCluster.RootToken = resp.Auth.ClientToken
|
||||
|
||||
req.ID = "dev-revoke-init-root"
|
||||
req.Path = "auth/token/revoke-self"
|
||||
req.Data = nil
|
||||
resp, err = testCluster.Cores[0].HandleRequest(ctx, req)
|
||||
if err != nil {
|
||||
c.UI.Output(fmt.Sprintf("failed to revoke initial root token: %s", err))
|
||||
return errors.New("")
|
||||
}
|
||||
|
||||
for _, core := range testCluster.Cores {
|
||||
core.Client.SetToken(base.DevToken)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
err = getCluster("perf-pri")
|
||||
if err != nil {
|
||||
return 1
|
||||
}
|
||||
err = getCluster("perf-pri-dr")
|
||||
if err != nil {
|
||||
return 1
|
||||
}
|
||||
err = getCluster("perf-sec")
|
||||
if err != nil {
|
||||
return 1
|
||||
}
|
||||
err = getCluster("perf-sec-dr")
|
||||
if err != nil {
|
||||
return 1
|
||||
}
|
||||
|
||||
clusterCleanup := func() {
|
||||
for name, cluster := range clusters {
|
||||
cluster.Cleanup()
|
||||
|
||||
// Shutdown will wait until after Vault is sealed, which means the
|
||||
// request forwarding listeners will also be closed (and also
|
||||
// waited for).
|
||||
for _, core := range cluster.Cores {
|
||||
if err := core.Shutdown(); err != nil {
|
||||
c.UI.Error(fmt.Sprintf("Error with cluster %s core shutdown: %s", name, err))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
defer c.cleanupGuard.Do(clusterCleanup)
|
||||
|
||||
info["cluster parameters path"] = tempDir
|
||||
infoKeys = append(infoKeys, "cluster parameters path")
|
||||
|
||||
verInfo := version.GetVersion()
|
||||
info["version"] = verInfo.FullVersionNumber(false)
|
||||
infoKeys = append(infoKeys, "version")
|
||||
if verInfo.Revision != "" {
|
||||
info["version sha"] = strings.Trim(verInfo.Revision, "'")
|
||||
infoKeys = append(infoKeys, "version sha")
|
||||
}
|
||||
infoKeys = append(infoKeys, "cgo")
|
||||
info["cgo"] = "disabled"
|
||||
if version.CgoEnabled {
|
||||
info["cgo"] = "enabled"
|
||||
}
|
||||
|
||||
// Server configuration output
|
||||
padding := 40
|
||||
sort.Strings(infoKeys)
|
||||
c.UI.Output("==> Vault server configuration:\n")
|
||||
for _, k := range infoKeys {
|
||||
c.UI.Output(fmt.Sprintf(
|
||||
"%s%s: %s",
|
||||
strings.Repeat(" ", padding-len(k)),
|
||||
strings.Title(k),
|
||||
info[k]))
|
||||
}
|
||||
c.UI.Output("")
|
||||
|
||||
// Set the token
|
||||
tokenHelper, err := c.TokenHelper()
|
||||
if err != nil {
|
||||
c.UI.Error(fmt.Sprintf("Error getting token helper: %s", err))
|
||||
return 1
|
||||
}
|
||||
if err := tokenHelper.Store(base.DevToken); err != nil {
|
||||
c.UI.Error(fmt.Sprintf("Error storing in token helper: %s", err))
|
||||
return 1
|
||||
}
|
||||
|
||||
if err := ioutil.WriteFile(filepath.Join(tempDir, "root_token"), []byte(base.DevToken), 0755); err != nil {
|
||||
c.UI.Error(fmt.Sprintf("Error writing token to tempfile: %s", err))
|
||||
return 1
|
||||
}
|
||||
|
||||
c.UI.Output(fmt.Sprintf(
|
||||
"\nRoot Token: %s\n", base.DevToken,
|
||||
))
|
||||
|
||||
for i, key := range clusters["perf-pri"].BarrierKeys {
|
||||
c.UI.Output(fmt.Sprintf(
|
||||
"Unseal Key %d: %s",
|
||||
i+1, base64.StdEncoding.EncodeToString(key),
|
||||
))
|
||||
}
|
||||
|
||||
c.UI.Output(fmt.Sprintf(
|
||||
"\nUseful env vars:\n"+
|
||||
"export VAULT_TOKEN=%s\n"+
|
||||
"export VAULT_CACERT=%s/perf-pri/ca_cert.pem\n",
|
||||
base.DevToken,
|
||||
tempDir,
|
||||
))
|
||||
c.UI.Output(fmt.Sprintf("Addresses of initial active nodes:"))
|
||||
clusterNames := []string{}
|
||||
for name := range clusters {
|
||||
clusterNames = append(clusterNames, name)
|
||||
}
|
||||
sort.Strings(clusterNames)
|
||||
for _, name := range clusterNames {
|
||||
c.UI.Output(fmt.Sprintf(
|
||||
"%s:\n"+
|
||||
"export VAULT_ADDR=%s\n",
|
||||
name,
|
||||
clusters[name].Cores[0].Client.Address(),
|
||||
))
|
||||
}
|
||||
|
||||
// Output the header that the server has started
|
||||
c.UI.Output("==> Vault clusters started! Log data will stream in below:\n")
|
||||
|
||||
// Inform any tests that the server is ready
|
||||
select {
|
||||
case c.startedCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
|
||||
// Release the log gate.
|
||||
c.logGate.Flush()
|
||||
|
||||
testhelpers.SetupFourClusterReplication(&testing.RuntimeT{},
|
||||
clusters["perf-pri"],
|
||||
clusters["perf-sec"],
|
||||
clusters["perf-pri-dr"],
|
||||
clusters["perf-sec-dr"],
|
||||
)
|
||||
|
||||
// Wait for shutdown
|
||||
shutdownTriggered := false
|
||||
|
||||
for !shutdownTriggered {
|
||||
select {
|
||||
case <-c.ShutdownCh:
|
||||
c.UI.Output("==> Vault shutdown triggered")
|
||||
|
||||
// Stop the listeners so that we don't process further client requests.
|
||||
c.cleanupGuard.Do(clusterCleanup)
|
||||
|
||||
shutdownTriggered = true
|
||||
|
||||
case <-c.SighupCh:
|
||||
c.UI.Output("==> Vault reload triggered")
|
||||
for name, cluster := range clusters {
|
||||
for _, core := range cluster.Cores {
|
||||
if err := c.Reload(core.ReloadFuncsLock, core.ReloadFuncs, nil); err != nil {
|
||||
c.UI.Error(fmt.Sprintf("Error(s) were encountered during reload of cluster %s cores: %s", name, err))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
|
@ -7,79 +7,22 @@ import (
|
|||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-uuid"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/vault/physical/raft"
|
||||
"github.com/hashicorp/vault/sdk/helper/logging"
|
||||
"github.com/hashicorp/vault/sdk/logical"
|
||||
"github.com/hashicorp/vault/vault/cluster"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
raftlib "github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/vault/api"
|
||||
credAppRole "github.com/hashicorp/vault/builtin/credential/approle"
|
||||
credUserpass "github.com/hashicorp/vault/builtin/credential/userpass"
|
||||
"github.com/hashicorp/vault/helper/namespace"
|
||||
"github.com/hashicorp/vault/helper/xor"
|
||||
"github.com/hashicorp/vault/sdk/helper/consts"
|
||||
"github.com/hashicorp/vault/physical/raft"
|
||||
"github.com/hashicorp/vault/sdk/physical"
|
||||
"github.com/hashicorp/vault/sdk/physical/inmem"
|
||||
"github.com/hashicorp/vault/vault"
|
||||
testing "github.com/mitchellh/go-testing-interface"
|
||||
"github.com/mitchellh/go-testing-interface"
|
||||
)
|
||||
|
||||
type ReplicatedTestClusters struct {
|
||||
PerfPrimaryCluster *vault.TestCluster
|
||||
PerfSecondaryCluster *vault.TestCluster
|
||||
PerfPrimaryDRCluster *vault.TestCluster
|
||||
PerfSecondaryDRCluster *vault.TestCluster
|
||||
}
|
||||
|
||||
func (r *ReplicatedTestClusters) nonNilClusters() []*vault.TestCluster {
|
||||
all := []*vault.TestCluster{r.PerfPrimaryCluster, r.PerfSecondaryCluster,
|
||||
r.PerfPrimaryDRCluster, r.PerfSecondaryDRCluster}
|
||||
|
||||
var ret []*vault.TestCluster
|
||||
for _, cluster := range all {
|
||||
if cluster != nil {
|
||||
ret = append(ret, cluster)
|
||||
}
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func (r *ReplicatedTestClusters) Cleanup() {
|
||||
for _, cluster := range r.nonNilClusters() {
|
||||
cluster.Cleanup()
|
||||
}
|
||||
}
|
||||
|
||||
func (r *ReplicatedTestClusters) Primary() (*vault.TestCluster, *vault.TestClusterCore, *api.Client) {
|
||||
return r.PerfPrimaryCluster, r.PerfPrimaryCluster.Cores[0], r.PerfPrimaryCluster.Cores[0].Client
|
||||
}
|
||||
|
||||
func (r *ReplicatedTestClusters) Secondary() (*vault.TestCluster, *vault.TestClusterCore, *api.Client) {
|
||||
return r.PerfSecondaryCluster, r.PerfSecondaryCluster.Cores[0], r.PerfSecondaryCluster.Cores[0].Client
|
||||
}
|
||||
|
||||
func (r *ReplicatedTestClusters) PrimaryDR() (*vault.TestCluster, *vault.TestClusterCore, *api.Client) {
|
||||
return r.PerfPrimaryDRCluster, r.PerfPrimaryDRCluster.Cores[0], r.PerfPrimaryDRCluster.Cores[0].Client
|
||||
}
|
||||
|
||||
func (r *ReplicatedTestClusters) SecondaryDR() (*vault.TestCluster, *vault.TestClusterCore, *api.Client) {
|
||||
return r.PerfSecondaryDRCluster, r.PerfSecondaryDRCluster.Cores[0], r.PerfSecondaryDRCluster.Cores[0].Client
|
||||
}
|
||||
|
||||
// Generates a root token on the target cluster.
|
||||
func GenerateRoot(t testing.T, cluster *vault.TestCluster, drToken bool) string {
|
||||
token, err := GenerateRootWithError(t, cluster, drToken)
|
||||
|
@ -205,349 +148,6 @@ func EnsureCoreUnsealed(t testing.T, c *vault.TestCluster, core *vault.TestClust
|
|||
}
|
||||
}
|
||||
|
||||
func EnsureCoreIsPerfStandby(t testing.T, client *api.Client) {
|
||||
t.Helper()
|
||||
logger := logging.NewVaultLogger(log.Info).Named(t.Name())
|
||||
start := time.Now()
|
||||
for {
|
||||
health, err := client.Sys().Health()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if health.PerformanceStandby {
|
||||
break
|
||||
}
|
||||
|
||||
logger.Info("waiting for performance standby", "health", health)
|
||||
time.Sleep(time.Millisecond * 500)
|
||||
if time.Now().After(start.Add(time.Second * 60)) {
|
||||
t.Fatal("did not become a perf standby")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func WaitForReplicationState(t testing.T, c *vault.Core, state consts.ReplicationState) {
|
||||
timeout := time.Now().Add(10 * time.Second)
|
||||
for {
|
||||
if time.Now().After(timeout) {
|
||||
t.Fatalf("timeout waiting for core to have state %d", uint32(state))
|
||||
}
|
||||
state := c.ReplicationState()
|
||||
if state.HasState(state) {
|
||||
break
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
type PassthroughWithLocalPaths struct {
|
||||
logical.Backend
|
||||
}
|
||||
|
||||
func (p *PassthroughWithLocalPaths) SpecialPaths() *logical.Paths {
|
||||
return &logical.Paths{
|
||||
LocalStorage: []string{
|
||||
"*",
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func PassthroughWithLocalPathsFactory(ctx context.Context, c *logical.BackendConfig) (logical.Backend, error) {
|
||||
b, err := vault.PassthroughBackendFactory(ctx, c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &PassthroughWithLocalPaths{b}, nil
|
||||
}
|
||||
|
||||
func ConfClusterAndCore(t testing.T, conf *vault.CoreConfig, opts *vault.TestClusterOptions) (*vault.TestCluster, *vault.TestClusterCore) {
|
||||
if conf.Physical != nil || conf.HAPhysical != nil {
|
||||
t.Fatalf("conf.Physical and conf.HAPhysical cannot be specified")
|
||||
}
|
||||
if opts.Logger == nil {
|
||||
t.Fatalf("opts.Logger must be specified")
|
||||
}
|
||||
|
||||
inm, err := inmem.NewTransactionalInmem(nil, opts.Logger)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
inmha, err := inmem.NewInmemHA(nil, opts.Logger)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
coreConfig := *conf
|
||||
coreConfig.Physical = inm
|
||||
coreConfig.HAPhysical = inmha.(physical.HABackend)
|
||||
coreConfig.CredentialBackends = map[string]logical.Factory{
|
||||
"approle": credAppRole.Factory,
|
||||
"userpass": credUserpass.Factory,
|
||||
}
|
||||
vault.AddNoopAudit(&coreConfig)
|
||||
cluster := vault.NewTestCluster(t, &coreConfig, opts)
|
||||
cluster.Start()
|
||||
|
||||
cores := cluster.Cores
|
||||
core := cores[0]
|
||||
|
||||
vault.TestWaitActive(t, core.Core)
|
||||
|
||||
return cluster, core
|
||||
}
|
||||
|
||||
func GetPerfReplicatedClusters(t testing.T, conf *vault.CoreConfig, opts *vault.TestClusterOptions) *ReplicatedTestClusters {
|
||||
ret := &ReplicatedTestClusters{}
|
||||
|
||||
var logger hclog.Logger
|
||||
if opts != nil {
|
||||
logger = opts.Logger
|
||||
}
|
||||
if logger == nil {
|
||||
logger = log.New(&log.LoggerOptions{
|
||||
Mutex: &sync.Mutex{},
|
||||
Level: log.Trace,
|
||||
})
|
||||
}
|
||||
|
||||
// Set this lower so that state populates quickly to standby nodes
|
||||
cluster.HeartbeatInterval = 2 * time.Second
|
||||
|
||||
numCores := opts.NumCores
|
||||
if numCores == 0 {
|
||||
numCores = vault.DefaultNumCores
|
||||
}
|
||||
|
||||
localopts := *opts
|
||||
localopts.Logger = logger.Named("perf-pri")
|
||||
ret.PerfPrimaryCluster, _ = ConfClusterAndCore(t, conf, &localopts)
|
||||
|
||||
localopts.Logger = logger.Named("perf-sec")
|
||||
localopts.FirstCoreNumber += numCores
|
||||
ret.PerfSecondaryCluster, _ = ConfClusterAndCore(t, conf, &localopts)
|
||||
|
||||
SetupTwoClusterPerfReplication(t, ret.PerfPrimaryCluster, ret.PerfSecondaryCluster)
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
func GetFourReplicatedClusters(t testing.T, handlerFunc func(*vault.HandlerProperties) http.Handler) *ReplicatedTestClusters {
|
||||
return GetFourReplicatedClustersWithConf(t, &vault.CoreConfig{}, &vault.TestClusterOptions{
|
||||
HandlerFunc: handlerFunc,
|
||||
})
|
||||
}
|
||||
|
||||
func GetFourReplicatedClustersWithConf(t testing.T, conf *vault.CoreConfig, opts *vault.TestClusterOptions) *ReplicatedTestClusters {
|
||||
ret := &ReplicatedTestClusters{}
|
||||
|
||||
logger := log.New(&log.LoggerOptions{
|
||||
Mutex: &sync.Mutex{},
|
||||
Level: log.Trace,
|
||||
})
|
||||
// Set this lower so that state populates quickly to standby nodes
|
||||
cluster.HeartbeatInterval = 2 * time.Second
|
||||
|
||||
numCores := opts.NumCores
|
||||
if numCores == 0 {
|
||||
numCores = vault.DefaultNumCores
|
||||
}
|
||||
|
||||
localopts := *opts
|
||||
localopts.Logger = logger.Named("perf-pri")
|
||||
ret.PerfPrimaryCluster, _ = ConfClusterAndCore(t, conf, &localopts)
|
||||
|
||||
localopts.Logger = logger.Named("perf-sec")
|
||||
localopts.FirstCoreNumber += numCores
|
||||
ret.PerfSecondaryCluster, _ = ConfClusterAndCore(t, conf, &localopts)
|
||||
|
||||
localopts.Logger = logger.Named("perf-pri-dr")
|
||||
localopts.FirstCoreNumber += numCores
|
||||
ret.PerfPrimaryDRCluster, _ = ConfClusterAndCore(t, conf, &localopts)
|
||||
|
||||
localopts.Logger = logger.Named("perf-sec-dr")
|
||||
localopts.FirstCoreNumber += numCores
|
||||
ret.PerfSecondaryDRCluster, _ = ConfClusterAndCore(t, conf, &localopts)
|
||||
|
||||
builder := &ReplicatedTestClustersBuilder{clusters: ret}
|
||||
builder.setupFourClusterReplication(t)
|
||||
|
||||
// Wait until poison pills have been read
|
||||
time.Sleep(45 * time.Second)
|
||||
EnsureCoresUnsealed(t, ret.PerfPrimaryCluster)
|
||||
EnsureCoresUnsealed(t, ret.PerfSecondaryCluster)
|
||||
EnsureCoresUnsealed(t, ret.PerfPrimaryDRCluster)
|
||||
EnsureCoresUnsealed(t, ret.PerfSecondaryDRCluster)
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
type ReplicatedTestClustersBuilder struct {
|
||||
clusters *ReplicatedTestClusters
|
||||
perfToken string
|
||||
drToken string
|
||||
perfSecondaryRootToken string
|
||||
perfSecondaryDRToken string
|
||||
}
|
||||
|
||||
func SetupTwoClusterPerfReplication(t testing.T, pri, sec *vault.TestCluster) {
|
||||
clusters := &ReplicatedTestClusters{
|
||||
PerfPrimaryCluster: pri,
|
||||
PerfSecondaryCluster: sec,
|
||||
}
|
||||
builder := &ReplicatedTestClustersBuilder{clusters: clusters}
|
||||
builder.setupTwoClusterReplication(t)
|
||||
}
|
||||
|
||||
func (r *ReplicatedTestClustersBuilder) setupTwoClusterReplication(t testing.T) {
|
||||
t.Log("enabling perf primary")
|
||||
r.enablePerfPrimary(t)
|
||||
WaitForActiveNode(t, r.clusters.PerfPrimaryCluster)
|
||||
r.getPerformanceToken(t)
|
||||
t.Log("enabling perf secondary")
|
||||
r.enablePerformanceSecondary(t)
|
||||
}
|
||||
|
||||
func SetupFourClusterReplication(t testing.T, pri, sec, pridr, secdr *vault.TestCluster) {
|
||||
clusters := &ReplicatedTestClusters{
|
||||
PerfPrimaryCluster: pri,
|
||||
PerfSecondaryCluster: sec,
|
||||
PerfPrimaryDRCluster: pridr,
|
||||
PerfSecondaryDRCluster: secdr,
|
||||
}
|
||||
builder := &ReplicatedTestClustersBuilder{clusters: clusters}
|
||||
builder.setupFourClusterReplication(t)
|
||||
}
|
||||
|
||||
func (r *ReplicatedTestClustersBuilder) setupFourClusterReplication(t testing.T) {
|
||||
t.Log("enabling perf primary")
|
||||
r.enablePerfPrimary(t)
|
||||
r.getPerformanceToken(t)
|
||||
|
||||
t.Log("enabling dr primary")
|
||||
enableDrPrimary(t, r.clusters.PerfPrimaryCluster)
|
||||
r.drToken = getDrToken(t, r.clusters.PerfPrimaryCluster, "primary-dr-secondary")
|
||||
WaitForActiveNode(t, r.clusters.PerfPrimaryCluster)
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
t.Log("enabling perf secondary")
|
||||
r.enablePerformanceSecondary(t)
|
||||
enableDrPrimary(t, r.clusters.PerfSecondaryCluster)
|
||||
r.perfSecondaryDRToken = getDrToken(t, r.clusters.PerfSecondaryCluster, "secondary-dr-secondary")
|
||||
|
||||
t.Log("enabling dr secondary on primary dr cluster")
|
||||
r.enableDrSecondary(t, r.clusters.PerfPrimaryDRCluster, r.drToken, r.clusters.PerfPrimaryCluster.CACertPEMFile)
|
||||
r.clusters.PerfPrimaryDRCluster.Cores[0].Client.SetToken(r.clusters.PerfPrimaryCluster.Cores[0].Client.Token())
|
||||
WaitForActiveNode(t, r.clusters.PerfPrimaryDRCluster)
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
t.Log("enabling dr secondary on secondary dr cluster")
|
||||
r.enableDrSecondary(t, r.clusters.PerfSecondaryDRCluster, r.perfSecondaryDRToken, r.clusters.PerfSecondaryCluster.CACertPEMFile)
|
||||
r.clusters.PerfSecondaryDRCluster.Cores[0].Client.SetToken(r.perfSecondaryRootToken)
|
||||
WaitForActiveNode(t, r.clusters.PerfSecondaryDRCluster)
|
||||
}
|
||||
|
||||
func (r *ReplicatedTestClustersBuilder) enablePerfPrimary(t testing.T) {
|
||||
c := r.clusters.PerfPrimaryCluster.Cores[0]
|
||||
_, err := c.Client.Logical().Write("sys/replication/performance/primary/enable", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
WaitForReplicationState(t, c.Core, consts.ReplicationPerformancePrimary)
|
||||
WaitForActiveNodeAndPerfStandbys(t, r.clusters.PerfPrimaryCluster)
|
||||
}
|
||||
|
||||
func (r *ReplicatedTestClustersBuilder) getPerformanceToken(t testing.T) {
|
||||
client := r.clusters.PerfPrimaryCluster.Cores[0].Client
|
||||
req := map[string]interface{}{
|
||||
"id": r.clusters.PerfSecondaryCluster.ID,
|
||||
}
|
||||
secret, err := client.Logical().Write("sys/replication/performance/primary/secondary-token", req)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
r.perfToken = secret.WrapInfo.Token
|
||||
}
|
||||
|
||||
func enableDrPrimary(t testing.T, tc *vault.TestCluster) {
|
||||
c := tc.Cores[0]
|
||||
_, err := c.Client.Logical().Write("sys/replication/dr/primary/enable", nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
WaitForReplicationStatus(t, c.Client, true, func(secret map[string]interface{}) bool {
|
||||
return secret["mode"] != nil && secret["mode"] == "primary"
|
||||
})
|
||||
}
|
||||
|
||||
func getDrToken(t testing.T, tc *vault.TestCluster, id string) string {
|
||||
req := map[string]interface{}{
|
||||
"id": id,
|
||||
}
|
||||
secret, err := tc.Cores[0].Client.Logical().Write("sys/replication/dr/primary/secondary-token", req)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return secret.WrapInfo.Token
|
||||
}
|
||||
|
||||
func (r *ReplicatedTestClustersBuilder) enablePerformanceSecondary(t testing.T) {
|
||||
c := r.clusters.PerfSecondaryCluster.Cores[0]
|
||||
postData := map[string]interface{}{
|
||||
"token": r.perfToken,
|
||||
"ca_file": r.clusters.PerfPrimaryCluster.CACertPEMFile,
|
||||
}
|
||||
if r.clusters.PerfPrimaryCluster.ClientAuthRequired {
|
||||
p := r.clusters.PerfPrimaryCluster.Cores[0]
|
||||
postData["client_cert_pem"] = string(p.ServerCertPEM)
|
||||
postData["client_key_pem"] = string(p.ServerKeyPEM)
|
||||
}
|
||||
_, err := c.Client.Logical().Write("sys/replication/performance/secondary/enable", postData)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
WaitForReplicationState(t, c.Core, consts.ReplicationPerformanceSecondary)
|
||||
|
||||
r.clusters.PerfSecondaryCluster.BarrierKeys = r.clusters.PerfPrimaryCluster.BarrierKeys
|
||||
|
||||
// We want to make sure we unseal all the nodes so we first need to wait
|
||||
// until two of the nodes seal due to the poison pill being written
|
||||
WaitForNCoresSealed(t, r.clusters.PerfSecondaryCluster, 2)
|
||||
EnsureCoresUnsealed(t, r.clusters.PerfSecondaryCluster)
|
||||
WaitForActiveNode(t, r.clusters.PerfSecondaryCluster)
|
||||
|
||||
r.perfSecondaryRootToken = GenerateRoot(t, r.clusters.PerfSecondaryCluster, false)
|
||||
for _, core := range r.clusters.PerfSecondaryCluster.Cores {
|
||||
core.Client.SetToken(r.perfSecondaryRootToken)
|
||||
}
|
||||
|
||||
WaitForPerfReplicationWorking(t, r.clusters.PerfPrimaryCluster, r.clusters.PerfSecondaryCluster)
|
||||
}
|
||||
|
||||
func (r *ReplicatedTestClustersBuilder) enableDrSecondary(t testing.T, tc *vault.TestCluster, token, ca_file string) {
|
||||
_, err := tc.Cores[0].Client.Logical().Write("sys/replication/dr/secondary/enable", map[string]interface{}{
|
||||
"token": token,
|
||||
"ca_file": ca_file,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
WaitForReplicationState(t, tc.Cores[0].Core, consts.ReplicationDRSecondary)
|
||||
tc.BarrierKeys = r.clusters.PerfPrimaryCluster.BarrierKeys
|
||||
|
||||
// We want to make sure we unseal all the nodes so we first need to wait
|
||||
// until two of the nodes seal due to the poison pill being written
|
||||
WaitForNCoresSealed(t, tc, len(tc.Cores)-1)
|
||||
EnsureCoresUnsealed(t, tc)
|
||||
WaitForReplicationStatus(t, tc.Cores[0].Client, true, func(secret map[string]interface{}) bool {
|
||||
return secret["mode"] != nil && secret["mode"] == "secondary"
|
||||
})
|
||||
}
|
||||
|
||||
func EnsureStableActiveNode(t testing.T, cluster *vault.TestCluster) {
|
||||
activeCore := DeriveActiveCore(t, cluster)
|
||||
|
||||
|
@ -633,73 +233,6 @@ func WaitForNCoresSealed(t testing.T, cluster *vault.TestCluster, n int) {
|
|||
t.Fatalf("%d cores were not sealed", n)
|
||||
}
|
||||
|
||||
func WaitForActiveNodeAndPerfStandbys(t testing.T, cluster *vault.TestCluster) {
|
||||
t.Helper()
|
||||
mountPoint, err := uuid.GenerateUUID()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = cluster.Cores[0].Client.Sys().Mount(mountPoint, &api.MountInput{
|
||||
Type: "kv",
|
||||
Local: false,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal("unable to mount KV engine")
|
||||
}
|
||||
path := mountPoint + "/foo"
|
||||
var standbys, actives int64
|
||||
var wg sync.WaitGroup
|
||||
deadline := time.Now().Add(30 * time.Second)
|
||||
for i, c := range cluster.Cores {
|
||||
wg.Add(1)
|
||||
go func(coreIdx int, client *api.Client) {
|
||||
defer wg.Done()
|
||||
val := 1
|
||||
for time.Now().Before(deadline) {
|
||||
if coreIdx == 0 {
|
||||
_, err = cluster.Cores[0].Client.Logical().Write(path, map[string]interface{}{
|
||||
"bar": val,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal("unable to write KV", "path", path)
|
||||
}
|
||||
}
|
||||
val++
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
|
||||
if coreIdx > 0 || atomic.LoadInt64(&actives) == 0 {
|
||||
leader, err := client.Sys().Leader()
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "Vault is sealed") {
|
||||
continue
|
||||
}
|
||||
t.Fatal(err)
|
||||
}
|
||||
if leader.IsSelf {
|
||||
atomic.AddInt64(&actives, 1)
|
||||
}
|
||||
if leader.PerfStandby && leader.PerfStandbyLastRemoteWAL > 0 {
|
||||
atomic.AddInt64(&standbys, 1)
|
||||
return
|
||||
}
|
||||
}
|
||||
if coreIdx == 0 && int(atomic.LoadInt64(&standbys)) == len(cluster.Cores)-1 {
|
||||
return
|
||||
}
|
||||
}
|
||||
}(i, c.Client)
|
||||
}
|
||||
wg.Wait()
|
||||
if actives != 1 || int(standbys) != len(cluster.Cores)-1 {
|
||||
t.Fatalf("expected 1 active core and %d standbys, got %d active and %d standbys",
|
||||
len(cluster.Cores)-1, actives, standbys)
|
||||
}
|
||||
err = cluster.Cores[0].Client.Sys().Unmount(mountPoint)
|
||||
if err != nil {
|
||||
t.Fatal("unable to unmount KV engine on primary")
|
||||
}
|
||||
}
|
||||
|
||||
func WaitForActiveNode(t testing.T, cluster *vault.TestCluster) *vault.TestClusterCore {
|
||||
t.Helper()
|
||||
for i := 0; i < 30; i++ {
|
||||
|
@ -716,90 +249,6 @@ func WaitForActiveNode(t testing.T, cluster *vault.TestCluster) *vault.TestClust
|
|||
return nil
|
||||
}
|
||||
|
||||
func WaitForMatchingMerkleRoots(t testing.T, endpoint string, primary, secondary *api.Client) {
|
||||
getRoot := func(mode string, cli *api.Client) string {
|
||||
status, err := cli.Logical().Read(endpoint + "status")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if status == nil || status.Data == nil || status.Data["mode"] == nil {
|
||||
t.Fatal("got nil secret or data")
|
||||
}
|
||||
if status.Data["mode"].(string) != mode {
|
||||
t.Fatalf("expected mode=%s, got %s", mode, status.Data["mode"].(string))
|
||||
}
|
||||
return status.Data["merkle_root"].(string)
|
||||
}
|
||||
|
||||
t.Helper()
|
||||
for i := 0; i < 30; i++ {
|
||||
secRoot := getRoot("secondary", secondary)
|
||||
priRoot := getRoot("primary", primary)
|
||||
|
||||
if reflect.DeepEqual(priRoot, secRoot) {
|
||||
return
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
t.Fatalf("roots did not become equal")
|
||||
}
|
||||
|
||||
func WaitForMatchingMerkleRootsCore(t testing.T, pri, sec *vault.TestClusterCore, dr bool) {
|
||||
rootFunc := vault.PerformanceMerkleRoot
|
||||
if dr {
|
||||
rootFunc = vault.DRMerkleRoot
|
||||
}
|
||||
|
||||
t.Helper()
|
||||
for i := 0; i < 30; i++ {
|
||||
secRoot := rootFunc(pri.Core)
|
||||
priRoot := rootFunc(pri.Core)
|
||||
|
||||
if reflect.DeepEqual(priRoot, secRoot) {
|
||||
return
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
t.Fatalf("roots did not become equal")
|
||||
}
|
||||
|
||||
func WaitForReplicationStatus(t testing.T, client *api.Client, dr bool, accept func(map[string]interface{}) bool) {
|
||||
t.Helper()
|
||||
url := "sys/replication/performance/status"
|
||||
if dr {
|
||||
url = "sys/replication/dr/status"
|
||||
}
|
||||
|
||||
var err error
|
||||
var secret *api.Secret
|
||||
for i := 0; i < 30; i++ {
|
||||
secret, err = client.Logical().Read(url)
|
||||
if err == nil && secret != nil && secret.Data != nil {
|
||||
if accept(secret.Data) {
|
||||
return
|
||||
}
|
||||
}
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
}
|
||||
t.Fatalf("unable to get acceptable replication status: error=%v secret=%#v", err, secret)
|
||||
}
|
||||
|
||||
func WaitForWAL(t testing.T, c *vault.TestClusterCore, wal uint64) {
|
||||
t.Helper()
|
||||
timeout := time.Now().Add(3 * time.Second)
|
||||
for {
|
||||
if time.Now().After(timeout) {
|
||||
t.Fatal("timeout waiting for WAL", "segment", wal, "lastrmtwal", vault.LastRemoteWAL(c.Core))
|
||||
}
|
||||
if vault.LastRemoteWAL(c.Core) >= wal {
|
||||
break
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func RekeyCluster(t testing.T, cluster *vault.TestCluster) {
|
||||
client := cluster.Cores[0].Client
|
||||
|
||||
|
@ -927,43 +376,3 @@ func RaftClusterJoinNodes(t testing.T, cluster *vault.TestCluster) {
|
|||
|
||||
WaitForNCoresUnsealed(t, cluster, 3)
|
||||
}
|
||||
|
||||
// WaitForPerfReplicationWorking mounts a KV non-locally, writes to it on pri, and waits for the value to be readable on sec.
|
||||
func WaitForPerfReplicationWorking(t testing.T, pri, sec *vault.TestCluster) {
|
||||
t.Helper()
|
||||
priClient, secClient := pri.Cores[0].Client, sec.Cores[0].Client
|
||||
mountPoint, err := uuid.GenerateUUID()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = priClient.Sys().Mount(mountPoint, &api.MountInput{
|
||||
Type: "kv",
|
||||
Local: false,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal("unable to mount KV engine on primary")
|
||||
}
|
||||
|
||||
path := mountPoint + "/foo"
|
||||
_, err = priClient.Logical().Write(path, map[string]interface{}{
|
||||
"bar": 1,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal("unable to write KV on primary", "path", path)
|
||||
}
|
||||
|
||||
deadline := time.Now().Add(30 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
var secret *api.Secret
|
||||
secret, err = secClient.Logical().Read(path)
|
||||
if err == nil && secret != nil {
|
||||
err = priClient.Sys().Unmount(mountPoint)
|
||||
if err != nil {
|
||||
t.Fatal("unable to unmount KV engine on primary")
|
||||
}
|
||||
return
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
t.Fatal("unable to read replicated KV on secondary", "path", path, "err", err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue