Fix leader info repopulation (#6167)

* Two things:

* Change how we populate and clear leader UUID. This fixes a case where
if a standby disconnects from an active node and reconnects, without the
active node restarting, the UUID doesn't change so triggers on a new
active node don't get run.

* Add a bunch of test helpers and minor updates to things.
This commit is contained in:
Jeff Mitchell 2019-02-05 21:01:18 -05:00 committed by GitHub
parent 97a73d6bf8
commit 9ef0680e7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 331 additions and 57 deletions

View File

@ -16,7 +16,7 @@ const (
// ensure no overlap between old and new values.
ReplicationUnknown ReplicationState = 0
ReplicationPerformancePrimary ReplicationState = 1 << iota
ReplicationPerformancePrimary ReplicationState = 1 << iota // Note -- iota is 5 here!
ReplicationPerformanceSecondary
OldSplitReplicationBootstrapping
ReplicationDRPrimary
@ -51,6 +51,39 @@ func (r ReplicationState) string() string {
return "unknown"
}
func (r ReplicationState) StateStrings() []string {
var ret []string
if r.HasState(ReplicationPerformanceSecondary) {
ret = append(ret, "perf-secondary")
}
if r.HasState(ReplicationPerformancePrimary) {
ret = append(ret, "perf-primary")
}
if r.HasState(ReplicationPerformanceBootstrapping) {
ret = append(ret, "perf-bootstrapping")
}
if r.HasState(ReplicationPerformanceDisabled) {
ret = append(ret, "perf-disabled")
}
if r.HasState(ReplicationDRPrimary) {
ret = append(ret, "dr-primary")
}
if r.HasState(ReplicationDRSecondary) {
ret = append(ret, "dr-secondary")
}
if r.HasState(ReplicationDRBootstrapping) {
ret = append(ret, "dr-bootstrapping")
}
if r.HasState(ReplicationDRDisabled) {
ret = append(ret, "dr-disabled")
}
if r.HasState(ReplicationPerformanceStandby) {
ret = append(ret, "perfstandby")
}
return ret
}
func (r ReplicationState) GetDRString() string {
switch {
case r.HasState(ReplicationDRBootstrapping):

View File

@ -0,0 +1,11 @@
package cryptoutil
import "golang.org/x/crypto/blake2b"
func Blake2b256Hash(key string) []byte {
hf, _ := blake2b.New256(nil)
hf.Write([]byte(key))
return hf.Sum(nil)
}

View File

@ -0,0 +1,11 @@
package cryptoutil
import "testing"
func TestBlake2b256Hash(t *testing.T) {
hashVal := Blake2b256Hash("sampletext")
if string(hashVal) == "" || string(hashVal) == "sampletext" {
t.Fatalf("failed to hash the text")
}
}

View File

@ -1,8 +1,9 @@
package locksutil
import (
"crypto/md5"
"sync"
"github.com/hashicorp/vault/helper/cryptoutil"
)
const (
@ -34,9 +35,7 @@ func CreateLocks() []*LockEntry {
}
func LockIndexForKey(key string) uint8 {
hf := md5.New()
hf.Write([]byte(key))
return uint8(hf.Sum(nil)[0])
return uint8(cryptoutil.Blake2b256Hash(key)[0])
}
func LockForKey(locks []*LockEntry, key string) *LockEntry {

View File

@ -28,8 +28,12 @@ type ReplicatedTestClusters struct {
func (r *ReplicatedTestClusters) Cleanup() {
r.PerfPrimaryCluster.Cleanup()
r.PerfSecondaryCluster.Cleanup()
r.PerfPrimaryDRCluster.Cleanup()
r.PerfSecondaryDRCluster.Cleanup()
if r.PerfPrimaryDRCluster != nil {
r.PerfPrimaryDRCluster.Cleanup()
}
if r.PerfSecondaryDRCluster != nil {
r.PerfSecondaryDRCluster.Cleanup()
}
}
// Generates a root token on the target cluster.
@ -100,34 +104,77 @@ func RandomWithPrefix(name string) string {
return fmt.Sprintf("%s-%d", name, rand.New(rand.NewSource(time.Now().UnixNano())).Int())
}
func EnsureCoresSealed(t testing.T, c *vault.TestCluster) {
t.Helper()
for _, core := range c.Cores {
EnsureCoreSealed(t, core)
}
}
func EnsureCoreSealed(t testing.T, core *vault.TestClusterCore) error {
core.Seal(t)
timeout := time.Now().Add(60 * time.Second)
for {
if time.Now().After(timeout) {
return fmt.Errorf("timeout waiting for core to seal")
}
if core.Core.Sealed() {
break
}
time.Sleep(250 * time.Millisecond)
}
return nil
}
func EnsureCoresUnsealed(t testing.T, c *vault.TestCluster) {
t.Helper()
for _, core := range c.Cores {
if !core.Sealed() {
continue
}
EnsureCoreUnsealed(t, c, core)
}
}
func EnsureCoreUnsealed(t testing.T, c *vault.TestCluster, core *vault.TestClusterCore) {
if !core.Sealed() {
return
}
client := core.Client
client.Sys().ResetUnsealProcess()
for j := 0; j < len(c.BarrierKeys); j++ {
statusResp, err := client.Sys().Unseal(base64.StdEncoding.EncodeToString(c.BarrierKeys[j]))
if err != nil {
// Sometimes when we get here it's already unsealed on its own
// and then this fails for DR secondaries so check again
if core.Sealed() {
t.Fatal(err)
}
break
}
if statusResp == nil {
t.Fatal("nil status response during unseal")
}
if !statusResp.Sealed {
break
client := core.Client
client.Sys().ResetUnsealProcess()
for j := 0; j < len(c.BarrierKeys); j++ {
statusResp, err := client.Sys().Unseal(base64.StdEncoding.EncodeToString(c.BarrierKeys[j]))
if err != nil {
// Sometimes when we get here it's already unsealed on its own
// and then this fails for DR secondaries so check again
if core.Sealed() {
t.Fatal(err)
}
break
}
if core.Sealed() {
t.Fatal("core is still sealed")
if statusResp == nil {
t.Fatal("nil status response during unseal")
}
if !statusResp.Sealed {
break
}
}
if core.Sealed() {
t.Fatal("core is still sealed")
}
}
func EnsureCoreIsPerfStandby(t testing.T, core *vault.TestClusterCore) {
t.Helper()
start := time.Now()
for {
health, err := core.Client.Sys().Health()
if err != nil {
t.Fatal(err)
}
if health.PerformanceStandby {
break
}
time.Sleep(time.Millisecond * 500)
if time.Now().After(start.Add(time.Second * 60)) {
t.Fatal("did not become a perf standby")
}
}
}
@ -175,6 +222,30 @@ func GetClusterAndCore(t testing.T, logger log.Logger, handlerFunc func(*vault.H
return cluster, core
}
func GetPerfReplicatedClusters(t testing.T, handlerFunc func(*vault.HandlerProperties) http.Handler) *ReplicatedTestClusters {
ret := &ReplicatedTestClusters{}
logger := log.New(&log.LoggerOptions{
Mutex: &sync.Mutex{},
Level: log.Trace,
})
// Set this lower so that state populates quickly to standby nodes
vault.HeartbeatInterval = 2 * time.Second
ret.PerfPrimaryCluster, _ = GetClusterAndCore(t, logger.Named("perf-pri"), handlerFunc)
ret.PerfSecondaryCluster, _ = GetClusterAndCore(t, logger.Named("perf-sec"), handlerFunc)
SetupTwoClusterPerfReplication(t, ret.PerfPrimaryCluster, ret.PerfSecondaryCluster)
// Wait until poison pills have been read
time.Sleep(45 * time.Second)
EnsureCoresUnsealed(t, ret.PerfPrimaryCluster)
EnsureCoresUnsealed(t, ret.PerfSecondaryCluster)
return ret
}
func GetFourReplicatedClusters(t testing.T, handlerFunc func(*vault.HandlerProperties) http.Handler) *ReplicatedTestClusters {
ret := &ReplicatedTestClusters{}
@ -205,6 +276,46 @@ func GetFourReplicatedClusters(t testing.T, handlerFunc func(*vault.HandlerPrope
return ret
}
func SetupTwoClusterPerfReplication(t testing.T, perfPrimary, perfSecondary *vault.TestCluster) {
// Enable performance primary
_, err := perfPrimary.Cores[0].Client.Logical().Write("sys/replication/performance/primary/enable", nil)
if err != nil {
t.Fatal(err)
}
WaitForReplicationState(t, perfPrimary.Cores[0].Core, consts.ReplicationPerformancePrimary)
// get performance token
secret, err := perfPrimary.Cores[0].Client.Logical().Write("sys/replication/performance/primary/secondary-token", map[string]interface{}{
"id": "1",
})
if err != nil {
t.Fatal(err)
}
token := secret.WrapInfo.Token
// enable performace secondary
secret, err = perfSecondary.Cores[0].Client.Logical().Write("sys/replication/performance/secondary/enable", map[string]interface{}{
"token": token,
"ca_file": perfPrimary.CACertPEMFile,
})
if err != nil {
t.Fatal(err)
}
WaitForReplicationState(t, perfSecondary.Cores[0].Core, consts.ReplicationPerformanceSecondary)
time.Sleep(time.Second * 3)
perfSecondary.BarrierKeys = perfPrimary.BarrierKeys
EnsureCoresUnsealed(t, perfSecondary)
rootToken := GenerateRoot(t, perfSecondary, false)
perfSecondary.Cores[0].Client.SetToken(rootToken)
for _, core := range perfSecondary.Cores {
core.Client.SetToken(rootToken)
}
}
func SetupFourClusterReplication(t testing.T, perfPrimary, perfSecondary, perfDRSecondary, perfSecondaryDRSecondary *vault.TestCluster) {
// Enable dr primary
_, err := perfPrimary.Cores[0].Client.Logical().Write("sys/replication/dr/primary/enable", nil)
@ -269,7 +380,9 @@ func SetupFourClusterReplication(t testing.T, perfPrimary, perfSecondary, perfDR
EnsureCoresUnsealed(t, perfSecondary)
rootToken := GenerateRoot(t, perfSecondary, false)
perfSecondary.Cores[0].Client.SetToken(rootToken)
for _, core := range perfSecondary.Cores {
core.Client.SetToken(rootToken)
}
// Enable dr primary on perf secondary
_, err = perfSecondary.Cores[0].Client.Logical().Write("sys/replication/dr/primary/enable", nil)

View File

@ -0,0 +1,52 @@
package logical
import (
"context"
"github.com/hashicorp/vault/physical"
)
type LogicalStorage struct {
underlying physical.Backend
}
func (s *LogicalStorage) Get(ctx context.Context, key string) (*StorageEntry, error) {
entry, err := s.underlying.Get(ctx, key)
if err != nil {
return nil, err
}
if entry == nil {
return nil, nil
}
return &StorageEntry{
Key: entry.Key,
Value: entry.Value,
SealWrap: entry.SealWrap,
}, nil
}
func (s *LogicalStorage) Put(ctx context.Context, entry *StorageEntry) error {
return s.underlying.Put(ctx, &physical.Entry{
Key: entry.Key,
Value: entry.Value,
SealWrap: entry.SealWrap,
})
}
func (s *LogicalStorage) Delete(ctx context.Context, key string) error {
return s.underlying.Delete(ctx, key)
}
func (s *LogicalStorage) List(ctx context.Context, prefix string) ([]string, error) {
return s.underlying.List(ctx, prefix)
}
func (s *LogicalStorage) Underlying() physical.Backend {
return s.underlying
}
func NewLogicalStorage(underlying physical.Backend) *LogicalStorage {
return &LogicalStorage{
underlying: underlying,
}
}

View File

@ -38,6 +38,12 @@ var (
ErrCannotForward = errors.New("cannot forward request; no connection or address not known")
)
type ClusterLeaderParams struct {
LeaderUUID string
LeaderRedirectAddr string
LeaderClusterAddr string
}
type ReplicatedClusters struct {
DR *ReplicatedCluster
Performance *ReplicatedCluster

View File

@ -168,7 +168,7 @@ func TestCluster_ListenForRequests(t *testing.T) {
time.Sleep(manualStepDownSleepPeriod)
checkListenersFunc(false)
err = cores[0].Seal(cluster.RootToken)
err = cores[0].Core.Seal(cluster.RootToken)
if err != nil {
t.Fatal(err)
}

View File

@ -338,15 +338,11 @@ type Core struct {
// Write lock used to ensure that we don't have multiple connections adjust
// this value at the same time
requestForwardingConnectionLock sync.RWMutex
// Most recent leader UUID. Used to avoid repeatedly JSON parsing the same
// values.
clusterLeaderUUID string
// Most recent leader redirect addr
clusterLeaderRedirectAddr string
// Most recent leader cluster addr
clusterLeaderClusterAddr string
// Lock for the cluster leader values
clusterLeaderParamsLock sync.RWMutex
// Lock for the leader values, ensuring we don't run the parts of Leader()
// that change things concurrently
leaderParamsLock sync.RWMutex
// Current cluster leader values
clusterLeaderParams *atomic.Value
// Info on cluster members
clusterPeerClusterAddrsCache *cache.Cache
// Stores whether we currently have a server running
@ -416,6 +412,10 @@ type Core struct {
// Stores loggers so we can reset the level
allLoggers []log.Logger
allLoggersLock sync.RWMutex
// Can be toggled atomically to cause the core to never try to become
// active, or give up active as soon as it gets it
neverBecomeActive *uint32
}
// CoreConfig is used to parameterize a core
@ -590,6 +590,8 @@ func NewCore(conf *CoreConfig) (*Core, error) {
activeContextCancelFunc: new(atomic.Value),
allLoggers: conf.AllLoggers,
builtinRegistry: conf.BuiltinRegistry,
neverBecomeActive: new(uint32),
clusterLeaderParams: new(atomic.Value),
}
atomic.StoreUint32(c.sealed, 1)
@ -600,6 +602,8 @@ func NewCore(conf *CoreConfig) (*Core, error) {
c.localClusterParsedCert.Store((*x509.Certificate)(nil))
c.localClusterPrivateKey.Store((*ecdsa.PrivateKey)(nil))
c.clusterLeaderParams.Store((*ClusterLeaderParams)(nil))
c.activeContextCancelFunc.Store((context.CancelFunc)(nil))
if conf.ClusterCipherSuites != "" {

View File

@ -108,28 +108,41 @@ func (c *Core) Leader() (isLeader bool, leaderAddr, clusterAddr string, err erro
return false, "", "", nil
}
c.clusterLeaderParamsLock.RLock()
localLeaderUUID := c.clusterLeaderUUID
localRedirAddr := c.clusterLeaderRedirectAddr
localClusterAddr := c.clusterLeaderClusterAddr
c.clusterLeaderParamsLock.RUnlock()
var localLeaderUUID, localRedirectAddr, localClusterAddr string
clusterLeaderParams := c.clusterLeaderParams.Load().(*ClusterLeaderParams)
if clusterLeaderParams != nil {
localLeaderUUID = clusterLeaderParams.LeaderUUID
localRedirectAddr = clusterLeaderParams.LeaderRedirectAddr
localClusterAddr = clusterLeaderParams.LeaderClusterAddr
}
// If the leader hasn't changed, return the cached value; nothing changes
// mid-leadership, and the barrier caches anyways
if leaderUUID == localLeaderUUID && localRedirAddr != "" {
if leaderUUID == localLeaderUUID && localRedirectAddr != "" {
c.stateLock.RUnlock()
return false, localRedirAddr, localClusterAddr, nil
return false, localRedirectAddr, localClusterAddr, nil
}
c.logger.Trace("found new active node information, refreshing")
defer c.stateLock.RUnlock()
c.clusterLeaderParamsLock.Lock()
defer c.clusterLeaderParamsLock.Unlock()
c.leaderParamsLock.Lock()
defer c.leaderParamsLock.Unlock()
// Validate base conditions again
if leaderUUID == c.clusterLeaderUUID && c.clusterLeaderRedirectAddr != "" {
return false, localRedirAddr, localClusterAddr, nil
clusterLeaderParams = c.clusterLeaderParams.Load().(*ClusterLeaderParams)
if clusterLeaderParams != nil {
localLeaderUUID = clusterLeaderParams.LeaderUUID
localRedirectAddr = clusterLeaderParams.LeaderRedirectAddr
localClusterAddr = clusterLeaderParams.LeaderClusterAddr
} else {
localLeaderUUID = ""
localRedirectAddr = ""
localClusterAddr = ""
}
if leaderUUID == localLeaderUUID && localRedirectAddr != "" {
return false, localRedirectAddr, localClusterAddr, nil
}
key := coreLeaderPrefix + leaderUUID
@ -174,9 +187,11 @@ func (c *Core) Leader() (isLeader bool, leaderAddr, clusterAddr string, err erro
// Don't set these until everything has been parsed successfully or we'll
// never try again
c.clusterLeaderRedirectAddr = adv.RedirectAddr
c.clusterLeaderClusterAddr = adv.ClusterAddr
c.clusterLeaderUUID = leaderUUID
c.clusterLeaderParams.Store(&ClusterLeaderParams{
LeaderUUID: leaderUUID,
LeaderRedirectAddr: adv.RedirectAddr,
LeaderClusterAddr: adv.ClusterAddr,
})
return false, adv.RedirectAddr, adv.ClusterAddr, nil
}
@ -403,6 +418,14 @@ func (c *Core) waitForLeadership(newLeaderCh chan func(), manualStepDownCh, stop
if leaderLostCh == nil {
return
}
if atomic.LoadUint32(c.neverBecomeActive) == 1 {
c.heldHALock = nil
lock.Unlock()
c.logger.Info("marked never become active, giving up active state")
continue
}
c.logger.Info("acquired lock, enabling active operation")
// This is used later to log a metrics event; this can be helpful to
@ -410,6 +433,7 @@ func (c *Core) waitForLeadership(newLeaderCh chan func(), manualStepDownCh, stop
activeTime := time.Now()
continueCh := interruptPerfStandby(newLeaderCh, stopCh)
// Grab the statelock or stop
if stopped := grabLockOrStop(c.stateLock.Lock, c.stateLock.Unlock, stopCh); stopped {
lock.Unlock()
@ -869,3 +893,11 @@ func (c *Core) clearLeader(uuid string) error {
return err
}
func (c *Core) SetNeverBecomeActive(on bool) {
if on {
atomic.StoreUint32(c.neverBecomeActive, 1)
} else {
atomic.StoreUint32(c.neverBecomeActive, 0)
}
}

View File

@ -397,6 +397,8 @@ func (c *Core) clearForwardingClients() {
c.rpcClientConnContext = nil
c.rpcForwardingClient = nil
c.clusterLeaderParams.Store((*ClusterLeaderParams)(nil))
}
// ForwardRequest forwards a given request to the active node and returns the

View File

@ -688,7 +688,7 @@ func TestWaitActiveWithError(core *Core) error {
start := time.Now()
var standby bool
var err error
for time.Now().Sub(start) < time.Second {
for time.Now().Sub(start) < 30*time.Second {
standby, err = core.Standby()
if err != nil {
return err
@ -789,6 +789,13 @@ func (c *TestCluster) EnsureCoresSealed(t testing.T) {
}
}
func (c *TestClusterCore) Seal(t testing.T) {
t.Helper()
if err := c.Core.sealInternal(); err != nil {
t.Fatal(err)
}
}
func CleanupClusters(clusters []*TestCluster) {
wg := &sync.WaitGroup{}
for _, cluster := range clusters {

View File

@ -2,9 +2,13 @@
package vault
import testing "github.com/mitchellh/go-testing-interface"
import (
testing "github.com/mitchellh/go-testing-interface"
)
func testGenerateCoreKeys() (interface{}, interface{}, error) { return nil, nil, nil }
func testGetLicensingConfig(interface{}) *LicensingConfig { return &LicensingConfig{} }
func testAdjustTestCore(*CoreConfig, *TestClusterCore) {}
func testExtraClusterCoresTestSetup(testing.T, interface{}, []*TestClusterCore) {}
func testAdjustTestCore(_ *CoreConfig, tcc *TestClusterCore) {
tcc.UnderlyingStorage = tcc.physical
}