2015-01-13 02:13:52 +00:00
|
|
|
package api
|
|
|
|
|
|
|
|
import (
|
|
|
|
"log"
|
|
|
|
"sync"
|
|
|
|
"testing"
|
|
|
|
"time"
|
|
|
|
)
|
|
|
|
|
|
|
|
func TestSemaphore_AcquireRelease(t *testing.T) {
|
2015-05-08 17:27:24 +00:00
|
|
|
t.Parallel()
|
2015-01-13 02:13:52 +00:00
|
|
|
c, s := makeClient(t)
|
2015-03-03 02:18:38 +00:00
|
|
|
defer s.Stop()
|
2015-01-13 02:13:52 +00:00
|
|
|
|
|
|
|
sema, err := c.SemaphorePrefix("test/semaphore", 2)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Initial release should fail
|
|
|
|
err = sema.Release()
|
|
|
|
if err != ErrSemaphoreNotHeld {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Should work
|
|
|
|
lockCh, err := sema.Acquire(nil)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
if lockCh == nil {
|
|
|
|
t.Fatalf("not hold")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Double lock should fail
|
|
|
|
_, err = sema.Acquire(nil)
|
|
|
|
if err != ErrSemaphoreHeld {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Should be held
|
|
|
|
select {
|
|
|
|
case <-lockCh:
|
|
|
|
t.Fatalf("should be held")
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
|
|
|
// Initial release should work
|
|
|
|
err = sema.Release()
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Double unlock should fail
|
|
|
|
err = sema.Release()
|
|
|
|
if err != ErrSemaphoreNotHeld {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Should lose resource
|
|
|
|
select {
|
|
|
|
case <-lockCh:
|
|
|
|
case <-time.After(time.Second):
|
|
|
|
t.Fatalf("should not be held")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestSemaphore_ForceInvalidate(t *testing.T) {
|
2015-05-08 17:27:24 +00:00
|
|
|
t.Parallel()
|
2015-01-13 02:13:52 +00:00
|
|
|
c, s := makeClient(t)
|
2015-03-03 02:18:38 +00:00
|
|
|
defer s.Stop()
|
2015-01-13 02:13:52 +00:00
|
|
|
|
|
|
|
sema, err := c.SemaphorePrefix("test/semaphore", 2)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Should work
|
|
|
|
lockCh, err := sema.Acquire(nil)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
if lockCh == nil {
|
|
|
|
t.Fatalf("not acquired")
|
|
|
|
}
|
|
|
|
defer sema.Release()
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
// Nuke the session, simulator an operator invalidation
|
|
|
|
// or a health check failure
|
|
|
|
session := c.Session()
|
|
|
|
session.Destroy(sema.lockSession, nil)
|
|
|
|
}()
|
|
|
|
|
|
|
|
// Should loose slot
|
|
|
|
select {
|
|
|
|
case <-lockCh:
|
|
|
|
case <-time.After(time.Second):
|
|
|
|
t.Fatalf("should not be locked")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestSemaphore_DeleteKey(t *testing.T) {
|
2015-05-08 17:27:24 +00:00
|
|
|
t.Parallel()
|
2015-01-13 02:13:52 +00:00
|
|
|
c, s := makeClient(t)
|
2015-03-03 02:18:38 +00:00
|
|
|
defer s.Stop()
|
2015-01-13 02:13:52 +00:00
|
|
|
|
|
|
|
sema, err := c.SemaphorePrefix("test/semaphore", 2)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Should work
|
|
|
|
lockCh, err := sema.Acquire(nil)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
if lockCh == nil {
|
|
|
|
t.Fatalf("not locked")
|
|
|
|
}
|
|
|
|
defer sema.Release()
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
// Nuke the key, simulate an operator intervention
|
|
|
|
kv := c.KV()
|
|
|
|
kv.DeleteTree("test/semaphore", nil)
|
|
|
|
}()
|
|
|
|
|
|
|
|
// Should loose leadership
|
|
|
|
select {
|
|
|
|
case <-lockCh:
|
|
|
|
case <-time.After(time.Second):
|
|
|
|
t.Fatalf("should not be locked")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestSemaphore_Contend(t *testing.T) {
|
2015-05-08 17:27:24 +00:00
|
|
|
t.Parallel()
|
2015-01-13 02:13:52 +00:00
|
|
|
c, s := makeClient(t)
|
2015-03-03 02:18:38 +00:00
|
|
|
defer s.Stop()
|
2015-01-13 02:13:52 +00:00
|
|
|
|
|
|
|
wg := &sync.WaitGroup{}
|
|
|
|
acquired := make([]bool, 4)
|
|
|
|
for idx := range acquired {
|
|
|
|
wg.Add(1)
|
|
|
|
go func(idx int) {
|
|
|
|
defer wg.Done()
|
|
|
|
sema, err := c.SemaphorePrefix("test/semaphore", 2)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Should work eventually, will contend
|
|
|
|
lockCh, err := sema.Acquire(nil)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
if lockCh == nil {
|
|
|
|
t.Fatalf("not locked")
|
|
|
|
}
|
|
|
|
defer sema.Release()
|
|
|
|
log.Printf("Contender %d acquired", idx)
|
|
|
|
|
|
|
|
// Set acquired and then leave
|
|
|
|
acquired[idx] = true
|
|
|
|
}(idx)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Wait for termination
|
|
|
|
doneCh := make(chan struct{})
|
|
|
|
go func() {
|
|
|
|
wg.Wait()
|
|
|
|
close(doneCh)
|
|
|
|
}()
|
|
|
|
|
|
|
|
// Wait for everybody to get a turn
|
|
|
|
select {
|
|
|
|
case <-doneCh:
|
|
|
|
case <-time.After(3 * DefaultLockRetryTime):
|
|
|
|
t.Fatalf("timeout")
|
|
|
|
}
|
|
|
|
|
|
|
|
for idx, did := range acquired {
|
|
|
|
if !did {
|
|
|
|
t.Fatalf("contender %d never acquired", idx)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2015-01-13 02:18:32 +00:00
|
|
|
|
|
|
|
func TestSemaphore_BadLimit(t *testing.T) {
|
2015-05-08 17:27:24 +00:00
|
|
|
t.Parallel()
|
2015-01-13 02:18:32 +00:00
|
|
|
c, s := makeClient(t)
|
2015-03-03 02:18:38 +00:00
|
|
|
defer s.Stop()
|
2015-01-13 02:18:32 +00:00
|
|
|
|
|
|
|
sema, err := c.SemaphorePrefix("test/semaphore", 0)
|
|
|
|
if err == nil {
|
|
|
|
t.Fatalf("should error")
|
|
|
|
}
|
|
|
|
|
|
|
|
sema, err = c.SemaphorePrefix("test/semaphore", 1)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
_, err = sema.Acquire(nil)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
sema2, err := c.SemaphorePrefix("test/semaphore", 2)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
_, err = sema2.Acquire(nil)
|
|
|
|
if err.Error() != "semaphore limit conflict (lock: 1, local: 2)" {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
}
|
2015-01-13 22:18:28 +00:00
|
|
|
|
|
|
|
func TestSemaphore_Destroy(t *testing.T) {
|
2015-05-08 17:27:24 +00:00
|
|
|
t.Parallel()
|
2015-01-13 22:18:28 +00:00
|
|
|
c, s := makeClient(t)
|
2015-03-03 02:18:38 +00:00
|
|
|
defer s.Stop()
|
2015-01-13 22:18:28 +00:00
|
|
|
|
|
|
|
sema, err := c.SemaphorePrefix("test/semaphore", 2)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
sema2, err := c.SemaphorePrefix("test/semaphore", 2)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
_, err = sema.Acquire(nil)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
_, err = sema2.Acquire(nil)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Destroy should fail, still held
|
|
|
|
if err := sema.Destroy(); err != ErrSemaphoreHeld {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
err = sema.Release()
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Destroy should fail, still in use
|
|
|
|
if err := sema.Destroy(); err != ErrSemaphoreInUse {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
err = sema2.Release()
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Destroy should work
|
|
|
|
if err := sema.Destroy(); err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Destroy should work
|
|
|
|
if err := sema2.Destroy(); err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
}
|
2015-01-20 01:32:19 +00:00
|
|
|
|
|
|
|
func TestSemaphore_Conflict(t *testing.T) {
|
2015-05-08 17:27:24 +00:00
|
|
|
t.Parallel()
|
2015-01-20 01:32:19 +00:00
|
|
|
c, s := makeClient(t)
|
2015-03-03 02:18:38 +00:00
|
|
|
defer s.Stop()
|
2015-01-20 01:32:19 +00:00
|
|
|
|
|
|
|
lock, err := c.LockKey("test/sema/.lock")
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Should work
|
|
|
|
leaderCh, err := lock.Lock(nil)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
if leaderCh == nil {
|
|
|
|
t.Fatalf("not leader")
|
|
|
|
}
|
|
|
|
defer lock.Unlock()
|
|
|
|
|
|
|
|
sema, err := c.SemaphorePrefix("test/sema/", 2)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Should conflict with lock
|
|
|
|
_, err = sema.Acquire(nil)
|
|
|
|
if err != ErrSemaphoreConflict {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Should conflict with lock
|
|
|
|
err = sema.Destroy()
|
|
|
|
if err != ErrSemaphoreConflict {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
}
|
2016-01-06 00:40:35 +00:00
|
|
|
|
|
|
|
func TestSemaphore_OneShot(t *testing.T) {
|
|
|
|
t.Parallel()
|
|
|
|
c, s := makeClient(t)
|
|
|
|
defer s.Stop()
|
|
|
|
|
|
|
|
// Set up a semaphore as a one-shot.
|
|
|
|
opts := &SemaphoreOptions{
|
|
|
|
Prefix: "test/sema/.lock",
|
|
|
|
Limit: 2,
|
|
|
|
SemaphoreTryOnce: true,
|
|
|
|
}
|
|
|
|
sema, err := c.SemaphoreOpts(opts)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Make sure the default got set.
|
|
|
|
if sema.opts.SemaphoreWaitTime != DefaultSemaphoreWaitTime {
|
|
|
|
t.Fatalf("bad: %d", sema.opts.SemaphoreWaitTime)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Now set a custom time for the test.
|
|
|
|
opts.SemaphoreWaitTime = 250 * time.Millisecond
|
|
|
|
sema, err = c.SemaphoreOpts(opts)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
if sema.opts.SemaphoreWaitTime != 250*time.Millisecond {
|
|
|
|
t.Fatalf("bad: %d", sema.opts.SemaphoreWaitTime)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Should acquire the semaphore.
|
|
|
|
ch, err := sema.Acquire(nil)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
if ch == nil {
|
|
|
|
t.Fatalf("should have acquired the semaphore")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Try with another session.
|
|
|
|
another, err := c.SemaphoreOpts(opts)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
ch, err = another.Acquire(nil)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
if ch == nil {
|
|
|
|
t.Fatalf("should have acquired the semaphore")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Try with a third one that shouldn't get it.
|
|
|
|
contender, err := c.SemaphoreOpts(opts)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
start := time.Now()
|
|
|
|
ch, err = contender.Acquire(nil)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
if ch != nil {
|
|
|
|
t.Fatalf("should not have acquired the semaphore")
|
|
|
|
}
|
|
|
|
if diff := time.Now().Sub(start); diff > 2*contender.opts.SemaphoreWaitTime {
|
|
|
|
t.Fatalf("took too long: %9.6f", diff.Seconds())
|
|
|
|
}
|
|
|
|
|
|
|
|
// Give up a slot and make sure the third one can get it.
|
|
|
|
if err := another.Release(); err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
ch, err = contender.Acquire(nil)
|
|
|
|
if err != nil {
|
|
|
|
t.Fatalf("err: %v", err)
|
|
|
|
}
|
|
|
|
if ch == nil {
|
|
|
|
t.Fatalf("should have acquired the semaphore")
|
|
|
|
}
|
|
|
|
}
|