api: Adding Destroy to cleanup a semaphore
This commit is contained in:
parent
a12883a5be
commit
af186ac54a
|
@ -36,9 +36,13 @@ var (
|
|||
// ErrSemaphoreHeld is returned if we attempt to double lock
|
||||
ErrSemaphoreHeld = fmt.Errorf("Semaphore already held")
|
||||
|
||||
// ErrSemaphoreNotHeld is returned if we attempt to unlock a lock
|
||||
// ErrSemaphoreNotHeld is returned if we attempt to unlock a semaphore
|
||||
// that we do not hold.
|
||||
ErrSemaphoreNotHeld = fmt.Errorf("Semaphore not held")
|
||||
|
||||
// ErrSemaphoreInUse is returned if we attempt to destroy a semaphore
|
||||
// that is in use.
|
||||
ErrSemaphoreInUse = fmt.Errorf("Semaphore in use")
|
||||
)
|
||||
|
||||
// Semaphore is used to implement a distributed semaphore
|
||||
|
@ -300,6 +304,56 @@ READ:
|
|||
return nil
|
||||
}
|
||||
|
||||
// Destroy is used to cleanup the semaphore entry. It is not necessary
|
||||
// to invoke. It will fail if the semaphore is in use.
|
||||
func (s *Semaphore) Destroy() error {
|
||||
// Hold the lock as we try to acquire
|
||||
s.l.Lock()
|
||||
defer s.l.Unlock()
|
||||
|
||||
// Check if we already hold the semaphore
|
||||
if s.isHeld {
|
||||
return ErrSemaphoreHeld
|
||||
}
|
||||
|
||||
// List for the semaphore
|
||||
kv := s.c.KV()
|
||||
pairs, _, err := kv.List(s.opts.Prefix, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read prefix: %v", err)
|
||||
}
|
||||
|
||||
// Find the lock pair, bail if it doesn't exist
|
||||
lockPair := s.findLock(pairs)
|
||||
if lockPair.ModifyIndex == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Decode the lock
|
||||
lock, err := s.decodeLock(lockPair)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Prune the dead holders
|
||||
s.pruneDeadHolders(lock, pairs)
|
||||
|
||||
// Check if there are any holders
|
||||
if len(lock.Holders) > 0 {
|
||||
return ErrSemaphoreInUse
|
||||
}
|
||||
|
||||
// Attempt the delete
|
||||
didRemove, _, err := kv.DeleteCAS(lockPair, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to remove semaphore: %v", err)
|
||||
}
|
||||
if !didRemove {
|
||||
return ErrSemaphoreInUse
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// createSession is used to create a new managed session
|
||||
func (s *Semaphore) createSession() (string, error) {
|
||||
session := s.c.Session()
|
||||
|
|
|
@ -212,3 +212,58 @@ func TestSemaphore_BadLimit(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSemaphore_Destroy(t *testing.T) {
|
||||
c, s := makeClient(t)
|
||||
defer s.stop()
|
||||
|
||||
sema, err := c.SemaphorePrefix("test/semaphore", 2)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
sema2, err := c.SemaphorePrefix("test/semaphore", 2)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
_, err = sema.Acquire(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
_, err = sema2.Acquire(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Destroy should fail, still held
|
||||
if err := sema.Destroy(); err != ErrSemaphoreHeld {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
err = sema.Release()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Destroy should fail, still in use
|
||||
if err := sema.Destroy(); err != ErrSemaphoreInUse {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
err = sema2.Release()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Destroy should work
|
||||
if err := sema.Destroy(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Destroy should work
|
||||
if err := sema2.Destroy(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue