// Package semaphore implements a simple semaphore that is based on // golang.org/x/sync/semaphore but doesn't support weights. It's advantage over // a simple buffered chan is that the capacity of the semaphore (i.e. the number // of slots available) can be changed dynamically at runtime without waiting for // all existing work to stop. This makes it easier to implement e.g. concurrency // limits on certain operations that can be reconfigured at runtime. package semaphore import ( "container/list" "context" "sync" ) // Dynamic implements a semaphore whose capacity can be changed dynamically at // run time. type Dynamic struct { size int64 cur int64 waiters list.List mu sync.Mutex } // NewDynamic returns a dynamic semaphore with the given initial capacity. Note // that this is for convenience and to match golang.org/x/sync/semaphore however // it's possible to use a zero-value semaphore provided SetSize is called before // use. func NewDynamic(n int64) *Dynamic { return &Dynamic{ size: n, } } // SetSize dynamically updates the number of available slots. If there are more // than n slots currently acquired, no further acquires will succeed until // sufficient have been released to take the total outstanding below n again. func (s *Dynamic) SetSize(n int64) error { s.mu.Lock() defer s.mu.Unlock() s.size = n return nil } // Acquire attempts to acquire one "slot" in the semaphore, blocking only until // ctx is Done. On success, returns nil. On failure, returns ctx.Err() and leaves // the semaphore unchanged. // // If ctx is already done, Acquire may still succeed without blocking. func (s *Dynamic) Acquire(ctx context.Context) error { s.mu.Lock() if s.cur < s.size { s.cur++ s.mu.Unlock() return nil } // Need to wait, add to waiter list ready := make(chan struct{}) elem := s.waiters.PushBack(ready) s.mu.Unlock() select { case <-ctx.Done(): err := ctx.Err() s.mu.Lock() select { case <-ready: // Acquired the semaphore after we were canceled. Rather than trying to // fix up the queue, just pretend we didn't notice the cancellation. err = nil default: s.waiters.Remove(elem) } s.mu.Unlock() return err case <-ready: return nil } } // Release releases the semaphore. It will panic if release is called on an // empty semphore. func (s *Dynamic) Release() { s.mu.Lock() defer s.mu.Unlock() if s.cur < 1 { panic("semaphore: bad release") } next := s.waiters.Front() // If there are no waiters, just decrement and we're done if next == nil { s.cur-- return } // Need to yield our slot to the next waiter. // Remove them from the list s.waiters.Remove(next) // And trigger it's chan before we release the lock close(next.Value.(chan struct{})) // Note we _don't_ decrement inflight since the slot was yielded directly. }