diff --git a/CHANGELOG.md b/CHANGELOG.md index 0421bf77b..16793cb4a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,8 +6,9 @@ DEPRECATIONS/BREAKING CHANGES: IMPROVEMENTS: - * init: Base64-encoded PGP keys can be used with the CLI for `init` and `rekey` operations [GH-653] * core: Tokens can now renew themselves [GH-455] + * core: Stale leader entries will now be reaped [GH-679] + * init: Base64-encoded PGP keys can be used with the CLI for `init` and `rekey` operations [GH-653] * logical: Responses now contain a "warnings" key containing a list of warnings returned from the server. These are conditions that did not require failing an operation, but of which the client should be aware. [GH-676] BUG FIXES: diff --git a/vault/core.go b/vault/core.go index 507884947..1f66be6fe 100644 --- a/vault/core.go +++ b/vault/core.go @@ -239,6 +239,10 @@ type Core struct { // metricsCh is used to stop the metrics streaming metricsCh chan struct{} + // leaderPrefixCleanupCh is used to stop + // the leader prefix cleansing operation + leaderPrefixCleanupCh chan struct{} + defaultLeaseTTL time.Duration maxLeaseTTL time.Duration @@ -1603,6 +1607,8 @@ func (c *Core) acquireLock(lock physical.Lock, stopCh <-chan struct{}) <-chan st // advertiseLeader is used to advertise the current node as leader func (c *Core) advertiseLeader(uuid string) error { + c.leaderPrefixCleanupCh = make(chan struct{}) + go c.cleanLeaderPrefix(uuid) ent := &Entry{ Key: coreLeaderPrefix + uuid, Value: []byte(c.advertiseAddr), @@ -1610,8 +1616,35 @@ func (c *Core) advertiseLeader(uuid string) error { return c.barrier.Put(ent) } +func (c *Core) cleanLeaderPrefix(uuid string) { + keys, err := c.barrier.List(coreLeaderPrefix) + if err != nil { + c.logger.Printf("[ERR] core: failed to list entries in core/leader: %v", err) + return + } + if len(keys) == 0 { + c.logger.Print("[ERR] core: found no entries under core/leader (should have found our own)") + return + } + for { + select { + case <-time.After(time.Second): + if keys[0] != uuid { + c.barrier.Delete(coreLeaderPrefix + keys[0]) + } + keys = keys[1:] + if len(keys) == 0 { + return + } + case <-c.leaderPrefixCleanupCh: + return + } + } +} + // clearLeader is used to clear our leadership entry func (c *Core) clearLeader(uuid string) error { + close(c.leaderPrefixCleanupCh) key := coreLeaderPrefix + uuid return c.barrier.Delete(key) } diff --git a/vault/core_test.go b/vault/core_test.go index b31172343..1e4997a09 100644 --- a/vault/core_test.go +++ b/vault/core_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/hashicorp/otto/helper/uuid" "github.com/hashicorp/vault/audit" "github.com/hashicorp/vault/logical" "github.com/hashicorp/vault/physical" @@ -1068,6 +1069,152 @@ func TestCore_LimitedUseToken(t *testing.T) { } } +func TestCore_CleanLeaderPrefix(t *testing.T) { + // Create the first core and initialize it + inm := physical.NewInmemHA() + advertiseOriginal := "http://127.0.0.1:8200" + core, err := NewCore(&CoreConfig{ + Physical: inm, + AdvertiseAddr: advertiseOriginal, + DisableMlock: true, + }) + if err != nil { + t.Fatalf("err: %v", err) + } + key, root := TestCoreInit(t, core) + if _, err := core.Unseal(TestKeyCopy(key)); err != nil { + t.Fatalf("unseal err: %s", err) + } + + // Verify unsealed + sealed, err := core.Sealed() + if err != nil { + t.Fatalf("err checking seal status: %s", err) + } + if sealed { + t.Fatal("should not be sealed") + } + + // Wait for core to become active + testWaitActive(t, core) + + // Ensure that the original clean function has stopped running + time.Sleep(2 * time.Second) + + // Put several random entries + for i := 0; i < 5; i++ { + core.barrier.Put(&Entry{ + Key: coreLeaderPrefix + uuid.GenerateUUID(), + Value: []byte(uuid.GenerateUUID()), + }) + } + + entries, err := core.barrier.List(coreLeaderPrefix) + if err != nil { + t.Fatalf("err: %v", err) + } + if len(entries) != 6 { + t.Fatalf("wrong number of core leader prefix entries, got %d", len(entries)) + } + + // Check the leader is local + isLeader, advertise, err := core.Leader() + if err != nil { + t.Fatalf("err: %v", err) + } + if !isLeader { + t.Fatalf("should be leader") + } + if advertise != advertiseOriginal { + t.Fatalf("Bad advertise: %v", advertise) + } + + // Create a second core, attached to same in-memory store + advertiseOriginal2 := "http://127.0.0.1:8500" + core2, err := NewCore(&CoreConfig{ + Physical: inm, + AdvertiseAddr: advertiseOriginal2, + DisableMlock: true, + }) + if err != nil { + t.Fatalf("err: %v", err) + } + if _, err := core2.Unseal(TestKeyCopy(key)); err != nil { + t.Fatalf("unseal err: %s", err) + } + + // Verify unsealed + sealed, err = core2.Sealed() + if err != nil { + t.Fatalf("err checking seal status: %s", err) + } + if sealed { + t.Fatal("should not be sealed") + } + + // Core2 should be in standby + standby, err := core2.Standby() + if err != nil { + t.Fatalf("err: %v", err) + } + if !standby { + t.Fatalf("should be standby") + } + + // Check the leader is not local + isLeader, advertise, err = core2.Leader() + if err != nil { + t.Fatalf("err: %v", err) + } + if isLeader { + t.Fatalf("should not be leader") + } + if advertise != advertiseOriginal { + t.Fatalf("Bad advertise: %v", advertise) + } + + // Seal the first core, should step down + err = core.Seal(root) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Core should be in standby + standby, err = core.Standby() + if err != nil { + t.Fatalf("err: %v", err) + } + if !standby { + t.Fatalf("should be standby") + } + + // Wait for core2 to become active + testWaitActive(t, core2) + + // Check the leader is local + isLeader, advertise, err = core2.Leader() + if err != nil { + t.Fatalf("err: %v", err) + } + if !isLeader { + t.Fatalf("should be leader") + } + if advertise != advertiseOriginal2 { + t.Fatalf("Bad advertise: %v", advertise) + } + + // Give time for the entries to clear out; it is conservative at 1/second + time.Sleep(7 * time.Second) + + entries, err = core2.barrier.List(coreLeaderPrefix) + if err != nil { + t.Fatalf("err: %v", err) + } + if len(entries) != 1 { + t.Fatalf("wrong number of core leader prefix entries, got %d", len(entries)) + } +} + func TestCore_Standby(t *testing.T) { // Create the first core and initialize it inm := physical.NewInmemHA()