diff --git a/helper/consts/replication.go b/helper/consts/replication.go index bdad15522..a7e0edea1 100644 --- a/helper/consts/replication.go +++ b/helper/consts/replication.go @@ -16,7 +16,7 @@ const ( // ensure no overlap between old and new values. ReplicationUnknown ReplicationState = 0 - ReplicationPerformancePrimary ReplicationState = 1 << iota + ReplicationPerformancePrimary ReplicationState = 1 << iota // Note -- iota is 5 here! ReplicationPerformanceSecondary OldSplitReplicationBootstrapping ReplicationDRPrimary @@ -51,6 +51,39 @@ func (r ReplicationState) string() string { return "unknown" } +func (r ReplicationState) StateStrings() []string { + var ret []string + if r.HasState(ReplicationPerformanceSecondary) { + ret = append(ret, "perf-secondary") + } + if r.HasState(ReplicationPerformancePrimary) { + ret = append(ret, "perf-primary") + } + if r.HasState(ReplicationPerformanceBootstrapping) { + ret = append(ret, "perf-bootstrapping") + } + if r.HasState(ReplicationPerformanceDisabled) { + ret = append(ret, "perf-disabled") + } + if r.HasState(ReplicationDRPrimary) { + ret = append(ret, "dr-primary") + } + if r.HasState(ReplicationDRSecondary) { + ret = append(ret, "dr-secondary") + } + if r.HasState(ReplicationDRBootstrapping) { + ret = append(ret, "dr-bootstrapping") + } + if r.HasState(ReplicationDRDisabled) { + ret = append(ret, "dr-disabled") + } + if r.HasState(ReplicationPerformanceStandby) { + ret = append(ret, "perfstandby") + } + + return ret +} + func (r ReplicationState) GetDRString() string { switch { case r.HasState(ReplicationDRBootstrapping): diff --git a/helper/cryptoutil/cryptoutil.go b/helper/cryptoutil/cryptoutil.go new file mode 100644 index 000000000..a37086c64 --- /dev/null +++ b/helper/cryptoutil/cryptoutil.go @@ -0,0 +1,11 @@ +package cryptoutil + +import "golang.org/x/crypto/blake2b" + +func Blake2b256Hash(key string) []byte { + hf, _ := blake2b.New256(nil) + + hf.Write([]byte(key)) + + return hf.Sum(nil) +} diff --git a/helper/cryptoutil/cryptoutil_test.go b/helper/cryptoutil/cryptoutil_test.go new file mode 100644 index 000000000..a277e4fce --- /dev/null +++ b/helper/cryptoutil/cryptoutil_test.go @@ -0,0 +1,11 @@ +package cryptoutil + +import "testing" + +func TestBlake2b256Hash(t *testing.T) { + hashVal := Blake2b256Hash("sampletext") + + if string(hashVal) == "" || string(hashVal) == "sampletext" { + t.Fatalf("failed to hash the text") + } +} diff --git a/helper/locksutil/locks.go b/helper/locksutil/locks.go index e0c2fcdd8..2ec4cf4e2 100644 --- a/helper/locksutil/locks.go +++ b/helper/locksutil/locks.go @@ -1,8 +1,9 @@ package locksutil import ( - "crypto/md5" "sync" + + "github.com/hashicorp/vault/helper/cryptoutil" ) const ( @@ -34,9 +35,7 @@ func CreateLocks() []*LockEntry { } func LockIndexForKey(key string) uint8 { - hf := md5.New() - hf.Write([]byte(key)) - return uint8(hf.Sum(nil)[0]) + return uint8(cryptoutil.Blake2b256Hash(key)[0]) } func LockForKey(locks []*LockEntry, key string) *LockEntry { diff --git a/helper/testhelpers/testhelpers.go b/helper/testhelpers/testhelpers.go index ae73c2e4a..61d2a219a 100644 --- a/helper/testhelpers/testhelpers.go +++ b/helper/testhelpers/testhelpers.go @@ -28,8 +28,12 @@ type ReplicatedTestClusters struct { func (r *ReplicatedTestClusters) Cleanup() { r.PerfPrimaryCluster.Cleanup() r.PerfSecondaryCluster.Cleanup() - r.PerfPrimaryDRCluster.Cleanup() - r.PerfSecondaryDRCluster.Cleanup() + if r.PerfPrimaryDRCluster != nil { + r.PerfPrimaryDRCluster.Cleanup() + } + if r.PerfSecondaryDRCluster != nil { + r.PerfSecondaryDRCluster.Cleanup() + } } // Generates a root token on the target cluster. @@ -100,34 +104,77 @@ func RandomWithPrefix(name string) string { return fmt.Sprintf("%s-%d", name, rand.New(rand.NewSource(time.Now().UnixNano())).Int()) } +func EnsureCoresSealed(t testing.T, c *vault.TestCluster) { + t.Helper() + for _, core := range c.Cores { + EnsureCoreSealed(t, core) + } +} + +func EnsureCoreSealed(t testing.T, core *vault.TestClusterCore) error { + core.Seal(t) + timeout := time.Now().Add(60 * time.Second) + for { + if time.Now().After(timeout) { + return fmt.Errorf("timeout waiting for core to seal") + } + if core.Core.Sealed() { + break + } + time.Sleep(250 * time.Millisecond) + } + return nil +} + func EnsureCoresUnsealed(t testing.T, c *vault.TestCluster) { t.Helper() for _, core := range c.Cores { - if !core.Sealed() { - continue - } + EnsureCoreUnsealed(t, c, core) + } +} +func EnsureCoreUnsealed(t testing.T, c *vault.TestCluster, core *vault.TestClusterCore) { + if !core.Sealed() { + return + } - client := core.Client - client.Sys().ResetUnsealProcess() - for j := 0; j < len(c.BarrierKeys); j++ { - statusResp, err := client.Sys().Unseal(base64.StdEncoding.EncodeToString(c.BarrierKeys[j])) - if err != nil { - // Sometimes when we get here it's already unsealed on its own - // and then this fails for DR secondaries so check again - if core.Sealed() { - t.Fatal(err) - } - break - } - if statusResp == nil { - t.Fatal("nil status response during unseal") - } - if !statusResp.Sealed { - break + client := core.Client + client.Sys().ResetUnsealProcess() + for j := 0; j < len(c.BarrierKeys); j++ { + statusResp, err := client.Sys().Unseal(base64.StdEncoding.EncodeToString(c.BarrierKeys[j])) + if err != nil { + // Sometimes when we get here it's already unsealed on its own + // and then this fails for DR secondaries so check again + if core.Sealed() { + t.Fatal(err) } + break } - if core.Sealed() { - t.Fatal("core is still sealed") + if statusResp == nil { + t.Fatal("nil status response during unseal") + } + if !statusResp.Sealed { + break + } + } + if core.Sealed() { + t.Fatal("core is still sealed") + } +} + +func EnsureCoreIsPerfStandby(t testing.T, core *vault.TestClusterCore) { + t.Helper() + start := time.Now() + for { + health, err := core.Client.Sys().Health() + if err != nil { + t.Fatal(err) + } + if health.PerformanceStandby { + break + } + time.Sleep(time.Millisecond * 500) + if time.Now().After(start.Add(time.Second * 60)) { + t.Fatal("did not become a perf standby") } } } @@ -175,6 +222,30 @@ func GetClusterAndCore(t testing.T, logger log.Logger, handlerFunc func(*vault.H return cluster, core } +func GetPerfReplicatedClusters(t testing.T, handlerFunc func(*vault.HandlerProperties) http.Handler) *ReplicatedTestClusters { + ret := &ReplicatedTestClusters{} + + logger := log.New(&log.LoggerOptions{ + Mutex: &sync.Mutex{}, + Level: log.Trace, + }) + // Set this lower so that state populates quickly to standby nodes + vault.HeartbeatInterval = 2 * time.Second + + ret.PerfPrimaryCluster, _ = GetClusterAndCore(t, logger.Named("perf-pri"), handlerFunc) + + ret.PerfSecondaryCluster, _ = GetClusterAndCore(t, logger.Named("perf-sec"), handlerFunc) + + SetupTwoClusterPerfReplication(t, ret.PerfPrimaryCluster, ret.PerfSecondaryCluster) + + // Wait until poison pills have been read + time.Sleep(45 * time.Second) + EnsureCoresUnsealed(t, ret.PerfPrimaryCluster) + EnsureCoresUnsealed(t, ret.PerfSecondaryCluster) + + return ret +} + func GetFourReplicatedClusters(t testing.T, handlerFunc func(*vault.HandlerProperties) http.Handler) *ReplicatedTestClusters { ret := &ReplicatedTestClusters{} @@ -205,6 +276,46 @@ func GetFourReplicatedClusters(t testing.T, handlerFunc func(*vault.HandlerPrope return ret } +func SetupTwoClusterPerfReplication(t testing.T, perfPrimary, perfSecondary *vault.TestCluster) { + // Enable performance primary + _, err := perfPrimary.Cores[0].Client.Logical().Write("sys/replication/performance/primary/enable", nil) + if err != nil { + t.Fatal(err) + } + + WaitForReplicationState(t, perfPrimary.Cores[0].Core, consts.ReplicationPerformancePrimary) + + // get performance token + secret, err := perfPrimary.Cores[0].Client.Logical().Write("sys/replication/performance/primary/secondary-token", map[string]interface{}{ + "id": "1", + }) + if err != nil { + t.Fatal(err) + } + + token := secret.WrapInfo.Token + + // enable performace secondary + secret, err = perfSecondary.Cores[0].Client.Logical().Write("sys/replication/performance/secondary/enable", map[string]interface{}{ + "token": token, + "ca_file": perfPrimary.CACertPEMFile, + }) + if err != nil { + t.Fatal(err) + } + + WaitForReplicationState(t, perfSecondary.Cores[0].Core, consts.ReplicationPerformanceSecondary) + time.Sleep(time.Second * 3) + perfSecondary.BarrierKeys = perfPrimary.BarrierKeys + + EnsureCoresUnsealed(t, perfSecondary) + rootToken := GenerateRoot(t, perfSecondary, false) + perfSecondary.Cores[0].Client.SetToken(rootToken) + for _, core := range perfSecondary.Cores { + core.Client.SetToken(rootToken) + } +} + func SetupFourClusterReplication(t testing.T, perfPrimary, perfSecondary, perfDRSecondary, perfSecondaryDRSecondary *vault.TestCluster) { // Enable dr primary _, err := perfPrimary.Cores[0].Client.Logical().Write("sys/replication/dr/primary/enable", nil) @@ -269,7 +380,9 @@ func SetupFourClusterReplication(t testing.T, perfPrimary, perfSecondary, perfDR EnsureCoresUnsealed(t, perfSecondary) rootToken := GenerateRoot(t, perfSecondary, false) - perfSecondary.Cores[0].Client.SetToken(rootToken) + for _, core := range perfSecondary.Cores { + core.Client.SetToken(rootToken) + } // Enable dr primary on perf secondary _, err = perfSecondary.Cores[0].Client.Logical().Write("sys/replication/dr/primary/enable", nil) diff --git a/logical/logical_storage.go b/logical/logical_storage.go new file mode 100644 index 000000000..cece9e3b2 --- /dev/null +++ b/logical/logical_storage.go @@ -0,0 +1,52 @@ +package logical + +import ( + "context" + + "github.com/hashicorp/vault/physical" +) + +type LogicalStorage struct { + underlying physical.Backend +} + +func (s *LogicalStorage) Get(ctx context.Context, key string) (*StorageEntry, error) { + entry, err := s.underlying.Get(ctx, key) + if err != nil { + return nil, err + } + if entry == nil { + return nil, nil + } + return &StorageEntry{ + Key: entry.Key, + Value: entry.Value, + SealWrap: entry.SealWrap, + }, nil +} + +func (s *LogicalStorage) Put(ctx context.Context, entry *StorageEntry) error { + return s.underlying.Put(ctx, &physical.Entry{ + Key: entry.Key, + Value: entry.Value, + SealWrap: entry.SealWrap, + }) +} + +func (s *LogicalStorage) Delete(ctx context.Context, key string) error { + return s.underlying.Delete(ctx, key) +} + +func (s *LogicalStorage) List(ctx context.Context, prefix string) ([]string, error) { + return s.underlying.List(ctx, prefix) +} + +func (s *LogicalStorage) Underlying() physical.Backend { + return s.underlying +} + +func NewLogicalStorage(underlying physical.Backend) *LogicalStorage { + return &LogicalStorage{ + underlying: underlying, + } +} diff --git a/vault/cluster.go b/vault/cluster.go index d8497201b..5960c3b5d 100644 --- a/vault/cluster.go +++ b/vault/cluster.go @@ -38,6 +38,12 @@ var ( ErrCannotForward = errors.New("cannot forward request; no connection or address not known") ) +type ClusterLeaderParams struct { + LeaderUUID string + LeaderRedirectAddr string + LeaderClusterAddr string +} + type ReplicatedClusters struct { DR *ReplicatedCluster Performance *ReplicatedCluster diff --git a/vault/cluster_test.go b/vault/cluster_test.go index 3509431ad..949670d27 100644 --- a/vault/cluster_test.go +++ b/vault/cluster_test.go @@ -168,7 +168,7 @@ func TestCluster_ListenForRequests(t *testing.T) { time.Sleep(manualStepDownSleepPeriod) checkListenersFunc(false) - err = cores[0].Seal(cluster.RootToken) + err = cores[0].Core.Seal(cluster.RootToken) if err != nil { t.Fatal(err) } diff --git a/vault/core.go b/vault/core.go index 7dc7f95c3..fb6250fc7 100644 --- a/vault/core.go +++ b/vault/core.go @@ -338,15 +338,11 @@ type Core struct { // Write lock used to ensure that we don't have multiple connections adjust // this value at the same time requestForwardingConnectionLock sync.RWMutex - // Most recent leader UUID. Used to avoid repeatedly JSON parsing the same - // values. - clusterLeaderUUID string - // Most recent leader redirect addr - clusterLeaderRedirectAddr string - // Most recent leader cluster addr - clusterLeaderClusterAddr string - // Lock for the cluster leader values - clusterLeaderParamsLock sync.RWMutex + // Lock for the leader values, ensuring we don't run the parts of Leader() + // that change things concurrently + leaderParamsLock sync.RWMutex + // Current cluster leader values + clusterLeaderParams *atomic.Value // Info on cluster members clusterPeerClusterAddrsCache *cache.Cache // Stores whether we currently have a server running @@ -416,6 +412,10 @@ type Core struct { // Stores loggers so we can reset the level allLoggers []log.Logger allLoggersLock sync.RWMutex + + // Can be toggled atomically to cause the core to never try to become + // active, or give up active as soon as it gets it + neverBecomeActive *uint32 } // CoreConfig is used to parameterize a core @@ -590,6 +590,8 @@ func NewCore(conf *CoreConfig) (*Core, error) { activeContextCancelFunc: new(atomic.Value), allLoggers: conf.AllLoggers, builtinRegistry: conf.BuiltinRegistry, + neverBecomeActive: new(uint32), + clusterLeaderParams: new(atomic.Value), } atomic.StoreUint32(c.sealed, 1) @@ -600,6 +602,8 @@ func NewCore(conf *CoreConfig) (*Core, error) { c.localClusterParsedCert.Store((*x509.Certificate)(nil)) c.localClusterPrivateKey.Store((*ecdsa.PrivateKey)(nil)) + c.clusterLeaderParams.Store((*ClusterLeaderParams)(nil)) + c.activeContextCancelFunc.Store((context.CancelFunc)(nil)) if conf.ClusterCipherSuites != "" { diff --git a/vault/ha.go b/vault/ha.go index b1affc56f..fc998132b 100644 --- a/vault/ha.go +++ b/vault/ha.go @@ -108,28 +108,41 @@ func (c *Core) Leader() (isLeader bool, leaderAddr, clusterAddr string, err erro return false, "", "", nil } - c.clusterLeaderParamsLock.RLock() - localLeaderUUID := c.clusterLeaderUUID - localRedirAddr := c.clusterLeaderRedirectAddr - localClusterAddr := c.clusterLeaderClusterAddr - c.clusterLeaderParamsLock.RUnlock() + var localLeaderUUID, localRedirectAddr, localClusterAddr string + clusterLeaderParams := c.clusterLeaderParams.Load().(*ClusterLeaderParams) + if clusterLeaderParams != nil { + localLeaderUUID = clusterLeaderParams.LeaderUUID + localRedirectAddr = clusterLeaderParams.LeaderRedirectAddr + localClusterAddr = clusterLeaderParams.LeaderClusterAddr + } // If the leader hasn't changed, return the cached value; nothing changes // mid-leadership, and the barrier caches anyways - if leaderUUID == localLeaderUUID && localRedirAddr != "" { + if leaderUUID == localLeaderUUID && localRedirectAddr != "" { c.stateLock.RUnlock() - return false, localRedirAddr, localClusterAddr, nil + return false, localRedirectAddr, localClusterAddr, nil } c.logger.Trace("found new active node information, refreshing") defer c.stateLock.RUnlock() - c.clusterLeaderParamsLock.Lock() - defer c.clusterLeaderParamsLock.Unlock() + c.leaderParamsLock.Lock() + defer c.leaderParamsLock.Unlock() // Validate base conditions again - if leaderUUID == c.clusterLeaderUUID && c.clusterLeaderRedirectAddr != "" { - return false, localRedirAddr, localClusterAddr, nil + clusterLeaderParams = c.clusterLeaderParams.Load().(*ClusterLeaderParams) + if clusterLeaderParams != nil { + localLeaderUUID = clusterLeaderParams.LeaderUUID + localRedirectAddr = clusterLeaderParams.LeaderRedirectAddr + localClusterAddr = clusterLeaderParams.LeaderClusterAddr + } else { + localLeaderUUID = "" + localRedirectAddr = "" + localClusterAddr = "" + } + + if leaderUUID == localLeaderUUID && localRedirectAddr != "" { + return false, localRedirectAddr, localClusterAddr, nil } key := coreLeaderPrefix + leaderUUID @@ -174,9 +187,11 @@ func (c *Core) Leader() (isLeader bool, leaderAddr, clusterAddr string, err erro // Don't set these until everything has been parsed successfully or we'll // never try again - c.clusterLeaderRedirectAddr = adv.RedirectAddr - c.clusterLeaderClusterAddr = adv.ClusterAddr - c.clusterLeaderUUID = leaderUUID + c.clusterLeaderParams.Store(&ClusterLeaderParams{ + LeaderUUID: leaderUUID, + LeaderRedirectAddr: adv.RedirectAddr, + LeaderClusterAddr: adv.ClusterAddr, + }) return false, adv.RedirectAddr, adv.ClusterAddr, nil } @@ -403,6 +418,14 @@ func (c *Core) waitForLeadership(newLeaderCh chan func(), manualStepDownCh, stop if leaderLostCh == nil { return } + + if atomic.LoadUint32(c.neverBecomeActive) == 1 { + c.heldHALock = nil + lock.Unlock() + c.logger.Info("marked never become active, giving up active state") + continue + } + c.logger.Info("acquired lock, enabling active operation") // This is used later to log a metrics event; this can be helpful to @@ -410,6 +433,7 @@ func (c *Core) waitForLeadership(newLeaderCh chan func(), manualStepDownCh, stop activeTime := time.Now() continueCh := interruptPerfStandby(newLeaderCh, stopCh) + // Grab the statelock or stop if stopped := grabLockOrStop(c.stateLock.Lock, c.stateLock.Unlock, stopCh); stopped { lock.Unlock() @@ -869,3 +893,11 @@ func (c *Core) clearLeader(uuid string) error { return err } + +func (c *Core) SetNeverBecomeActive(on bool) { + if on { + atomic.StoreUint32(c.neverBecomeActive, 1) + } else { + atomic.StoreUint32(c.neverBecomeActive, 0) + } +} diff --git a/vault/request_forwarding.go b/vault/request_forwarding.go index d0fbd2865..ff0eb5fd4 100644 --- a/vault/request_forwarding.go +++ b/vault/request_forwarding.go @@ -397,6 +397,8 @@ func (c *Core) clearForwardingClients() { c.rpcClientConnContext = nil c.rpcForwardingClient = nil + + c.clusterLeaderParams.Store((*ClusterLeaderParams)(nil)) } // ForwardRequest forwards a given request to the active node and returns the diff --git a/vault/testing.go b/vault/testing.go index a5faceb85..c5e9c4bce 100644 --- a/vault/testing.go +++ b/vault/testing.go @@ -688,7 +688,7 @@ func TestWaitActiveWithError(core *Core) error { start := time.Now() var standby bool var err error - for time.Now().Sub(start) < time.Second { + for time.Now().Sub(start) < 30*time.Second { standby, err = core.Standby() if err != nil { return err @@ -789,6 +789,13 @@ func (c *TestCluster) EnsureCoresSealed(t testing.T) { } } +func (c *TestClusterCore) Seal(t testing.T) { + t.Helper() + if err := c.Core.sealInternal(); err != nil { + t.Fatal(err) + } +} + func CleanupClusters(clusters []*TestCluster) { wg := &sync.WaitGroup{} for _, cluster := range clusters { diff --git a/vault/testing_util.go b/vault/testing_util.go index 689ce11fd..26c7cde05 100644 --- a/vault/testing_util.go +++ b/vault/testing_util.go @@ -2,9 +2,13 @@ package vault -import testing "github.com/mitchellh/go-testing-interface" +import ( + testing "github.com/mitchellh/go-testing-interface" +) func testGenerateCoreKeys() (interface{}, interface{}, error) { return nil, nil, nil } func testGetLicensingConfig(interface{}) *LicensingConfig { return &LicensingConfig{} } -func testAdjustTestCore(*CoreConfig, *TestClusterCore) {} func testExtraClusterCoresTestSetup(testing.T, interface{}, []*TestClusterCore) {} +func testAdjustTestCore(_ *CoreConfig, tcc *TestClusterCore) { + tcc.UnderlyingStorage = tcc.physical +}