commit
7916630479
276
physical/etcd.go
276
physical/etcd.go
|
@ -5,6 +5,7 @@ import (
|
|||
"errors"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
|
@ -12,18 +13,34 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
|
||||
// Ideally, this prefix would match the "_" used in the file backend, but
|
||||
// that prefix has special meaining in etcd. Specifically, it excludes those
|
||||
// entries from directory listings.
|
||||
EtcdNodeFilePrefix = "."
|
||||
|
||||
// The lock prefix can (and probably should) cause an entry to be excluded
|
||||
// from diretory listings, so "_" works here.
|
||||
EtcdNodeLockPrefix = "_"
|
||||
|
||||
// The delimiter is the same as the `-C` flag of etcdctl.
|
||||
EtcdMachineDelimiter = ","
|
||||
|
||||
// The lock TTL matches the default that Consul API uses, 15 seconds.
|
||||
EtcdLockTTL = uint64(15)
|
||||
|
||||
// The ammount of time to wait if a watch fails before trying again.
|
||||
EtcdWatchRetryInterval = time.Second
|
||||
|
||||
// The number of times to re-try a failed watch before signaling that leadership is lost.
|
||||
EtcdWatchRetryMax = 5
|
||||
)
|
||||
|
||||
var (
|
||||
EtcdSyncClusterError = errors.New("client setup failed: unable to sync etcd cluster")
|
||||
EtcdSyncClusterError = errors.New("client setup failed: unable to sync etcd cluster")
|
||||
EtcdSemaphoreKeysEmptyError = errors.New("lock queue is empty")
|
||||
EtcdLockHeldError = errors.New("lock already held")
|
||||
EtcdLockNotHeldError = errors.New("lock not held")
|
||||
EtcdSemaphoreKeyRemovedError = errors.New("semaphore key removed before lock aquisition")
|
||||
)
|
||||
|
||||
// errorIsMissingKey returns true if the given error is an etcd error with an
|
||||
|
@ -43,7 +60,6 @@ type EtcdBackend struct {
|
|||
|
||||
// newEtcdBackend constructs a etcd backend using a given machine address.
|
||||
func newEtcdBackend(conf map[string]string) (Backend, error) {
|
||||
|
||||
// Get the etcd path form the configuration.
|
||||
path, ok := conf["path"]
|
||||
if !ok {
|
||||
|
@ -68,7 +84,7 @@ func newEtcdBackend(conf map[string]string) (Backend, error) {
|
|||
return nil, EtcdSyncClusterError
|
||||
}
|
||||
|
||||
// Setup the backend
|
||||
// Setup the backend.
|
||||
return &EtcdBackend{
|
||||
path: path,
|
||||
client: client,
|
||||
|
@ -117,7 +133,6 @@ func (c *EtcdBackend) Delete(key string) error {
|
|||
if err != nil && !errorIsMissingKey(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -154,7 +169,6 @@ func (c *EtcdBackend) List(prefix string) ([]string, error) {
|
|||
out[i] = name[1:]
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
|
@ -167,3 +181,253 @@ func (b *EtcdBackend) nodePath(key string) string {
|
|||
func (b *EtcdBackend) nodePathDir(key string) string {
|
||||
return filepath.Join(b.path, key) + "/"
|
||||
}
|
||||
|
||||
// nodePathLock returns an etcd directory path used specifically for semaphore
|
||||
// indicies based on the given key.
|
||||
func (b *EtcdBackend) nodePathLock(key string) string {
|
||||
return filepath.Join(b.path, filepath.Dir(key), EtcdNodeLockPrefix+filepath.Base(key)+"/")
|
||||
}
|
||||
|
||||
// Lock is used for mutual exclusion based on the given key.
|
||||
func (c *EtcdBackend) LockWith(key, value string) (Lock, error) {
|
||||
return &EtcdLock{
|
||||
client: c.client,
|
||||
value: value,
|
||||
semaphoreDirKey: c.nodePathLock(key),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// EtcdLock emplements a lock using and etcd backend.
|
||||
type EtcdLock struct {
|
||||
client *etcd.Client
|
||||
value, semaphoreDirKey, semaphoreKey string
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
// addSemaphoreKey aquires a new ordered semaphore key.
|
||||
func (c *EtcdLock) addSemaphoreKey() (string, uint64, error) {
|
||||
// CreateInOrder is an atomic operation that can be used to enqueue a
|
||||
// request onto a semaphore. In the rest of the comments, we refer to the
|
||||
// resulting key as a "semaphore key".
|
||||
// https://coreos.com/etcd/docs/2.0.8/api.html#atomically-creating-in-order-keys
|
||||
response, err := c.client.CreateInOrder(c.semaphoreDirKey, c.value, EtcdLockTTL)
|
||||
if err != nil {
|
||||
return "", 0, err
|
||||
}
|
||||
return response.Node.Key, response.EtcdIndex, nil
|
||||
}
|
||||
|
||||
// getSemaphoreKey determines which semaphore key holder has aquired the lock
|
||||
// and its value.
|
||||
func (c *EtcdLock) getSemaphoreKey() (string, string, uint64, error) {
|
||||
// Get the list of waiters in order to see if we are next.
|
||||
response, err := c.client.Get(c.semaphoreDirKey, true, false)
|
||||
if err != nil {
|
||||
return "", "", 0, err
|
||||
}
|
||||
|
||||
// Make sure the list isn't empty.
|
||||
if response.Node.Nodes.Len() == 0 {
|
||||
return "", "", response.EtcdIndex, nil
|
||||
}
|
||||
return response.Node.Nodes[0].Key, response.Node.Nodes[0].Value, response.EtcdIndex, nil
|
||||
}
|
||||
|
||||
// isHeld determines if we are the current holders of the lock.
|
||||
func (c *EtcdLock) isHeld() (bool, error) {
|
||||
if c.semaphoreKey == "" {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Get the key of the curren holder of the lock.
|
||||
currentSemaphoreKey, _, _, err := c.getSemaphoreKey()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return c.semaphoreKey == currentSemaphoreKey, nil
|
||||
}
|
||||
|
||||
// assertHeld determines whether or not we are the current holders of the lock
|
||||
// and returns an EtcdLockNotHeldError if we are not.
|
||||
func (c *EtcdLock) assertHeld() error {
|
||||
held, err := c.isHeld()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check if we don't hold the lock.
|
||||
if !held {
|
||||
return EtcdLockNotHeldError
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// assertNotHeld determines whether or not we are the current holders of the
|
||||
// lock and returns an EtcdLockHeldError if we are.
|
||||
func (c *EtcdLock) assertNotHeld() error {
|
||||
held, err := c.isHeld()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check if we hold the lock.
|
||||
if held {
|
||||
return EtcdLockHeldError
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// watchForKeyRemoval continuously watches a single non-directory key starting
|
||||
// from the provided etcd index and closes the provided channel when it's
|
||||
// deleted, expires, or appears to be missing.
|
||||
func (c *EtcdLock) watchForKeyRemoval(key string, etcdIndex uint64, closeCh chan struct{}) {
|
||||
retries := EtcdWatchRetryMax
|
||||
|
||||
for {
|
||||
// Start a non-recursive watch of the given key.
|
||||
response, err := c.client.Watch(key, etcdIndex, false, nil, nil)
|
||||
if err != nil {
|
||||
|
||||
// If the key is just missing, we can exit the loop.
|
||||
if errorIsMissingKey(err) {
|
||||
break
|
||||
}
|
||||
|
||||
// If the error is something else, there's nothing we can do but retry
|
||||
// the watch. Check that we still have retries left.
|
||||
retries -= 1
|
||||
if retries == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
// Sleep for a period of time to avoid slamming etcd.
|
||||
time.Sleep(EtcdWatchRetryInterval)
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if the key we are concerned with has been removed. If it has, we
|
||||
// can exit the loop.
|
||||
if response.Node.Key == key &&
|
||||
(response.Action == "delete" || response.Action == "expire") {
|
||||
break
|
||||
}
|
||||
|
||||
// Update the etcd index.
|
||||
etcdIndex = response.EtcdIndex + 1
|
||||
}
|
||||
|
||||
// Regardless of what happened, we need to close the close channel.
|
||||
close(closeCh)
|
||||
}
|
||||
|
||||
// Lock attempts to aquire the lock by waiting for a new semaphore key in etcd
|
||||
// to become the first in the queue and will block until it is successful or
|
||||
// it recieves a signal on the provided channel. The returned channel will be
|
||||
// closed when the lock is lost, either by an explicit call to Unlock or by
|
||||
// the associated semaphore key in etcd otherwise being deleted or expiring.
|
||||
//
|
||||
// If the lock is currently held by this instance of EtcdLock, Lock will
|
||||
// return an EtcdLockHeldError error.
|
||||
func (c *EtcdLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
|
||||
// Get the local lock before interacting with etcd.
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
|
||||
// Check if the lock is already held.
|
||||
if err := c.assertNotHeld(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Add a new semaphore key that we will track.
|
||||
semaphoreKey, _, err := c.addSemaphoreKey()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.semaphoreKey = semaphoreKey
|
||||
|
||||
// Get the current semaphore key.
|
||||
currentSemaphoreKey, _, currentEtcdIndex, err := c.getSemaphoreKey()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create an etcd-compatible boolean stop channel from the provided
|
||||
// interface stop channel.
|
||||
boolStopCh := make(chan bool)
|
||||
go func() {
|
||||
<-stopCh
|
||||
close(boolStopCh)
|
||||
}()
|
||||
|
||||
// Loop until the we current semaphore key matches ours.
|
||||
for semaphoreKey != currentSemaphoreKey {
|
||||
var err error
|
||||
|
||||
// Start a watch of the entire lock directory, providing the stop channel.
|
||||
response, err := c.client.Watch(c.semaphoreDirKey, currentEtcdIndex+1, true, nil, boolStopCh)
|
||||
if err != nil {
|
||||
|
||||
// If the error is not an etcd error, we can assume it's a notification
|
||||
// of the stop channel having closed. In this scenario, we also want to
|
||||
// remove our semaphore key as we are no longer waiting to aquire the
|
||||
// lock.
|
||||
if _, ok := err.(*etcd.EtcdError); !ok {
|
||||
_, err = c.client.Delete(c.semaphoreKey, false)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Make sure the index we are waiting for has not been removed. If it has,
|
||||
// this is an error and nothing else needs to be done.
|
||||
if response.Node.Key == semaphoreKey &&
|
||||
(response.Action == "delete" || response.Action == "expire") {
|
||||
return nil, EtcdSemaphoreKeyRemovedError
|
||||
}
|
||||
|
||||
// Get the current semaphore key and etcd index.
|
||||
currentSemaphoreKey, _, currentEtcdIndex, err = c.getSemaphoreKey()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Create a channel to signal when we lose the lock.
|
||||
done := make(chan struct{})
|
||||
go c.watchForKeyRemoval(c.semaphoreKey, currentEtcdIndex+1, done)
|
||||
return done, nil
|
||||
}
|
||||
|
||||
// Unlock releases the lock by deleting the associated semaphore key in etcd.
|
||||
//
|
||||
// If the lock is not currently held by this instance of EtcdLock, Unlock will
|
||||
// return an EtcdLockNotHeldError error.
|
||||
func (c *EtcdLock) Unlock() error {
|
||||
// Get the local lock before interacting with etcd.
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
|
||||
// Check that the lock is held.
|
||||
if err := c.assertHeld(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Delete our semaphore key.
|
||||
if _, err := c.client.Delete(c.semaphoreKey, false); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Value checks whether or not the lock is held by any instance of EtcdLock,
|
||||
// including this one, and returns the current value.
|
||||
func (c *EtcdLock) Value() (bool, string, error) {
|
||||
semaphoreKey, semaphoreValue, _, err := c.getSemaphoreKey()
|
||||
if err != nil {
|
||||
return false, "", err
|
||||
}
|
||||
|
||||
if semaphoreKey == "" {
|
||||
return false, "", nil
|
||||
}
|
||||
return true, semaphoreValue, nil
|
||||
}
|
||||
|
|
|
@ -37,4 +37,10 @@ func TestEtcdBackend(t *testing.T) {
|
|||
|
||||
testBackend(t, b)
|
||||
testBackend_ListPrefix(t, b)
|
||||
|
||||
ha, ok := b.(HABackend)
|
||||
if !ok {
|
||||
t.Fatalf("etcd does not implement HABackend")
|
||||
}
|
||||
testHABackend(t, ha, ha)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue