open-consul/lib/semaphore/semaphore.go

107 lines
2.8 KiB
Go

// Package semaphore implements a simple semaphore that is based on
// golang.org/x/sync/semaphore but doesn't support weights. It's advantage over
// a simple buffered chan is that the capacity of the semaphore (i.e. the number
// of slots available) can be changed dynamically at runtime without waiting for
// all existing work to stop. This makes it easier to implement e.g. concurrency
// limits on certain operations that can be reconfigured at runtime.
package semaphore
import (
"container/list"
"context"
"sync"
)
// Dynamic implements a semaphore whose capacity can be changed dynamically at
// run time.
type Dynamic struct {
size int64
cur int64
waiters list.List
mu sync.Mutex
}
// NewDynamic returns a dynamic semaphore with the given initial capacity. Note
// that this is for convenience and to match golang.org/x/sync/semaphore however
// it's possible to use a zero-value semaphore provided SetSize is called before
// use.
func NewDynamic(n int64) *Dynamic {
return &Dynamic{
size: n,
}
}
// SetSize dynamically updates the number of available slots. If there are more
// than n slots currently acquired, no further acquires will succeed until
// sufficient have been released to take the total outstanding below n again.
func (s *Dynamic) SetSize(n int64) error {
s.mu.Lock()
defer s.mu.Unlock()
s.size = n
return nil
}
// Acquire attempts to acquire one "slot" in the semaphore, blocking only until
// ctx is Done. On success, returns nil. On failure, returns ctx.Err() and leaves
// the semaphore unchanged.
//
// If ctx is already done, Acquire may still succeed without blocking.
func (s *Dynamic) Acquire(ctx context.Context) error {
s.mu.Lock()
if s.cur < s.size {
s.cur++
s.mu.Unlock()
return nil
}
// Need to wait, add to waiter list
ready := make(chan struct{})
elem := s.waiters.PushBack(ready)
s.mu.Unlock()
select {
case <-ctx.Done():
err := ctx.Err()
s.mu.Lock()
select {
case <-ready:
// Acquired the semaphore after we were canceled. Rather than trying to
// fix up the queue, just pretend we didn't notice the cancellation.
err = nil
default:
s.waiters.Remove(elem)
}
s.mu.Unlock()
return err
case <-ready:
return nil
}
}
// Release releases the semaphore. It will panic if release is called on an
// empty semphore.
func (s *Dynamic) Release() {
s.mu.Lock()
defer s.mu.Unlock()
if s.cur < 1 {
panic("semaphore: bad release")
}
next := s.waiters.Front()
// If there are no waiters, just decrement and we're done
if next == nil {
s.cur--
return
}
// Need to yield our slot to the next waiter.
// Remove them from the list
s.waiters.Remove(next)
// And trigger it's chan before we release the lock
close(next.Value.(chan struct{}))
// Note we _don't_ decrement inflight since the slot was yielded directly.
}