From e07f73bfe0d1de71558870f68397410e9465a538 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 10 May 2019 08:51:06 -0700 Subject: [PATCH] client: do not restart dead tasks until server is contacted (try 2) Refactoring of 104067bc2b2002a4e45ae7b667a476b89addc162 Switch the MarkLive method for a chan that is closed by the client. Thanks to @notnoop for the idea! The old approach called a method on most existing ARs and TRs on every runAllocs call. The new approach does a once.Do call in runAllocs to accomplish the same thing with less work. Able to remove the gate abstraction that did much more than was needed. --- client/allocrunner/alloc_runner.go | 16 +-- client/allocrunner/config.go | 4 + client/allocrunner/taskrunner/task_runner.go | 70 +++++----- .../taskrunner/task_runner_test.go | 48 +++---- client/allocrunner/testing.go | 23 ++-- client/client.go | 38 ++---- helper/gate/gate.go | 87 ------------ helper/gate/gate_test.go | 126 ------------------ 8 files changed, 97 insertions(+), 315 deletions(-) delete mode 100644 helper/gate/gate.go delete mode 100644 helper/gate/gate_test.go diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index a8612bb44..e9efa120f 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -136,6 +136,11 @@ type allocRunner struct { // driverManager is responsible for dispensing driver plugins and registering // event handlers driverManager drivermanager.Manager + + // serversContactedCh is passed to TaskRunners so they can detect when + // servers have been contacted for the first time in case of a failed + // restore. + serversContactedCh chan struct{} } // NewAllocRunner returns a new allocation runner. @@ -167,6 +172,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) { prevAllocMigrator: config.PrevAllocMigrator, devicemanager: config.DeviceManager, driverManager: config.DriverManager, + serversContactedCh: config.ServersContactedCh, } // Create the logger based on the allocation ID @@ -205,6 +211,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error { DeviceStatsReporter: ar.deviceStatsReporter, DeviceManager: ar.devicemanager, DriverManager: ar.driverManager, + ServersContactedCh: ar.serversContactedCh, } // Create, but do not Run, the task runner @@ -721,15 +728,6 @@ func (ar *allocRunner) handleAllocUpdate(update *structs.Allocation) { } -// MarkLive unblocks restored tasks that failed to reattach and are waiting to -// contact a server before restarting the dead task. The Client will call this -// method when the task should run, otherwise the task will be killed. -func (ar *allocRunner) MarkLive() { - for _, tr := range ar.tasks { - tr.MarkLive() - } -} - func (ar *allocRunner) Listener() *cstructs.AllocListener { return ar.allocBroadcaster.Listen() } diff --git a/client/allocrunner/config.go b/client/allocrunner/config.go index 16f3e27db..42cea978e 100644 --- a/client/allocrunner/config.go +++ b/client/allocrunner/config.go @@ -51,4 +51,8 @@ type Config struct { // DriverManager handles dispensing of driver plugins DriverManager drivermanager.Manager + + // ServersContactedCh is closed when the first GetClientAllocs call to + // servers succeeds and allocs are synced. + ServersContactedCh chan struct{} } diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index c8e2ccf08..e88487431 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -25,7 +25,6 @@ import ( cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/client/vaultclient" - "github.com/hashicorp/nomad/helper/gate" "github.com/hashicorp/nomad/helper/pluginutils/hclspecutils" "github.com/hashicorp/nomad/helper/pluginutils/hclutils" "github.com/hashicorp/nomad/helper/uuid" @@ -195,9 +194,15 @@ type TaskRunner struct { // Defaults to defaultMaxEvents but overrideable for testing. maxEvents int - // restoreGate is used to block restored tasks that failed to reattach - // from restarting until servers are contacted. #1795 - restoreGate *gate.G + // serversContactedCh is passed to TaskRunners so they can detect when + // servers have been contacted for the first time in case of a failed + // restore. + serversContactedCh <-chan struct{} + + // waitOnServers defaults to false but will be set true if a restore + // fails and the Run method should wait until serversContactedCh is + // closed. + waitOnServers bool } type Config struct { @@ -227,6 +232,10 @@ type Config struct { // DriverManager is used to dispense driver plugins and register event // handlers DriverManager drivermanager.Manager + + // ServersContactedCh is closed when the first GetClientAllocs call to + // servers succeeds and allocs are synced. + ServersContactedCh chan struct{} } func NewTaskRunner(config *Config) (*TaskRunner, error) { @@ -250,12 +259,6 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { tstate = ts.Copy() } - // Initialize restoreGate as open. It will only be closed if Restore is - // called and fails to reconnect to the task handle. In that case the - // we must wait until contact with the server is made before restarting - // or killing the task. #1795 - restoreGate := gate.NewOpen() - tr := &TaskRunner{ alloc: config.Alloc, allocID: config.Alloc.ID, @@ -281,7 +284,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { devicemanager: config.DeviceManager, driverManager: config.DriverManager, maxEvents: defaultMaxEvents, - restoreGate: restoreGate, + serversContactedCh: config.ServersContactedCh, } // Create the logger based on the allocation ID @@ -393,14 +396,15 @@ func (tr *TaskRunner) Run() { // - should be handled serially. go tr.handleUpdates() - // If restore failed, don't proceed until servers are contacted - if tr.restoreGate.IsClosed() { + // If restore failed wait until servers are contacted before running. + // #1795 + if tr.waitOnServers { tr.logger.Info("task failed to restore; waiting to contact server before restarting") select { case <-tr.killCtx.Done(): case <-tr.shutdownCtx.Done(): return - case <-tr.restoreGate.Wait(): + case <-tr.serversContactedCh: tr.logger.Trace("server contacted; unblocking waiting task") } } @@ -883,18 +887,27 @@ func (tr *TaskRunner) Restore() error { //TODO if RecoverTask returned the DriverNetwork we wouldn't // have to persist it at all! restored := tr.restoreHandle(taskHandle, tr.localState.DriverNetwork) - if !restored && !tr.Alloc().TerminalStatus() { - // Restore failed, close the restore gate to block - // until server is contacted to prevent restarting - // terminal allocs. #1795 - tr.logger.Trace("failed to reattach to task; will not run until server is contacted") - tr.restoreGate.Close() - ev := structs.NewTaskEvent(structs.TaskRestoreFailed). - SetDisplayMessage("failed to restore task; will not run until server is contacted") - tr.UpdateState(structs.TaskStatePending, ev) + // If the handle could not be restored, the alloc is + // non-terminal, and the task isn't a system job: wait until + // servers have been contacted before running. #1795 + if restored { + return nil } + + alloc := tr.Alloc() + if alloc.TerminalStatus() || alloc.Job.Type == structs.JobTypeSystem { + return nil + } + + tr.logger.Trace("failed to reattach to task; will not run until server is contacted") + tr.waitOnServers = true + + ev := structs.NewTaskEvent(structs.TaskRestoreFailed). + SetDisplayMessage("failed to restore task; will not run until server is contacted") + tr.UpdateState(structs.TaskStatePending, ev) } + return nil } @@ -1108,10 +1121,6 @@ func (tr *TaskRunner) Update(update *structs.Allocation) { // Trigger update hooks if not terminal if !update.TerminalStatus() { tr.triggerUpdateHooks() - - // MarkLive in case task had failed to restore and were waiting - // to hear from the server. - tr.MarkLive() } } @@ -1128,13 +1137,6 @@ func (tr *TaskRunner) triggerUpdateHooks() { } } -// MarkLive unblocks restored tasks that failed to reattach and are waiting to -// contact a server before restarting the dead task. The Client will call this -// method when the task should run, otherwise the task will be killed. -func (tr *TaskRunner) MarkLive() { - tr.restoreGate.Open() -} - // Shutdown TaskRunner gracefully without affecting the state of the task. // Shutdown blocks until the main Run loop exits. func (tr *TaskRunner) Shutdown() { diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index b8a1ad69a..f1589a1d1 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -95,17 +95,18 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri } conf := &Config{ - Alloc: alloc, - ClientConfig: clientConf, - Consul: consulapi.NewMockConsulServiceClient(t, logger), - Task: thisTask, - TaskDir: taskDir, - Logger: clientConf.Logger, - Vault: vaultclient.NewMockVaultClient(), - StateDB: cstate.NoopDB{}, - StateUpdater: NewMockTaskStateUpdater(), - DeviceManager: devicemanager.NoopMockManager(), - DriverManager: drivermanager.TestDriverManager(t), + Alloc: alloc, + ClientConfig: clientConf, + Consul: consulapi.NewMockConsulServiceClient(t, logger), + Task: thisTask, + TaskDir: taskDir, + Logger: clientConf.Logger, + Vault: vaultclient.NewMockVaultClient(), + StateDB: cstate.NoopDB{}, + StateUpdater: NewMockTaskStateUpdater(), + DeviceManager: devicemanager.NoopMockManager(), + DriverManager: drivermanager.TestDriverManager(t), + ServersContactedCh: make(chan struct{}), } return conf, trCleanup } @@ -184,7 +185,7 @@ func TestTaskRunner_Restore_Running(t *testing.T) { // kills the task before restarting a new TaskRunner. The new TaskRunner is // returned once it is running and waiting in pending along with a cleanup // func. -func setupRestoreFailureTest(t *testing.T) (*TaskRunner, func()) { +func setupRestoreFailureTest(t *testing.T) (*TaskRunner, *Config, func()) { t.Parallel() alloc := mock.Alloc() @@ -234,10 +235,10 @@ func setupRestoreFailureTest(t *testing.T) (*TaskRunner, func()) { // Create a new TaskRunner and Restore the task newTR, err := NewTaskRunner(conf) require.NoError(t, err) - require.NoError(t, newTR.Restore()) - // Assert the restore gate is *closed* because reattachment failed - require.True(t, newTR.restoreGate.IsClosed()) + // Assert the TR will wait on servers because reattachment failed + require.NoError(t, newTR.Restore()) + require.True(t, newTR.waitOnServers) // Start new TR go newTR.Run() @@ -253,17 +254,17 @@ func setupRestoreFailureTest(t *testing.T) (*TaskRunner, func()) { ts := newTR.TaskState() require.Equal(t, structs.TaskStatePending, ts.State) - return newTR, cleanup3 + return newTR, conf, cleanup3 } // TestTaskRunner_Restore_Restart asserts restoring a dead task blocks until // MarkAlive is called. #1795 func TestTaskRunner_Restore_Restart(t *testing.T) { - newTR, cleanup := setupRestoreFailureTest(t) + newTR, conf, cleanup := setupRestoreFailureTest(t) defer cleanup() - // Fake contacting the server by opening the restore gate - newTR.MarkLive() + // Fake contacting the server by closing the chan + close(conf.ServersContactedCh) testutil.WaitForResult(func() (bool, error) { ts := newTR.TaskState().State @@ -276,10 +277,10 @@ func TestTaskRunner_Restore_Restart(t *testing.T) { // TestTaskRunner_Restore_Kill asserts restoring a dead task blocks until // the task is killed. #1795 func TestTaskRunner_Restore_Kill(t *testing.T) { - newTR, cleanup := setupRestoreFailureTest(t) + newTR, _, cleanup := setupRestoreFailureTest(t) defer cleanup() - // Sending the task a terminal update shouldn't kill it or mark it live + // Sending the task a terminal update shouldn't kill it or unblock it alloc := newTR.Alloc().Copy() alloc.DesiredStatus = structs.AllocDesiredStatusStop newTR.Update(alloc) @@ -301,12 +302,13 @@ func TestTaskRunner_Restore_Kill(t *testing.T) { // TestTaskRunner_Restore_Update asserts restoring a dead task blocks until // Update is called. #1795 func TestTaskRunner_Restore_Update(t *testing.T) { - newTR, cleanup := setupRestoreFailureTest(t) + newTR, conf, cleanup := setupRestoreFailureTest(t) defer cleanup() - // Fake contacting the server by opening the restore gate + // Fake Client.runAllocs behavior by calling Update then closing chan alloc := newTR.Alloc().Copy() newTR.Update(alloc) + close(conf.ServersContactedCh) testutil.WaitForResult(func() (bool, error) { ts := newTR.TaskState().State diff --git a/client/allocrunner/testing.go b/client/allocrunner/testing.go index 5933019ea..75806644b 100644 --- a/client/allocrunner/testing.go +++ b/client/allocrunner/testing.go @@ -55,17 +55,18 @@ func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*Config, fu clientConf, cleanup := clientconfig.TestClientConfig(t) conf := &Config{ // Copy the alloc in case the caller edits and reuses it - Alloc: alloc.Copy(), - Logger: clientConf.Logger, - ClientConfig: clientConf, - StateDB: state.NoopDB{}, - Consul: consul.NewMockConsulServiceClient(t, clientConf.Logger), - Vault: vaultclient.NewMockVaultClient(), - StateUpdater: &MockStateUpdater{}, - PrevAllocWatcher: allocwatcher.NoopPrevAlloc{}, - PrevAllocMigrator: allocwatcher.NoopPrevAlloc{}, - DeviceManager: devicemanager.NoopMockManager(), - DriverManager: drivermanager.TestDriverManager(t), + Alloc: alloc.Copy(), + Logger: clientConf.Logger, + ClientConfig: clientConf, + StateDB: state.NoopDB{}, + Consul: consul.NewMockConsulServiceClient(t, clientConf.Logger), + Vault: vaultclient.NewMockVaultClient(), + StateUpdater: &MockStateUpdater{}, + PrevAllocWatcher: allocwatcher.NoopPrevAlloc{}, + PrevAllocMigrator: allocwatcher.NoopPrevAlloc{}, + DeviceManager: devicemanager.NoopMockManager(), + DriverManager: drivermanager.TestDriverManager(t), + ServersContactedCh: make(chan struct{}), } return conf, cleanup } diff --git a/client/client.go b/client/client.go index 030906172..51edb28ff 100644 --- a/client/client.go +++ b/client/client.go @@ -126,7 +126,6 @@ type AllocRunner interface { Run() StatsReporter() interfaces.AllocStatsReporter Update(*structs.Allocation) - MarkLive() WaitCh() <-chan struct{} DestroyCh() <-chan struct{} ShutdownCh() <-chan struct{} @@ -260,6 +259,11 @@ type Client struct { // fpInitialized chan is closed when the first batch of fingerprints are // applied to the node and the server is updated fpInitialized chan struct{} + + // serversContactedCh is closed when GetClientAllocs and runAllocs have + // successfully run once. + serversContactedCh chan struct{} + serversContactedOnce sync.Once } var ( @@ -310,6 +314,8 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic triggerEmitNodeEvent: make(chan *structs.NodeEvent, 8), fpInitialized: make(chan struct{}), invalidAllocs: make(map[string]struct{}), + serversContactedCh: make(chan struct{}), + serversContactedOnce: sync.Once{}, } c.batchNodeUpdates = newBatchNodeUpdates( @@ -2000,12 +2006,6 @@ func (c *Client) runAllocs(update *allocUpdates) { errs := 0 - // Mark existing allocations as live in case they failed to reattach on - // restore and are waiting to hear from the server before restarting. - for _, live := range diff.ignore { - c.markAllocLive(live) - } - // Remove the old allocations for _, remove := range diff.removed { c.removeAlloc(remove) @@ -2037,6 +2037,12 @@ func (c *Client) runAllocs(update *allocUpdates) { } } + // Mark servers as having been contacted so blocked tasks that failed + // to restore can now restart. + c.serversContactedOnce.Do(func() { + close(c.serversContactedCh) + }) + // Trigger the GC once more now that new allocs are started that could // have caused thresholds to be exceeded c.garbageCollector.Trigger() @@ -2089,24 +2095,6 @@ func makeFailedAlloc(add *structs.Allocation, err error) *structs.Allocation { return stripped } -// markAllocLive is invoked when an alloc should be running but has not been -// updated or just been added. This allows unblocking tasks that failed to -// reattach on restored and are waiting to hear from the server. -func (c *Client) markAllocLive(allocID string) { - c.allocLock.Lock() - defer c.allocLock.Unlock() - - ar, ok := c.allocs[allocID] - if !ok { - // This should never happen as alloc diffing should cause - // unknown allocs to be added, not marked live. - c.logger.Warn("unknown alloc should be running but is not", "alloc_id", allocID) - return - } - - ar.MarkLive() -} - // removeAlloc is invoked when we should remove an allocation because it has // been removed by the server. func (c *Client) removeAlloc(allocID string) { diff --git a/helper/gate/gate.go b/helper/gate/gate.go deleted file mode 100644 index 092eec92a..000000000 --- a/helper/gate/gate.go +++ /dev/null @@ -1,87 +0,0 @@ -// Package gate implements a simple on/off latch or gate: it blocks waiters -// until opened. Waiters may receive on a chan which is closed when the gate is -// open. -package gate - -import "sync" - -// closedCh is a chan initialized as closed -var closedCh chan struct{} - -func init() { - closedCh = make(chan struct{}) - close(closedCh) -} - -// G is a gate which blocks waiters until opened and is safe for concurrent -// use. Must be created via New. -type G struct { - // open is true if the gate is open and ch is closed. - open bool - - // ch is closed if the gate is open. - ch chan struct{} - - mu sync.Mutex -} - -// NewClosed returns a closed gate. The chan returned by Wait will block until Open -// is called. -func NewClosed() *G { - return &G{ - ch: make(chan struct{}), - } -} - -// NewOpen returns an open gate. The chan returned by Wait is closed and -// therefore will never block. -func NewOpen() *G { - return &G{ - open: true, - ch: closedCh, - } -} - -// Open the gate. Unblocks any Waiters. Opening an opened gate is a noop. Safe -// for concurrent ues with Close and Wait. -func (g *G) Open() { - g.mu.Lock() - defer g.mu.Unlock() - - if g.open { - return - } - - g.open = true - close(g.ch) -} - -// Close the gate. Blocks subsequent Wait callers. Closing a closed gate is a -// noop. Safe for concurrent use with Open and Wait. -func (g *G) Close() { - g.mu.Lock() - defer g.mu.Unlock() - - if !g.open { - return - } - - g.open = false - g.ch = make(chan struct{}) -} - -// Wait returns a chan that blocks until the gate is open. Safe for concurrent -// use with Open and Close, but the chan should not be reused between calls to -// Open and Close. -func (g *G) Wait() <-chan struct{} { - g.mu.Lock() - defer g.mu.Unlock() - return g.ch -} - -// IsClosed returns true if the gate is closed. -func (g *G) IsClosed() bool { - g.mu.Lock() - defer g.mu.Unlock() - return !g.open -} diff --git a/helper/gate/gate_test.go b/helper/gate/gate_test.go deleted file mode 100644 index cceb9673c..000000000 --- a/helper/gate/gate_test.go +++ /dev/null @@ -1,126 +0,0 @@ -package gate - -import ( - "math/rand" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestGate_NewClosed(t *testing.T) { - t.Parallel() - - g := NewClosed() - - assertClosed := func() { - require.True(t, g.IsClosed()) - select { - case <-g.Wait(): - require.Fail(t, "expected gate to be closed") - default: - // Ok! - } - } - - assertClosed() - g.Close() - assertClosed() - - // Close should be safe to call multiple times - g.Close() - assertClosed() - - g.Open() - require.False(t, g.IsClosed()) - select { - case <-g.Wait(): - // Ok! - default: - require.Fail(t, "expected gate to be open") - } -} - -func TestGate_NewOpen(t *testing.T) { - t.Parallel() - - g := NewOpen() - - assertOpen := func() { - require.False(t, g.IsClosed()) - select { - case <-g.Wait(): - // Ok! - default: - require.Fail(t, "expected gate to be open") - } - } - - assertOpen() - g.Open() - assertOpen() - - // Open should be safe to call multiple times - g.Open() - assertOpen() - - g.Close() - select { - case <-g.Wait(): - require.Fail(t, "expected gate to be closed") - default: - // Ok! - } -} - -// TestGate_Concurrency is meant to be run with the race detector enabled to -// find any races. -func TestGate_Concurrency(t *testing.T) { - t.Parallel() - - g := NewOpen() - wg := sync.WaitGroup{} - - // Start closer - wg.Add(1) - go func() { - defer wg.Done() - dice := rand.New(rand.NewSource(time.Now().UnixNano())) - for i := 0; i < 1000; i++ { - g.Close() - time.Sleep(time.Duration(dice.Int63n(100))) - } - }() - - // Start opener - wg.Add(1) - go func() { - defer wg.Done() - dice := rand.New(rand.NewSource(time.Now().UnixNano())) - for i := 0; i < 1000; i++ { - g.Open() - time.Sleep(time.Duration(dice.Int63n(100))) - } - }() - - // Perform reads concurrently with writes - wgCh := make(chan struct{}) - doneCh := make(chan struct{}) - go func() { - defer close(doneCh) - for { - select { - case <-time.After(time.Millisecond): - case <-wgCh: - return - } - g.IsClosed() - g.Wait() - } - }() - - wg.Wait() - close(wgCh) - <-doneCh -}