Rearrange shutdown logic to remove some lock switching (#4083)
* Rearrange shutdown logic to remove some lock switching and to allow giving up the HA lock to be skipped.
This commit is contained in:
parent
7042398f5f
commit
6b777dc63e
155
vault/core.go
155
vault/core.go
|
@ -110,6 +110,10 @@ var (
|
|||
LastRemoteWAL = lastRemoteWALImpl
|
||||
)
|
||||
|
||||
type StopOptions struct {
|
||||
KeepLock bool
|
||||
}
|
||||
|
||||
// NonFatalError is an error that can be returned during NewCore that should be
|
||||
// displayed but not cause a program exit
|
||||
type NonFatalError struct {
|
||||
|
@ -190,10 +194,12 @@ type Core struct {
|
|||
stateLock sync.RWMutex
|
||||
sealed bool
|
||||
|
||||
standby bool
|
||||
standbyDoneCh chan struct{}
|
||||
standbyStopCh chan struct{}
|
||||
manualStepDownCh chan struct{}
|
||||
standby bool
|
||||
standbyDoneCh chan struct{}
|
||||
lockAcquisitionStopCh chan struct{}
|
||||
standbyStopCh chan StopOptions
|
||||
manualStepDownCh chan struct{}
|
||||
heldHALock physical.Lock
|
||||
|
||||
// unlockInfo has the keys provided to Unseal until the threshold number of parts is available, as well as the operation nonce
|
||||
unlockInfo *unlockInformation
|
||||
|
@ -626,6 +632,7 @@ func NewCore(conf *CoreConfig) (*Core, error) {
|
|||
// problem. It is only used to gracefully quit in the case of HA so that failover
|
||||
// happens as quickly as possible.
|
||||
func (c *Core) Shutdown() error {
|
||||
c.logger.Trace("core: shutdown called")
|
||||
c.stateLock.RLock()
|
||||
// Tell any requests that know about this to stop
|
||||
if c.activeContextCancelFunc != nil {
|
||||
|
@ -633,15 +640,13 @@ func (c *Core) Shutdown() error {
|
|||
}
|
||||
c.stateLock.RUnlock()
|
||||
|
||||
c.logger.Trace("core: shutdown initiating internal seal")
|
||||
// Seal the Vault, causes a leader stepdown
|
||||
retChan := make(chan error)
|
||||
go func() {
|
||||
c.stateLock.Lock()
|
||||
defer c.stateLock.Unlock()
|
||||
retChan <- c.sealInternal()
|
||||
}()
|
||||
c.stateLock.Lock()
|
||||
defer c.stateLock.Unlock()
|
||||
|
||||
return <-retChan
|
||||
c.logger.Trace("core: shutdown running internal seal")
|
||||
return c.sealInternal(false)
|
||||
}
|
||||
|
||||
// CORSConfig returns the current CORS configuration
|
||||
|
@ -1237,9 +1242,10 @@ func (c *Core) unsealInternal(ctx context.Context, masterKey []byte) (bool, erro
|
|||
} else {
|
||||
// Go to standby mode, wait until we are active to unseal
|
||||
c.standbyDoneCh = make(chan struct{})
|
||||
c.standbyStopCh = make(chan struct{})
|
||||
c.manualStepDownCh = make(chan struct{})
|
||||
go c.runStandby(c.standbyDoneCh, c.standbyStopCh, c.manualStepDownCh)
|
||||
c.lockAcquisitionStopCh = make(chan struct{})
|
||||
c.standbyStopCh = make(chan StopOptions)
|
||||
go c.runStandby(c.standbyDoneCh, c.manualStepDownCh, c.lockAcquisitionStopCh, c.standbyStopCh)
|
||||
}
|
||||
|
||||
// Success!
|
||||
|
@ -1406,19 +1412,15 @@ func (c *Core) sealInitCommon(ctx context.Context, req *logical.Request) (retErr
|
|||
c.stateLock.RUnlock()
|
||||
|
||||
//Seal the Vault
|
||||
retChan := make(chan error)
|
||||
go func() {
|
||||
c.stateLock.Lock()
|
||||
defer c.stateLock.Unlock()
|
||||
retChan <- c.sealInternal()
|
||||
}()
|
||||
c.stateLock.Lock()
|
||||
defer c.stateLock.Unlock()
|
||||
sealErr := c.sealInternal(false)
|
||||
|
||||
funcErr := <-retChan
|
||||
if funcErr != nil {
|
||||
retErr = multierror.Append(retErr, funcErr)
|
||||
if sealErr != nil {
|
||||
retErr = multierror.Append(retErr, sealErr)
|
||||
}
|
||||
|
||||
return retErr
|
||||
return
|
||||
}
|
||||
|
||||
// StepDown is used to step down from leadership
|
||||
|
@ -1515,7 +1517,7 @@ func (c *Core) StepDown(req *logical.Request) (retErr error) {
|
|||
|
||||
// sealInternal is an internal method used to seal the vault. It does not do
|
||||
// any authorization checking. The stateLock must be held prior to calling.
|
||||
func (c *Core) sealInternal() error {
|
||||
func (c *Core) sealInternal(keepLock bool) error {
|
||||
if c.sealed {
|
||||
return nil
|
||||
}
|
||||
|
@ -1539,13 +1541,24 @@ func (c *Core) sealInternal() error {
|
|||
return fmt.Errorf("internal error")
|
||||
}
|
||||
} else {
|
||||
// Signal the standby goroutine to shutdown, wait for completion
|
||||
close(c.standbyStopCh)
|
||||
// If we are trying to acquire the lock, force it to return with nil so
|
||||
// runStandby will exit
|
||||
if c.lockAcquisitionStopCh != nil {
|
||||
c.logger.Trace("core: closing lock acquisition stop channel")
|
||||
close(c.lockAcquisitionStopCh)
|
||||
c.lockAcquisitionStopCh = nil
|
||||
}
|
||||
// If we are active, signal the standby goroutine to shut down and wait
|
||||
// for completion. We have the state lock here so nothing else should
|
||||
// be toggling standby status.
|
||||
if !c.standby {
|
||||
c.standbyStopCh <- StopOptions{KeepLock: keepLock}
|
||||
c.logger.Trace("core: finished triggering standbyStopCh for runStandby")
|
||||
|
||||
// Release the lock while we wait to avoid deadlocking
|
||||
c.stateLock.Unlock()
|
||||
<-c.standbyDoneCh
|
||||
c.stateLock.Lock()
|
||||
// Wait for runStandby to stop
|
||||
<-c.standbyDoneCh
|
||||
c.logger.Trace("core: runStandby done")
|
||||
}
|
||||
}
|
||||
|
||||
c.logger.Debug("core: sealing barrier")
|
||||
|
@ -1744,7 +1757,7 @@ func stopReplicationImpl(c *Core) error {
|
|||
// runStandby is a long running routine that is used when an HA backend
|
||||
// is enabled. It waits until we are leader and switches this Vault to
|
||||
// active.
|
||||
func (c *Core) runStandby(doneCh, stopCh, manualStepDownCh chan struct{}) {
|
||||
func (c *Core) runStandby(doneCh, manualStepDownCh, lockAcquisitionStopCh chan struct{}, stopCh chan StopOptions) {
|
||||
defer close(doneCh)
|
||||
defer close(manualStepDownCh)
|
||||
c.logger.Info("core: entering standby mode")
|
||||
|
@ -1758,18 +1771,29 @@ func (c *Core) runStandby(doneCh, stopCh, manualStepDownCh chan struct{}) {
|
|||
checkLeaderStop := make(chan struct{})
|
||||
go c.periodicLeaderRefresh(checkLeaderDone, checkLeaderStop)
|
||||
defer func() {
|
||||
c.logger.Trace("core: closed periodic key rotation checker stop channel")
|
||||
close(keyRotateStop)
|
||||
<-keyRotateDone
|
||||
close(checkLeaderStop)
|
||||
c.logger.Trace("core: closed periodic leader refresh stop channel")
|
||||
<-checkLeaderDone
|
||||
c.logger.Trace("core: periodic leader refresh returned")
|
||||
}()
|
||||
|
||||
var manualStepDown bool
|
||||
for {
|
||||
// Check for a shutdown
|
||||
select {
|
||||
case <-stopCh:
|
||||
c.logger.Trace("core: stop channel triggered in runStandby")
|
||||
return
|
||||
default:
|
||||
// If we've just down, we could instantly grab the lock again. Give
|
||||
// the other nodes a chance.
|
||||
if manualStepDown {
|
||||
time.Sleep(manualStepDownSleepPeriod)
|
||||
manualStepDown = false
|
||||
}
|
||||
}
|
||||
|
||||
// Create a lock
|
||||
|
@ -1785,7 +1809,7 @@ func (c *Core) runStandby(doneCh, stopCh, manualStepDownCh chan struct{}) {
|
|||
}
|
||||
|
||||
// Attempt the acquisition
|
||||
leaderLostCh := c.acquireLock(lock, stopCh)
|
||||
leaderLostCh := c.acquireLock(lock, lockAcquisitionStopCh)
|
||||
|
||||
// Bail if we are being shutdown
|
||||
if leaderLostCh == nil {
|
||||
|
@ -1801,6 +1825,17 @@ func (c *Core) runStandby(doneCh, stopCh, manualStepDownCh chan struct{}) {
|
|||
// before advertising;
|
||||
c.stateLock.Lock()
|
||||
|
||||
if c.sealed {
|
||||
c.logger.Warn("core: grabbed HA lock but already sealed, exiting")
|
||||
lock.Unlock()
|
||||
c.stateLock.Unlock()
|
||||
metrics.MeasureSince([]string{"core", "leadership_setup_failed"}, activeTime)
|
||||
return
|
||||
}
|
||||
|
||||
// Store the lock so that we can manually clear it later if needed
|
||||
c.heldHALock = lock
|
||||
|
||||
// We haven't run postUnseal yet so we have nothing meaningful to use here
|
||||
ctx := context.Background()
|
||||
|
||||
|
@ -1818,10 +1853,11 @@ func (c *Core) runStandby(doneCh, stopCh, manualStepDownCh chan struct{}) {
|
|||
// statelock and have this shut us down; sealInternal has a
|
||||
// workflow where it watches for the stopCh to close so we want
|
||||
// to return from here
|
||||
go c.Shutdown()
|
||||
c.logger.Error("core: error performing key upgrades", "error", err)
|
||||
c.stateLock.Unlock()
|
||||
go c.Shutdown()
|
||||
c.heldHALock = nil
|
||||
lock.Unlock()
|
||||
c.stateLock.Unlock()
|
||||
metrics.MeasureSince([]string{"core", "leadership_setup_failed"}, activeTime)
|
||||
return
|
||||
}
|
||||
|
@ -1834,18 +1870,20 @@ func (c *Core) runStandby(doneCh, stopCh, manualStepDownCh chan struct{}) {
|
|||
c.localClusterPrivateKey.Store((*ecdsa.PrivateKey)(nil))
|
||||
|
||||
if err := c.setupCluster(ctx); err != nil {
|
||||
c.heldHALock = nil
|
||||
lock.Unlock()
|
||||
c.stateLock.Unlock()
|
||||
c.logger.Error("core: cluster setup failed", "error", err)
|
||||
lock.Unlock()
|
||||
metrics.MeasureSince([]string{"core", "leadership_setup_failed"}, activeTime)
|
||||
continue
|
||||
}
|
||||
|
||||
// Advertise as leader
|
||||
if err := c.advertiseLeader(ctx, uuid, leaderLostCh); err != nil {
|
||||
c.heldHALock = nil
|
||||
lock.Unlock()
|
||||
c.stateLock.Unlock()
|
||||
c.logger.Error("core: leader advertisement setup failed", "error", err)
|
||||
lock.Unlock()
|
||||
metrics.MeasureSince([]string{"core", "leadership_setup_failed"}, activeTime)
|
||||
continue
|
||||
}
|
||||
|
@ -1855,6 +1893,7 @@ func (c *Core) runStandby(doneCh, stopCh, manualStepDownCh chan struct{}) {
|
|||
if err == nil {
|
||||
c.standby = false
|
||||
}
|
||||
|
||||
c.stateLock.Unlock()
|
||||
|
||||
// Handle a failure to unseal
|
||||
|
@ -1866,12 +1905,16 @@ func (c *Core) runStandby(doneCh, stopCh, manualStepDownCh chan struct{}) {
|
|||
}
|
||||
|
||||
// Monitor a loss of leadership
|
||||
var manualStepDown bool
|
||||
var stopOpts StopOptions
|
||||
grabStateLock := true
|
||||
select {
|
||||
case <-leaderLostCh:
|
||||
c.logger.Warn("core: leadership lost, stopping active operation")
|
||||
case <-stopCh:
|
||||
c.logger.Warn("core: stopping active operation")
|
||||
case stopOpts = <-stopCh:
|
||||
// This case comes from sealInternal; we will already be having the
|
||||
// state lock held so we do toggle grabStateLock to false
|
||||
grabStateLock = false
|
||||
close(stopCh)
|
||||
case <-manualStepDownCh:
|
||||
c.logger.Warn("core: stepping down from active operation to standby")
|
||||
manualStepDown = true
|
||||
|
@ -1879,35 +1922,33 @@ func (c *Core) runStandby(doneCh, stopCh, manualStepDownCh chan struct{}) {
|
|||
|
||||
metrics.MeasureSince([]string{"core", "leadership_lost"}, activeTime)
|
||||
|
||||
// Clear ourself as leader
|
||||
if err := c.clearLeader(uuid); err != nil {
|
||||
c.logger.Error("core: clearing leader advertisement failed", "error", err)
|
||||
}
|
||||
|
||||
// Tell any requests that know about this to stop
|
||||
if c.activeContextCancelFunc != nil {
|
||||
c.activeContextCancelFunc()
|
||||
}
|
||||
|
||||
// Attempt the pre-seal process
|
||||
c.stateLock.Lock()
|
||||
if grabStateLock {
|
||||
c.stateLock.Lock()
|
||||
}
|
||||
c.standby = true
|
||||
preSealErr := c.preSeal()
|
||||
c.stateLock.Unlock()
|
||||
if grabStateLock {
|
||||
c.stateLock.Unlock()
|
||||
}
|
||||
|
||||
// Give up leadership
|
||||
lock.Unlock()
|
||||
if !stopOpts.KeepLock {
|
||||
if err := c.clearLeader(uuid); err != nil {
|
||||
c.logger.Error("core: clearing leader advertisement failed", "error", err)
|
||||
}
|
||||
c.heldHALock.Unlock()
|
||||
c.heldHALock = nil
|
||||
}
|
||||
|
||||
// Check for a failure to prepare to seal
|
||||
if preSealErr != nil {
|
||||
c.logger.Error("core: pre-seal teardown failed", "error", err)
|
||||
}
|
||||
|
||||
// If we've merely stepped down, we could instantly grab the lock
|
||||
// again. Give the other nodes a chance.
|
||||
if manualStepDown {
|
||||
time.Sleep(manualStepDownSleepPeriod)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1920,7 +1961,11 @@ func (c *Core) periodicLeaderRefresh(doneCh, stopCh chan struct{}) {
|
|||
for {
|
||||
select {
|
||||
case <-time.After(leaderCheckInterval):
|
||||
c.Leader()
|
||||
// We do this in a goroutine because otherwise if this refresh is
|
||||
// called while we're shutting down the call to Leader() can
|
||||
// deadlock, which then means stopCh can never been seen and we can
|
||||
// block shutdown
|
||||
go c.Leader()
|
||||
case <-stopCh:
|
||||
return
|
||||
}
|
||||
|
|
|
@ -174,11 +174,12 @@ func testPlugin_CatalogRemoved(t *testing.T, btype logical.BackendType, testMoun
|
|||
if sealed {
|
||||
t.Fatal("should not be sealed")
|
||||
}
|
||||
// Wait for active so post-unseal takes place
|
||||
// If it fails, it means unseal process failed
|
||||
vault.TestWaitActive(t, core.Core)
|
||||
}
|
||||
|
||||
// Wait for active so post-unseal takes place
|
||||
// If it fails, it means unseal process failed
|
||||
vault.TestWaitActive(t, core.Core)
|
||||
|
||||
if testMount {
|
||||
// Mount the plugin at the same path after plugin is re-added to the catalog
|
||||
// and expect an error due to existing path.
|
||||
|
@ -286,11 +287,12 @@ func testPlugin_continueOnError(t *testing.T, btype logical.BackendType, mismatc
|
|||
if sealed {
|
||||
t.Fatal("should not be sealed")
|
||||
}
|
||||
// Wait for active so post-unseal takes place
|
||||
// If it fails, it means unseal process failed
|
||||
vault.TestWaitActive(t, core.Core)
|
||||
}
|
||||
|
||||
// Wait for active so post-unseal takes place
|
||||
// If it fails, it means unseal process failed
|
||||
vault.TestWaitActive(t, core.Core)
|
||||
|
||||
// Re-add the plugin to the catalog
|
||||
switch btype {
|
||||
case logical.TypeLogical:
|
||||
|
@ -394,10 +396,11 @@ func TestSystemBackend_Plugin_SealUnseal(t *testing.T) {
|
|||
if sealed {
|
||||
t.Fatal("should not be sealed")
|
||||
}
|
||||
// Wait for active so post-unseal takes place
|
||||
// If it fails, it means unseal process failed
|
||||
vault.TestWaitActive(t, core.Core)
|
||||
}
|
||||
|
||||
// Wait for active so post-unseal takes place
|
||||
// If it fails, it means unseal process failed
|
||||
vault.TestWaitActive(t, cluster.Cores[0].Core)
|
||||
}
|
||||
|
||||
func TestSystemBackend_Plugin_reload(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue