From 445f64eb39d9cfe0ce7d04a110d78f687cf4d57c Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 14 Apr 2015 16:44:48 -0700 Subject: [PATCH] vault: leader should advertise address --- vault/core.go | 60 ++++++++++++++++++++++++++++++++++++++-------- vault/core_test.go | 4 ++-- 2 files changed, 52 insertions(+), 12 deletions(-) diff --git a/vault/core.go b/vault/core.go index 1d21577f3..75f91b8fd 100644 --- a/vault/core.go +++ b/vault/core.go @@ -29,6 +29,10 @@ const ( // for a highly-available deploy. coreLockPath = "core/lock" + // coreLeaderPrefix is the prefix used for the UUID that contains + // the currently elected leader. + coreLeaderPrefix = "core/leader/" + // lockRetryInterval is the interval we re-attempt to acquire the // HA lock if an error is encountered lockRetryInterval = 10 * time.Second @@ -111,6 +115,9 @@ type Core struct { // HABackend may be available depending on the physical backend ha physical.HABackend + // AdvertiseAddr is the address we advertise as leader if held + advertiseAddr string + // physical backend is the un-trusted backend with durable data physical physical.Backend @@ -186,8 +193,9 @@ type CoreConfig struct { AuditBackends map[string]audit.Factory Physical physical.Backend Logger *log.Logger - DisableCache bool // Disables the LRU cache on the physical backend - CacheSize int // Custom cache size of zero for default + DisableCache bool // Disables the LRU cache on the physical backend + CacheSize int // Custom cache size of zero for default + AdvertiseAddr string // Set as the leader address for HA } // NewCore isk used to construct a new core @@ -197,6 +205,9 @@ func NewCore(conf *CoreConfig) (*Core, error) { if ha, ok := conf.Physical.(physical.HABackend); ok { haBackend = ha } + if haBackend != nil && conf.AdvertiseAddr == "" { + return nil, fmt.Errorf("missing advertisement address") + } // Wrap the backend in a cache unless disabled if !conf.DisableCache { @@ -221,13 +232,14 @@ func NewCore(conf *CoreConfig) (*Core, error) { // Setup the core c := &Core{ - ha: haBackend, - physical: conf.Physical, - barrier: barrier, - router: NewRouter(), - sealed: true, - standby: true, - logger: conf.Logger, + ha: haBackend, + advertiseAddr: conf.AdvertiseAddr, + physical: conf.Physical, + barrier: barrier, + router: NewRouter(), + sealed: true, + standby: true, + logger: conf.Logger, } // Setup the backends @@ -887,7 +899,8 @@ func (c *Core) runStandby(doneCh, stopCh chan struct{}) { } // Create a lock - lock, err := c.ha.LockWith(coreLockPath) + uuid := generateUUID() + lock, err := c.ha.LockWith(coreLockPath, uuid) if err != nil { c.logger.Printf("[ERR] core: failed to create lock: %v", err) return @@ -902,6 +915,13 @@ func (c *Core) runStandby(doneCh, stopCh chan struct{}) { } c.logger.Printf("[INFO] core: acquired lock, enabling active operation") + // Advertise ourself as leader + if err := c.advertiseLeader(uuid); err != nil { + c.logger.Printf("[ERR] core: leader advertisement setup failed: %v", err) + lock.Unlock() + continue + } + // Attempt the post-unseal process c.stateLock.Lock() err = c.postUnseal() @@ -925,6 +945,11 @@ func (c *Core) runStandby(doneCh, stopCh chan struct{}) { c.logger.Printf("[WARN] core: stopping active operation") } + // Clear ourself as leader + if err := c.clearLeader(uuid); err != nil { + c.logger.Printf("[ERR] core: clearing leader advertisement failed: %v", err) + } + // Attempt the pre-seal process c.stateLock.Lock() c.standby = true @@ -961,6 +986,21 @@ func (c *Core) acquireLock(lock physical.Lock, stopCh <-chan struct{}) <-chan st } } +// advertiseLeader is used to advertise the current node as leader +func (c *Core) advertiseLeader(uuid string) error { + ent := &Entry{ + Key: coreLeaderPrefix + uuid, + Value: []byte(c.advertiseAddr), + } + return c.barrier.Put(ent) +} + +// clearLeader is used to clear our leadership entry +func (c *Core) clearLeader(uuid string) error { + key := coreLeaderPrefix + uuid + return c.barrier.Delete(key) +} + // emitMetrics is used to periodically expose metrics while runnig func (c *Core) emitMetrics(stopCh chan struct{}) { for { diff --git a/vault/core_test.go b/vault/core_test.go index 3fd41e7cf..f99ee75c7 100644 --- a/vault/core_test.go +++ b/vault/core_test.go @@ -987,7 +987,7 @@ func TestCore_HandleRequest_CreateToken_Lease(t *testing.T) { func TestCore_Standby(t *testing.T) { // Create the first core and initialize it inm := physical.NewInmemHA() - core, err := NewCore(&CoreConfig{Physical: inm}) + core, err := NewCore(&CoreConfig{Physical: inm, AdvertiseAddr: "foo"}) if err != nil { t.Fatalf("err: %v", err) } @@ -1036,7 +1036,7 @@ func TestCore_Standby(t *testing.T) { } // Create a second core, attached to same in-memory store - core2, err := NewCore(&CoreConfig{Physical: inm}) + core2, err := NewCore(&CoreConfig{Physical: inm, AdvertiseAddr: "bar"}) if err != nil { t.Fatalf("err: %v", err) }