Port some HA changes (#5186)
This commit is contained in:
parent
e8991e8fe9
commit
b7e33f1d2e
|
@ -52,25 +52,6 @@ const (
|
|||
// information for primaries
|
||||
knownPrimaryAddrsPrefix = "core/primary-addrs/"
|
||||
|
||||
// lockRetryInterval is the interval we re-attempt to acquire the
|
||||
// HA lock if an error is encountered
|
||||
lockRetryInterval = 10 * time.Second
|
||||
|
||||
// leaderCheckInterval is how often a standby checks for a new leader
|
||||
leaderCheckInterval = 2500 * time.Millisecond
|
||||
|
||||
// keyRotateCheckInterval is how often a standby checks for a key
|
||||
// rotation taking place.
|
||||
keyRotateCheckInterval = 30 * time.Second
|
||||
|
||||
// keyRotateGracePeriod is how long we allow an upgrade path
|
||||
// for standby instances before we delete the upgrade keys
|
||||
keyRotateGracePeriod = 2 * time.Minute
|
||||
|
||||
// leaderPrefixCleanDelay is how long to wait between deletions
|
||||
// of orphaned leader keys, to prevent slamming the backend.
|
||||
leaderPrefixCleanDelay = 200 * time.Millisecond
|
||||
|
||||
// coreKeyringCanaryPath is used as a canary to indicate to replicated
|
||||
// clusters that they need to perform a rekey operation synchronously; this
|
||||
// isn't keyring-canary to avoid ignoring it when ignoring core/keyring
|
||||
|
|
135
vault/ha.go
135
vault/ha.go
|
@ -16,11 +16,43 @@ import (
|
|||
"github.com/hashicorp/vault/audit"
|
||||
"github.com/hashicorp/vault/helper/consts"
|
||||
"github.com/hashicorp/vault/helper/jsonutil"
|
||||
"github.com/hashicorp/vault/helper/namespace"
|
||||
"github.com/hashicorp/vault/logical"
|
||||
"github.com/hashicorp/vault/physical"
|
||||
"github.com/oklog/run"
|
||||
)
|
||||
|
||||
const (
|
||||
// lockRetryInterval is the interval we re-attempt to acquire the
|
||||
// HA lock if an error is encountered
|
||||
lockRetryInterval = 10 * time.Second
|
||||
|
||||
// leaderCheckInterval is how often a standby checks for a new leader
|
||||
leaderCheckInterval = 2500 * time.Millisecond
|
||||
|
||||
// keyRotateCheckInterval is how often a standby checks for a key
|
||||
// rotation taking place.
|
||||
keyRotateCheckInterval = 10 * time.Second
|
||||
|
||||
// keyRotateGracePeriod is how long we allow an upgrade path
|
||||
// for standby instances before we delete the upgrade keys
|
||||
keyRotateGracePeriod = 2 * time.Minute
|
||||
|
||||
// leaderPrefixCleanDelay is how long to wait between deletions
|
||||
// of orphaned leader keys, to prevent slamming the backend.
|
||||
leaderPrefixCleanDelay = 200 * time.Millisecond
|
||||
)
|
||||
|
||||
var (
|
||||
addEnterpriseHaActors func(*Core, *run.Group) chan func() = addEnterpriseHaActorsNoop
|
||||
interruptPerfStandby func(chan func(), chan struct{}) chan struct{} = interruptPerfStandbyNoop
|
||||
)
|
||||
|
||||
func addEnterpriseHaActorsNoop(*Core, *run.Group) chan func() { return nil }
|
||||
func interruptPerfStandbyNoop(chan func(), chan struct{}) chan struct{} {
|
||||
return make(chan struct{})
|
||||
}
|
||||
|
||||
// Standby checks if the Vault is in standby mode
|
||||
func (c *Core) Standby() (bool, error) {
|
||||
c.stateLock.RLock()
|
||||
|
@ -275,6 +307,7 @@ func (c *Core) runStandby(doneCh, manualStepDownCh, stopCh chan struct{}) {
|
|||
c.logger.Info("entering standby mode")
|
||||
|
||||
var g run.Group
|
||||
newLeaderCh := addEnterpriseHaActors(c, &g)
|
||||
{
|
||||
// This will cause all the other actors to close when the stop channel
|
||||
// is closed.
|
||||
|
@ -285,44 +318,38 @@ func (c *Core) runStandby(doneCh, manualStepDownCh, stopCh chan struct{}) {
|
|||
}
|
||||
{
|
||||
// Monitor for key rotation
|
||||
keyRotateDone := make(chan struct{})
|
||||
keyRotateStop := make(chan struct{})
|
||||
|
||||
g.Add(func() error {
|
||||
c.periodicCheckKeyUpgrade(context.Background(), keyRotateDone, keyRotateStop)
|
||||
c.periodicCheckKeyUpgrade(context.Background(), keyRotateStop)
|
||||
return nil
|
||||
}, func(error) {
|
||||
close(keyRotateStop)
|
||||
c.logger.Debug("shutting down periodic key rotation checker")
|
||||
<-keyRotateDone
|
||||
})
|
||||
}
|
||||
{
|
||||
// Monitor for new leadership
|
||||
checkLeaderDone := make(chan struct{})
|
||||
checkLeaderStop := make(chan struct{})
|
||||
|
||||
g.Add(func() error {
|
||||
c.periodicLeaderRefresh(checkLeaderDone, checkLeaderStop)
|
||||
c.periodicLeaderRefresh(newLeaderCh, checkLeaderStop)
|
||||
return nil
|
||||
}, func(error) {
|
||||
close(checkLeaderStop)
|
||||
c.logger.Debug("shutting down periodic leader refresh")
|
||||
<-checkLeaderDone
|
||||
})
|
||||
}
|
||||
{
|
||||
// Wait for leadership
|
||||
leaderDoneCh := make(chan struct{})
|
||||
leaderStopCh := make(chan struct{})
|
||||
|
||||
g.Add(func() error {
|
||||
c.waitForLeadership(leaderDoneCh, manualStepDownCh, leaderStopCh)
|
||||
c.waitForLeadership(newLeaderCh, manualStepDownCh, leaderStopCh)
|
||||
return nil
|
||||
}, func(error) {
|
||||
close(leaderStopCh)
|
||||
c.logger.Debug("shutting down leader elections")
|
||||
<-leaderDoneCh
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -333,11 +360,7 @@ func (c *Core) runStandby(doneCh, manualStepDownCh, stopCh chan struct{}) {
|
|||
// waitForLeadership is a long running routine that is used when an HA backend
|
||||
// is enabled. It waits until we are leader and switches this Vault to
|
||||
// active.
|
||||
func (c *Core) waitForLeadership(doneCh, manualStepDownCh, stopCh chan struct{}) {
|
||||
defer close(doneCh)
|
||||
|
||||
c.logger.Info("entering standby mode")
|
||||
|
||||
func (c *Core) waitForLeadership(newLeaderCh chan func(), manualStepDownCh, stopCh chan struct{}) {
|
||||
var manualStepDown bool
|
||||
for {
|
||||
// Check for a shutdown
|
||||
|
@ -379,37 +402,19 @@ func (c *Core) waitForLeadership(doneCh, manualStepDownCh, stopCh chan struct{})
|
|||
// detect flapping
|
||||
activeTime := time.Now()
|
||||
|
||||
// Grab the lock as we need it for cluster setup, which needs to happen
|
||||
// before advertising;
|
||||
|
||||
lockGrabbedCh := make(chan struct{})
|
||||
go func() {
|
||||
// Grab the lock
|
||||
c.stateLock.Lock()
|
||||
// If stopCh has been closed, which only happens while the
|
||||
// stateLock is held, we have actually terminated, so we just
|
||||
// instantly give up the lock, otherwise we notify that it's ready
|
||||
// for consumption
|
||||
select {
|
||||
case <-stopCh:
|
||||
c.stateLock.Unlock()
|
||||
default:
|
||||
close(lockGrabbedCh)
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-stopCh:
|
||||
continueCh := interruptPerfStandby(newLeaderCh, stopCh)
|
||||
// Grab the statelock or stop
|
||||
if stopped := grabLockOrStop(c.stateLock.Lock, c.stateLock.Unlock, stopCh); stopped {
|
||||
lock.Unlock()
|
||||
close(continueCh)
|
||||
metrics.MeasureSince([]string{"core", "leadership_setup_failed"}, activeTime)
|
||||
return
|
||||
case <-lockGrabbedCh:
|
||||
// We now have the lock and can use it
|
||||
}
|
||||
|
||||
if c.Sealed() {
|
||||
c.logger.Warn("grabbed HA lock but already sealed, exiting")
|
||||
lock.Unlock()
|
||||
close(continueCh)
|
||||
c.stateLock.Unlock()
|
||||
metrics.MeasureSince([]string{"core", "leadership_setup_failed"}, activeTime)
|
||||
return
|
||||
|
@ -419,7 +424,7 @@ func (c *Core) waitForLeadership(doneCh, manualStepDownCh, stopCh chan struct{})
|
|||
c.heldHALock = lock
|
||||
|
||||
// Create the active context
|
||||
activeCtx, activeCtxCancel := context.WithCancel(context.Background())
|
||||
activeCtx, activeCtxCancel := context.WithCancel(namespace.RootContext(nil))
|
||||
c.activeContext = activeCtx
|
||||
c.activeContextCancelFunc.Store(activeCtxCancel)
|
||||
|
||||
|
@ -441,6 +446,7 @@ func (c *Core) waitForLeadership(doneCh, manualStepDownCh, stopCh chan struct{})
|
|||
go c.Shutdown()
|
||||
c.heldHALock = nil
|
||||
lock.Unlock()
|
||||
close(continueCh)
|
||||
c.stateLock.Unlock()
|
||||
metrics.MeasureSince([]string{"core", "leadership_setup_failed"}, activeTime)
|
||||
return
|
||||
|
@ -457,6 +463,7 @@ func (c *Core) waitForLeadership(doneCh, manualStepDownCh, stopCh chan struct{})
|
|||
if err := c.setupCluster(activeCtx); err != nil {
|
||||
c.heldHALock = nil
|
||||
lock.Unlock()
|
||||
close(continueCh)
|
||||
c.stateLock.Unlock()
|
||||
c.logger.Error("cluster setup failed", "error", err)
|
||||
metrics.MeasureSince([]string{"core", "leadership_setup_failed"}, activeTime)
|
||||
|
@ -467,6 +474,7 @@ func (c *Core) waitForLeadership(doneCh, manualStepDownCh, stopCh chan struct{})
|
|||
if err := c.advertiseLeader(activeCtx, uuid, leaderLostCh); err != nil {
|
||||
c.heldHALock = nil
|
||||
lock.Unlock()
|
||||
close(continueCh)
|
||||
c.stateLock.Unlock()
|
||||
c.logger.Error("leader advertisement setup failed", "error", err)
|
||||
metrics.MeasureSince([]string{"core", "leadership_setup_failed"}, activeTime)
|
||||
|
@ -479,6 +487,7 @@ func (c *Core) waitForLeadership(doneCh, manualStepDownCh, stopCh chan struct{})
|
|||
c.standby = false
|
||||
}
|
||||
|
||||
close(continueCh)
|
||||
c.stateLock.Unlock()
|
||||
|
||||
// Handle a failure to unseal
|
||||
|
@ -553,13 +562,43 @@ func (c *Core) waitForLeadership(doneCh, manualStepDownCh, stopCh chan struct{})
|
|||
}
|
||||
}
|
||||
|
||||
func grabLockOrStop(lockFunc, unlockFunc func(), stopCh chan struct{}) (stopped bool) {
|
||||
// Grab the lock as we need it for cluster setup, which needs to happen
|
||||
// before advertising;
|
||||
lockGrabbedCh := make(chan struct{})
|
||||
go func() {
|
||||
// Grab the lock
|
||||
lockFunc()
|
||||
// If stopCh has been closed, which only happens while the
|
||||
// stateLock is held, we have actually terminated, so we just
|
||||
// instantly give up the lock, otherwise we notify that it's ready
|
||||
// for consumption
|
||||
select {
|
||||
case <-stopCh:
|
||||
unlockFunc()
|
||||
default:
|
||||
close(lockGrabbedCh)
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-stopCh:
|
||||
return true
|
||||
case <-lockGrabbedCh:
|
||||
// We now have the lock and can use it
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// This checks the leader periodically to ensure that we switch RPC to a new
|
||||
// leader pretty quickly. There is logic in Leader() already to not make this
|
||||
// onerous and avoid more traffic than needed, so we just call that and ignore
|
||||
// the result.
|
||||
func (c *Core) periodicLeaderRefresh(doneCh, stopCh chan struct{}) {
|
||||
defer close(doneCh)
|
||||
func (c *Core) periodicLeaderRefresh(newLeaderCh chan func(), stopCh chan struct{}) {
|
||||
opCount := new(int32)
|
||||
|
||||
clusterAddr := ""
|
||||
for {
|
||||
select {
|
||||
case <-time.After(leaderCheckInterval):
|
||||
|
@ -575,7 +614,18 @@ func (c *Core) periodicLeaderRefresh(doneCh, stopCh chan struct{}) {
|
|||
go func() {
|
||||
// Bind locally, as the race detector is tripping here
|
||||
lopCount := opCount
|
||||
c.Leader()
|
||||
isLeader, _, newClusterAddr, _ := c.Leader()
|
||||
|
||||
if !isLeader && newClusterAddr != clusterAddr && newLeaderCh != nil {
|
||||
select {
|
||||
case newLeaderCh <- nil:
|
||||
c.logger.Debug("new leader found, triggering new leader channel")
|
||||
clusterAddr = newClusterAddr
|
||||
default:
|
||||
c.logger.Debug("new leader found, but still processing previous leader change")
|
||||
}
|
||||
|
||||
}
|
||||
atomic.AddInt32(lopCount, -1)
|
||||
}()
|
||||
case <-stopCh:
|
||||
|
@ -585,8 +635,7 @@ func (c *Core) periodicLeaderRefresh(doneCh, stopCh chan struct{}) {
|
|||
}
|
||||
|
||||
// periodicCheckKeyUpgrade is used to watch for key rotation events as a standby
|
||||
func (c *Core) periodicCheckKeyUpgrade(ctx context.Context, doneCh, stopCh chan struct{}) {
|
||||
defer close(doneCh)
|
||||
func (c *Core) periodicCheckKeyUpgrade(ctx context.Context, stopCh chan struct{}) {
|
||||
opCount := new(int32)
|
||||
for {
|
||||
select {
|
||||
|
|
Loading…
Reference in New Issue