package api import ( "log" "sync" "testing" "time" ) func TestSemaphore_AcquireRelease(t *testing.T) { t.Parallel() c, s := makeClient(t) defer s.Stop() sema, err := c.SemaphorePrefix("test/semaphore", 2) if err != nil { t.Fatalf("err: %v", err) } // Initial release should fail err = sema.Release() if err != ErrSemaphoreNotHeld { t.Fatalf("err: %v", err) } // Should work lockCh, err := sema.Acquire(nil) if err != nil { t.Fatalf("err: %v", err) } if lockCh == nil { t.Fatalf("not hold") } // Double lock should fail _, err = sema.Acquire(nil) if err != ErrSemaphoreHeld { t.Fatalf("err: %v", err) } // Should be held select { case <-lockCh: t.Fatalf("should be held") default: } // Initial release should work err = sema.Release() if err != nil { t.Fatalf("err: %v", err) } // Double unlock should fail err = sema.Release() if err != ErrSemaphoreNotHeld { t.Fatalf("err: %v", err) } // Should lose resource select { case <-lockCh: case <-time.After(time.Second): t.Fatalf("should not be held") } } func TestSemaphore_ForceInvalidate(t *testing.T) { t.Parallel() c, s := makeClient(t) defer s.Stop() sema, err := c.SemaphorePrefix("test/semaphore", 2) if err != nil { t.Fatalf("err: %v", err) } // Should work lockCh, err := sema.Acquire(nil) if err != nil { t.Fatalf("err: %v", err) } if lockCh == nil { t.Fatalf("not acquired") } defer sema.Release() go func() { // Nuke the session, simulator an operator invalidation // or a health check failure session := c.Session() session.Destroy(sema.lockSession, nil) }() // Should loose slot select { case <-lockCh: case <-time.After(time.Second): t.Fatalf("should not be locked") } } func TestSemaphore_DeleteKey(t *testing.T) { t.Parallel() c, s := makeClient(t) defer s.Stop() sema, err := c.SemaphorePrefix("test/semaphore", 2) if err != nil { t.Fatalf("err: %v", err) } // Should work lockCh, err := sema.Acquire(nil) if err != nil { t.Fatalf("err: %v", err) } if lockCh == nil { t.Fatalf("not locked") } defer sema.Release() go func() { // Nuke the key, simulate an operator intervention kv := c.KV() kv.DeleteTree("test/semaphore", nil) }() // Should loose leadership select { case <-lockCh: case <-time.After(time.Second): t.Fatalf("should not be locked") } } func TestSemaphore_Contend(t *testing.T) { t.Parallel() c, s := makeClient(t) defer s.Stop() wg := &sync.WaitGroup{} acquired := make([]bool, 4) for idx := range acquired { wg.Add(1) go func(idx int) { defer wg.Done() sema, err := c.SemaphorePrefix("test/semaphore", 2) if err != nil { t.Fatalf("err: %v", err) } // Should work eventually, will contend lockCh, err := sema.Acquire(nil) if err != nil { t.Fatalf("err: %v", err) } if lockCh == nil { t.Fatalf("not locked") } defer sema.Release() log.Printf("Contender %d acquired", idx) // Set acquired and then leave acquired[idx] = true }(idx) } // Wait for termination doneCh := make(chan struct{}) go func() { wg.Wait() close(doneCh) }() // Wait for everybody to get a turn select { case <-doneCh: case <-time.After(3 * DefaultLockRetryTime): t.Fatalf("timeout") } for idx, did := range acquired { if !did { t.Fatalf("contender %d never acquired", idx) } } } func TestSemaphore_BadLimit(t *testing.T) { t.Parallel() c, s := makeClient(t) defer s.Stop() sema, err := c.SemaphorePrefix("test/semaphore", 0) if err == nil { t.Fatalf("should error") } sema, err = c.SemaphorePrefix("test/semaphore", 1) if err != nil { t.Fatalf("err: %v", err) } _, err = sema.Acquire(nil) if err != nil { t.Fatalf("err: %v", err) } sema2, err := c.SemaphorePrefix("test/semaphore", 2) if err != nil { t.Fatalf("err: %v", err) } _, err = sema2.Acquire(nil) if err.Error() != "semaphore limit conflict (lock: 1, local: 2)" { t.Fatalf("err: %v", err) } } func TestSemaphore_Destroy(t *testing.T) { t.Parallel() c, s := makeClient(t) defer s.Stop() sema, err := c.SemaphorePrefix("test/semaphore", 2) if err != nil { t.Fatalf("err: %v", err) } sema2, err := c.SemaphorePrefix("test/semaphore", 2) if err != nil { t.Fatalf("err: %v", err) } _, err = sema.Acquire(nil) if err != nil { t.Fatalf("err: %v", err) } _, err = sema2.Acquire(nil) if err != nil { t.Fatalf("err: %v", err) } // Destroy should fail, still held if err := sema.Destroy(); err != ErrSemaphoreHeld { t.Fatalf("err: %v", err) } err = sema.Release() if err != nil { t.Fatalf("err: %v", err) } // Destroy should fail, still in use if err := sema.Destroy(); err != ErrSemaphoreInUse { t.Fatalf("err: %v", err) } err = sema2.Release() if err != nil { t.Fatalf("err: %v", err) } // Destroy should work if err := sema.Destroy(); err != nil { t.Fatalf("err: %v", err) } // Destroy should work if err := sema2.Destroy(); err != nil { t.Fatalf("err: %v", err) } } func TestSemaphore_Conflict(t *testing.T) { t.Parallel() c, s := makeClient(t) defer s.Stop() lock, err := c.LockKey("test/sema/.lock") if err != nil { t.Fatalf("err: %v", err) } // Should work leaderCh, err := lock.Lock(nil) if err != nil { t.Fatalf("err: %v", err) } if leaderCh == nil { t.Fatalf("not leader") } defer lock.Unlock() sema, err := c.SemaphorePrefix("test/sema/", 2) if err != nil { t.Fatalf("err: %v", err) } // Should conflict with lock _, err = sema.Acquire(nil) if err != ErrSemaphoreConflict { t.Fatalf("err: %v", err) } // Should conflict with lock err = sema.Destroy() if err != ErrSemaphoreConflict { t.Fatalf("err: %v", err) } }