vault: leader should advertise address
This commit is contained in:
parent
b28dac7cb2
commit
445f64eb39
|
@ -29,6 +29,10 @@ const (
|
|||
// for a highly-available deploy.
|
||||
coreLockPath = "core/lock"
|
||||
|
||||
// coreLeaderPrefix is the prefix used for the UUID that contains
|
||||
// the currently elected leader.
|
||||
coreLeaderPrefix = "core/leader/"
|
||||
|
||||
// lockRetryInterval is the interval we re-attempt to acquire the
|
||||
// HA lock if an error is encountered
|
||||
lockRetryInterval = 10 * time.Second
|
||||
|
@ -111,6 +115,9 @@ type Core struct {
|
|||
// HABackend may be available depending on the physical backend
|
||||
ha physical.HABackend
|
||||
|
||||
// AdvertiseAddr is the address we advertise as leader if held
|
||||
advertiseAddr string
|
||||
|
||||
// physical backend is the un-trusted backend with durable data
|
||||
physical physical.Backend
|
||||
|
||||
|
@ -186,8 +193,9 @@ type CoreConfig struct {
|
|||
AuditBackends map[string]audit.Factory
|
||||
Physical physical.Backend
|
||||
Logger *log.Logger
|
||||
DisableCache bool // Disables the LRU cache on the physical backend
|
||||
CacheSize int // Custom cache size of zero for default
|
||||
DisableCache bool // Disables the LRU cache on the physical backend
|
||||
CacheSize int // Custom cache size of zero for default
|
||||
AdvertiseAddr string // Set as the leader address for HA
|
||||
}
|
||||
|
||||
// NewCore isk used to construct a new core
|
||||
|
@ -197,6 +205,9 @@ func NewCore(conf *CoreConfig) (*Core, error) {
|
|||
if ha, ok := conf.Physical.(physical.HABackend); ok {
|
||||
haBackend = ha
|
||||
}
|
||||
if haBackend != nil && conf.AdvertiseAddr == "" {
|
||||
return nil, fmt.Errorf("missing advertisement address")
|
||||
}
|
||||
|
||||
// Wrap the backend in a cache unless disabled
|
||||
if !conf.DisableCache {
|
||||
|
@ -221,13 +232,14 @@ func NewCore(conf *CoreConfig) (*Core, error) {
|
|||
|
||||
// Setup the core
|
||||
c := &Core{
|
||||
ha: haBackend,
|
||||
physical: conf.Physical,
|
||||
barrier: barrier,
|
||||
router: NewRouter(),
|
||||
sealed: true,
|
||||
standby: true,
|
||||
logger: conf.Logger,
|
||||
ha: haBackend,
|
||||
advertiseAddr: conf.AdvertiseAddr,
|
||||
physical: conf.Physical,
|
||||
barrier: barrier,
|
||||
router: NewRouter(),
|
||||
sealed: true,
|
||||
standby: true,
|
||||
logger: conf.Logger,
|
||||
}
|
||||
|
||||
// Setup the backends
|
||||
|
@ -887,7 +899,8 @@ func (c *Core) runStandby(doneCh, stopCh chan struct{}) {
|
|||
}
|
||||
|
||||
// Create a lock
|
||||
lock, err := c.ha.LockWith(coreLockPath)
|
||||
uuid := generateUUID()
|
||||
lock, err := c.ha.LockWith(coreLockPath, uuid)
|
||||
if err != nil {
|
||||
c.logger.Printf("[ERR] core: failed to create lock: %v", err)
|
||||
return
|
||||
|
@ -902,6 +915,13 @@ func (c *Core) runStandby(doneCh, stopCh chan struct{}) {
|
|||
}
|
||||
c.logger.Printf("[INFO] core: acquired lock, enabling active operation")
|
||||
|
||||
// Advertise ourself as leader
|
||||
if err := c.advertiseLeader(uuid); err != nil {
|
||||
c.logger.Printf("[ERR] core: leader advertisement setup failed: %v", err)
|
||||
lock.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
// Attempt the post-unseal process
|
||||
c.stateLock.Lock()
|
||||
err = c.postUnseal()
|
||||
|
@ -925,6 +945,11 @@ func (c *Core) runStandby(doneCh, stopCh chan struct{}) {
|
|||
c.logger.Printf("[WARN] core: stopping active operation")
|
||||
}
|
||||
|
||||
// Clear ourself as leader
|
||||
if err := c.clearLeader(uuid); err != nil {
|
||||
c.logger.Printf("[ERR] core: clearing leader advertisement failed: %v", err)
|
||||
}
|
||||
|
||||
// Attempt the pre-seal process
|
||||
c.stateLock.Lock()
|
||||
c.standby = true
|
||||
|
@ -961,6 +986,21 @@ func (c *Core) acquireLock(lock physical.Lock, stopCh <-chan struct{}) <-chan st
|
|||
}
|
||||
}
|
||||
|
||||
// advertiseLeader is used to advertise the current node as leader
|
||||
func (c *Core) advertiseLeader(uuid string) error {
|
||||
ent := &Entry{
|
||||
Key: coreLeaderPrefix + uuid,
|
||||
Value: []byte(c.advertiseAddr),
|
||||
}
|
||||
return c.barrier.Put(ent)
|
||||
}
|
||||
|
||||
// clearLeader is used to clear our leadership entry
|
||||
func (c *Core) clearLeader(uuid string) error {
|
||||
key := coreLeaderPrefix + uuid
|
||||
return c.barrier.Delete(key)
|
||||
}
|
||||
|
||||
// emitMetrics is used to periodically expose metrics while runnig
|
||||
func (c *Core) emitMetrics(stopCh chan struct{}) {
|
||||
for {
|
||||
|
|
|
@ -987,7 +987,7 @@ func TestCore_HandleRequest_CreateToken_Lease(t *testing.T) {
|
|||
func TestCore_Standby(t *testing.T) {
|
||||
// Create the first core and initialize it
|
||||
inm := physical.NewInmemHA()
|
||||
core, err := NewCore(&CoreConfig{Physical: inm})
|
||||
core, err := NewCore(&CoreConfig{Physical: inm, AdvertiseAddr: "foo"})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -1036,7 +1036,7 @@ func TestCore_Standby(t *testing.T) {
|
|||
}
|
||||
|
||||
// Create a second core, attached to same in-memory store
|
||||
core2, err := NewCore(&CoreConfig{Physical: inm})
|
||||
core2, err := NewCore(&CoreConfig{Physical: inm, AdvertiseAddr: "bar"})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue