Merge pull request #684 from hashicorp/clean-leader-prefix

Add a cleanLeaderPrefix function to clean up stale leader entries in core/leader
This commit is contained in:
Jeff Mitchell 2015-10-08 14:45:03 -04:00
commit 670b6b89bb
3 changed files with 181 additions and 9 deletions

View file

@ -6,8 +6,9 @@ DEPRECATIONS/BREAKING CHANGES:
IMPROVEMENTS:
* init: Base64-encoded PGP keys can be used with the CLI for `init` and `rekey` operations [GH-653]
* core: Tokens can now renew themselves [GH-455]
* core: Stale leader entries will now be reaped [GH-679]
* init: Base64-encoded PGP keys can be used with the CLI for `init` and `rekey` operations [GH-653]
* logical: Responses now contain a "warnings" key containing a list of warnings returned from the server. These are conditions that did not require failing an operation, but of which the client should be aware. [GH-676]
BUG FIXES:

View file

@ -52,6 +52,10 @@ const (
// keyRotateGracePeriod is how long we allow an upgrade path
// for standby instances before we delete the upgrade keys
keyRotateGracePeriod = 2 * time.Minute
// leaderPrefixCleanDelay is how long to wait between deletions
// of orphaned leader keys, to prevent slamming the backend.
leaderPrefixCleanDelay = 200 * time.Millisecond
)
var (
@ -1454,16 +1458,16 @@ func (c *Core) runStandby(doneCh, stopCh chan struct{}) {
}
// Attempt the acquisition
leaderCh := c.acquireLock(lock, stopCh)
leaderLostCh := c.acquireLock(lock, stopCh)
// Bail if we are being shutdown
if leaderCh == nil {
if leaderLostCh == nil {
return
}
c.logger.Printf("[INFO] core: acquired lock, enabling active operation")
// Advertise ourself as leader
if err := c.advertiseLeader(uuid); err != nil {
if err := c.advertiseLeader(uuid, leaderLostCh); err != nil {
c.logger.Printf("[ERR] core: leader advertisement setup failed: %v", err)
lock.Unlock()
continue
@ -1486,7 +1490,7 @@ func (c *Core) runStandby(doneCh, stopCh chan struct{}) {
// Monitor a loss of leadership
select {
case <-leaderCh:
case <-leaderLostCh:
c.logger.Printf("[WARN] core: leadership lost, stopping active operation")
case <-stopCh:
c.logger.Printf("[WARN] core: stopping active operation")
@ -1582,13 +1586,13 @@ func (c *Core) scheduleUpgradeCleanup() error {
return nil
}
// acquireLock blocks until the lock is acquired, returning the leaderCh
// acquireLock blocks until the lock is acquired, returning the leaderLostCh
func (c *Core) acquireLock(lock physical.Lock, stopCh <-chan struct{}) <-chan struct{} {
for {
// Attempt lock acquisition
leaderCh, err := lock.Lock(stopCh)
leaderLostCh, err := lock.Lock(stopCh)
if err == nil {
return leaderCh
return leaderLostCh
}
// Retry the acquisition
@ -1602,7 +1606,8 @@ func (c *Core) acquireLock(lock physical.Lock, stopCh <-chan struct{}) <-chan st
}
// advertiseLeader is used to advertise the current node as leader
func (c *Core) advertiseLeader(uuid string) error {
func (c *Core) advertiseLeader(uuid string, leaderLostCh <-chan struct{}) error {
go c.cleanLeaderPrefix(uuid, leaderLostCh)
ent := &Entry{
Key: coreLeaderPrefix + uuid,
Value: []byte(c.advertiseAddr),
@ -1610,6 +1615,25 @@ func (c *Core) advertiseLeader(uuid string) error {
return c.barrier.Put(ent)
}
func (c *Core) cleanLeaderPrefix(uuid string, leaderLostCh <-chan struct{}) {
keys, err := c.barrier.List(coreLeaderPrefix)
if err != nil {
c.logger.Printf("[ERR] core: failed to list entries in core/leader: %v", err)
return
}
for len(keys) > 0 {
select {
case <-time.After(leaderPrefixCleanDelay):
if keys[0] != uuid {
c.barrier.Delete(coreLeaderPrefix + keys[0])
}
keys = keys[1:]
case <-leaderLostCh:
return
}
}
}
// clearLeader is used to clear our leadership entry
func (c *Core) clearLeader(uuid string) error {
key := coreLeaderPrefix + uuid

View file

@ -5,6 +5,7 @@ import (
"testing"
"time"
"github.com/hashicorp/otto/helper/uuid"
"github.com/hashicorp/vault/audit"
"github.com/hashicorp/vault/logical"
"github.com/hashicorp/vault/physical"
@ -1068,6 +1069,152 @@ func TestCore_LimitedUseToken(t *testing.T) {
}
}
func TestCore_CleanLeaderPrefix(t *testing.T) {
// Create the first core and initialize it
inm := physical.NewInmemHA()
advertiseOriginal := "http://127.0.0.1:8200"
core, err := NewCore(&CoreConfig{
Physical: inm,
AdvertiseAddr: advertiseOriginal,
DisableMlock: true,
})
if err != nil {
t.Fatalf("err: %v", err)
}
key, root := TestCoreInit(t, core)
if _, err := core.Unseal(TestKeyCopy(key)); err != nil {
t.Fatalf("unseal err: %s", err)
}
// Verify unsealed
sealed, err := core.Sealed()
if err != nil {
t.Fatalf("err checking seal status: %s", err)
}
if sealed {
t.Fatal("should not be sealed")
}
// Wait for core to become active
testWaitActive(t, core)
// Ensure that the original clean function has stopped running
time.Sleep(2 * time.Second)
// Put several random entries
for i := 0; i < 5; i++ {
core.barrier.Put(&Entry{
Key: coreLeaderPrefix + uuid.GenerateUUID(),
Value: []byte(uuid.GenerateUUID()),
})
}
entries, err := core.barrier.List(coreLeaderPrefix)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(entries) != 6 {
t.Fatalf("wrong number of core leader prefix entries, got %d", len(entries))
}
// Check the leader is local
isLeader, advertise, err := core.Leader()
if err != nil {
t.Fatalf("err: %v", err)
}
if !isLeader {
t.Fatalf("should be leader")
}
if advertise != advertiseOriginal {
t.Fatalf("Bad advertise: %v", advertise)
}
// Create a second core, attached to same in-memory store
advertiseOriginal2 := "http://127.0.0.1:8500"
core2, err := NewCore(&CoreConfig{
Physical: inm,
AdvertiseAddr: advertiseOriginal2,
DisableMlock: true,
})
if err != nil {
t.Fatalf("err: %v", err)
}
if _, err := core2.Unseal(TestKeyCopy(key)); err != nil {
t.Fatalf("unseal err: %s", err)
}
// Verify unsealed
sealed, err = core2.Sealed()
if err != nil {
t.Fatalf("err checking seal status: %s", err)
}
if sealed {
t.Fatal("should not be sealed")
}
// Core2 should be in standby
standby, err := core2.Standby()
if err != nil {
t.Fatalf("err: %v", err)
}
if !standby {
t.Fatalf("should be standby")
}
// Check the leader is not local
isLeader, advertise, err = core2.Leader()
if err != nil {
t.Fatalf("err: %v", err)
}
if isLeader {
t.Fatalf("should not be leader")
}
if advertise != advertiseOriginal {
t.Fatalf("Bad advertise: %v", advertise)
}
// Seal the first core, should step down
err = core.Seal(root)
if err != nil {
t.Fatalf("err: %v", err)
}
// Core should be in standby
standby, err = core.Standby()
if err != nil {
t.Fatalf("err: %v", err)
}
if !standby {
t.Fatalf("should be standby")
}
// Wait for core2 to become active
testWaitActive(t, core2)
// Check the leader is local
isLeader, advertise, err = core2.Leader()
if err != nil {
t.Fatalf("err: %v", err)
}
if !isLeader {
t.Fatalf("should be leader")
}
if advertise != advertiseOriginal2 {
t.Fatalf("Bad advertise: %v", advertise)
}
// Give time for the entries to clear out; it is conservative at 1/second
time.Sleep(10 * leaderPrefixCleanDelay)
entries, err = core2.barrier.List(coreLeaderPrefix)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(entries) != 1 {
t.Fatalf("wrong number of core leader prefix entries, got %d", len(entries))
}
}
func TestCore_Standby(t *testing.T) {
// Create the first core and initialize it
inm := physical.NewInmemHA()