allocrunner: Async shutdown and destroy

This commit reduces the locking required to shutdown or destroy
allocrunners, and allows parallel shutdown and destroy of allocrunners during
shutdown.
This commit is contained in:
Danielle Tomlinson 2018-12-14 16:02:47 +01:00
parent 6c51232f55
commit d1fbac1aad
2 changed files with 114 additions and 35 deletions

View File

@ -64,12 +64,32 @@ type allocRunner struct {
// to access.
destroyed bool
// destroyCh is closed when the Run() loop has exited, postrun hooks have
// run, and alloc runner has been destroyed.
destroyCh chan struct{}
// shutdown is true when the Run() loop has exited, and shutdown hooks have
// run. Must acquire destroyedLock to access.
shutdown bool
// shutdownCh is closed when the Run() loop has exited, and shutdown hooks
// have run.
shutdownCh chan struct{}
// runnersLaunched is true if TaskRunners were Run. Must acquire
// destroyedLock to access.
runnersLaunched bool
// destroyedLock guards destroyed, runnersLaunched, and serializes
// Shutdown/Destroy calls.
// destroyLaunched is true if Destroy has been called. Must acquire
// destroyedLock to access.
destroyLaunched bool
// shutdownLaunched is true if Shutdown has been called. Must acquire
// destroyedLock to access.
shutdownLaunched bool
// destroyedLock guards destroyed, runnersLaunched, destroyLaunched,
// shutdownLaunched, and serializes Shutdown/Destroy calls.
destroyedLock sync.Mutex
// Alloc captures the allocation being run.
@ -130,6 +150,8 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
vaultClient: config.Vault,
tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)),
waitCh: make(chan struct{}),
destroyCh: make(chan struct{}),
shutdownCh: make(chan struct{}),
state: &state.State{},
stateDB: config.StateDB,
stateUpdater: config.StateUpdater,
@ -615,19 +637,7 @@ func (ar *allocRunner) Listener() *cstructs.AllocListener {
return ar.allocBroadcaster.Listen()
}
// Destroy the alloc runner by synchronously stopping it if it is still running
// and cleaning up all of its resources.
//
// This method is safe for calling concurrently with Run() and will cause it to
// exit (thus closing WaitCh).
func (ar *allocRunner) Destroy() {
ar.destroyedLock.Lock()
defer ar.destroyedLock.Unlock()
if ar.destroyed {
// Only destroy once
return
}
func (ar *allocRunner) destroyImpl() {
// Stop any running tasks and persist states in case the client is
// shutdown before Destroy finishes.
states := ar.killTasks()
@ -652,7 +662,52 @@ func (ar *allocRunner) Destroy() {
}
// Mark alloc as destroyed
ar.destroyedLock.Lock()
if !ar.shutdown {
ar.shutdown = true
close(ar.shutdownCh)
}
ar.destroyed = true
close(ar.destroyCh)
ar.destroyedLock.Unlock()
}
// Destroy the alloc runner by stopping it if it is still running and cleaning
// up all of its resources.
//
// This method is safe for calling concurrently with Run() and will cause it to
// exit (thus closing WaitCh).
func (ar *allocRunner) Destroy() {
ar.destroyedLock.Lock()
defer ar.destroyedLock.Unlock()
if ar.destroyed {
// Only destroy once
return
}
if ar.destroyLaunched {
// Only dispatch a destroy once
return
}
ar.destroyLaunched = true
// Synchronize calls to shutdown/destroy
if ar.shutdownLaunched {
go func() {
ar.logger.Debug("Waiting for shutdown before destroying runner")
<-ar.shutdownCh
ar.destroyImpl()
}()
return
}
go ar.destroyImpl()
}
// IsDestroyed returns true if the alloc runner has been destroyed (stopped and
@ -675,6 +730,14 @@ func (ar *allocRunner) IsWaiting() bool {
return ar.prevAllocWatcher.IsWaiting()
}
func (ar *allocRunner) DestroyCh() <-chan struct{} {
return ar.destroyCh
}
func (ar *allocRunner) ShutdownCh() <-chan struct{} {
return ar.shutdownCh
}
// Shutdown AllocRunner gracefully. Blocks while shutting down all TaskRunners.
// Tasks are unaffected and may be restored.
func (ar *allocRunner) Shutdown() {
@ -687,29 +750,45 @@ func (ar *allocRunner) Shutdown() {
return
}
ar.logger.Trace("shutting down")
// Shutdown tasks gracefully if they were run
if ar.runnersLaunched {
wg := sync.WaitGroup{}
for _, tr := range ar.tasks {
wg.Add(1)
go func(tr *taskrunner.TaskRunner) {
tr.Shutdown()
wg.Done()
}(tr)
}
wg.Wait()
// Destroy is a superset of Shutdown so if it's been marked for destruction,
// don't try and shutdown in parallel. If shutdown has been launched, don't
// try again.
if ar.destroyLaunched || ar.shutdownLaunched {
return
}
// Wait for Run to exit
<-ar.waitCh
ar.shutdownLaunched = true
// Run shutdown hooks
ar.shutdownHooks()
go func() {
ar.logger.Trace("shutting down")
// Wait for updater to finish its final run
<-ar.taskStateUpdateHandlerCh
// Shutdown tasks gracefully if they were run
if ar.runnersLaunched {
wg := sync.WaitGroup{}
for _, tr := range ar.tasks {
wg.Add(1)
go func(tr *taskrunner.TaskRunner) {
tr.Shutdown()
wg.Done()
}(tr)
}
wg.Wait()
}
// Wait for Run to exit
<-ar.waitCh
// Run shutdown hooks
ar.shutdownHooks()
// Wait for updater to finish its final run
<-ar.taskStateUpdateHandlerCh
ar.destroyedLock.Lock()
ar.shutdown = true
close(ar.shutdownCh)
ar.destroyedLock.Unlock()
}()
}
// IsMigrating returns true if the alloc runner is migrating data from its

View File

@ -279,7 +279,7 @@ func TestAllocRunner_TaskLeader_StopRestoredTG(t *testing.T) {
ar2.Destroy()
select {
case <-ar2.WaitCh():
case <-ar2.DestroyCh():
// exited as expected
case <-time.After(10 * time.Second):
t.Fatalf("timed out waiting for AR to GC")