From af186ac54a3de67ac6a22d58ecc9a72ba7ba3901 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 13 Jan 2015 14:18:28 -0800 Subject: [PATCH] api: Adding Destroy to cleanup a semaphore --- api/semaphore.go | 56 ++++++++++++++++++++++++++++++++++++++++++- api/semaphore_test.go | 55 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 110 insertions(+), 1 deletion(-) diff --git a/api/semaphore.go b/api/semaphore.go index c1c467a95..17b6396c3 100644 --- a/api/semaphore.go +++ b/api/semaphore.go @@ -36,9 +36,13 @@ var ( // ErrSemaphoreHeld is returned if we attempt to double lock ErrSemaphoreHeld = fmt.Errorf("Semaphore already held") - // ErrSemaphoreNotHeld is returned if we attempt to unlock a lock + // ErrSemaphoreNotHeld is returned if we attempt to unlock a semaphore // that we do not hold. ErrSemaphoreNotHeld = fmt.Errorf("Semaphore not held") + + // ErrSemaphoreInUse is returned if we attempt to destroy a semaphore + // that is in use. + ErrSemaphoreInUse = fmt.Errorf("Semaphore in use") ) // Semaphore is used to implement a distributed semaphore @@ -300,6 +304,56 @@ READ: return nil } +// Destroy is used to cleanup the semaphore entry. It is not necessary +// to invoke. It will fail if the semaphore is in use. +func (s *Semaphore) Destroy() error { + // Hold the lock as we try to acquire + s.l.Lock() + defer s.l.Unlock() + + // Check if we already hold the semaphore + if s.isHeld { + return ErrSemaphoreHeld + } + + // List for the semaphore + kv := s.c.KV() + pairs, _, err := kv.List(s.opts.Prefix, nil) + if err != nil { + return fmt.Errorf("failed to read prefix: %v", err) + } + + // Find the lock pair, bail if it doesn't exist + lockPair := s.findLock(pairs) + if lockPair.ModifyIndex == 0 { + return nil + } + + // Decode the lock + lock, err := s.decodeLock(lockPair) + if err != nil { + return err + } + + // Prune the dead holders + s.pruneDeadHolders(lock, pairs) + + // Check if there are any holders + if len(lock.Holders) > 0 { + return ErrSemaphoreInUse + } + + // Attempt the delete + didRemove, _, err := kv.DeleteCAS(lockPair, nil) + if err != nil { + return fmt.Errorf("failed to remove semaphore: %v", err) + } + if !didRemove { + return ErrSemaphoreInUse + } + return nil +} + // createSession is used to create a new managed session func (s *Semaphore) createSession() (string, error) { session := s.c.Session() diff --git a/api/semaphore_test.go b/api/semaphore_test.go index d2d1487ef..e57f2b457 100644 --- a/api/semaphore_test.go +++ b/api/semaphore_test.go @@ -212,3 +212,58 @@ func TestSemaphore_BadLimit(t *testing.T) { t.Fatalf("err: %v", err) } } + +func TestSemaphore_Destroy(t *testing.T) { + c, s := makeClient(t) + defer s.stop() + + sema, err := c.SemaphorePrefix("test/semaphore", 2) + if err != nil { + t.Fatalf("err: %v", err) + } + + sema2, err := c.SemaphorePrefix("test/semaphore", 2) + if err != nil { + t.Fatalf("err: %v", err) + } + + _, err = sema.Acquire(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + _, err = sema2.Acquire(nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Destroy should fail, still held + if err := sema.Destroy(); err != ErrSemaphoreHeld { + t.Fatalf("err: %v", err) + } + + err = sema.Release() + if err != nil { + t.Fatalf("err: %v", err) + } + + // Destroy should fail, still in use + if err := sema.Destroy(); err != ErrSemaphoreInUse { + t.Fatalf("err: %v", err) + } + + err = sema2.Release() + if err != nil { + t.Fatalf("err: %v", err) + } + + // Destroy should work + if err := sema.Destroy(); err != nil { + t.Fatalf("err: %v", err) + } + + // Destroy should work + if err := sema2.Destroy(); err != nil { + t.Fatalf("err: %v", err) + } +}