34871f89be
* changes necessary to support oss licesning shims revert nomad fmt changes update test to work with enterprise changes update tests to work with new ent enforcements make check update cas test to use scheduler algorithm back out preemption changes add comments * remove unused method
145 lines
3.6 KiB
Go
145 lines
3.6 KiB
Go
package memdb
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
)
|
|
|
|
// WatchSet is a collection of watch channels.
|
|
type WatchSet map[<-chan struct{}]struct{}
|
|
|
|
// NewWatchSet constructs a new watch set.
|
|
func NewWatchSet() WatchSet {
|
|
return make(map[<-chan struct{}]struct{})
|
|
}
|
|
|
|
// Add appends a watchCh to the WatchSet if non-nil.
|
|
func (w WatchSet) Add(watchCh <-chan struct{}) {
|
|
if w == nil {
|
|
return
|
|
}
|
|
|
|
if _, ok := w[watchCh]; !ok {
|
|
w[watchCh] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// AddWithLimit appends a watchCh to the WatchSet if non-nil, and if the given
|
|
// softLimit hasn't been exceeded. Otherwise, it will watch the given alternate
|
|
// channel. It's expected that the altCh will be the same on many calls to this
|
|
// function, so you will exceed the soft limit a little bit if you hit this, but
|
|
// not by much.
|
|
//
|
|
// This is useful if you want to track individual items up to some limit, after
|
|
// which you watch a higher-level channel (usually a channel from start start of
|
|
// an iterator higher up in the radix tree) that will watch a superset of items.
|
|
func (w WatchSet) AddWithLimit(softLimit int, watchCh <-chan struct{}, altCh <-chan struct{}) {
|
|
// This is safe for a nil WatchSet so we don't need to check that here.
|
|
if len(w) < softLimit {
|
|
w.Add(watchCh)
|
|
} else {
|
|
w.Add(altCh)
|
|
}
|
|
}
|
|
|
|
// Watch is used to wait for either the watch set to trigger or a timeout.
|
|
// Returns true on timeout.
|
|
func (w WatchSet) Watch(timeoutCh <-chan time.Time) bool {
|
|
if w == nil {
|
|
return false
|
|
}
|
|
|
|
// Create a context that gets cancelled when the timeout is triggered
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
go func() {
|
|
select {
|
|
case <-timeoutCh:
|
|
cancel()
|
|
case <-ctx.Done():
|
|
}
|
|
}()
|
|
|
|
return w.WatchCtx(ctx) == context.Canceled
|
|
}
|
|
|
|
// WatchCtx is used to wait for either the watch set to trigger or for the
|
|
// context to be cancelled. Watch with a timeout channel can be mimicked by
|
|
// creating a context with a deadline. WatchCtx should be preferred over Watch.
|
|
func (w WatchSet) WatchCtx(ctx context.Context) error {
|
|
if w == nil {
|
|
return nil
|
|
}
|
|
|
|
if n := len(w); n <= aFew {
|
|
idx := 0
|
|
chunk := make([]<-chan struct{}, aFew)
|
|
for watchCh := range w {
|
|
chunk[idx] = watchCh
|
|
idx++
|
|
}
|
|
return watchFew(ctx, chunk)
|
|
}
|
|
|
|
return w.watchMany(ctx)
|
|
}
|
|
|
|
// watchMany is used if there are many watchers.
|
|
func (w WatchSet) watchMany(ctx context.Context) error {
|
|
// Set up a goroutine for each watcher.
|
|
triggerCh := make(chan struct{}, 1)
|
|
watcher := func(chunk []<-chan struct{}) {
|
|
if err := watchFew(ctx, chunk); err == nil {
|
|
select {
|
|
case triggerCh <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
// Apportion the watch channels into chunks we can feed into the
|
|
// watchFew helper.
|
|
idx := 0
|
|
chunk := make([]<-chan struct{}, aFew)
|
|
for watchCh := range w {
|
|
subIdx := idx % aFew
|
|
chunk[subIdx] = watchCh
|
|
idx++
|
|
|
|
// Fire off this chunk and start a fresh one.
|
|
if idx%aFew == 0 {
|
|
go watcher(chunk)
|
|
chunk = make([]<-chan struct{}, aFew)
|
|
}
|
|
}
|
|
|
|
// Make sure to watch any residual channels in the last chunk.
|
|
if idx%aFew != 0 {
|
|
go watcher(chunk)
|
|
}
|
|
|
|
// Wait for a channel to trigger or timeout.
|
|
select {
|
|
case <-triggerCh:
|
|
return nil
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
// WatchCh returns a channel that is used to wait for either the watch set to trigger
|
|
// or for the context to be cancelled. WatchCh creates a new goroutine each call, so
|
|
// callers may need to cache the returned channel to avoid creating extra goroutines.
|
|
func (w WatchSet) WatchCh(ctx context.Context) <-chan error {
|
|
// Create the outgoing channel
|
|
triggerCh := make(chan error, 1)
|
|
|
|
// Create a goroutine to collect the error from WatchCtx
|
|
go func() {
|
|
triggerCh <- w.WatchCtx(ctx)
|
|
}()
|
|
|
|
return triggerCh
|
|
}
|