Use a semaphore to block until watcher exits
This commit is contained in:
parent
a73162c977
commit
a2b696c4cf
|
@ -14,6 +14,31 @@ import (
|
|||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
// watchSema acts as a semaphore to allow one watcher at a time.
|
||||
type watchSema struct {
|
||||
ch chan int
|
||||
}
|
||||
|
||||
func newWatchSema() *watchSema {
|
||||
return &watchSema{ch: make(chan int, 1)}
|
||||
}
|
||||
|
||||
// acquire the semaphore.
|
||||
func (w *watchSema) acquire() {
|
||||
w.ch <- 1
|
||||
}
|
||||
|
||||
// release the semaphore.
|
||||
func (w *watchSema) release() {
|
||||
<-w.ch
|
||||
}
|
||||
|
||||
// wait blocks until the semaphore is free to be acquired.
|
||||
func (w *watchSema) wait() {
|
||||
w.acquire()
|
||||
w.release()
|
||||
}
|
||||
|
||||
// healthMutator is able to set/clear alloc health.
|
||||
type healthSetter interface {
|
||||
// Set health via the mutator
|
||||
|
@ -40,17 +65,17 @@ type allocHealthWatcherHook struct {
|
|||
// Update and synchronous hooks.
|
||||
hookLock sync.Mutex
|
||||
|
||||
// watchLock is held by the health watching/setting goroutine to ensure
|
||||
// only one health watching goroutine is running at a time.
|
||||
watchLock sync.Mutex
|
||||
// watchLock is acquired by the health watching/setting goroutine to
|
||||
// ensure only one health watching goroutine is running at a time.
|
||||
watchLock *watchSema
|
||||
|
||||
// ranOnce is set once Prerun or Update have run at least once. This
|
||||
// prevents Prerun from running if an Update has already been
|
||||
// processed. Must hold hookLock to access.
|
||||
ranOnce bool
|
||||
|
||||
// cancelFn stops the health watching/setting goroutine. Grab the
|
||||
// watchLock to block until it exits.
|
||||
// cancelFn stops the health watching/setting goroutine. Wait on
|
||||
// watchLock to block until the watcher exits.
|
||||
cancelFn context.CancelFunc
|
||||
|
||||
// alloc set by new func or Update. Must hold hookLock to access.
|
||||
|
@ -152,10 +177,8 @@ func (h *allocHealthWatcherHook) Update(req *interfaces.RunnerUpdateRequest) err
|
|||
// Cancel the old watcher and create a new one
|
||||
h.cancelFn()
|
||||
|
||||
// Acquire the watchLock to ensure the old watcher has exited before
|
||||
// continuing. Kind of an ugly/easy-to-reuse done chan.
|
||||
h.watchLock.Lock()
|
||||
h.watchLock.Unlock()
|
||||
// Wait until the watcher exits
|
||||
h.watchLock.wait()
|
||||
|
||||
// Deployment has changed, reset status
|
||||
if req.Alloc.DeploymentID != h.alloc.DeploymentID {
|
||||
|
@ -175,10 +198,8 @@ func (h *allocHealthWatcherHook) Destroy() error {
|
|||
h.cancelFn()
|
||||
h.listener.Close()
|
||||
|
||||
// Acquire the watchLock to ensure any existing watcher has exited
|
||||
// before exiting. Kind of an ugly/easy-to-reuse done chan.
|
||||
h.watchLock.Lock()
|
||||
h.watchLock.Unlock()
|
||||
// Wait until the watcher exits
|
||||
h.watchLock.wait()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -187,8 +208,8 @@ func (h *allocHealthWatcherHook) Destroy() error {
|
|||
// the context is canceled. watchHealth will be canceled and restarted on
|
||||
// Updates so calls are serialized with a lock.
|
||||
func (h *allocHealthWatcherHook) watchHealth(ctx context.Context, tracker *allochealth.Tracker) {
|
||||
h.watchLock.Lock()
|
||||
defer h.watchLock.Unlock()
|
||||
h.watchLock.acquire()
|
||||
defer h.watchLock.release()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
|
Loading…
Reference in a new issue