897 lines
25 KiB
Go
897 lines
25 KiB
Go
package testcluster
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"reflect"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/hashicorp/go-hclog"
|
|
"github.com/hashicorp/go-secure-stdlib/strutil"
|
|
"github.com/hashicorp/go-uuid"
|
|
"github.com/hashicorp/vault/api"
|
|
"github.com/hashicorp/vault/sdk/helper/consts"
|
|
"github.com/mitchellh/mapstructure"
|
|
)
|
|
|
|
func GetPerformanceToken(pri VaultCluster, id, secondaryPublicKey string) (string, error) {
|
|
client := pri.Nodes()[0].APIClient()
|
|
req := map[string]interface{}{
|
|
"id": id,
|
|
}
|
|
if secondaryPublicKey != "" {
|
|
req["secondary_public_key"] = secondaryPublicKey
|
|
}
|
|
secret, err := client.Logical().Write("sys/replication/performance/primary/secondary-token", req)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
if secondaryPublicKey != "" {
|
|
return secret.Data["token"].(string), nil
|
|
}
|
|
return secret.WrapInfo.Token, nil
|
|
}
|
|
|
|
func EnablePerfPrimary(ctx context.Context, pri VaultCluster) error {
|
|
client := pri.Nodes()[0].APIClient()
|
|
_, err := client.Logical().WriteWithContext(ctx, "sys/replication/performance/primary/enable", nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = WaitForPerfReplicationState(ctx, pri, consts.ReplicationPerformancePrimary)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return WaitForActiveNodeAndPerfStandbys(ctx, pri)
|
|
}
|
|
|
|
func WaitForPerfReplicationState(ctx context.Context, cluster VaultCluster, state consts.ReplicationState) error {
|
|
client := cluster.Nodes()[0].APIClient()
|
|
var health *api.HealthResponse
|
|
var err error
|
|
for ctx.Err() == nil {
|
|
health, err = client.Sys().HealthWithContext(ctx)
|
|
if err == nil && health.ReplicationPerformanceMode == state.GetPerformanceString() {
|
|
return nil
|
|
}
|
|
time.Sleep(500 * time.Millisecond)
|
|
}
|
|
if err == nil {
|
|
err = ctx.Err()
|
|
}
|
|
return err
|
|
}
|
|
|
|
func EnablePerformanceSecondaryNoWait(ctx context.Context, perfToken string, pri, sec VaultCluster, updatePrimary bool) error {
|
|
postData := map[string]interface{}{
|
|
"token": perfToken,
|
|
"ca_file": DefaultCAFile,
|
|
}
|
|
path := "sys/replication/performance/secondary/enable"
|
|
if updatePrimary {
|
|
path = "sys/replication/performance/secondary/update-primary"
|
|
}
|
|
err := WaitForActiveNodeAndPerfStandbys(ctx, sec)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = sec.Nodes()[0].APIClient().Logical().Write(path, postData)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return WaitForPerfReplicationState(ctx, sec, consts.ReplicationPerformanceSecondary)
|
|
}
|
|
|
|
func EnablePerformanceSecondary(ctx context.Context, perfToken string, pri, sec VaultCluster, updatePrimary, skipPoisonPill bool) (string, error) {
|
|
if err := EnablePerformanceSecondaryNoWait(ctx, perfToken, pri, sec, updatePrimary); err != nil {
|
|
return "", err
|
|
}
|
|
if err := WaitForMatchingMerkleRoots(ctx, "sys/replication/performance/", pri, sec); err != nil {
|
|
return "", err
|
|
}
|
|
root, err := WaitForPerformanceSecondary(ctx, pri, sec, skipPoisonPill)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if err := WaitForPerfReplicationWorking(ctx, pri, sec); err != nil {
|
|
return "", err
|
|
}
|
|
return root, nil
|
|
}
|
|
|
|
func WaitForMatchingMerkleRoots(ctx context.Context, endpoint string, pri, sec VaultCluster) error {
|
|
getRoot := func(mode string, cli *api.Client) (string, error) {
|
|
status, err := cli.Logical().Read(endpoint + "status")
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if status == nil || status.Data == nil || status.Data["mode"] == nil {
|
|
return "", fmt.Errorf("got nil secret or data")
|
|
}
|
|
if status.Data["mode"].(string) != mode {
|
|
return "", fmt.Errorf("expected mode=%s, got %s", mode, status.Data["mode"].(string))
|
|
}
|
|
return status.Data["merkle_root"].(string), nil
|
|
}
|
|
|
|
secClient := sec.Nodes()[0].APIClient()
|
|
priClient := pri.Nodes()[0].APIClient()
|
|
for i := 0; i < 30; i++ {
|
|
secRoot, err := getRoot("secondary", secClient)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
priRoot, err := getRoot("primary", priClient)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if reflect.DeepEqual(priRoot, secRoot) {
|
|
return nil
|
|
}
|
|
time.Sleep(time.Second)
|
|
}
|
|
|
|
return fmt.Errorf("roots did not become equal")
|
|
}
|
|
|
|
func WaitForPerformanceWAL(ctx context.Context, pri, sec VaultCluster) error {
|
|
endpoint := "sys/replication/performance/"
|
|
if err := WaitForMatchingMerkleRoots(ctx, endpoint, pri, sec); err != nil {
|
|
return nil
|
|
}
|
|
getWAL := func(mode, walKey string, cli *api.Client) (int64, error) {
|
|
status, err := cli.Logical().Read(endpoint + "status")
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if status == nil || status.Data == nil || status.Data["mode"] == nil {
|
|
return 0, fmt.Errorf("got nil secret or data")
|
|
}
|
|
if status.Data["mode"].(string) != mode {
|
|
return 0, fmt.Errorf("expected mode=%s, got %s", mode, status.Data["mode"].(string))
|
|
}
|
|
return status.Data[walKey].(json.Number).Int64()
|
|
}
|
|
|
|
secClient := sec.Nodes()[0].APIClient()
|
|
priClient := pri.Nodes()[0].APIClient()
|
|
for ctx.Err() == nil {
|
|
secLastRemoteWAL, err := getWAL("secondary", "last_remote_wal", secClient)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
priLastPerfWAL, err := getWAL("primary", "last_performance_wal", priClient)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if secLastRemoteWAL >= priLastPerfWAL {
|
|
return nil
|
|
}
|
|
time.Sleep(time.Second)
|
|
}
|
|
|
|
return fmt.Errorf("performance WALs on the secondary did not catch up with the primary, context err: %w", ctx.Err())
|
|
}
|
|
|
|
func WaitForPerformanceSecondary(ctx context.Context, pri, sec VaultCluster, skipPoisonPill bool) (string, error) {
|
|
if len(pri.GetRecoveryKeys()) > 0 {
|
|
sec.SetBarrierKeys(pri.GetRecoveryKeys())
|
|
sec.SetRecoveryKeys(pri.GetRecoveryKeys())
|
|
} else {
|
|
sec.SetBarrierKeys(pri.GetBarrierKeys())
|
|
sec.SetRecoveryKeys(pri.GetBarrierKeys())
|
|
}
|
|
|
|
if len(sec.Nodes()) > 1 {
|
|
if skipPoisonPill {
|
|
// As part of prepareSecondary on the active node the keyring is
|
|
// deleted from storage. Its absence can cause standbys to seal
|
|
// themselves. But it's not reliable, so we'll seal them
|
|
// ourselves to force the issue.
|
|
for i := range sec.Nodes()[1:] {
|
|
if err := SealNode(ctx, sec, i+1); err != nil {
|
|
return "", err
|
|
}
|
|
}
|
|
} else {
|
|
// We want to make sure we unseal all the nodes so we first need to wait
|
|
// until two of the nodes seal due to the poison pill being written
|
|
if err := WaitForNCoresSealed(ctx, sec, len(sec.Nodes())-1); err != nil {
|
|
return "", err
|
|
}
|
|
}
|
|
}
|
|
if _, err := WaitForActiveNode(ctx, sec); err != nil {
|
|
return "", err
|
|
}
|
|
if err := UnsealAllNodes(ctx, sec); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
perfSecondaryRootToken, err := GenerateRoot(sec, GenerateRootRegular)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
sec.SetRootToken(perfSecondaryRootToken)
|
|
if err := WaitForActiveNodeAndPerfStandbys(ctx, sec); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return perfSecondaryRootToken, nil
|
|
}
|
|
|
|
func WaitForPerfReplicationWorking(ctx context.Context, pri, sec VaultCluster) error {
|
|
priActiveIdx, err := WaitForActiveNode(ctx, pri)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
secActiveIdx, err := WaitForActiveNode(ctx, sec)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
priClient, secClient := pri.Nodes()[priActiveIdx].APIClient(), sec.Nodes()[secActiveIdx].APIClient()
|
|
mountPoint, err := uuid.GenerateUUID()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = priClient.Sys().Mount(mountPoint, &api.MountInput{
|
|
Type: "kv",
|
|
Local: false,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("unable to mount KV engine on primary")
|
|
}
|
|
|
|
path := mountPoint + "/foo"
|
|
_, err = priClient.Logical().Write(path, map[string]interface{}{
|
|
"bar": 1,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("unable to write KV on primary", "path", path)
|
|
}
|
|
|
|
for ctx.Err() == nil {
|
|
var secret *api.Secret
|
|
secret, err = secClient.Logical().Read(path)
|
|
if err == nil && secret != nil {
|
|
err = priClient.Sys().Unmount(mountPoint)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to unmount KV engine on primary")
|
|
}
|
|
return nil
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
if err == nil {
|
|
err = ctx.Err()
|
|
}
|
|
return fmt.Errorf("unable to read replicated KV on secondary, path=%s, err=%v", path, err)
|
|
}
|
|
|
|
func SetupTwoClusterPerfReplication(ctx context.Context, pri, sec VaultCluster) error {
|
|
if err := EnablePerfPrimary(ctx, pri); err != nil {
|
|
return err
|
|
}
|
|
perfToken, err := GetPerformanceToken(pri, sec.ClusterID(), "")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = EnablePerformanceSecondary(ctx, perfToken, pri, sec, false, false)
|
|
return err
|
|
}
|
|
|
|
// PassiveWaitForActiveNodeAndPerfStandbys should be used instead of
|
|
// WaitForActiveNodeAndPerfStandbys when you don't want to do any writes
|
|
// as a side-effect. This returns perfStandby nodes in the cluster and
|
|
// an error.
|
|
func PassiveWaitForActiveNodeAndPerfStandbys(ctx context.Context, pri VaultCluster) (VaultClusterNode, []VaultClusterNode, error) {
|
|
leaderNode, standbys, err := GetActiveAndStandbys(ctx, pri)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to derive standby nodes, %w", err)
|
|
}
|
|
|
|
for i, node := range standbys {
|
|
client := node.APIClient()
|
|
// Make sure we get perf standby nodes
|
|
if err = EnsureCoreIsPerfStandby(ctx, client); err != nil {
|
|
return nil, nil, fmt.Errorf("standby node %d is not a perfStandby, %w", i, err)
|
|
}
|
|
}
|
|
|
|
return leaderNode, standbys, nil
|
|
}
|
|
|
|
func GetActiveAndStandbys(ctx context.Context, cluster VaultCluster) (VaultClusterNode, []VaultClusterNode, error) {
|
|
var leaderIndex int
|
|
var err error
|
|
if leaderIndex, err = WaitForActiveNode(ctx, cluster); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
var leaderNode VaultClusterNode
|
|
var nodes []VaultClusterNode
|
|
for i, node := range cluster.Nodes() {
|
|
if i == leaderIndex {
|
|
leaderNode = node
|
|
continue
|
|
}
|
|
nodes = append(nodes, node)
|
|
}
|
|
|
|
return leaderNode, nodes, nil
|
|
}
|
|
|
|
func EnsureCoreIsPerfStandby(ctx context.Context, client *api.Client) error {
|
|
var err error
|
|
var health *api.HealthResponse
|
|
for ctx.Err() == nil {
|
|
health, err = client.Sys().HealthWithContext(ctx)
|
|
if err == nil && health.PerformanceStandby {
|
|
return nil
|
|
}
|
|
time.Sleep(time.Millisecond * 500)
|
|
}
|
|
if err == nil {
|
|
err = ctx.Err()
|
|
}
|
|
return err
|
|
}
|
|
|
|
func WaitForDRReplicationState(ctx context.Context, cluster VaultCluster, state consts.ReplicationState) error {
|
|
client := cluster.Nodes()[0].APIClient()
|
|
var health *api.HealthResponse
|
|
var err error
|
|
for ctx.Err() == nil {
|
|
health, err = client.Sys().HealthWithContext(ctx)
|
|
if err == nil && health.ReplicationDRMode == state.GetDRString() {
|
|
return nil
|
|
}
|
|
time.Sleep(500 * time.Millisecond)
|
|
}
|
|
if err == nil {
|
|
err = ctx.Err()
|
|
}
|
|
return err
|
|
}
|
|
|
|
func EnableDrPrimary(ctx context.Context, pri VaultCluster) error {
|
|
client := pri.Nodes()[0].APIClient()
|
|
_, err := client.Logical().Write("sys/replication/dr/primary/enable", nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = WaitForDRReplicationState(ctx, pri, consts.ReplicationDRPrimary)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return WaitForActiveNodeAndPerfStandbys(ctx, pri)
|
|
}
|
|
|
|
func GenerateDRActivationToken(pri VaultCluster, id, secondaryPublicKey string) (string, error) {
|
|
client := pri.Nodes()[0].APIClient()
|
|
req := map[string]interface{}{
|
|
"id": id,
|
|
}
|
|
if secondaryPublicKey != "" {
|
|
req["secondary_public_key"] = secondaryPublicKey
|
|
}
|
|
secret, err := client.Logical().Write("sys/replication/dr/primary/secondary-token", req)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
if secondaryPublicKey != "" {
|
|
return secret.Data["token"].(string), nil
|
|
}
|
|
return secret.WrapInfo.Token, nil
|
|
}
|
|
|
|
func WaitForDRSecondary(ctx context.Context, pri, sec VaultCluster, skipPoisonPill bool) error {
|
|
if len(pri.GetRecoveryKeys()) > 0 {
|
|
sec.SetBarrierKeys(pri.GetRecoveryKeys())
|
|
sec.SetRecoveryKeys(pri.GetRecoveryKeys())
|
|
} else {
|
|
sec.SetBarrierKeys(pri.GetBarrierKeys())
|
|
sec.SetRecoveryKeys(pri.GetBarrierKeys())
|
|
}
|
|
|
|
if len(sec.Nodes()) > 1 {
|
|
if skipPoisonPill {
|
|
// As part of prepareSecondary on the active node the keyring is
|
|
// deleted from storage. Its absence can cause standbys to seal
|
|
// themselves. But it's not reliable, so we'll seal them
|
|
// ourselves to force the issue.
|
|
for i := range sec.Nodes()[1:] {
|
|
if err := SealNode(ctx, sec, i+1); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
} else {
|
|
// We want to make sure we unseal all the nodes so we first need to wait
|
|
// until two of the nodes seal due to the poison pill being written
|
|
if err := WaitForNCoresSealed(ctx, sec, len(sec.Nodes())-1); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
if _, err := WaitForActiveNode(ctx, sec); err != nil {
|
|
return err
|
|
}
|
|
|
|
// unseal nodes
|
|
for i := range sec.Nodes() {
|
|
if err := UnsealNode(ctx, sec, i); err != nil {
|
|
// Sometimes when we get here it's already unsealed on its own
|
|
// and then this fails for DR secondaries so check again
|
|
// The error is "path disabled in replication DR secondary mode".
|
|
if healthErr := NodeHealthy(ctx, sec, i); healthErr != nil {
|
|
// return the original error
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
sec.SetRootToken(pri.GetRootToken())
|
|
|
|
if _, err := WaitForActiveNode(ctx, sec); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func EnableDRSecondaryNoWait(ctx context.Context, sec VaultCluster, drToken string) error {
|
|
postData := map[string]interface{}{
|
|
"token": drToken,
|
|
"ca_file": DefaultCAFile,
|
|
}
|
|
|
|
_, err := sec.Nodes()[0].APIClient().Logical().Write("sys/replication/dr/secondary/enable", postData)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return WaitForDRReplicationState(ctx, sec, consts.ReplicationDRSecondary)
|
|
}
|
|
|
|
func WaitForReplicationStatus(ctx context.Context, client *api.Client, dr bool, accept func(map[string]interface{}) bool) error {
|
|
url := "sys/replication/performance/status"
|
|
if dr {
|
|
url = "sys/replication/dr/status"
|
|
}
|
|
|
|
var err error
|
|
var secret *api.Secret
|
|
for ctx.Err() == nil {
|
|
secret, err = client.Logical().Read(url)
|
|
if err == nil && secret != nil && secret.Data != nil {
|
|
if accept(secret.Data) {
|
|
return nil
|
|
}
|
|
}
|
|
time.Sleep(500 * time.Millisecond)
|
|
}
|
|
if err == nil {
|
|
err = ctx.Err()
|
|
}
|
|
|
|
return fmt.Errorf("unable to get acceptable replication status: error=%v secret=%#v", err, secret)
|
|
}
|
|
|
|
func WaitForDRReplicationWorking(ctx context.Context, pri, sec VaultCluster) error {
|
|
priClient := pri.Nodes()[0].APIClient()
|
|
secClient := sec.Nodes()[0].APIClient()
|
|
|
|
// Make sure we've entered stream-wals mode
|
|
err := WaitForReplicationStatus(ctx, secClient, true, func(secret map[string]interface{}) bool {
|
|
return secret["state"] == string("stream-wals")
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Now write some data and make sure that we see last_remote_wal nonzero, i.e.
|
|
// at least one WAL has been streamed.
|
|
secret, err := priClient.Auth().Token().Create(&api.TokenCreateRequest{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Revoke the token since some tests won't be happy to see it.
|
|
err = priClient.Auth().Token().RevokeTree(secret.Auth.ClientToken)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = WaitForReplicationStatus(ctx, secClient, true, func(secret map[string]interface{}) bool {
|
|
if secret["state"] != string("stream-wals") {
|
|
return false
|
|
}
|
|
if secret["last_remote_wal"] != nil {
|
|
lastRemoteWal, _ := secret["last_remote_wal"].(json.Number).Int64()
|
|
return lastRemoteWal > 0
|
|
}
|
|
|
|
return false
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func EnableDrSecondary(ctx context.Context, pri, sec VaultCluster, drToken string) error {
|
|
err := EnableDRSecondaryNoWait(ctx, sec, drToken)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err = WaitForMatchingMerkleRoots(ctx, "sys/replication/dr/", pri, sec); err != nil {
|
|
return err
|
|
}
|
|
|
|
err = WaitForDRSecondary(ctx, pri, sec, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err = WaitForDRReplicationWorking(ctx, pri, sec); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func SetupTwoClusterDRReplication(ctx context.Context, pri, sec VaultCluster) error {
|
|
if err := EnableDrPrimary(ctx, pri); err != nil {
|
|
return err
|
|
}
|
|
|
|
drToken, err := GenerateDRActivationToken(pri, sec.ClusterID(), "")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = EnableDrSecondary(ctx, pri, sec, drToken)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func DemoteDRPrimary(client *api.Client) error {
|
|
_, err := client.Logical().Write("sys/replication/dr/primary/demote", map[string]interface{}{})
|
|
return err
|
|
}
|
|
|
|
func createBatchToken(client *api.Client, path string) (string, error) {
|
|
// TODO: should these be more random in case more than one batch token needs to be created?
|
|
suffix := strings.Replace(path, "/", "", -1)
|
|
policyName := "path-batch-policy-" + suffix
|
|
roleName := "path-batch-role-" + suffix
|
|
|
|
rules := fmt.Sprintf(`path "%s" { capabilities = [ "read", "update" ] }`, path)
|
|
|
|
// create policy
|
|
_, err := client.Logical().Write("sys/policy/"+policyName, map[string]interface{}{
|
|
"policy": rules,
|
|
})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// create a role
|
|
_, err = client.Logical().Write("auth/token/roles/"+roleName, map[string]interface{}{
|
|
"allowed_policies": policyName,
|
|
"orphan": true,
|
|
"renewable": false,
|
|
"token_type": "batch",
|
|
})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// create batch token
|
|
secret, err := client.Logical().Write("auth/token/create/"+roleName, nil)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return secret.Auth.ClientToken, nil
|
|
}
|
|
|
|
// PromoteDRSecondaryWithBatchToken creates a batch token for DR promotion
|
|
// before promotion, it demotes the primary cluster. The primary cluster needs
|
|
// to be functional for the generation of the batch token
|
|
func PromoteDRSecondaryWithBatchToken(ctx context.Context, pri, sec VaultCluster) error {
|
|
client := pri.Nodes()[0].APIClient()
|
|
drToken, err := createBatchToken(client, "sys/replication/dr/secondary/promote")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = DemoteDRPrimary(client)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return promoteDRSecondaryInternal(ctx, sec, drToken)
|
|
}
|
|
|
|
// PromoteDRSecondary generates a DR operation token on the secondary using
|
|
// unseal/recovery keys. Therefore, the primary cluster could potentially
|
|
// be out of service.
|
|
func PromoteDRSecondary(ctx context.Context, sec VaultCluster) error {
|
|
// generate DR operation token to do update primary on vC to point to
|
|
// the new perfSec primary vD
|
|
drToken, err := GenerateRoot(sec, GenerateRootDR)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return promoteDRSecondaryInternal(ctx, sec, drToken)
|
|
}
|
|
|
|
func promoteDRSecondaryInternal(ctx context.Context, sec VaultCluster, drToken string) error {
|
|
secClient := sec.Nodes()[0].APIClient()
|
|
|
|
// Allow retries of 503s, e.g.: replication is still catching up,
|
|
// try again later or provide the "force" argument
|
|
oldMaxRetries := secClient.MaxRetries()
|
|
secClient.SetMaxRetries(10)
|
|
defer secClient.SetMaxRetries(oldMaxRetries)
|
|
resp, err := secClient.Logical().Write("sys/replication/dr/secondary/promote", map[string]interface{}{
|
|
"dr_operation_token": drToken,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if resp == nil {
|
|
return fmt.Errorf("nil status response during DR promotion")
|
|
}
|
|
|
|
if _, err := WaitForActiveNode(ctx, sec); err != nil {
|
|
return err
|
|
}
|
|
|
|
return WaitForDRReplicationState(ctx, sec, consts.ReplicationDRPrimary)
|
|
}
|
|
|
|
func checkClusterAddr(ctx context.Context, pri, sec VaultCluster) error {
|
|
priClient := pri.Nodes()[0].APIClient()
|
|
priLeader, err := priClient.Sys().LeaderWithContext(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
secClient := sec.Nodes()[0].APIClient()
|
|
endpoint := "sys/replication/dr/"
|
|
status, err := secClient.Logical().Read(endpoint + "status")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if status == nil || status.Data == nil {
|
|
return fmt.Errorf("got nil secret or data")
|
|
}
|
|
|
|
var priAddrs []string
|
|
err = mapstructure.Decode(status.Data["known_primary_cluster_addrs"], &priAddrs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !strutil.StrListContains(priAddrs, priLeader.LeaderClusterAddress) {
|
|
return fmt.Errorf("failed to fine the expected primary cluster address %v in known_primary_cluster_addrs", priLeader.LeaderClusterAddress)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func UpdatePrimary(ctx context.Context, pri, sec VaultCluster) error {
|
|
// generate DR operation token to do update primary on vC to point to
|
|
// the new perfSec primary vD
|
|
rootToken, err := GenerateRoot(sec, GenerateRootDR)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// secondary activation token
|
|
drToken, err := GenerateDRActivationToken(pri, sec.ClusterID(), "")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// update-primary on vC (new perfSec Dr secondary) to point to
|
|
// the new perfSec Dr primary
|
|
secClient := sec.Nodes()[0].APIClient()
|
|
resp, err := secClient.Logical().Write("sys/replication/dr/secondary/update-primary", map[string]interface{}{
|
|
"dr_operation_token": rootToken,
|
|
"token": drToken,
|
|
"ca_file": DefaultCAFile,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if resp == nil {
|
|
return fmt.Errorf("nil status response during update primary")
|
|
}
|
|
|
|
if _, err = WaitForActiveNode(ctx, sec); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err = WaitForDRReplicationState(ctx, sec, consts.ReplicationDRSecondary); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err = checkClusterAddr(ctx, pri, sec); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func SetupFourClusterReplication(ctx context.Context, pri, sec, pridr, secdr VaultCluster) error {
|
|
err := SetupTwoClusterPerfReplication(ctx, pri, sec)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = SetupTwoClusterDRReplication(ctx, pri, pridr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = SetupTwoClusterDRReplication(ctx, sec, secdr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type ReplicationSet struct {
|
|
// By convention, we recommend the following naming scheme for
|
|
// clusters in this map:
|
|
// A: perf primary
|
|
// B: primary's DR
|
|
// C: first perf secondary of A
|
|
// D: C's DR
|
|
// E: second perf secondary of A
|
|
// F: E's DR
|
|
// ... etc.
|
|
//
|
|
// We use generic names rather than role-specific names because
|
|
// that's less confusing when promotions take place that result in role
|
|
// changes. In other words, if D gets promoted to replace C as a perf
|
|
// secondary, and C gets demoted and updated to become D's DR secondary,
|
|
// they should maintain their initial names of D and C throughout.
|
|
Clusters map[string]VaultCluster
|
|
Builder ClusterBuilder
|
|
Logger hclog.Logger
|
|
CA *CA
|
|
}
|
|
|
|
type ClusterBuilder func(ctx context.Context, name string, logger hclog.Logger) (VaultCluster, error)
|
|
|
|
func NewReplicationSet(b ClusterBuilder) (*ReplicationSet, error) {
|
|
return &ReplicationSet{
|
|
Clusters: map[string]VaultCluster{},
|
|
Builder: b,
|
|
Logger: hclog.NewNullLogger(),
|
|
}, nil
|
|
}
|
|
|
|
func (r *ReplicationSet) StandardPerfReplication(ctx context.Context) error {
|
|
for _, name := range []string{"A", "C"} {
|
|
if _, ok := r.Clusters[name]; !ok {
|
|
cluster, err := r.Builder(ctx, name, r.Logger)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
r.Clusters[name] = cluster
|
|
}
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
|
|
defer cancel()
|
|
err := SetupTwoClusterPerfReplication(ctx, r.Clusters["A"], r.Clusters["C"])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *ReplicationSet) StandardDRReplication(ctx context.Context) error {
|
|
for _, name := range []string{"A", "B"} {
|
|
if _, ok := r.Clusters[name]; !ok {
|
|
cluster, err := r.Builder(ctx, name, r.Logger)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
r.Clusters[name] = cluster
|
|
}
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
|
|
defer cancel()
|
|
err := SetupTwoClusterDRReplication(ctx, r.Clusters["A"], r.Clusters["B"])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *ReplicationSet) GetFourReplicationCluster(ctx context.Context) error {
|
|
for _, name := range []string{"A", "B", "C", "D"} {
|
|
if _, ok := r.Clusters[name]; !ok {
|
|
cluster, err := r.Builder(ctx, name, r.Logger)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
r.Clusters[name] = cluster
|
|
}
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
|
|
defer cancel()
|
|
err := SetupFourClusterReplication(ctx, r.Clusters["A"], r.Clusters["C"], r.Clusters["B"], r.Clusters["D"])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *ReplicationSet) Cleanup() {
|
|
for _, cluster := range r.Clusters {
|
|
cluster.Cleanup()
|
|
}
|
|
}
|
|
|
|
func WaitForPerfReplicationConnectionStatus(ctx context.Context, client *api.Client) error {
|
|
type Primary struct {
|
|
APIAddress string `mapstructure:"api_address"`
|
|
ConnectionStatus string `mapstructure:"connection_status"`
|
|
ClusterAddress string `mapstructure:"cluster_address"`
|
|
LastHeartbeat string `mapstructure:"last_heartbeat"`
|
|
}
|
|
type Status struct {
|
|
Primaries []Primary `mapstructure:"primaries"`
|
|
}
|
|
return WaitForPerfReplicationStatus(ctx, client, func(m map[string]interface{}) error {
|
|
var status Status
|
|
err := mapstructure.Decode(m, &status)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(status.Primaries) == 0 {
|
|
return fmt.Errorf("primaries is zero")
|
|
}
|
|
for _, v := range status.Primaries {
|
|
if v.ConnectionStatus == "connected" {
|
|
return nil
|
|
}
|
|
}
|
|
return fmt.Errorf("no primaries connected")
|
|
})
|
|
}
|
|
|
|
func WaitForPerfReplicationStatus(ctx context.Context, client *api.Client, accept func(map[string]interface{}) error) error {
|
|
var err error
|
|
var secret *api.Secret
|
|
for ctx.Err() == nil {
|
|
secret, err = client.Logical().Read("sys/replication/performance/status")
|
|
if err == nil && secret != nil && secret.Data != nil {
|
|
if err = accept(secret.Data); err == nil {
|
|
return nil
|
|
}
|
|
}
|
|
time.Sleep(500 * time.Millisecond)
|
|
}
|
|
return fmt.Errorf("unable to get acceptable replication status within allotted time: error=%v secret=%#v", err, secret)
|
|
}
|