vault: first pass at HA standby mode

This commit is contained in:
Armon Dadgar 2015-04-14 14:06:15 -07:00
parent 0be49a97b7
commit d7102e2661
1 changed files with 145 additions and 14 deletions

View File

@ -24,6 +24,14 @@ const (
// it even with the Vault sealed. This is required so that we know
// how many secret parts must be used to reconstruct the master key.
coreSealConfigPath = "core/seal-config"
// coreLockPath is the path used to acquire a coordinating lock
// for a highly-available deploy.
coreLockPath = "core/lock"
// lockRetryInterval is the interval we re-attempt to acquire the
// HA lock if an error is encountered
lockRetryInterval = 10 * time.Second
)
var (
@ -96,6 +104,9 @@ func (e *ErrInvalidKey) Error() string {
// interface for API handlers and is responsible for managing the logical and physical
// backends, router, security barrier, and audit trails.
type Core struct {
// HABackend may be available depending on the physical backend
ha physical.HABackend
// physical backend is the un-trusted backend with durable data
physical physical.Backend
@ -118,6 +129,10 @@ type Core struct {
stateLock sync.RWMutex
sealed bool
active bool
standbyDoneCh chan struct{}
standbyStopCh chan struct{}
// unlockParts has the keys provided to Unseal until
// the threshold number of parts is available.
unlockParts [][]byte
@ -173,6 +188,12 @@ type CoreConfig struct {
// NewCore isk used to construct a new core
func NewCore(conf *CoreConfig) (*Core, error) {
// Check if this backend supports an HA configuraiton
var haBackend physical.HABackend
if ha, ok := conf.Physical.(physical.HABackend); ok {
haBackend = ha
}
// Wrap the backend in a cache unless disabled
if !conf.DisableCache {
_, isCache := conf.Physical.(*physical.Cache)
@ -196,10 +217,12 @@ func NewCore(conf *CoreConfig) (*Core, error) {
// Setup the core
c := &Core{
ha: haBackend,
physical: conf.Physical,
barrier: barrier,
router: NewRouter(),
sealed: true,
active: false,
logger: conf.Logger,
}
@ -695,15 +718,22 @@ func (c *Core) Unseal(key []byte) (bool, error) {
}
c.logger.Printf("[INFO] core: vault is unsealed")
// Do post-unseal setup
c.logger.Printf("[INFO] core: post-unseal setup starting")
if err := c.postUnseal(); err != nil {
c.logger.Printf("[ERR] core: post-unseal setup failed: %v", err)
c.barrier.Seal()
c.logger.Printf("[WARN] core: vault is sealed")
return false, err
// Do post-unseal setup if HA is not enabled
if c.ha == nil {
c.active = true
if err := c.postUnseal(); err != nil {
c.logger.Printf("[ERR] core: post-unseal setup failed: %v", err)
c.barrier.Seal()
c.logger.Printf("[WARN] core: vault is sealed")
return false, err
}
} else {
// Go to standby mode, wait until we are active to unseal
c.logger.Printf("[INFO] core: HA backend configured, enabling standby")
c.standbyDoneCh = make(chan struct{})
c.standbyStopCh = make(chan struct{})
go c.standby(c.standbyDoneCh, c.standbyStopCh)
}
c.logger.Printf("[INFO] core: post-unseal setup complete")
// Success!
c.sealed = false
@ -726,15 +756,24 @@ func (c *Core) Seal(token string) error {
return err
}
// Enable that we are sealed to prevent furthur transactions
c.sealed = true
// Do pre-seal teardown
c.logger.Printf("[INFO] core: pre-seal teardown starting")
if err := c.preSeal(); err != nil {
c.logger.Printf("[ERR] core: pre-seal teardown failed: %v", err)
return fmt.Errorf("internal error")
// Do pre-seal teardown if HA is not enabled
if c.ha == nil {
if err := c.preSeal(); err != nil {
c.logger.Printf("[ERR] core: pre-seal teardown failed: %v", err)
return fmt.Errorf("internal error")
}
} else {
// Signal the standby goroutine to shutdown, wait for completion
close(c.standbyStopCh)
// Release the lock while we wait to avoid deadlocking
c.stateLock.Unlock()
<-c.standbyDoneCh
c.stateLock.Lock()
}
c.logger.Printf("[INFO] core: pre-seal teardown complete")
if err := c.barrier.Seal(); err != nil {
return err
@ -749,6 +788,7 @@ func (c *Core) Seal(token string) error {
// credential stores, etc.
func (c *Core) postUnseal() error {
defer metrics.MeasureSince([]string{"core", "post_unseal"}, time.Now())
c.logger.Printf("[INFO] core: post-unseal setup starting")
if cache, ok := c.physical.(*physical.Cache); ok {
cache.Purge()
}
@ -781,6 +821,7 @@ func (c *Core) postUnseal() error {
}
c.metricsCh = make(chan struct{})
go c.emitMetrics(c.metricsCh)
c.logger.Printf("[INFO] core: post-unseal setup complete")
return nil
}
@ -788,6 +829,7 @@ func (c *Core) postUnseal() error {
// for any state teardown required.
func (c *Core) preSeal() error {
defer metrics.MeasureSince([]string{"core", "pre_seal"}, time.Now())
c.logger.Printf("[INFO] core: pre-seal teardown starting")
if c.metricsCh != nil {
close(c.metricsCh)
c.metricsCh = nil
@ -813,9 +855,98 @@ func (c *Core) preSeal() error {
if cache, ok := c.physical.(*physical.Cache); ok {
cache.Purge()
}
c.logger.Printf("[INFO] core: pre-seal teardown complete")
return nil
}
// Standby is a long running routine that is used when an HA backend
// is enabled. It waits until we are leader and switches this Vault to
// active.
func (c *Core) standby(doneCh, stopCh chan struct{}) {
defer close(doneCh)
for {
// Check for a shutdown
select {
case <-stopCh:
return
default:
}
// Create a lock
lock, err := c.ha.LockWith(coreLockPath)
if err != nil {
c.logger.Printf("[ERR] core: failed to create lock: %v", err)
return
}
// Attempt the acquisition
leaderCh := c.acquireLock(lock, stopCh)
// Bail if we are being shutdown
if leaderCh == nil {
return
}
c.logger.Printf("[INFO] core: acquired lock, enabling active operation")
// Attempt the post-unseal process
c.stateLock.Lock()
err = c.postUnseal()
if err == nil {
c.active = true
}
c.stateLock.Unlock()
// Handle a failure to unseal
if err != nil {
c.logger.Printf("[ERR] core: post-unseal setup failed: %v", err)
lock.Unlock()
continue
}
// Monitor a loss of leadership
select {
case <-leaderCh:
c.logger.Printf("[WARN] core: leadership lost, stopping active operation")
case <-stopCh:
c.logger.Printf("[WARN] core: stopping active operation")
}
// Attempt the pre-seal process
c.stateLock.Lock()
c.active = false
err = c.preSeal()
c.stateLock.Unlock()
// Give up leadership
lock.Unlock()
// Check for a failure to prepare to seal
if err := c.preSeal(); err != nil {
c.logger.Printf("[ERR] core: pre-seal teardown failed: %v", err)
continue
}
}
}
// acquireLock blocks until the lock is acquired, returning the leaderCh
func (c *Core) acquireLock(lock physical.Lock, stopCh <-chan struct{}) <-chan struct{} {
for {
// Attempt lock acquisition
leaderCh, err := lock.Lock(stopCh)
if err == nil {
return leaderCh
}
// Retry the acquisition
c.logger.Printf("[ERR] core: failed to acquire lock: %v", err)
select {
case <-time.After(lockRetryInterval):
case <-stopCh:
return nil
}
}
}
// emitMetrics is used to periodically expose metrics while runnig
func (c *Core) emitMetrics(stopCh chan struct{}) {
for {