Alternative approach: avoid restoring
This uses an alternative approach where we avoid restoring the alloc runner in the first place, if we suspect that the alloc may have been completed already.
This commit is contained in:
parent
647c1457cb
commit
90c5eefbab
|
@ -141,11 +141,6 @@ type allocRunner struct {
|
||||||
// servers have been contacted for the first time in case of a failed
|
// servers have been contacted for the first time in case of a failed
|
||||||
// restore.
|
// restore.
|
||||||
serversContactedCh chan struct{}
|
serversContactedCh chan struct{}
|
||||||
|
|
||||||
// waitOnServers defaults to false but will be set true if a restore
|
|
||||||
// fails and the Run method should wait until serversContactedCh is
|
|
||||||
// closed.
|
|
||||||
waitOnServers bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewAllocRunner returns a new allocation runner.
|
// NewAllocRunner returns a new allocation runner.
|
||||||
|
@ -248,16 +243,6 @@ func (ar *allocRunner) Run() {
|
||||||
// Start the alloc update handler
|
// Start the alloc update handler
|
||||||
go ar.handleAllocUpdates()
|
go ar.handleAllocUpdates()
|
||||||
|
|
||||||
if ar.waitOnServers {
|
|
||||||
ar.logger.Info(" waiting to contact server before restarting")
|
|
||||||
select {
|
|
||||||
case <-ar.taskStateUpdateHandlerCh:
|
|
||||||
return
|
|
||||||
case <-ar.serversContactedCh:
|
|
||||||
ar.logger.Info("server contacted; unblocking waiting alloc")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If task update chan has been closed, that means we've been shutdown.
|
// If task update chan has been closed, that means we've been shutdown.
|
||||||
select {
|
select {
|
||||||
case <-ar.taskStateUpdateHandlerCh:
|
case <-ar.taskStateUpdateHandlerCh:
|
||||||
|
@ -368,50 +353,9 @@ func (ar *allocRunner) Restore() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ar.waitOnServers = ar.shouldWaitForServers(ds)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// shouldWaitForServers returns true if we suspect the alloc
|
|
||||||
// is potentially a completed alloc that got resurrected after AR was destroyed.
|
|
||||||
// In such cases, rerunning the alloc can lead to process and task exhaustion.
|
|
||||||
//
|
|
||||||
// The heaurstic used here is an alloc is suspect if it's in a pending state
|
|
||||||
// and no other task/status info is found.
|
|
||||||
//
|
|
||||||
// See:
|
|
||||||
// * https://github.com/hashicorp/nomad/pull/6207
|
|
||||||
// * https://github.com/hashicorp/nomad/issues/5984
|
|
||||||
//
|
|
||||||
// COMPAT(0.12): remove once upgrading from 0.9.5 is no longer supported
|
|
||||||
func (ar *allocRunner) shouldWaitForServers(ds *structs.AllocDeploymentStatus) bool {
|
|
||||||
alloc := ar.Alloc()
|
|
||||||
|
|
||||||
if alloc.ClientStatus != structs.AllocClientStatusPending {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// check if we restore a task but see no other data
|
|
||||||
if ds != nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
|
||||||
if tg == nil {
|
|
||||||
// corrupt alloc?!
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, task := range tg.Tasks {
|
|
||||||
ls, tr, _ := ar.stateDB.GetTaskRunnerState(alloc.ID, task.Name)
|
|
||||||
if ls != nil || tr != nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// persistDeploymentStatus stores AllocDeploymentStatus.
|
// persistDeploymentStatus stores AllocDeploymentStatus.
|
||||||
func (ar *allocRunner) persistDeploymentStatus(ds *structs.AllocDeploymentStatus) {
|
func (ar *allocRunner) persistDeploymentStatus(ds *structs.AllocDeploymentStatus) {
|
||||||
if err := ar.stateDB.PutDeploymentStatus(ar.id, ds); err != nil {
|
if err := ar.stateDB.PutDeploymentStatus(ar.id, ds); err != nil {
|
||||||
|
|
|
@ -1059,44 +1059,3 @@ func TestAllocRunner_PersistState_Destroyed(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Nil(t, ts)
|
require.Nil(t, ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// COMPAT(0.12): remove once upgrading from 0.9.5 is no longer supported
|
|
||||||
func TestAllocRunner_WaitForServer_Detects_Suspicious_Allocs(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
alloc := mock.BatchAlloc()
|
|
||||||
|
|
||||||
conf, cleanup := testAllocRunnerConfig(t, alloc)
|
|
||||||
conf.StateDB = state.NewMemDB(conf.Logger)
|
|
||||||
|
|
||||||
defer cleanup()
|
|
||||||
ar, err := NewAllocRunner(conf)
|
|
||||||
require.NoError(t, err)
|
|
||||||
defer destroy(ar)
|
|
||||||
|
|
||||||
defer destroy(ar)
|
|
||||||
go ar.Run()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ar.WaitCh():
|
|
||||||
case <-time.After(10 * time.Second):
|
|
||||||
require.Fail(t, "timed out waiting for alloc to complete")
|
|
||||||
}
|
|
||||||
|
|
||||||
// shouldn't wait after successful completion
|
|
||||||
require.False(t, ar.shouldWaitForServers(nil))
|
|
||||||
|
|
||||||
// new alloc runner shouldn't restore completed alloc
|
|
||||||
ar, err = NewAllocRunner(conf)
|
|
||||||
require.NoError(t, err)
|
|
||||||
ar.Restore()
|
|
||||||
require.False(t, ar.shouldWaitForServers(nil))
|
|
||||||
|
|
||||||
// simulate 0.9.5 behavior
|
|
||||||
require.NoError(t, conf.StateDB.DeleteAllocationBucket(alloc.ID))
|
|
||||||
require.NoError(t, conf.StateDB.PutAllocation(alloc))
|
|
||||||
|
|
||||||
ar, err = NewAllocRunner(conf)
|
|
||||||
require.NoError(t, err)
|
|
||||||
ar.Restore()
|
|
||||||
require.True(t, ar.shouldWaitForServers(nil))
|
|
||||||
}
|
|
||||||
|
|
|
@ -14,11 +14,11 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
metrics "github.com/armon/go-metrics"
|
||||||
consulapi "github.com/hashicorp/consul/api"
|
consulapi "github.com/hashicorp/consul/api"
|
||||||
"github.com/hashicorp/consul/lib"
|
"github.com/hashicorp/consul/lib"
|
||||||
"github.com/hashicorp/go-hclog"
|
hclog "github.com/hashicorp/go-hclog"
|
||||||
"github.com/hashicorp/go-multierror"
|
multierror "github.com/hashicorp/go-multierror"
|
||||||
"github.com/hashicorp/nomad/client/allocdir"
|
"github.com/hashicorp/nomad/client/allocdir"
|
||||||
"github.com/hashicorp/nomad/client/allocrunner"
|
"github.com/hashicorp/nomad/client/allocrunner"
|
||||||
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
||||||
|
@ -1006,6 +1006,15 @@ func (c *Client) restoreState() error {
|
||||||
// Load each alloc back
|
// Load each alloc back
|
||||||
for _, alloc := range allocs {
|
for _, alloc := range allocs {
|
||||||
|
|
||||||
|
// COMPAT(0.12): remove once upgrading from 0.9.5 is no longer supported
|
||||||
|
// See isPotentiallyCompletedAlloc for details. Skipping suspicious allocs
|
||||||
|
// now. If allocs should be run, they will be started when the client
|
||||||
|
// gets allocs from servers.
|
||||||
|
if c.isPotentiallyCompletedAlloc(alloc) {
|
||||||
|
c.logger.Warn("found a alloc that may have been completed already, skipping restore", "alloc_id", alloc.ID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
//XXX On Restore we give up on watching previous allocs because
|
//XXX On Restore we give up on watching previous allocs because
|
||||||
// we need the local AllocRunners initialized first. We could
|
// we need the local AllocRunners initialized first. We could
|
||||||
// add a second loop to initialize just the alloc watcher.
|
// add a second loop to initialize just the alloc watcher.
|
||||||
|
@ -1062,6 +1071,44 @@ func (c *Client) restoreState() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// isPotentiallyCompletedAlloc returns true if we suspect the alloc
|
||||||
|
// is potentially a completed alloc that got resurrected after AR was destroyed.
|
||||||
|
// In such cases, rerunning the alloc can lead to process and task exhaustion.
|
||||||
|
//
|
||||||
|
// The heuristic used here is an alloc is suspect if we see no other information
|
||||||
|
// and no other task/status info is found.
|
||||||
|
//
|
||||||
|
// See:
|
||||||
|
// * https://github.com/hashicorp/nomad/pull/6207
|
||||||
|
// * https://github.com/hashicorp/nomad/issues/5984
|
||||||
|
//
|
||||||
|
// COMPAT(0.12): remove once upgrading from 0.9.5 is no longer supported
|
||||||
|
func (c *Client) isPotentiallyCompletedAlloc(alloc *structs.Allocation) bool {
|
||||||
|
if alloc.ClientStatus != structs.AllocClientStatusPending {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
ds, _ := c.stateDB.GetDeploymentStatus(alloc.ID)
|
||||||
|
if ds != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
||||||
|
if tg == nil {
|
||||||
|
// corrupt alloc?!
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, task := range tg.Tasks {
|
||||||
|
ls, tr, _ := c.stateDB.GetTaskRunnerState(alloc.ID, task.Name)
|
||||||
|
if ls != nil || tr != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) handleInvalidAllocs(alloc *structs.Allocation, err error) {
|
func (c *Client) handleInvalidAllocs(alloc *structs.Allocation, err error) {
|
||||||
c.invalidAllocsLock.Lock()
|
c.invalidAllocsLock.Lock()
|
||||||
c.invalidAllocs[alloc.ID] = struct{}{}
|
c.invalidAllocs[alloc.ID] = struct{}{}
|
||||||
|
|
|
@ -10,10 +10,12 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/go-memdb"
|
memdb "github.com/hashicorp/go-memdb"
|
||||||
|
trstate "github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
|
||||||
"github.com/hashicorp/nomad/client/config"
|
"github.com/hashicorp/nomad/client/config"
|
||||||
consulApi "github.com/hashicorp/nomad/client/consul"
|
consulApi "github.com/hashicorp/nomad/client/consul"
|
||||||
"github.com/hashicorp/nomad/client/fingerprint"
|
"github.com/hashicorp/nomad/client/fingerprint"
|
||||||
|
"github.com/hashicorp/nomad/client/state"
|
||||||
"github.com/hashicorp/nomad/command/agent/consul"
|
"github.com/hashicorp/nomad/command/agent/consul"
|
||||||
"github.com/hashicorp/nomad/helper/pluginutils/catalog"
|
"github.com/hashicorp/nomad/helper/pluginutils/catalog"
|
||||||
"github.com/hashicorp/nomad/helper/testlog"
|
"github.com/hashicorp/nomad/helper/testlog"
|
||||||
|
@ -27,7 +29,7 @@ import (
|
||||||
"github.com/hashicorp/nomad/testutil"
|
"github.com/hashicorp/nomad/testutil"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
"github.com/hashicorp/go-hclog"
|
hclog "github.com/hashicorp/go-hclog"
|
||||||
cstate "github.com/hashicorp/nomad/client/state"
|
cstate "github.com/hashicorp/nomad/client/state"
|
||||||
ctestutil "github.com/hashicorp/nomad/client/testutil"
|
ctestutil "github.com/hashicorp/nomad/client/testutil"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -1644,3 +1646,44 @@ func TestClient_updateNodeFromDriverUpdatesAll(t *testing.T) {
|
||||||
assert.EqualValues(t, n, un)
|
assert.EqualValues(t, n, un)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// COMPAT(0.12): remove once upgrading from 0.9.5 is no longer supported
|
||||||
|
func TestClient_Restore_PotentiallyCompletedAlloc(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
c, cleanup := TestClient(t, nil)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
c.stateDB = state.NewMemDB(c.logger)
|
||||||
|
|
||||||
|
t.Run("plain alloc", func(t *testing.T) {
|
||||||
|
alloc := mock.BatchAlloc()
|
||||||
|
c.stateDB.PutAllocation(alloc)
|
||||||
|
|
||||||
|
require.True(t, c.isPotentiallyCompletedAlloc(alloc))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("alloc with a task with local state", func(t *testing.T) {
|
||||||
|
alloc := mock.BatchAlloc()
|
||||||
|
taskName := alloc.Job.LookupTaskGroup(alloc.TaskGroup).Tasks[0].Name
|
||||||
|
ls := &trstate.LocalState{}
|
||||||
|
|
||||||
|
c.stateDB.PutAllocation(alloc)
|
||||||
|
c.stateDB.PutTaskRunnerLocalState(alloc.ID, taskName, ls)
|
||||||
|
|
||||||
|
require.False(t, c.isPotentiallyCompletedAlloc(alloc))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("alloc with a task with local state", func(t *testing.T) {
|
||||||
|
alloc := mock.BatchAlloc()
|
||||||
|
taskName := alloc.Job.LookupTaskGroup(alloc.TaskGroup).Tasks[0].Name
|
||||||
|
ts := &structs.TaskState{
|
||||||
|
State: structs.TaskStateRunning,
|
||||||
|
}
|
||||||
|
|
||||||
|
c.stateDB.PutAllocation(alloc)
|
||||||
|
c.stateDB.PutTaskState(alloc.ID, taskName, ts)
|
||||||
|
|
||||||
|
require.False(t, c.isPotentiallyCompletedAlloc(alloc))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue