Revert stopoptions (#4088)

* Use an atomic value instead to communicate whether to give up HA lock

* Remove now-unneeded StopOptions struct

* Add a channel based mutex acquisition function to avoid a deadlock

* Make periodic leader refresh only spawn a single waiting goroutine and
extend logic to the key upgrade check
This commit is contained in:
Brian Kassouf 2018-03-06 18:35:58 -08:00 committed by Jeff Mitchell
parent 6b777dc63e
commit 34d958968d
1 changed files with 87 additions and 50 deletions

View File

@ -110,10 +110,6 @@ var (
LastRemoteWAL = lastRemoteWALImpl
)
type StopOptions struct {
KeepLock bool
}
// NonFatalError is an error that can be returned during NewCore that should be
// displayed but not cause a program exit
type NonFatalError struct {
@ -194,12 +190,12 @@ type Core struct {
stateLock sync.RWMutex
sealed bool
standby bool
standbyDoneCh chan struct{}
lockAcquisitionStopCh chan struct{}
standbyStopCh chan StopOptions
manualStepDownCh chan struct{}
heldHALock physical.Lock
standby bool
standbyDoneCh chan struct{}
standbyStopCh chan struct{}
manualStepDownCh chan struct{}
keepHALockOnStepDown uint32
heldHALock physical.Lock
// unlockInfo has the keys provided to Unseal until the threshold number of parts is available, as well as the operation nonce
unlockInfo *unlockInformation
@ -1243,9 +1239,8 @@ func (c *Core) unsealInternal(ctx context.Context, masterKey []byte) (bool, erro
// Go to standby mode, wait until we are active to unseal
c.standbyDoneCh = make(chan struct{})
c.manualStepDownCh = make(chan struct{})
c.lockAcquisitionStopCh = make(chan struct{})
c.standbyStopCh = make(chan StopOptions)
go c.runStandby(c.standbyDoneCh, c.manualStepDownCh, c.lockAcquisitionStopCh, c.standbyStopCh)
c.standbyStopCh = make(chan struct{})
go c.runStandby(c.standbyDoneCh, c.manualStepDownCh, c.standbyStopCh)
}
// Success!
@ -1541,24 +1536,21 @@ func (c *Core) sealInternal(keepLock bool) error {
return fmt.Errorf("internal error")
}
} else {
if keepLock {
atomic.StoreUint32(&c.keepHALockOnStepDown, 1)
}
// If we are trying to acquire the lock, force it to return with nil so
// runStandby will exit
if c.lockAcquisitionStopCh != nil {
c.logger.Trace("core: closing lock acquisition stop channel")
close(c.lockAcquisitionStopCh)
c.lockAcquisitionStopCh = nil
}
// If we are active, signal the standby goroutine to shut down and wait
// for completion. We have the state lock here so nothing else should
// be toggling standby status.
if !c.standby {
c.standbyStopCh <- StopOptions{KeepLock: keepLock}
c.logger.Trace("core: finished triggering standbyStopCh for runStandby")
close(c.standbyStopCh)
c.logger.Trace("core: finished triggering standbyStopCh for runStandby")
// Wait for runStandby to stop
<-c.standbyDoneCh
c.logger.Trace("core: runStandby done")
}
// Wait for runStandby to stop
<-c.standbyDoneCh
atomic.StoreUint32(&c.keepHALockOnStepDown, 0)
c.logger.Trace("core: runStandby done")
}
c.logger.Debug("core: sealing barrier")
@ -1757,7 +1749,7 @@ func stopReplicationImpl(c *Core) error {
// runStandby is a long running routine that is used when an HA backend
// is enabled. It waits until we are leader and switches this Vault to
// active.
func (c *Core) runStandby(doneCh, manualStepDownCh, lockAcquisitionStopCh chan struct{}, stopCh chan StopOptions) {
func (c *Core) runStandby(doneCh, manualStepDownCh, stopCh chan struct{}) {
defer close(doneCh)
defer close(manualStepDownCh)
c.logger.Info("core: entering standby mode")
@ -1809,7 +1801,7 @@ func (c *Core) runStandby(doneCh, manualStepDownCh, lockAcquisitionStopCh chan s
}
// Attempt the acquisition
leaderLostCh := c.acquireLock(lock, lockAcquisitionStopCh)
leaderLostCh := c.acquireLock(lock, stopCh)
// Bail if we are being shutdown
if leaderLostCh == nil {
@ -1823,7 +1815,31 @@ func (c *Core) runStandby(doneCh, manualStepDownCh, lockAcquisitionStopCh chan s
// Grab the lock as we need it for cluster setup, which needs to happen
// before advertising;
c.stateLock.Lock()
lockGrabbedCh := make(chan struct{})
go func() {
// Grab the lock
c.stateLock.Lock()
// If stopCh has been closed, which only happens while the
// stateLock is held, we have actually terminated, so we just
// instantly give up the lock, otherwise we notify that it's ready
// for consumption
select {
case <-stopCh:
c.stateLock.Unlock()
default:
close(lockGrabbedCh)
}
}()
select {
case <-stopCh:
lock.Unlock()
metrics.MeasureSince([]string{"core", "leadership_setup_failed"}, activeTime)
return
case <-lockGrabbedCh:
// We now have the lock and can use it
}
if c.sealed {
c.logger.Warn("core: grabbed HA lock but already sealed, exiting")
@ -1905,16 +1921,18 @@ func (c *Core) runStandby(doneCh, manualStepDownCh, lockAcquisitionStopCh chan s
}
// Monitor a loss of leadership
var stopOpts StopOptions
releaseHALock := true
grabStateLock := true
select {
case <-leaderLostCh:
c.logger.Warn("core: leadership lost, stopping active operation")
case stopOpts = <-stopCh:
case <-stopCh:
// This case comes from sealInternal; we will already be having the
// state lock held so we do toggle grabStateLock to false
if atomic.LoadUint32(&c.keepHALockOnStepDown) == 1 {
releaseHALock = false
}
grabStateLock = false
close(stopCh)
case <-manualStepDownCh:
c.logger.Warn("core: stepping down from active operation to standby")
manualStepDown = true
@ -1937,7 +1955,7 @@ func (c *Core) runStandby(doneCh, manualStepDownCh, lockAcquisitionStopCh chan s
c.stateLock.Unlock()
}
if !stopOpts.KeepLock {
if releaseHALock {
if err := c.clearLeader(uuid); err != nil {
c.logger.Error("core: clearing leader advertisement failed", "error", err)
}
@ -1958,14 +1976,23 @@ func (c *Core) runStandby(doneCh, manualStepDownCh, lockAcquisitionStopCh chan s
// the result.
func (c *Core) periodicLeaderRefresh(doneCh, stopCh chan struct{}) {
defer close(doneCh)
var opCount int32
for {
select {
case <-time.After(leaderCheckInterval):
count := atomic.AddInt32(&opCount, 1)
if count > 1 {
atomic.AddInt32(&opCount, -1)
continue
}
// We do this in a goroutine because otherwise if this refresh is
// called while we're shutting down the call to Leader() can
// deadlock, which then means stopCh can never been seen and we can
// block shutdown
go c.Leader()
go func() {
defer atomic.AddInt32(&opCount, -1)
c.Leader()
}()
case <-stopCh:
return
}
@ -1975,30 +2002,40 @@ func (c *Core) periodicLeaderRefresh(doneCh, stopCh chan struct{}) {
// periodicCheckKeyUpgrade is used to watch for key rotation events as a standby
func (c *Core) periodicCheckKeyUpgrade(ctx context.Context, doneCh, stopCh chan struct{}) {
defer close(doneCh)
var opCount int32
for {
select {
case <-time.After(keyRotateCheckInterval):
// Only check if we are a standby
c.stateLock.RLock()
standby := c.standby
c.stateLock.RUnlock()
if !standby {
count := atomic.AddInt32(&opCount, 1)
if count > 1 {
atomic.AddInt32(&opCount, -1)
continue
}
// Check for a poison pill. If we can read it, it means we have stale
// keys (e.g. from replication being activated) and we need to seal to
// be unsealed again.
entry, _ := c.barrier.Get(ctx, poisonPillPath)
if entry != nil && len(entry.Value) > 0 {
c.logger.Warn("core: encryption keys have changed out from underneath us (possibly due to replication enabling), must be unsealed again")
go c.Shutdown()
continue
}
go func() {
defer atomic.AddInt32(&opCount, -1)
// Only check if we are a standby
c.stateLock.RLock()
standby := c.standby
c.stateLock.RUnlock()
if !standby {
return
}
if err := c.checkKeyUpgrades(ctx); err != nil {
c.logger.Error("core: key rotation periodic upgrade check failed", "error", err)
}
// Check for a poison pill. If we can read it, it means we have stale
// keys (e.g. from replication being activated) and we need to seal to
// be unsealed again.
entry, _ := c.barrier.Get(ctx, poisonPillPath)
if entry != nil && len(entry.Value) > 0 {
c.logger.Warn("core: encryption keys have changed out from underneath us (possibly due to replication enabling), must be unsealed again")
go c.Shutdown()
return
}
if err := c.checkKeyUpgrades(ctx); err != nil {
c.logger.Error("core: key rotation periodic upgrade check failed", "error", err)
}
}()
case <-stopCh:
return
}