From d1fbac1aad95aeebcbdb3f5b3033eda98de83042 Mon Sep 17 00:00:00 2001 From: Danielle Tomlinson Date: Fri, 14 Dec 2018 16:02:47 +0100 Subject: [PATCH] allocrunner: Async shutdown and destroy This commit reduces the locking required to shutdown or destroy allocrunners, and allows parallel shutdown and destroy of allocrunners during shutdown. --- client/allocrunner/alloc_runner.go | 147 ++++++++++++++++++------ client/allocrunner/alloc_runner_test.go | 2 +- 2 files changed, 114 insertions(+), 35 deletions(-) diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 3a5ab0f6d..9d852beaa 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -64,12 +64,32 @@ type allocRunner struct { // to access. destroyed bool + // destroyCh is closed when the Run() loop has exited, postrun hooks have + // run, and alloc runner has been destroyed. + destroyCh chan struct{} + + // shutdown is true when the Run() loop has exited, and shutdown hooks have + // run. Must acquire destroyedLock to access. + shutdown bool + + // shutdownCh is closed when the Run() loop has exited, and shutdown hooks + // have run. + shutdownCh chan struct{} + // runnersLaunched is true if TaskRunners were Run. Must acquire // destroyedLock to access. runnersLaunched bool - // destroyedLock guards destroyed, runnersLaunched, and serializes - // Shutdown/Destroy calls. + // destroyLaunched is true if Destroy has been called. Must acquire + // destroyedLock to access. + destroyLaunched bool + + // shutdownLaunched is true if Shutdown has been called. Must acquire + // destroyedLock to access. + shutdownLaunched bool + + // destroyedLock guards destroyed, runnersLaunched, destroyLaunched, + // shutdownLaunched, and serializes Shutdown/Destroy calls. destroyedLock sync.Mutex // Alloc captures the allocation being run. @@ -130,6 +150,8 @@ func NewAllocRunner(config *Config) (*allocRunner, error) { vaultClient: config.Vault, tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)), waitCh: make(chan struct{}), + destroyCh: make(chan struct{}), + shutdownCh: make(chan struct{}), state: &state.State{}, stateDB: config.StateDB, stateUpdater: config.StateUpdater, @@ -615,19 +637,7 @@ func (ar *allocRunner) Listener() *cstructs.AllocListener { return ar.allocBroadcaster.Listen() } -// Destroy the alloc runner by synchronously stopping it if it is still running -// and cleaning up all of its resources. -// -// This method is safe for calling concurrently with Run() and will cause it to -// exit (thus closing WaitCh). -func (ar *allocRunner) Destroy() { - ar.destroyedLock.Lock() - defer ar.destroyedLock.Unlock() - if ar.destroyed { - // Only destroy once - return - } - +func (ar *allocRunner) destroyImpl() { // Stop any running tasks and persist states in case the client is // shutdown before Destroy finishes. states := ar.killTasks() @@ -652,7 +662,52 @@ func (ar *allocRunner) Destroy() { } // Mark alloc as destroyed + ar.destroyedLock.Lock() + + if !ar.shutdown { + ar.shutdown = true + close(ar.shutdownCh) + } + ar.destroyed = true + close(ar.destroyCh) + + ar.destroyedLock.Unlock() +} + +// Destroy the alloc runner by stopping it if it is still running and cleaning +// up all of its resources. +// +// This method is safe for calling concurrently with Run() and will cause it to +// exit (thus closing WaitCh). +func (ar *allocRunner) Destroy() { + ar.destroyedLock.Lock() + defer ar.destroyedLock.Unlock() + + if ar.destroyed { + // Only destroy once + return + } + + if ar.destroyLaunched { + // Only dispatch a destroy once + return + } + + ar.destroyLaunched = true + + // Synchronize calls to shutdown/destroy + if ar.shutdownLaunched { + go func() { + ar.logger.Debug("Waiting for shutdown before destroying runner") + <-ar.shutdownCh + ar.destroyImpl() + }() + + return + } + + go ar.destroyImpl() } // IsDestroyed returns true if the alloc runner has been destroyed (stopped and @@ -675,6 +730,14 @@ func (ar *allocRunner) IsWaiting() bool { return ar.prevAllocWatcher.IsWaiting() } +func (ar *allocRunner) DestroyCh() <-chan struct{} { + return ar.destroyCh +} + +func (ar *allocRunner) ShutdownCh() <-chan struct{} { + return ar.shutdownCh +} + // Shutdown AllocRunner gracefully. Blocks while shutting down all TaskRunners. // Tasks are unaffected and may be restored. func (ar *allocRunner) Shutdown() { @@ -687,29 +750,45 @@ func (ar *allocRunner) Shutdown() { return } - ar.logger.Trace("shutting down") - - // Shutdown tasks gracefully if they were run - if ar.runnersLaunched { - wg := sync.WaitGroup{} - for _, tr := range ar.tasks { - wg.Add(1) - go func(tr *taskrunner.TaskRunner) { - tr.Shutdown() - wg.Done() - }(tr) - } - wg.Wait() + // Destroy is a superset of Shutdown so if it's been marked for destruction, + // don't try and shutdown in parallel. If shutdown has been launched, don't + // try again. + if ar.destroyLaunched || ar.shutdownLaunched { + return } - // Wait for Run to exit - <-ar.waitCh + ar.shutdownLaunched = true - // Run shutdown hooks - ar.shutdownHooks() + go func() { + ar.logger.Trace("shutting down") - // Wait for updater to finish its final run - <-ar.taskStateUpdateHandlerCh + // Shutdown tasks gracefully if they were run + if ar.runnersLaunched { + wg := sync.WaitGroup{} + for _, tr := range ar.tasks { + wg.Add(1) + go func(tr *taskrunner.TaskRunner) { + tr.Shutdown() + wg.Done() + }(tr) + } + wg.Wait() + } + + // Wait for Run to exit + <-ar.waitCh + + // Run shutdown hooks + ar.shutdownHooks() + + // Wait for updater to finish its final run + <-ar.taskStateUpdateHandlerCh + + ar.destroyedLock.Lock() + ar.shutdown = true + close(ar.shutdownCh) + ar.destroyedLock.Unlock() + }() } // IsMigrating returns true if the alloc runner is migrating data from its diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index 3c1464560..55fa0ea17 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -279,7 +279,7 @@ func TestAllocRunner_TaskLeader_StopRestoredTG(t *testing.T) { ar2.Destroy() select { - case <-ar2.WaitCh(): + case <-ar2.DestroyCh(): // exited as expected case <-time.After(10 * time.Second): t.Fatalf("timed out waiting for AR to GC")