open-vault/vault/ha.go
ncabatoff 1c98152fa0
Shamir seals now come in two varieties: legacy and new-style. (#7694)
Shamir seals now come in two varieties: legacy and new-style. Legacy
Shamir is automatically converted to new-style when a rekey operation
is performed. All new Vault initializations using Shamir are new-style.

New-style Shamir writes an encrypted master key to storage, just like
AutoUnseal. The stored master key is encrypted using the shared key that
is split via Shamir's algorithm. Thus when unsealing, we take the key
fragments given, combine them into a Key-Encryption-Key, and use that
to decrypt the master key on disk. Then the master key is used to read
the keyring that decrypts the barrier.
2019-10-18 14:46:00 -04:00

981 lines
27 KiB
Go

package vault
import (
"context"
"crypto/ecdsa"
"crypto/x509"
"errors"
"fmt"
"github.com/hashicorp/vault/vault/seal/shamir"
"strings"
"sync/atomic"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/errwrap"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/physical/raft"
"github.com/hashicorp/vault/sdk/helper/certutil"
"github.com/hashicorp/vault/sdk/helper/consts"
"github.com/hashicorp/vault/sdk/helper/jsonutil"
"github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/sdk/physical"
"github.com/oklog/run"
)
const (
// lockRetryInterval is the interval we re-attempt to acquire the
// HA lock if an error is encountered
lockRetryInterval = 10 * time.Second
// leaderCheckInterval is how often a standby checks for a new leader
leaderCheckInterval = 2500 * time.Millisecond
// keyRotateCheckInterval is how often a standby checks for a key
// rotation taking place.
keyRotateCheckInterval = 10 * time.Second
// leaderPrefixCleanDelay is how long to wait between deletions
// of orphaned leader keys, to prevent slamming the backend.
leaderPrefixCleanDelay = 200 * time.Millisecond
)
var (
// KeyRotateGracePeriod is how long we allow an upgrade path
// for standby instances before we delete the upgrade keys
KeyRotateGracePeriod = 2 * time.Minute
addEnterpriseHaActors func(*Core, *run.Group) chan func() = addEnterpriseHaActorsNoop
interruptPerfStandby func(chan func(), chan struct{}) chan struct{} = interruptPerfStandbyNoop
)
func addEnterpriseHaActorsNoop(*Core, *run.Group) chan func() { return nil }
func interruptPerfStandbyNoop(chan func(), chan struct{}) chan struct{} {
return make(chan struct{})
}
// Standby checks if the Vault is in standby mode
func (c *Core) Standby() (bool, error) {
c.stateLock.RLock()
standby := c.standby
c.stateLock.RUnlock()
return standby, nil
}
// PerfStandby checks if the vault is a performance standby
func (c *Core) PerfStandby() bool {
c.stateLock.RLock()
perfStandby := c.perfStandby
c.stateLock.RUnlock()
return perfStandby
}
// Leader is used to get the current active leader
func (c *Core) Leader() (isLeader bool, leaderAddr, clusterAddr string, err error) {
// Check if HA enabled. We don't need the lock for this check as it's set
// on startup and never modified
if c.ha == nil {
return false, "", "", ErrHANotEnabled
}
// Check if sealed
if c.Sealed() {
return false, "", "", consts.ErrSealed
}
c.stateLock.RLock()
// Check if we are the leader
if !c.standby {
c.stateLock.RUnlock()
return true, c.redirectAddr, c.ClusterAddr(), nil
}
// Initialize a lock
lock, err := c.ha.LockWith(CoreLockPath, "read")
if err != nil {
c.stateLock.RUnlock()
return false, "", "", err
}
// Read the value
held, leaderUUID, err := lock.Value()
if err != nil {
c.stateLock.RUnlock()
return false, "", "", err
}
if !held {
c.stateLock.RUnlock()
return false, "", "", nil
}
var localLeaderUUID, localRedirectAddr, localClusterAddr string
clusterLeaderParams := c.clusterLeaderParams.Load().(*ClusterLeaderParams)
if clusterLeaderParams != nil {
localLeaderUUID = clusterLeaderParams.LeaderUUID
localRedirectAddr = clusterLeaderParams.LeaderRedirectAddr
localClusterAddr = clusterLeaderParams.LeaderClusterAddr
}
// If the leader hasn't changed, return the cached value; nothing changes
// mid-leadership, and the barrier caches anyways
if leaderUUID == localLeaderUUID && localRedirectAddr != "" {
c.stateLock.RUnlock()
return false, localRedirectAddr, localClusterAddr, nil
}
c.logger.Trace("found new active node information, refreshing")
defer c.stateLock.RUnlock()
c.leaderParamsLock.Lock()
defer c.leaderParamsLock.Unlock()
// Validate base conditions again
clusterLeaderParams = c.clusterLeaderParams.Load().(*ClusterLeaderParams)
if clusterLeaderParams != nil {
localLeaderUUID = clusterLeaderParams.LeaderUUID
localRedirectAddr = clusterLeaderParams.LeaderRedirectAddr
localClusterAddr = clusterLeaderParams.LeaderClusterAddr
} else {
localLeaderUUID = ""
localRedirectAddr = ""
localClusterAddr = ""
}
if leaderUUID == localLeaderUUID && localRedirectAddr != "" {
return false, localRedirectAddr, localClusterAddr, nil
}
key := coreLeaderPrefix + leaderUUID
// Use background because postUnseal isn't run on standby
entry, err := c.barrier.Get(context.Background(), key)
if err != nil {
return false, "", "", err
}
if entry == nil {
return false, "", "", nil
}
var oldAdv bool
var adv activeAdvertisement
err = jsonutil.DecodeJSON(entry.Value, &adv)
if err != nil {
// Fall back to pre-struct handling
adv.RedirectAddr = string(entry.Value)
c.logger.Debug("parsed redirect addr for new active node", "redirect_addr", adv.RedirectAddr)
oldAdv = true
}
if !oldAdv {
c.logger.Debug("parsing information for new active node", "active_cluster_addr", adv.ClusterAddr, "active_redirect_addr", adv.RedirectAddr)
// Ensure we are using current values
err = c.loadLocalClusterTLS(adv)
if err != nil {
return false, "", "", err
}
// This will ensure that we both have a connection at the ready and that
// the address is the current known value
// Since this is standby, we don't use the active context. Later we may
// use a process-scoped context
err = c.refreshRequestForwardingConnection(context.Background(), adv.ClusterAddr)
if err != nil {
return false, "", "", err
}
}
// Don't set these until everything has been parsed successfully or we'll
// never try again
c.clusterLeaderParams.Store(&ClusterLeaderParams{
LeaderUUID: leaderUUID,
LeaderRedirectAddr: adv.RedirectAddr,
LeaderClusterAddr: adv.ClusterAddr,
})
return false, adv.RedirectAddr, adv.ClusterAddr, nil
}
// StepDown is used to step down from leadership
func (c *Core) StepDown(httpCtx context.Context, req *logical.Request) (retErr error) {
defer metrics.MeasureSince([]string{"core", "step_down"}, time.Now())
if req == nil {
retErr = multierror.Append(retErr, errors.New("nil request to step-down"))
return retErr
}
c.stateLock.RLock()
defer c.stateLock.RUnlock()
if c.Sealed() {
return nil
}
if c.ha == nil || c.standby {
return nil
}
ctx, cancel := context.WithCancel(namespace.RootContext(nil))
defer cancel()
go func() {
select {
case <-ctx.Done():
case <-httpCtx.Done():
cancel()
}
}()
acl, te, entity, identityPolicies, err := c.fetchACLTokenEntryAndEntity(ctx, req)
if err != nil {
retErr = multierror.Append(retErr, err)
return retErr
}
// Audit-log the request before going any further
auth := &logical.Auth{
ClientToken: req.ClientToken,
Accessor: req.ClientTokenAccessor,
}
if te != nil {
auth.IdentityPolicies = identityPolicies[te.NamespaceID]
delete(identityPolicies, te.NamespaceID)
auth.ExternalNamespacePolicies = identityPolicies
auth.TokenPolicies = te.Policies
auth.Policies = append(te.Policies, identityPolicies[te.NamespaceID]...)
auth.Metadata = te.Meta
auth.DisplayName = te.DisplayName
auth.EntityID = te.EntityID
auth.TokenType = te.Type
}
logInput := &logical.LogInput{
Auth: auth,
Request: req,
}
if err := c.auditBroker.LogRequest(ctx, logInput, c.auditedHeaders); err != nil {
c.logger.Error("failed to audit request", "request_path", req.Path, "error", err)
retErr = multierror.Append(retErr, errors.New("failed to audit request, cannot continue"))
return retErr
}
if entity != nil && entity.Disabled {
c.logger.Warn("permission denied as the entity on the token is disabled")
retErr = multierror.Append(retErr, logical.ErrPermissionDenied)
return retErr
}
if te != nil && te.EntityID != "" && entity == nil {
c.logger.Warn("permission denied as the entity on the token is invalid")
retErr = multierror.Append(retErr, logical.ErrPermissionDenied)
return retErr
}
// Attempt to use the token (decrement num_uses)
if te != nil {
te, err = c.tokenStore.UseToken(ctx, te)
if err != nil {
c.logger.Error("failed to use token", "error", err)
retErr = multierror.Append(retErr, ErrInternalError)
return retErr
}
if te == nil {
// Token has been revoked
retErr = multierror.Append(retErr, logical.ErrPermissionDenied)
return retErr
}
}
// Verify that this operation is allowed
authResults := c.performPolicyChecks(ctx, acl, te, req, entity, &PolicyCheckOpts{
RootPrivsRequired: true,
})
if !authResults.Allowed {
retErr = multierror.Append(retErr, authResults.Error)
if authResults.Error.ErrorOrNil() == nil || authResults.DeniedError {
retErr = multierror.Append(retErr, logical.ErrPermissionDenied)
}
return retErr
}
if te != nil && te.NumUses == tokenRevocationPending {
// Token needs to be revoked. We do this immediately here because
// we won't have a token store after sealing.
leaseID, err := c.expiration.CreateOrFetchRevocationLeaseByToken(c.activeContext, te)
if err == nil {
err = c.expiration.Revoke(c.activeContext, leaseID)
}
if err != nil {
c.logger.Error("token needed revocation before step-down but failed to revoke", "error", err)
retErr = multierror.Append(retErr, ErrInternalError)
}
}
select {
case c.manualStepDownCh <- struct{}{}:
default:
c.logger.Warn("manual step-down operation already queued")
}
return retErr
}
// runStandby is a long running process that manages a number of the HA
// subsystems.
func (c *Core) runStandby(doneCh, manualStepDownCh, stopCh chan struct{}) {
defer close(doneCh)
defer close(manualStepDownCh)
c.logger.Info("entering standby mode")
var g run.Group
newLeaderCh := addEnterpriseHaActors(c, &g)
{
// This will cause all the other actors to close when the stop channel
// is closed.
g.Add(func() error {
<-stopCh
return nil
}, func(error) {})
}
{
// Monitor for key rotations
keyRotateStop := make(chan struct{})
g.Add(func() error {
c.periodicCheckKeyUpgrades(context.Background(), keyRotateStop)
return nil
}, func(error) {
close(keyRotateStop)
c.logger.Debug("shutting down periodic key rotation checker")
})
}
{
// Monitor for new leadership
checkLeaderStop := make(chan struct{})
g.Add(func() error {
c.periodicLeaderRefresh(newLeaderCh, checkLeaderStop)
return nil
}, func(error) {
close(checkLeaderStop)
c.logger.Debug("shutting down periodic leader refresh")
})
}
{
// Wait for leadership
leaderStopCh := make(chan struct{})
g.Add(func() error {
c.waitForLeadership(newLeaderCh, manualStepDownCh, leaderStopCh)
return nil
}, func(error) {
close(leaderStopCh)
c.logger.Debug("shutting down leader elections")
})
}
// Start all the actors
g.Run()
}
// waitForLeadership is a long running routine that is used when an HA backend
// is enabled. It waits until we are leader and switches this Vault to
// active.
func (c *Core) waitForLeadership(newLeaderCh chan func(), manualStepDownCh, stopCh chan struct{}) {
var manualStepDown bool
for {
// Check for a shutdown
select {
case <-stopCh:
c.logger.Debug("stop channel triggered in runStandby")
return
default:
// If we've just down, we could instantly grab the lock again. Give
// the other nodes a chance.
if manualStepDown {
time.Sleep(manualStepDownSleepPeriod)
manualStepDown = false
}
}
// Create a lock
uuid, err := uuid.GenerateUUID()
if err != nil {
c.logger.Error("failed to generate uuid", "error", err)
return
}
lock, err := c.ha.LockWith(CoreLockPath, uuid)
if err != nil {
c.logger.Error("failed to create lock", "error", err)
return
}
// Attempt the acquisition
leaderLostCh := c.acquireLock(lock, stopCh)
// Bail if we are being shutdown
if leaderLostCh == nil {
return
}
if atomic.LoadUint32(c.neverBecomeActive) == 1 {
c.heldHALock = nil
lock.Unlock()
c.logger.Info("marked never become active, giving up active state")
continue
}
c.logger.Info("acquired lock, enabling active operation")
// This is used later to log a metrics event; this can be helpful to
// detect flapping
activeTime := time.Now()
continueCh := interruptPerfStandby(newLeaderCh, stopCh)
// Grab the statelock or stop
if stopped := grabLockOrStop(c.stateLock.Lock, c.stateLock.Unlock, stopCh); stopped {
lock.Unlock()
close(continueCh)
metrics.MeasureSince([]string{"core", "leadership_setup_failed"}, activeTime)
return
}
if c.Sealed() {
c.logger.Warn("grabbed HA lock but already sealed, exiting")
lock.Unlock()
close(continueCh)
c.stateLock.Unlock()
metrics.MeasureSince([]string{"core", "leadership_setup_failed"}, activeTime)
return
}
// Store the lock so that we can manually clear it later if needed
c.heldHALock = lock
// Create the active context
activeCtx, activeCtxCancel := context.WithCancel(namespace.RootContext(nil))
c.activeContext = activeCtx
c.activeContextCancelFunc.Store(activeCtxCancel)
// This block is used to wipe barrier/seal state and verify that
// everything is sane. If we have no sanity in the barrier, we actually
// seal, as there's little we can do.
{
c.seal.SetBarrierConfig(activeCtx, nil)
if c.seal.RecoveryKeySupported() {
c.seal.SetRecoveryConfig(activeCtx, nil)
}
if err := c.performKeyUpgrades(activeCtx); err != nil {
c.logger.Error("error performing key upgrades", "error", err)
// If we fail due to anything other than a context canceled
// error we should shutdown as we may have the incorrect Keys.
if !strings.Contains(err.Error(), context.Canceled.Error()) {
// We call this in a goroutine so that we can give up the
// statelock and have this shut us down; sealInternal has a
// workflow where it watches for the stopCh to close so we want
// to return from here
go c.Shutdown()
}
c.heldHALock = nil
lock.Unlock()
close(continueCh)
c.stateLock.Unlock()
metrics.MeasureSince([]string{"core", "leadership_setup_failed"}, activeTime)
// If we are shutting down we should return from this function,
// otherwise continue
if !strings.Contains(err.Error(), context.Canceled.Error()) {
continue
} else {
return
}
}
}
{
// Clear previous local cluster cert info so we generate new. Since the
// UUID will have changed, standbys will know to look for new info
c.localClusterParsedCert.Store((*x509.Certificate)(nil))
c.localClusterCert.Store(([]byte)(nil))
c.localClusterPrivateKey.Store((*ecdsa.PrivateKey)(nil))
if err := c.setupCluster(activeCtx); err != nil {
c.heldHALock = nil
lock.Unlock()
close(continueCh)
c.stateLock.Unlock()
c.logger.Error("cluster setup failed", "error", err)
metrics.MeasureSince([]string{"core", "leadership_setup_failed"}, activeTime)
continue
}
}
// Advertise as leader
if err := c.advertiseLeader(activeCtx, uuid, leaderLostCh); err != nil {
c.heldHALock = nil
lock.Unlock()
close(continueCh)
c.stateLock.Unlock()
c.logger.Error("leader advertisement setup failed", "error", err)
metrics.MeasureSince([]string{"core", "leadership_setup_failed"}, activeTime)
continue
}
// Attempt the post-unseal process
err = c.postUnseal(activeCtx, activeCtxCancel, standardUnsealStrategy{})
if err == nil {
c.standby = false
c.leaderUUID = uuid
}
close(continueCh)
c.stateLock.Unlock()
// Handle a failure to unseal
if err != nil {
c.logger.Error("post-unseal setup failed", "error", err)
lock.Unlock()
metrics.MeasureSince([]string{"core", "leadership_setup_failed"}, activeTime)
continue
}
// Monitor a loss of leadership
select {
case <-leaderLostCh:
c.logger.Warn("leadership lost, stopping active operation")
case <-stopCh:
case <-manualStepDownCh:
manualStepDown = true
c.logger.Warn("stepping down from active operation to standby")
}
// Stop Active Duty
{
// Spawn this in a go routine so we can cancel the context and
// unblock any inflight requests that are holding the statelock.
go func() {
select {
case <-activeCtx.Done():
// Attempt to drain any inflight requests
case <-time.After(DefaultMaxRequestDuration):
activeCtxCancel()
}
}()
// Grab lock if we are not stopped
stopped := grabLockOrStop(c.stateLock.Lock, c.stateLock.Unlock, stopCh)
// Cancel the context incase the above go routine hasn't done it
// yet
activeCtxCancel()
metrics.MeasureSince([]string{"core", "leadership_lost"}, activeTime)
// Mark as standby
c.standby = true
c.leaderUUID = ""
// Seal
if err := c.preSeal(); err != nil {
c.logger.Error("pre-seal teardown failed", "error", err)
}
// If we are not meant to keep the HA lock, clear it
if atomic.LoadUint32(c.keepHALockOnStepDown) == 0 {
if err := c.clearLeader(uuid); err != nil {
c.logger.Error("clearing leader advertisement failed", "error", err)
}
if err := c.heldHALock.Unlock(); err != nil {
c.logger.Error("unlocking HA lock failed", "error", err)
}
c.heldHALock = nil
}
// If we are stopped return, otherwise unlock the statelock
if stopped {
return
}
c.stateLock.Unlock()
}
}
}
// grabLockOrStop returns true if we failed to get the lock before stopCh
// was closed. Returns false if the lock was obtained, in which case it's
// the caller's responsibility to unlock it.
func grabLockOrStop(lockFunc, unlockFunc func(), stopCh chan struct{}) (stopped bool) {
// Grab the lock as we need it for cluster setup, which needs to happen
// before advertising;
lockGrabbedCh := make(chan struct{})
go func() {
// Grab the lock
lockFunc()
// If stopCh has been closed, which only happens while the
// stateLock is held, we have actually terminated, so we just
// instantly give up the lock, otherwise we notify that it's ready
// for consumption
select {
case <-stopCh:
unlockFunc()
default:
close(lockGrabbedCh)
}
}()
select {
case <-stopCh:
return true
case <-lockGrabbedCh:
// We now have the lock and can use it
}
return false
}
// This checks the leader periodically to ensure that we switch RPC to a new
// leader pretty quickly. There is logic in Leader() already to not make this
// onerous and avoid more traffic than needed, so we just call that and ignore
// the result.
func (c *Core) periodicLeaderRefresh(newLeaderCh chan func(), stopCh chan struct{}) {
opCount := new(int32)
clusterAddr := ""
for {
select {
case <-time.After(leaderCheckInterval):
count := atomic.AddInt32(opCount, 1)
if count > 1 {
atomic.AddInt32(opCount, -1)
continue
}
// We do this in a goroutine because otherwise if this refresh is
// called while we're shutting down the call to Leader() can
// deadlock, which then means stopCh can never been seen and we can
// block shutdown
go func() {
// Bind locally, as the race detector is tripping here
lopCount := opCount
isLeader, _, newClusterAddr, _ := c.Leader()
// If we are the leader reset the clusterAddr since the next
// failover might go to the node that was previously active.
if isLeader {
clusterAddr = ""
}
if !isLeader && newClusterAddr != clusterAddr && newLeaderCh != nil {
select {
case newLeaderCh <- nil:
c.logger.Debug("new leader found, triggering new leader channel")
clusterAddr = newClusterAddr
default:
c.logger.Debug("new leader found, but still processing previous leader change")
}
}
atomic.AddInt32(lopCount, -1)
}()
case <-stopCh:
return
}
}
}
// periodicCheckKeyUpgrade is used to watch for key rotation events as a standby
func (c *Core) periodicCheckKeyUpgrades(ctx context.Context, stopCh chan struct{}) {
opCount := new(int32)
_, isRaft := c.underlyingPhysical.(*raft.RaftBackend)
for {
select {
case <-time.After(keyRotateCheckInterval):
count := atomic.AddInt32(opCount, 1)
if count > 1 {
atomic.AddInt32(opCount, -1)
continue
}
go func() {
// Bind locally, as the race detector is tripping here
lopCount := opCount
// Only check if we are a standby
c.stateLock.RLock()
standby := c.standby
c.stateLock.RUnlock()
if !standby {
atomic.AddInt32(lopCount, -1)
return
}
// Check for a poison pill. If we can read it, it means we have stale
// keys (e.g. from replication being activated) and we need to seal to
// be unsealed again.
entry, _ := c.barrier.Get(ctx, poisonPillPath)
if entry != nil && len(entry.Value) > 0 {
c.logger.Warn("encryption keys have changed out from underneath us (possibly due to replication enabling), must be unsealed again")
// If we are using raft storage we do not want to shut down
// raft during replication secondary enablement. This will
// allow us to keep making progress on the raft log.
go c.sealInternalWithOptions(true, false, !isRaft)
atomic.AddInt32(lopCount, -1)
return
}
if err := c.checkKeyUpgrades(ctx); err != nil {
c.logger.Error("key rotation periodic upgrade check failed", "error", err)
}
if err := c.checkRaftTLSKeyUpgrades(ctx); err != nil {
c.logger.Error("raft tls periodic upgrade check failed", "error", err)
}
atomic.AddInt32(lopCount, -1)
return
}()
case <-stopCh:
return
}
}
}
// checkKeyUpgrades is used to check if there have been any key rotations
// and if there is a chain of upgrades available
func (c *Core) checkKeyUpgrades(ctx context.Context) error {
for {
// Check for an upgrade
didUpgrade, newTerm, err := c.barrier.CheckUpgrade(ctx)
if err != nil {
return err
}
// Nothing to do if no upgrade
if !didUpgrade {
break
}
if c.logger.IsInfo() {
c.logger.Info("upgraded to new key term", "term", newTerm)
}
}
return nil
}
func (c *Core) reloadMasterKey(ctx context.Context) error {
if err := c.barrier.ReloadMasterKey(ctx); err != nil {
return errwrap.Wrapf("error reloading master key: {{err}}", err)
}
return nil
}
func (c *Core) reloadShamirKey(ctx context.Context) error {
_ = c.seal.SetBarrierConfig(ctx, nil)
if cfg, _ := c.seal.BarrierConfig(ctx); cfg == nil {
return nil
}
var shamirKey []byte
switch c.seal.StoredKeysSupported() {
case StoredKeysSupportedGeneric:
return nil
case StoredKeysSupportedShamirMaster:
entry, err := c.barrier.Get(ctx, shamirKekPath)
if err != nil {
return err
}
if entry == nil {
return nil
}
shamirKey = entry.Value
case StoredKeysNotSupported:
keyring, err := c.barrier.Keyring()
if err != nil {
return errwrap.Wrapf("failed to update seal access: {{err}}", err)
}
shamirKey = keyring.masterKey
}
return c.seal.GetAccess().(*shamir.ShamirSeal).SetKey(shamirKey)
}
func (c *Core) performKeyUpgrades(ctx context.Context) error {
if err := c.checkKeyUpgrades(ctx); err != nil {
return errwrap.Wrapf("error checking for key upgrades: {{err}}", err)
}
if err := c.reloadMasterKey(ctx); err != nil {
return errwrap.Wrapf("error reloading master key: {{err}}", err)
}
if err := c.barrier.ReloadKeyring(ctx); err != nil {
return errwrap.Wrapf("error reloading keyring: {{err}}", err)
}
if err := c.reloadShamirKey(ctx); err != nil {
return errwrap.Wrapf("error reloading shamir kek key: {{err}}", err)
}
if err := c.scheduleUpgradeCleanup(ctx); err != nil {
return errwrap.Wrapf("error scheduling upgrade cleanup: {{err}}", err)
}
return nil
}
// scheduleUpgradeCleanup is used to ensure that all the upgrade paths
// are cleaned up in a timely manner if a leader failover takes place
func (c *Core) scheduleUpgradeCleanup(ctx context.Context) error {
// List the upgrades
upgrades, err := c.barrier.List(ctx, keyringUpgradePrefix)
if err != nil {
return errwrap.Wrapf("failed to list upgrades: {{err}}", err)
}
// Nothing to do if no upgrades
if len(upgrades) == 0 {
return nil
}
// Schedule cleanup for all of them
time.AfterFunc(KeyRotateGracePeriod, func() {
sealed, err := c.barrier.Sealed()
if err != nil {
c.logger.Warn("failed to check barrier status at upgrade cleanup time")
return
}
if sealed {
c.logger.Warn("barrier sealed at upgrade cleanup time")
return
}
for _, upgrade := range upgrades {
path := fmt.Sprintf("%s%s", keyringUpgradePrefix, upgrade)
if err := c.barrier.Delete(ctx, path); err != nil {
c.logger.Error("failed to cleanup upgrade", "path", path, "error", err)
}
}
})
return nil
}
// acquireLock blocks until the lock is acquired, returning the leaderLostCh
func (c *Core) acquireLock(lock physical.Lock, stopCh <-chan struct{}) <-chan struct{} {
for {
// Attempt lock acquisition
leaderLostCh, err := lock.Lock(stopCh)
if err == nil {
return leaderLostCh
}
// Retry the acquisition
c.logger.Error("failed to acquire lock", "error", err)
select {
case <-time.After(lockRetryInterval):
case <-stopCh:
return nil
}
}
}
// advertiseLeader is used to advertise the current node as leader
func (c *Core) advertiseLeader(ctx context.Context, uuid string, leaderLostCh <-chan struct{}) error {
if leaderLostCh != nil {
go c.cleanLeaderPrefix(ctx, uuid, leaderLostCh)
}
var key *ecdsa.PrivateKey
switch c.localClusterPrivateKey.Load().(type) {
case *ecdsa.PrivateKey:
key = c.localClusterPrivateKey.Load().(*ecdsa.PrivateKey)
default:
c.logger.Error("unknown cluster private key type", "key_type", fmt.Sprintf("%T", c.localClusterPrivateKey.Load()))
return fmt.Errorf("unknown cluster private key type %T", c.localClusterPrivateKey.Load())
}
keyParams := &certutil.ClusterKeyParams{
Type: corePrivateKeyTypeP521,
X: key.X,
Y: key.Y,
D: key.D,
}
locCert := c.localClusterCert.Load().([]byte)
localCert := make([]byte, len(locCert))
copy(localCert, locCert)
adv := &activeAdvertisement{
RedirectAddr: c.redirectAddr,
ClusterAddr: c.ClusterAddr(),
ClusterCert: localCert,
ClusterKeyParams: keyParams,
}
val, err := jsonutil.EncodeJSON(adv)
if err != nil {
return err
}
ent := &logical.StorageEntry{
Key: coreLeaderPrefix + uuid,
Value: val,
}
err = c.barrier.Put(ctx, ent)
if err != nil {
return err
}
sd, ok := c.ha.(physical.ServiceDiscovery)
if ok {
if err := sd.NotifyActiveStateChange(); err != nil {
if c.logger.IsWarn() {
c.logger.Warn("failed to notify active status", "error", err)
}
}
}
return nil
}
func (c *Core) cleanLeaderPrefix(ctx context.Context, uuid string, leaderLostCh <-chan struct{}) {
keys, err := c.barrier.List(ctx, coreLeaderPrefix)
if err != nil {
c.logger.Error("failed to list entries in core/leader", "error", err)
return
}
for len(keys) > 0 {
select {
case <-time.After(leaderPrefixCleanDelay):
if keys[0] != uuid {
c.barrier.Delete(ctx, coreLeaderPrefix+keys[0])
}
keys = keys[1:]
case <-leaderLostCh:
return
}
}
}
// clearLeader is used to clear our leadership entry
func (c *Core) clearLeader(uuid string) error {
key := coreLeaderPrefix + uuid
err := c.barrier.Delete(context.Background(), key)
// Advertise ourselves as a standby
sd, ok := c.ha.(physical.ServiceDiscovery)
if ok {
if err := sd.NotifyActiveStateChange(); err != nil {
if c.logger.IsWarn() {
c.logger.Warn("failed to notify standby status", "error", err)
}
}
}
return err
}
func (c *Core) SetNeverBecomeActive(on bool) {
if on {
atomic.StoreUint32(c.neverBecomeActive, 1)
} else {
atomic.StoreUint32(c.neverBecomeActive, 0)
}
}