Add a cleanLeaderPrefix function to clean up stale leader entries in core/leader

Fixes #679.
This commit is contained in:
Jeff Mitchell 2015-10-08 13:47:21 -04:00
parent 89d40450cd
commit d58a3b601c
3 changed files with 182 additions and 1 deletions

View File

@ -6,8 +6,9 @@ DEPRECATIONS/BREAKING CHANGES:
IMPROVEMENTS:
* init: Base64-encoded PGP keys can be used with the CLI for `init` and `rekey` operations [GH-653]
* core: Tokens can now renew themselves [GH-455]
* core: Stale leader entries will now be reaped [GH-679]
* init: Base64-encoded PGP keys can be used with the CLI for `init` and `rekey` operations [GH-653]
* logical: Responses now contain a "warnings" key containing a list of warnings returned from the server. These are conditions that did not require failing an operation, but of which the client should be aware. [GH-676]
BUG FIXES:

View File

@ -239,6 +239,10 @@ type Core struct {
// metricsCh is used to stop the metrics streaming
metricsCh chan struct{}
// leaderPrefixCleanupCh is used to stop
// the leader prefix cleansing operation
leaderPrefixCleanupCh chan struct{}
defaultLeaseTTL time.Duration
maxLeaseTTL time.Duration
@ -1603,6 +1607,8 @@ func (c *Core) acquireLock(lock physical.Lock, stopCh <-chan struct{}) <-chan st
// advertiseLeader is used to advertise the current node as leader
func (c *Core) advertiseLeader(uuid string) error {
c.leaderPrefixCleanupCh = make(chan struct{})
go c.cleanLeaderPrefix(uuid)
ent := &Entry{
Key: coreLeaderPrefix + uuid,
Value: []byte(c.advertiseAddr),
@ -1610,8 +1616,35 @@ func (c *Core) advertiseLeader(uuid string) error {
return c.barrier.Put(ent)
}
func (c *Core) cleanLeaderPrefix(uuid string) {
keys, err := c.barrier.List(coreLeaderPrefix)
if err != nil {
c.logger.Printf("[ERR] core: failed to list entries in core/leader: %v", err)
return
}
if len(keys) == 0 {
c.logger.Print("[ERR] core: found no entries under core/leader (should have found our own)")
return
}
for {
select {
case <-time.After(time.Second):
if keys[0] != uuid {
c.barrier.Delete(coreLeaderPrefix + keys[0])
}
keys = keys[1:]
if len(keys) == 0 {
return
}
case <-c.leaderPrefixCleanupCh:
return
}
}
}
// clearLeader is used to clear our leadership entry
func (c *Core) clearLeader(uuid string) error {
close(c.leaderPrefixCleanupCh)
key := coreLeaderPrefix + uuid
return c.barrier.Delete(key)
}

View File

@ -5,6 +5,7 @@ import (
"testing"
"time"
"github.com/hashicorp/otto/helper/uuid"
"github.com/hashicorp/vault/audit"
"github.com/hashicorp/vault/logical"
"github.com/hashicorp/vault/physical"
@ -1068,6 +1069,152 @@ func TestCore_LimitedUseToken(t *testing.T) {
}
}
func TestCore_CleanLeaderPrefix(t *testing.T) {
// Create the first core and initialize it
inm := physical.NewInmemHA()
advertiseOriginal := "http://127.0.0.1:8200"
core, err := NewCore(&CoreConfig{
Physical: inm,
AdvertiseAddr: advertiseOriginal,
DisableMlock: true,
})
if err != nil {
t.Fatalf("err: %v", err)
}
key, root := TestCoreInit(t, core)
if _, err := core.Unseal(TestKeyCopy(key)); err != nil {
t.Fatalf("unseal err: %s", err)
}
// Verify unsealed
sealed, err := core.Sealed()
if err != nil {
t.Fatalf("err checking seal status: %s", err)
}
if sealed {
t.Fatal("should not be sealed")
}
// Wait for core to become active
testWaitActive(t, core)
// Ensure that the original clean function has stopped running
time.Sleep(2 * time.Second)
// Put several random entries
for i := 0; i < 5; i++ {
core.barrier.Put(&Entry{
Key: coreLeaderPrefix + uuid.GenerateUUID(),
Value: []byte(uuid.GenerateUUID()),
})
}
entries, err := core.barrier.List(coreLeaderPrefix)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(entries) != 6 {
t.Fatalf("wrong number of core leader prefix entries, got %d", len(entries))
}
// Check the leader is local
isLeader, advertise, err := core.Leader()
if err != nil {
t.Fatalf("err: %v", err)
}
if !isLeader {
t.Fatalf("should be leader")
}
if advertise != advertiseOriginal {
t.Fatalf("Bad advertise: %v", advertise)
}
// Create a second core, attached to same in-memory store
advertiseOriginal2 := "http://127.0.0.1:8500"
core2, err := NewCore(&CoreConfig{
Physical: inm,
AdvertiseAddr: advertiseOriginal2,
DisableMlock: true,
})
if err != nil {
t.Fatalf("err: %v", err)
}
if _, err := core2.Unseal(TestKeyCopy(key)); err != nil {
t.Fatalf("unseal err: %s", err)
}
// Verify unsealed
sealed, err = core2.Sealed()
if err != nil {
t.Fatalf("err checking seal status: %s", err)
}
if sealed {
t.Fatal("should not be sealed")
}
// Core2 should be in standby
standby, err := core2.Standby()
if err != nil {
t.Fatalf("err: %v", err)
}
if !standby {
t.Fatalf("should be standby")
}
// Check the leader is not local
isLeader, advertise, err = core2.Leader()
if err != nil {
t.Fatalf("err: %v", err)
}
if isLeader {
t.Fatalf("should not be leader")
}
if advertise != advertiseOriginal {
t.Fatalf("Bad advertise: %v", advertise)
}
// Seal the first core, should step down
err = core.Seal(root)
if err != nil {
t.Fatalf("err: %v", err)
}
// Core should be in standby
standby, err = core.Standby()
if err != nil {
t.Fatalf("err: %v", err)
}
if !standby {
t.Fatalf("should be standby")
}
// Wait for core2 to become active
testWaitActive(t, core2)
// Check the leader is local
isLeader, advertise, err = core2.Leader()
if err != nil {
t.Fatalf("err: %v", err)
}
if !isLeader {
t.Fatalf("should be leader")
}
if advertise != advertiseOriginal2 {
t.Fatalf("Bad advertise: %v", advertise)
}
// Give time for the entries to clear out; it is conservative at 1/second
time.Sleep(7 * time.Second)
entries, err = core2.barrier.List(coreLeaderPrefix)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(entries) != 1 {
t.Fatalf("wrong number of core leader prefix entries, got %d", len(entries))
}
}
func TestCore_Standby(t *testing.T) {
// Create the first core and initialize it
inm := physical.NewInmemHA()