Merge pull request #606 from tsilen/renew-etcd-semaphore-key

Renew the semaphore key periodically
This commit is contained in:
Jeff Mitchell 2015-09-17 10:00:06 -04:00
commit 91bcf83e5f

View file

@ -28,6 +28,9 @@ const (
// The lock TTL matches the default that Consul API uses, 15 seconds.
EtcdLockTTL = uint64(15)
// The amount of time to wait between the semaphore key renewals
EtcdLockRenewInterval = 5 * time.Second
// The ammount of time to wait if a watch fails before trying again.
EtcdWatchRetryInterval = time.Second
@ -217,6 +220,15 @@ func (c *EtcdLock) addSemaphoreKey() (string, uint64, error) {
return response.Node.Key, response.EtcdIndex, nil
}
// renewSemaphoreKey renews an existing semaphore key.
func (c *EtcdLock) renewSemaphoreKey() (string, uint64, error) {
response, err := c.client.Update(c.semaphoreKey, c.value, EtcdLockTTL)
if err != nil {
return "", 0, err
}
return response.Node.Key, response.EtcdIndex, nil
}
// getSemaphoreKey determines which semaphore key holder has aquired the lock
// and its value.
func (c *EtcdLock) getSemaphoreKey() (string, string, uint64, error) {
@ -277,6 +289,17 @@ func (c *EtcdLock) assertNotHeld() error {
return nil
}
// periodically renew our semaphore key so that it doesn't expire
func (c *EtcdLock) periodicallyRenewSemaphoreKey(stopCh chan struct{}) {
for {
select {
case <-time.After(EtcdLockRenewInterval):
c.renewSemaphoreKey()
case <-stopCh:
return
}
}
}
// watchForKeyRemoval continuously watches a single non-directory key starting
// from the provided etcd index and closes the provided channel when it's
// deleted, expires, or appears to be missing.
@ -328,7 +351,7 @@ func (c *EtcdLock) watchForKeyRemoval(key string, etcdIndex uint64, closeCh chan
//
// If the lock is currently held by this instance of EtcdLock, Lock will
// return an EtcdLockHeldError error.
func (c *EtcdLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
func (c *EtcdLock) Lock(stopCh <-chan struct{}) (doneCh <-chan struct{}, retErr error) {
// Get the local lock before interacting with etcd.
c.lock.Lock()
defer c.lock.Unlock()
@ -359,6 +382,16 @@ func (c *EtcdLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
close(boolStopCh)
}()
// Create a channel to signal when we lose the semaphore key.
done := make(chan struct{})
defer func() {
if retErr != nil {
close(done)
}
}()
go c.periodicallyRenewSemaphoreKey(done)
// Loop until the we current semaphore key matches ours.
for semaphoreKey != currentSemaphoreKey {
var err error
@ -391,8 +424,6 @@ func (c *EtcdLock) Lock(stopCh <-chan struct{}) (<-chan struct{}, error) {
}
}
// Create a channel to signal when we lose the lock.
done := make(chan struct{})
go c.watchForKeyRemoval(c.semaphoreKey, currentEtcdIndex+1, done)
return done, nil
}