core: Cancel context before taking state lock (#5020)
* core: Cancel context before taking state lock * Create active context outside of postUnseal * Attempt to drain requests before canceling context * fix test
This commit is contained in:
parent
9c6a14ba6b
commit
bb076f161d
|
@ -1,5 +1,11 @@
|
|||
## Next (Unreleased)
|
||||
|
||||
DEPRECATIONS/CHANGES:
|
||||
|
||||
* Request Timeouts: A default request timeout of 90s is now enforced. This
|
||||
setting can be overwritten in the config file. If you anticipate requests
|
||||
taking longer than 90s this setting should be updated before upgrading.
|
||||
|
||||
IMPROVEMENTS:
|
||||
|
||||
* agent: Add `exit_after_auth` to be able to use the Agent for a single
|
||||
|
|
|
@ -375,7 +375,7 @@ type Core struct {
|
|||
// This can be used to trigger operations to stop running when Vault is
|
||||
// going to be shut down, stepped down, or sealed
|
||||
activeContext context.Context
|
||||
activeContextCancelFunc context.CancelFunc
|
||||
activeContextCancelFunc *atomic.Value
|
||||
|
||||
// Stores the sealunwrapper for downgrade needs
|
||||
sealUnwrapper physical.Backend
|
||||
|
@ -501,6 +501,7 @@ func NewCore(conf *CoreConfig) (*Core, error) {
|
|||
localClusterParsedCert: new(atomic.Value),
|
||||
activeNodeReplicationState: new(uint32),
|
||||
keepHALockOnStepDown: new(uint32),
|
||||
activeContextCancelFunc: new(atomic.Value),
|
||||
}
|
||||
|
||||
atomic.StoreUint32(c.sealed, 1)
|
||||
|
@ -510,6 +511,8 @@ func NewCore(conf *CoreConfig) (*Core, error) {
|
|||
c.localClusterParsedCert.Store((*x509.Certificate)(nil))
|
||||
c.localClusterPrivateKey.Store((*ecdsa.PrivateKey)(nil))
|
||||
|
||||
c.activeContextCancelFunc.Store((context.CancelFunc)(nil))
|
||||
|
||||
if conf.ClusterCipherSuites != "" {
|
||||
suites, err := tlsutil.ParseCiphers(conf.ClusterCipherSuites)
|
||||
if err != nil {
|
||||
|
@ -886,7 +889,8 @@ func (c *Core) unsealInternal(ctx context.Context, masterKey []byte) (bool, erro
|
|||
return false, err
|
||||
}
|
||||
|
||||
if err := c.postUnseal(); err != nil {
|
||||
ctx, ctxCancel := context.WithCancel(context.Background())
|
||||
if err := c.postUnseal(ctx, ctxCancel); err != nil {
|
||||
c.logger.Error("post-unseal setup failed", "error", err)
|
||||
c.barrier.Seal()
|
||||
c.logger.Warn("vault is sealed")
|
||||
|
@ -1129,18 +1133,40 @@ func (c *Core) sealInternalWithOptions(grabStateLock, keepHALock bool) error {
|
|||
c.clearForwardingClients()
|
||||
c.requestForwardingConnectionLock.Unlock()
|
||||
|
||||
activeCtxCancel := c.activeContextCancelFunc.Load().(context.CancelFunc)
|
||||
cancelCtxAndLock := func() {
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
select {
|
||||
case <-doneCh:
|
||||
// Attempt to drain any inflight requests
|
||||
case <-time.After(DefaultMaxRequestDuration):
|
||||
if activeCtxCancel != nil {
|
||||
activeCtxCancel()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
c.stateLock.Lock()
|
||||
close(doneCh)
|
||||
// Stop requests from processing
|
||||
if activeCtxCancel != nil {
|
||||
activeCtxCancel()
|
||||
}
|
||||
}
|
||||
|
||||
// Do pre-seal teardown if HA is not enabled
|
||||
if c.ha == nil {
|
||||
if grabStateLock {
|
||||
c.stateLock.Lock()
|
||||
cancelCtxAndLock()
|
||||
defer c.stateLock.Unlock()
|
||||
}
|
||||
// Even in a non-HA context we key off of this for some things
|
||||
c.standby = true
|
||||
|
||||
// Stop requests from processing
|
||||
if c.activeContextCancelFunc != nil {
|
||||
c.activeContextCancelFunc()
|
||||
if activeCtxCancel != nil {
|
||||
activeCtxCancel()
|
||||
}
|
||||
|
||||
if err := c.preSeal(); err != nil {
|
||||
|
@ -1155,9 +1181,10 @@ func (c *Core) sealInternalWithOptions(grabStateLock, keepHALock bool) error {
|
|||
atomic.StoreUint32(c.keepHALockOnStepDown, 1)
|
||||
}
|
||||
if grabStateLock {
|
||||
c.stateLock.Lock()
|
||||
cancelCtxAndLock()
|
||||
defer c.stateLock.Unlock()
|
||||
}
|
||||
|
||||
// If we are trying to acquire the lock, force it to return with nil so
|
||||
// runStandby will exit
|
||||
// If we are active, signal the standby goroutine to shut down and wait
|
||||
|
@ -1198,18 +1225,19 @@ func (c *Core) sealInternalWithOptions(grabStateLock, keepHALock bool) error {
|
|||
// allowing any user operations. This allows us to setup any state that
|
||||
// requires the Vault to be unsealed such as mount tables, logical backends,
|
||||
// credential stores, etc.
|
||||
func (c *Core) postUnseal() (retErr error) {
|
||||
func (c *Core) postUnseal(ctx context.Context, ctxCancelFunc context.CancelFunc) (retErr error) {
|
||||
defer metrics.MeasureSince([]string{"core", "post_unseal"}, time.Now())
|
||||
|
||||
// Clear any out
|
||||
c.postUnsealFuncs = nil
|
||||
|
||||
// Create a new request context
|
||||
c.activeContext, c.activeContextCancelFunc = context.WithCancel(context.Background())
|
||||
c.activeContext = ctx
|
||||
c.activeContextCancelFunc.Store(ctxCancelFunc)
|
||||
|
||||
defer func() {
|
||||
if retErr != nil {
|
||||
c.activeContextCancelFunc()
|
||||
ctxCancelFunc()
|
||||
c.preSeal()
|
||||
}
|
||||
}()
|
||||
|
@ -1221,7 +1249,7 @@ func (c *Core) postUnseal() (retErr error) {
|
|||
c.requestForwardingConnectionLock.Unlock()
|
||||
|
||||
// Enable the cache
|
||||
c.physicalCache.Purge(c.activeContext)
|
||||
c.physicalCache.Purge(ctx)
|
||||
if !c.cachingDisabled {
|
||||
c.physicalCache.SetEnabled(true)
|
||||
}
|
||||
|
@ -1234,36 +1262,36 @@ func (c *Core) postUnseal() (retErr error) {
|
|||
}
|
||||
|
||||
// Purge these for safety in case of a rekey
|
||||
c.seal.SetBarrierConfig(c.activeContext, nil)
|
||||
c.seal.SetBarrierConfig(ctx, nil)
|
||||
if c.seal.RecoveryKeySupported() {
|
||||
c.seal.SetRecoveryConfig(c.activeContext, nil)
|
||||
c.seal.SetRecoveryConfig(ctx, nil)
|
||||
}
|
||||
|
||||
if err := enterprisePostUnseal(c); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.ensureWrappingKey(c.activeContext); err != nil {
|
||||
if err := c.ensureWrappingKey(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.setupPluginCatalog(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.loadMounts(c.activeContext); err != nil {
|
||||
if err := c.loadMounts(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.setupMounts(c.activeContext); err != nil {
|
||||
if err := c.setupMounts(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.setupPolicyStore(c.activeContext); err != nil {
|
||||
if err := c.setupPolicyStore(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.loadCORSConfig(c.activeContext); err != nil {
|
||||
if err := c.loadCORSConfig(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.loadCredentials(c.activeContext); err != nil {
|
||||
if err := c.loadCredentials(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.setupCredentials(c.activeContext); err != nil {
|
||||
if err := c.setupCredentials(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.startRollback(); err != nil {
|
||||
|
@ -1272,21 +1300,21 @@ func (c *Core) postUnseal() (retErr error) {
|
|||
if err := c.setupExpiration(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.loadAudits(c.activeContext); err != nil {
|
||||
if err := c.loadAudits(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.setupAudits(c.activeContext); err != nil {
|
||||
if err := c.setupAudits(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.loadIdentityStoreArtifacts(c.activeContext); err != nil {
|
||||
if err := c.loadIdentityStoreArtifacts(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.setupAuditedHeadersConfig(c.activeContext); err != nil {
|
||||
if err := c.setupAuditedHeadersConfig(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if c.ha != nil {
|
||||
if err := c.startClusterListener(c.activeContext); err != nil {
|
||||
if err := c.startClusterListener(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -1331,7 +1359,7 @@ func (c *Core) preSeal() error {
|
|||
if err := c.stopExpiration(); err != nil {
|
||||
result = multierror.Append(result, errwrap.Wrapf("error stopping expiration: {{err}}", err))
|
||||
}
|
||||
if err := c.teardownCredentials(c.activeContext); err != nil {
|
||||
if err := c.teardownCredentials(context.Background()); err != nil {
|
||||
result = multierror.Append(result, errwrap.Wrapf("error tearing down credentials: {{err}}", err))
|
||||
}
|
||||
if err := c.teardownPolicyStore(); err != nil {
|
||||
|
@ -1340,7 +1368,7 @@ func (c *Core) preSeal() error {
|
|||
if err := c.stopRollback(); err != nil {
|
||||
result = multierror.Append(result, errwrap.Wrapf("error stopping rollback: {{err}}", err))
|
||||
}
|
||||
if err := c.unloadMounts(c.activeContext); err != nil {
|
||||
if err := c.unloadMounts(context.Background()); err != nil {
|
||||
result = multierror.Append(result, errwrap.Wrapf("error unloading mounts: {{err}}", err))
|
||||
}
|
||||
if err := enterprisePreSeal(c); err != nil {
|
||||
|
@ -1356,7 +1384,7 @@ func (c *Core) preSeal() error {
|
|||
|
||||
// Purge the cache
|
||||
c.physicalCache.SetEnabled(false)
|
||||
c.physicalCache.Purge(c.activeContext)
|
||||
c.physicalCache.Purge(context.Background())
|
||||
|
||||
c.logger.Info("pre-seal teardown complete")
|
||||
return result
|
||||
|
|
67
vault/ha.go
67
vault/ha.go
|
@ -415,19 +415,21 @@ func (c *Core) waitForLeadership(doneCh, manualStepDownCh, stopCh chan struct{})
|
|||
// Store the lock so that we can manually clear it later if needed
|
||||
c.heldHALock = lock
|
||||
|
||||
// We haven't run postUnseal yet so we have nothing meaningful to use here
|
||||
ctx := context.Background()
|
||||
// Create the active context
|
||||
activeCtx, activeCtxCancel := context.WithCancel(context.Background())
|
||||
c.activeContext = activeCtx
|
||||
c.activeContextCancelFunc.Store(activeCtxCancel)
|
||||
|
||||
// This block is used to wipe barrier/seal state and verify that
|
||||
// everything is sane. If we have no sanity in the barrier, we actually
|
||||
// seal, as there's little we can do.
|
||||
{
|
||||
c.seal.SetBarrierConfig(ctx, nil)
|
||||
c.seal.SetBarrierConfig(activeCtx, nil)
|
||||
if c.seal.RecoveryKeySupported() {
|
||||
c.seal.SetRecoveryConfig(ctx, nil)
|
||||
c.seal.SetRecoveryConfig(activeCtx, nil)
|
||||
}
|
||||
|
||||
if err := c.performKeyUpgrades(ctx); err != nil {
|
||||
if err := c.performKeyUpgrades(activeCtx); err != nil {
|
||||
// We call this in a goroutine so that we can give up the
|
||||
// statelock and have this shut us down; sealInternal has a
|
||||
// workflow where it watches for the stopCh to close so we want
|
||||
|
@ -442,23 +444,24 @@ func (c *Core) waitForLeadership(doneCh, manualStepDownCh, stopCh chan struct{})
|
|||
}
|
||||
}
|
||||
|
||||
// Clear previous local cluster cert info so we generate new. Since the
|
||||
// UUID will have changed, standbys will know to look for new info
|
||||
c.localClusterParsedCert.Store((*x509.Certificate)(nil))
|
||||
c.localClusterCert.Store(([]byte)(nil))
|
||||
c.localClusterPrivateKey.Store((*ecdsa.PrivateKey)(nil))
|
||||
{
|
||||
// Clear previous local cluster cert info so we generate new. Since the
|
||||
// UUID will have changed, standbys will know to look for new info
|
||||
c.localClusterParsedCert.Store((*x509.Certificate)(nil))
|
||||
c.localClusterCert.Store(([]byte)(nil))
|
||||
c.localClusterPrivateKey.Store((*ecdsa.PrivateKey)(nil))
|
||||
|
||||
if err := c.setupCluster(ctx); err != nil {
|
||||
c.heldHALock = nil
|
||||
lock.Unlock()
|
||||
c.stateLock.Unlock()
|
||||
c.logger.Error("cluster setup failed", "error", err)
|
||||
metrics.MeasureSince([]string{"core", "leadership_setup_failed"}, activeTime)
|
||||
continue
|
||||
if err := c.setupCluster(activeCtx); err != nil {
|
||||
c.heldHALock = nil
|
||||
lock.Unlock()
|
||||
c.stateLock.Unlock()
|
||||
c.logger.Error("cluster setup failed", "error", err)
|
||||
metrics.MeasureSince([]string{"core", "leadership_setup_failed"}, activeTime)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Advertise as leader
|
||||
if err := c.advertiseLeader(ctx, uuid, leaderLostCh); err != nil {
|
||||
if err := c.advertiseLeader(activeCtx, uuid, leaderLostCh); err != nil {
|
||||
c.heldHALock = nil
|
||||
lock.Unlock()
|
||||
c.stateLock.Unlock()
|
||||
|
@ -468,7 +471,7 @@ func (c *Core) waitForLeadership(doneCh, manualStepDownCh, stopCh chan struct{})
|
|||
}
|
||||
|
||||
// Attempt the post-unseal process
|
||||
err = c.postUnseal()
|
||||
err = c.postUnseal(activeCtx, activeCtxCancel)
|
||||
if err == nil {
|
||||
c.standby = false
|
||||
}
|
||||
|
@ -483,14 +486,22 @@ func (c *Core) waitForLeadership(doneCh, manualStepDownCh, stopCh chan struct{})
|
|||
continue
|
||||
}
|
||||
|
||||
cancelCtxAndLock := func() {
|
||||
go func() {
|
||||
select {
|
||||
case <-activeCtx.Done():
|
||||
// Attempt to drain any inflight requests
|
||||
case <-time.After(DefaultMaxRequestDuration):
|
||||
activeCtxCancel()
|
||||
}
|
||||
}()
|
||||
c.stateLock.Lock()
|
||||
activeCtxCancel()
|
||||
}
|
||||
|
||||
runSealing := func() {
|
||||
metrics.MeasureSince([]string{"core", "leadership_lost"}, activeTime)
|
||||
|
||||
// Tell any requests that know about this to stop
|
||||
if c.activeContextCancelFunc != nil {
|
||||
c.activeContextCancelFunc()
|
||||
}
|
||||
|
||||
c.standby = true
|
||||
|
||||
if err := c.preSeal(); err != nil {
|
||||
|
@ -516,13 +527,13 @@ func (c *Core) waitForLeadership(doneCh, manualStepDownCh, stopCh chan struct{})
|
|||
select {
|
||||
case <-leaderLostCh:
|
||||
c.logger.Warn("leadership lost, stopping active operation")
|
||||
|
||||
c.stateLock.Lock()
|
||||
cancelCtxAndLock()
|
||||
runSealing()
|
||||
releaseHALock()
|
||||
c.stateLock.Unlock()
|
||||
|
||||
case <-stopCh:
|
||||
activeCtxCancel()
|
||||
runSealing()
|
||||
releaseHALock()
|
||||
return
|
||||
|
@ -531,7 +542,7 @@ func (c *Core) waitForLeadership(doneCh, manualStepDownCh, stopCh chan struct{})
|
|||
manualStepDown = true
|
||||
c.logger.Warn("stepping down from active operation to standby")
|
||||
|
||||
c.stateLock.Lock()
|
||||
cancelCtxAndLock()
|
||||
runSealing()
|
||||
releaseHALock()
|
||||
c.stateLock.Unlock()
|
||||
|
|
|
@ -195,7 +195,9 @@ func (c *Core) Initialize(ctx context.Context, initParams *InitParams) (*InitRes
|
|||
c.logger.Error("cluster setup failed during init", "error", err)
|
||||
return nil, err
|
||||
}
|
||||
if err := c.postUnseal(); err != nil {
|
||||
|
||||
activeCtx, ctxCancel := context.WithCancel(context.Background())
|
||||
if err := c.postUnseal(activeCtx, ctxCancel); err != nil {
|
||||
c.logger.Error("post-unseal setup failed during init", "error", err)
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -29,9 +29,7 @@ const (
|
|||
var (
|
||||
// DefaultMaxRequestDuration is the amount of time we'll wait for a request
|
||||
// to complete, unless overridden on a per-handler basis
|
||||
// FIXME: In 0.11 make this 90 seconds; for now keep it at essentially infinity if not set explicitly
|
||||
//DefaultMaxRequestDuration = 90 * time.Second
|
||||
DefaultMaxRequestDuration = 999999 * time.Hour
|
||||
DefaultMaxRequestDuration = 90 * time.Second
|
||||
)
|
||||
|
||||
// HanlderProperties is used to seed configuration into a vaulthttp.Handler.
|
||||
|
|
Loading…
Reference in New Issue