client: do not restart dead tasks until server is contacted (try 2)

Refactoring of 104067bc2b2002a4e45ae7b667a476b89addc162

Switch the MarkLive method for a chan that is closed by the client.
Thanks to @notnoop for the idea!

The old approach called a method on most existing ARs and TRs on every
runAllocs call. The new approach does a once.Do call in runAllocs to
accomplish the same thing with less work. Able to remove the gate
abstraction that did much more than was needed.
This commit is contained in:
Michael Schurter 2019-05-10 08:51:06 -07:00
parent 8589233a0e
commit e07f73bfe0
8 changed files with 97 additions and 315 deletions

View file

@ -136,6 +136,11 @@ type allocRunner struct {
// driverManager is responsible for dispensing driver plugins and registering
// event handlers
driverManager drivermanager.Manager
// serversContactedCh is passed to TaskRunners so they can detect when
// servers have been contacted for the first time in case of a failed
// restore.
serversContactedCh chan struct{}
}
// NewAllocRunner returns a new allocation runner.
@ -167,6 +172,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
prevAllocMigrator: config.PrevAllocMigrator,
devicemanager: config.DeviceManager,
driverManager: config.DriverManager,
serversContactedCh: config.ServersContactedCh,
}
// Create the logger based on the allocation ID
@ -205,6 +211,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error {
DeviceStatsReporter: ar.deviceStatsReporter,
DeviceManager: ar.devicemanager,
DriverManager: ar.driverManager,
ServersContactedCh: ar.serversContactedCh,
}
// Create, but do not Run, the task runner
@ -721,15 +728,6 @@ func (ar *allocRunner) handleAllocUpdate(update *structs.Allocation) {
}
// MarkLive unblocks restored tasks that failed to reattach and are waiting to
// contact a server before restarting the dead task. The Client will call this
// method when the task should run, otherwise the task will be killed.
func (ar *allocRunner) MarkLive() {
for _, tr := range ar.tasks {
tr.MarkLive()
}
}
func (ar *allocRunner) Listener() *cstructs.AllocListener {
return ar.allocBroadcaster.Listen()
}

View file

@ -51,4 +51,8 @@ type Config struct {
// DriverManager handles dispensing of driver plugins
DriverManager drivermanager.Manager
// ServersContactedCh is closed when the first GetClientAllocs call to
// servers succeeds and allocs are synced.
ServersContactedCh chan struct{}
}

View file

@ -25,7 +25,6 @@ import (
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/helper/gate"
"github.com/hashicorp/nomad/helper/pluginutils/hclspecutils"
"github.com/hashicorp/nomad/helper/pluginutils/hclutils"
"github.com/hashicorp/nomad/helper/uuid"
@ -195,9 +194,15 @@ type TaskRunner struct {
// Defaults to defaultMaxEvents but overrideable for testing.
maxEvents int
// restoreGate is used to block restored tasks that failed to reattach
// from restarting until servers are contacted. #1795
restoreGate *gate.G
// serversContactedCh is passed to TaskRunners so they can detect when
// servers have been contacted for the first time in case of a failed
// restore.
serversContactedCh <-chan struct{}
// waitOnServers defaults to false but will be set true if a restore
// fails and the Run method should wait until serversContactedCh is
// closed.
waitOnServers bool
}
type Config struct {
@ -227,6 +232,10 @@ type Config struct {
// DriverManager is used to dispense driver plugins and register event
// handlers
DriverManager drivermanager.Manager
// ServersContactedCh is closed when the first GetClientAllocs call to
// servers succeeds and allocs are synced.
ServersContactedCh chan struct{}
}
func NewTaskRunner(config *Config) (*TaskRunner, error) {
@ -250,12 +259,6 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
tstate = ts.Copy()
}
// Initialize restoreGate as open. It will only be closed if Restore is
// called and fails to reconnect to the task handle. In that case the
// we must wait until contact with the server is made before restarting
// or killing the task. #1795
restoreGate := gate.NewOpen()
tr := &TaskRunner{
alloc: config.Alloc,
allocID: config.Alloc.ID,
@ -281,7 +284,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
devicemanager: config.DeviceManager,
driverManager: config.DriverManager,
maxEvents: defaultMaxEvents,
restoreGate: restoreGate,
serversContactedCh: config.ServersContactedCh,
}
// Create the logger based on the allocation ID
@ -393,14 +396,15 @@ func (tr *TaskRunner) Run() {
// - should be handled serially.
go tr.handleUpdates()
// If restore failed, don't proceed until servers are contacted
if tr.restoreGate.IsClosed() {
// If restore failed wait until servers are contacted before running.
// #1795
if tr.waitOnServers {
tr.logger.Info("task failed to restore; waiting to contact server before restarting")
select {
case <-tr.killCtx.Done():
case <-tr.shutdownCtx.Done():
return
case <-tr.restoreGate.Wait():
case <-tr.serversContactedCh:
tr.logger.Trace("server contacted; unblocking waiting task")
}
}
@ -883,18 +887,27 @@ func (tr *TaskRunner) Restore() error {
//TODO if RecoverTask returned the DriverNetwork we wouldn't
// have to persist it at all!
restored := tr.restoreHandle(taskHandle, tr.localState.DriverNetwork)
if !restored && !tr.Alloc().TerminalStatus() {
// Restore failed, close the restore gate to block
// until server is contacted to prevent restarting
// terminal allocs. #1795
tr.logger.Trace("failed to reattach to task; will not run until server is contacted")
tr.restoreGate.Close()
ev := structs.NewTaskEvent(structs.TaskRestoreFailed).
SetDisplayMessage("failed to restore task; will not run until server is contacted")
tr.UpdateState(structs.TaskStatePending, ev)
// If the handle could not be restored, the alloc is
// non-terminal, and the task isn't a system job: wait until
// servers have been contacted before running. #1795
if restored {
return nil
}
alloc := tr.Alloc()
if alloc.TerminalStatus() || alloc.Job.Type == structs.JobTypeSystem {
return nil
}
tr.logger.Trace("failed to reattach to task; will not run until server is contacted")
tr.waitOnServers = true
ev := structs.NewTaskEvent(structs.TaskRestoreFailed).
SetDisplayMessage("failed to restore task; will not run until server is contacted")
tr.UpdateState(structs.TaskStatePending, ev)
}
return nil
}
@ -1108,10 +1121,6 @@ func (tr *TaskRunner) Update(update *structs.Allocation) {
// Trigger update hooks if not terminal
if !update.TerminalStatus() {
tr.triggerUpdateHooks()
// MarkLive in case task had failed to restore and were waiting
// to hear from the server.
tr.MarkLive()
}
}
@ -1128,13 +1137,6 @@ func (tr *TaskRunner) triggerUpdateHooks() {
}
}
// MarkLive unblocks restored tasks that failed to reattach and are waiting to
// contact a server before restarting the dead task. The Client will call this
// method when the task should run, otherwise the task will be killed.
func (tr *TaskRunner) MarkLive() {
tr.restoreGate.Open()
}
// Shutdown TaskRunner gracefully without affecting the state of the task.
// Shutdown blocks until the main Run loop exits.
func (tr *TaskRunner) Shutdown() {

View file

@ -95,17 +95,18 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri
}
conf := &Config{
Alloc: alloc,
ClientConfig: clientConf,
Consul: consulapi.NewMockConsulServiceClient(t, logger),
Task: thisTask,
TaskDir: taskDir,
Logger: clientConf.Logger,
Vault: vaultclient.NewMockVaultClient(),
StateDB: cstate.NoopDB{},
StateUpdater: NewMockTaskStateUpdater(),
DeviceManager: devicemanager.NoopMockManager(),
DriverManager: drivermanager.TestDriverManager(t),
Alloc: alloc,
ClientConfig: clientConf,
Consul: consulapi.NewMockConsulServiceClient(t, logger),
Task: thisTask,
TaskDir: taskDir,
Logger: clientConf.Logger,
Vault: vaultclient.NewMockVaultClient(),
StateDB: cstate.NoopDB{},
StateUpdater: NewMockTaskStateUpdater(),
DeviceManager: devicemanager.NoopMockManager(),
DriverManager: drivermanager.TestDriverManager(t),
ServersContactedCh: make(chan struct{}),
}
return conf, trCleanup
}
@ -184,7 +185,7 @@ func TestTaskRunner_Restore_Running(t *testing.T) {
// kills the task before restarting a new TaskRunner. The new TaskRunner is
// returned once it is running and waiting in pending along with a cleanup
// func.
func setupRestoreFailureTest(t *testing.T) (*TaskRunner, func()) {
func setupRestoreFailureTest(t *testing.T) (*TaskRunner, *Config, func()) {
t.Parallel()
alloc := mock.Alloc()
@ -234,10 +235,10 @@ func setupRestoreFailureTest(t *testing.T) (*TaskRunner, func()) {
// Create a new TaskRunner and Restore the task
newTR, err := NewTaskRunner(conf)
require.NoError(t, err)
require.NoError(t, newTR.Restore())
// Assert the restore gate is *closed* because reattachment failed
require.True(t, newTR.restoreGate.IsClosed())
// Assert the TR will wait on servers because reattachment failed
require.NoError(t, newTR.Restore())
require.True(t, newTR.waitOnServers)
// Start new TR
go newTR.Run()
@ -253,17 +254,17 @@ func setupRestoreFailureTest(t *testing.T) (*TaskRunner, func()) {
ts := newTR.TaskState()
require.Equal(t, structs.TaskStatePending, ts.State)
return newTR, cleanup3
return newTR, conf, cleanup3
}
// TestTaskRunner_Restore_Restart asserts restoring a dead task blocks until
// MarkAlive is called. #1795
func TestTaskRunner_Restore_Restart(t *testing.T) {
newTR, cleanup := setupRestoreFailureTest(t)
newTR, conf, cleanup := setupRestoreFailureTest(t)
defer cleanup()
// Fake contacting the server by opening the restore gate
newTR.MarkLive()
// Fake contacting the server by closing the chan
close(conf.ServersContactedCh)
testutil.WaitForResult(func() (bool, error) {
ts := newTR.TaskState().State
@ -276,10 +277,10 @@ func TestTaskRunner_Restore_Restart(t *testing.T) {
// TestTaskRunner_Restore_Kill asserts restoring a dead task blocks until
// the task is killed. #1795
func TestTaskRunner_Restore_Kill(t *testing.T) {
newTR, cleanup := setupRestoreFailureTest(t)
newTR, _, cleanup := setupRestoreFailureTest(t)
defer cleanup()
// Sending the task a terminal update shouldn't kill it or mark it live
// Sending the task a terminal update shouldn't kill it or unblock it
alloc := newTR.Alloc().Copy()
alloc.DesiredStatus = structs.AllocDesiredStatusStop
newTR.Update(alloc)
@ -301,12 +302,13 @@ func TestTaskRunner_Restore_Kill(t *testing.T) {
// TestTaskRunner_Restore_Update asserts restoring a dead task blocks until
// Update is called. #1795
func TestTaskRunner_Restore_Update(t *testing.T) {
newTR, cleanup := setupRestoreFailureTest(t)
newTR, conf, cleanup := setupRestoreFailureTest(t)
defer cleanup()
// Fake contacting the server by opening the restore gate
// Fake Client.runAllocs behavior by calling Update then closing chan
alloc := newTR.Alloc().Copy()
newTR.Update(alloc)
close(conf.ServersContactedCh)
testutil.WaitForResult(func() (bool, error) {
ts := newTR.TaskState().State

View file

@ -55,17 +55,18 @@ func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*Config, fu
clientConf, cleanup := clientconfig.TestClientConfig(t)
conf := &Config{
// Copy the alloc in case the caller edits and reuses it
Alloc: alloc.Copy(),
Logger: clientConf.Logger,
ClientConfig: clientConf,
StateDB: state.NoopDB{},
Consul: consul.NewMockConsulServiceClient(t, clientConf.Logger),
Vault: vaultclient.NewMockVaultClient(),
StateUpdater: &MockStateUpdater{},
PrevAllocWatcher: allocwatcher.NoopPrevAlloc{},
PrevAllocMigrator: allocwatcher.NoopPrevAlloc{},
DeviceManager: devicemanager.NoopMockManager(),
DriverManager: drivermanager.TestDriverManager(t),
Alloc: alloc.Copy(),
Logger: clientConf.Logger,
ClientConfig: clientConf,
StateDB: state.NoopDB{},
Consul: consul.NewMockConsulServiceClient(t, clientConf.Logger),
Vault: vaultclient.NewMockVaultClient(),
StateUpdater: &MockStateUpdater{},
PrevAllocWatcher: allocwatcher.NoopPrevAlloc{},
PrevAllocMigrator: allocwatcher.NoopPrevAlloc{},
DeviceManager: devicemanager.NoopMockManager(),
DriverManager: drivermanager.TestDriverManager(t),
ServersContactedCh: make(chan struct{}),
}
return conf, cleanup
}

View file

@ -126,7 +126,6 @@ type AllocRunner interface {
Run()
StatsReporter() interfaces.AllocStatsReporter
Update(*structs.Allocation)
MarkLive()
WaitCh() <-chan struct{}
DestroyCh() <-chan struct{}
ShutdownCh() <-chan struct{}
@ -260,6 +259,11 @@ type Client struct {
// fpInitialized chan is closed when the first batch of fingerprints are
// applied to the node and the server is updated
fpInitialized chan struct{}
// serversContactedCh is closed when GetClientAllocs and runAllocs have
// successfully run once.
serversContactedCh chan struct{}
serversContactedOnce sync.Once
}
var (
@ -310,6 +314,8 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
triggerEmitNodeEvent: make(chan *structs.NodeEvent, 8),
fpInitialized: make(chan struct{}),
invalidAllocs: make(map[string]struct{}),
serversContactedCh: make(chan struct{}),
serversContactedOnce: sync.Once{},
}
c.batchNodeUpdates = newBatchNodeUpdates(
@ -2000,12 +2006,6 @@ func (c *Client) runAllocs(update *allocUpdates) {
errs := 0
// Mark existing allocations as live in case they failed to reattach on
// restore and are waiting to hear from the server before restarting.
for _, live := range diff.ignore {
c.markAllocLive(live)
}
// Remove the old allocations
for _, remove := range diff.removed {
c.removeAlloc(remove)
@ -2037,6 +2037,12 @@ func (c *Client) runAllocs(update *allocUpdates) {
}
}
// Mark servers as having been contacted so blocked tasks that failed
// to restore can now restart.
c.serversContactedOnce.Do(func() {
close(c.serversContactedCh)
})
// Trigger the GC once more now that new allocs are started that could
// have caused thresholds to be exceeded
c.garbageCollector.Trigger()
@ -2089,24 +2095,6 @@ func makeFailedAlloc(add *structs.Allocation, err error) *structs.Allocation {
return stripped
}
// markAllocLive is invoked when an alloc should be running but has not been
// updated or just been added. This allows unblocking tasks that failed to
// reattach on restored and are waiting to hear from the server.
func (c *Client) markAllocLive(allocID string) {
c.allocLock.Lock()
defer c.allocLock.Unlock()
ar, ok := c.allocs[allocID]
if !ok {
// This should never happen as alloc diffing should cause
// unknown allocs to be added, not marked live.
c.logger.Warn("unknown alloc should be running but is not", "alloc_id", allocID)
return
}
ar.MarkLive()
}
// removeAlloc is invoked when we should remove an allocation because it has
// been removed by the server.
func (c *Client) removeAlloc(allocID string) {

View file

@ -1,87 +0,0 @@
// Package gate implements a simple on/off latch or gate: it blocks waiters
// until opened. Waiters may receive on a chan which is closed when the gate is
// open.
package gate
import "sync"
// closedCh is a chan initialized as closed
var closedCh chan struct{}
func init() {
closedCh = make(chan struct{})
close(closedCh)
}
// G is a gate which blocks waiters until opened and is safe for concurrent
// use. Must be created via New.
type G struct {
// open is true if the gate is open and ch is closed.
open bool
// ch is closed if the gate is open.
ch chan struct{}
mu sync.Mutex
}
// NewClosed returns a closed gate. The chan returned by Wait will block until Open
// is called.
func NewClosed() *G {
return &G{
ch: make(chan struct{}),
}
}
// NewOpen returns an open gate. The chan returned by Wait is closed and
// therefore will never block.
func NewOpen() *G {
return &G{
open: true,
ch: closedCh,
}
}
// Open the gate. Unblocks any Waiters. Opening an opened gate is a noop. Safe
// for concurrent ues with Close and Wait.
func (g *G) Open() {
g.mu.Lock()
defer g.mu.Unlock()
if g.open {
return
}
g.open = true
close(g.ch)
}
// Close the gate. Blocks subsequent Wait callers. Closing a closed gate is a
// noop. Safe for concurrent use with Open and Wait.
func (g *G) Close() {
g.mu.Lock()
defer g.mu.Unlock()
if !g.open {
return
}
g.open = false
g.ch = make(chan struct{})
}
// Wait returns a chan that blocks until the gate is open. Safe for concurrent
// use with Open and Close, but the chan should not be reused between calls to
// Open and Close.
func (g *G) Wait() <-chan struct{} {
g.mu.Lock()
defer g.mu.Unlock()
return g.ch
}
// IsClosed returns true if the gate is closed.
func (g *G) IsClosed() bool {
g.mu.Lock()
defer g.mu.Unlock()
return !g.open
}

View file

@ -1,126 +0,0 @@
package gate
import (
"math/rand"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestGate_NewClosed(t *testing.T) {
t.Parallel()
g := NewClosed()
assertClosed := func() {
require.True(t, g.IsClosed())
select {
case <-g.Wait():
require.Fail(t, "expected gate to be closed")
default:
// Ok!
}
}
assertClosed()
g.Close()
assertClosed()
// Close should be safe to call multiple times
g.Close()
assertClosed()
g.Open()
require.False(t, g.IsClosed())
select {
case <-g.Wait():
// Ok!
default:
require.Fail(t, "expected gate to be open")
}
}
func TestGate_NewOpen(t *testing.T) {
t.Parallel()
g := NewOpen()
assertOpen := func() {
require.False(t, g.IsClosed())
select {
case <-g.Wait():
// Ok!
default:
require.Fail(t, "expected gate to be open")
}
}
assertOpen()
g.Open()
assertOpen()
// Open should be safe to call multiple times
g.Open()
assertOpen()
g.Close()
select {
case <-g.Wait():
require.Fail(t, "expected gate to be closed")
default:
// Ok!
}
}
// TestGate_Concurrency is meant to be run with the race detector enabled to
// find any races.
func TestGate_Concurrency(t *testing.T) {
t.Parallel()
g := NewOpen()
wg := sync.WaitGroup{}
// Start closer
wg.Add(1)
go func() {
defer wg.Done()
dice := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < 1000; i++ {
g.Close()
time.Sleep(time.Duration(dice.Int63n(100)))
}
}()
// Start opener
wg.Add(1)
go func() {
defer wg.Done()
dice := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < 1000; i++ {
g.Open()
time.Sleep(time.Duration(dice.Int63n(100)))
}
}()
// Perform reads concurrently with writes
wgCh := make(chan struct{})
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
for {
select {
case <-time.After(time.Millisecond):
case <-wgCh:
return
}
g.IsClosed()
g.Wait()
}
}()
wg.Wait()
close(wgCh)
<-doneCh
}