107 lines
2.8 KiB
Go
107 lines
2.8 KiB
Go
|
// Package semaphore implements a simple semaphore that is based on
|
||
|
// golang.org/x/sync/semaphore but doesn't support weights. It's advantage over
|
||
|
// a simple buffered chan is that the capacity of the semaphore (i.e. the number
|
||
|
// of slots available) can be changed dynamically at runtime without waiting for
|
||
|
// all existing work to stop. This makes it easier to implement e.g. concurrency
|
||
|
// limits on certain operations that can be reconfigured at runtime.
|
||
|
package semaphore
|
||
|
|
||
|
import (
|
||
|
"container/list"
|
||
|
"context"
|
||
|
"sync"
|
||
|
)
|
||
|
|
||
|
// Dynamic implements a semaphore whose capacity can be changed dynamically at
|
||
|
// run time.
|
||
|
type Dynamic struct {
|
||
|
size int64
|
||
|
cur int64
|
||
|
waiters list.List
|
||
|
mu sync.Mutex
|
||
|
}
|
||
|
|
||
|
// NewDynamic returns a dynamic semaphore with the given initial capacity. Note
|
||
|
// that this is for convenience and to match golang.org/x/sync/semaphore however
|
||
|
// it's possible to use a zero-value semaphore provided SetSize is called before
|
||
|
// use.
|
||
|
func NewDynamic(n int64) *Dynamic {
|
||
|
return &Dynamic{
|
||
|
size: n,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// SetSize dynamically updates the number of available slots. If there are more
|
||
|
// than n slots currently acquired, no further acquires will succeed until
|
||
|
// sufficient have been released to take the total outstanding below n again.
|
||
|
func (s *Dynamic) SetSize(n int64) error {
|
||
|
s.mu.Lock()
|
||
|
defer s.mu.Unlock()
|
||
|
s.size = n
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Acquire attempts to acquire one "slot" in the semaphore, blocking only until
|
||
|
// ctx is Done. On success, returns nil. On failure, returns ctx.Err() and leaves
|
||
|
// the semaphore unchanged.
|
||
|
//
|
||
|
// If ctx is already done, Acquire may still succeed without blocking.
|
||
|
func (s *Dynamic) Acquire(ctx context.Context) error {
|
||
|
s.mu.Lock()
|
||
|
if s.cur < s.size {
|
||
|
s.cur++
|
||
|
s.mu.Unlock()
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Need to wait, add to waiter list
|
||
|
ready := make(chan struct{})
|
||
|
elem := s.waiters.PushBack(ready)
|
||
|
s.mu.Unlock()
|
||
|
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
err := ctx.Err()
|
||
|
s.mu.Lock()
|
||
|
select {
|
||
|
case <-ready:
|
||
|
// Acquired the semaphore after we were canceled. Rather than trying to
|
||
|
// fix up the queue, just pretend we didn't notice the cancellation.
|
||
|
err = nil
|
||
|
default:
|
||
|
s.waiters.Remove(elem)
|
||
|
}
|
||
|
s.mu.Unlock()
|
||
|
return err
|
||
|
|
||
|
case <-ready:
|
||
|
return nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Release releases the semaphore. It will panic if release is called on an
|
||
|
// empty semphore.
|
||
|
func (s *Dynamic) Release() {
|
||
|
s.mu.Lock()
|
||
|
defer s.mu.Unlock()
|
||
|
|
||
|
if s.cur < 1 {
|
||
|
panic("semaphore: bad release")
|
||
|
}
|
||
|
|
||
|
next := s.waiters.Front()
|
||
|
|
||
|
// If there are no waiters, just decrement and we're done
|
||
|
if next == nil {
|
||
|
s.cur--
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Need to yield our slot to the next waiter.
|
||
|
// Remove them from the list
|
||
|
s.waiters.Remove(next)
|
||
|
// And trigger it's chan before we release the lock
|
||
|
close(next.Value.(chan struct{}))
|
||
|
// Note we _don't_ decrement inflight since the slot was yielded directly.
|
||
|
}
|