Rough implementation of Zookeeper HA physical backend. Contains breaking changes to 'path' config. Has unresolved TODO's.
This commit is contained in:
parent
740e7b5f20
commit
f6de41c31d
|
@ -14,7 +14,8 @@ import (
|
|||
// prefix within Zookeeper. It is used in production situations as
|
||||
// it allows Vault to run on multiple machines in a highly-available manner.
|
||||
type ZookeeperBackend struct {
|
||||
path string
|
||||
datapath string
|
||||
lockpath string
|
||||
client *zk.Conn
|
||||
}
|
||||
|
||||
|
@ -22,18 +23,20 @@ type ZookeeperBackend struct {
|
|||
// and the prefix in the KV store.
|
||||
func newZookeeperBackend(conf map[string]string) (Backend, error) {
|
||||
// Get the path in Zookeeper
|
||||
path, ok := conf["path"]
|
||||
basepath, ok := conf["path"]
|
||||
if !ok {
|
||||
path = "vault/"
|
||||
basepath = "vault/"
|
||||
}
|
||||
|
||||
// Ensure path is suffixed and prefixed (zk requires prefix /)
|
||||
if !strings.HasSuffix(path, "/") {
|
||||
path += "/"
|
||||
if !strings.HasSuffix(basepath, "/") {
|
||||
basepath += "/"
|
||||
}
|
||||
if !strings.HasPrefix(path, "/") {
|
||||
path = "/" + path
|
||||
if !strings.HasPrefix(basepath, "/") {
|
||||
basepath = "/" + basepath
|
||||
}
|
||||
datapath := basepath + "data/"
|
||||
lockpath := basepath + "lock/"
|
||||
|
||||
// Configure the client, default to localhost instance
|
||||
var machines string
|
||||
|
@ -50,7 +53,8 @@ func newZookeeperBackend(conf map[string]string) (Backend, error) {
|
|||
|
||||
// Setup the backend
|
||||
c := &ZookeeperBackend{
|
||||
path: path,
|
||||
datapath: datapath,
|
||||
lockpath: lockpath,
|
||||
client: client,
|
||||
}
|
||||
return c, nil
|
||||
|
@ -93,7 +97,7 @@ func (c *ZookeeperBackend) Put(entry *Entry) error {
|
|||
defer metrics.MeasureSince([]string{"zookeeper", "put"}, time.Now())
|
||||
|
||||
// Attempt to set the full path
|
||||
fullPath := c.path + entry.Key
|
||||
fullPath := c.datapath + entry.Key
|
||||
_, err := c.client.Set(fullPath, entry.Value, 0)
|
||||
|
||||
// If we get ErrNoNode, we need to construct the path hierarchy
|
||||
|
@ -108,7 +112,7 @@ func (c *ZookeeperBackend) Get(key string) (*Entry, error) {
|
|||
defer metrics.MeasureSince([]string{"zookeeper", "get"}, time.Now())
|
||||
|
||||
// Attempt to read the full path
|
||||
fullPath := c.path + key
|
||||
fullPath := c.datapath + key
|
||||
value, _, err := c.client.Get(fullPath)
|
||||
|
||||
// Ignore if the node does not exist
|
||||
|
@ -135,7 +139,7 @@ func (c *ZookeeperBackend) Delete(key string) error {
|
|||
defer metrics.MeasureSince([]string{"zookeeper", "delete"}, time.Now())
|
||||
|
||||
// Delete the full path
|
||||
fullPath := c.path + key
|
||||
fullPath := c.datapath + key
|
||||
err := c.client.Delete(fullPath, -1)
|
||||
|
||||
// Mask if the node does not exist
|
||||
|
@ -151,7 +155,7 @@ func (c *ZookeeperBackend) List(prefix string) ([]string, error) {
|
|||
defer metrics.MeasureSince([]string{"zookeeper", "list"}, time.Now())
|
||||
|
||||
// Query the children at the full path
|
||||
fullPath := strings.TrimSuffix(c.path+prefix, "/")
|
||||
fullPath := strings.TrimSuffix(c.datapath+prefix, "/")
|
||||
result, _, err := c.client.Children(fullPath)
|
||||
|
||||
// If the path nodes are missing, no children!
|
||||
|
@ -174,3 +178,106 @@ func (c *ZookeeperBackend) List(prefix string) ([]string, error) {
|
|||
sort.Strings(children)
|
||||
return children, nil
|
||||
}
|
||||
|
||||
// LockWith is used for mutual exclusion based on the given key.
|
||||
func (c *ZookeeperBackend) LockWith(key, value string) (Lock, error) {
|
||||
l := &ZookeeperHALock{
|
||||
in: c,
|
||||
key: key,
|
||||
value: value,
|
||||
}
|
||||
return l, nil
|
||||
}
|
||||
|
||||
func (c *ZookeeperBackend) DetectHostAddr() (string, error) {
|
||||
// TODO: implement this!
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// ZookeeperHALock is a Zookeeper Lock implementation for the HABackend
|
||||
type ZookeeperHALock struct {
|
||||
in *ZookeeperBackend
|
||||
key string
|
||||
value string
|
||||
|
||||
held bool
|
||||
leaderCh chan struct{}
|
||||
zkLock *zk.Lock
|
||||
}
|
||||
|
||||
func (i *ZookeeperHALock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
|
||||
if i.held {
|
||||
return nil, fmt.Errorf("lock already held")
|
||||
}
|
||||
|
||||
// Attempt an async acquisition
|
||||
didLock := make(chan struct{})
|
||||
failLock := make(chan error, 1)
|
||||
releaseCh := make(chan bool, 1)
|
||||
go func() {
|
||||
// Wait to acquire the lock in ZK
|
||||
acl := zk.WorldACL(zk.PermAll)
|
||||
lockpath := i.in.lockpath + i.key
|
||||
lock := zk.NewLock(i.in.client, lockpath, acl)
|
||||
err := lock.Lock()
|
||||
if err != nil {
|
||||
failLock <- err
|
||||
return
|
||||
}
|
||||
// Set node value
|
||||
err2 := i.in.ensurePath(lockpath, []byte(i.value))
|
||||
if err2 != nil {
|
||||
failLock <- err2
|
||||
lock.Unlock()
|
||||
return
|
||||
}
|
||||
i.zkLock = lock
|
||||
|
||||
// Signal that lock is held
|
||||
close(didLock)
|
||||
|
||||
// Handle an early abort
|
||||
release := <-releaseCh
|
||||
if release {
|
||||
lock.Unlock()
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for lock acquisition, failure, or shutdown
|
||||
select {
|
||||
case <-didLock:
|
||||
releaseCh <- false
|
||||
case err := <-failLock:
|
||||
return nil, err
|
||||
case <-stopCh:
|
||||
releaseCh <- true
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Create the leader channel
|
||||
i.held = true
|
||||
i.leaderCh = make(chan struct{})
|
||||
|
||||
// TODO: Watch for Events which could result in loss of our zkLock and close(i.leaderCh)
|
||||
|
||||
return i.leaderCh, nil
|
||||
}
|
||||
|
||||
func (i *ZookeeperHALock) Unlock() error {
|
||||
if !i.held {
|
||||
return nil
|
||||
}
|
||||
|
||||
close(i.leaderCh)
|
||||
i.leaderCh = nil
|
||||
i.held = false
|
||||
i.zkLock.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *ZookeeperHALock) Value() (bool, string, error) {
|
||||
lockpath := i.in.lockpath + i.key
|
||||
value, _, err := i.in.client.Get(lockpath)
|
||||
return i.held, string(value), err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue