Merge pull request #12476 from hashicorp/f-disconnected-client-allocation-handling

disconnected clients: Feature branch merge
This commit is contained in:
Derek Strickland 2022-04-06 10:11:57 -04:00 committed by GitHub
commit 0ab89b1728
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 3156 additions and 294 deletions

View File

@ -1015,6 +1015,7 @@ type TaskGroupSummary struct {
Running int
Starting int
Lost int
Unknown int
}
// JobListStub is used to return a subset of information about

View File

@ -431,6 +431,7 @@ type TaskGroup struct {
Services []*Service `hcl:"service,block"`
ShutdownDelay *time.Duration `mapstructure:"shutdown_delay" hcl:"shutdown_delay,optional"`
StopAfterClientDisconnect *time.Duration `mapstructure:"stop_after_client_disconnect" hcl:"stop_after_client_disconnect,optional"`
MaxClientDisconnect *time.Duration `mapstructure:"max_client_disconnect" hcl:"max_client_disconnect,optional"`
Scaling *ScalingPolicy `hcl:"scaling,block"`
Consul *Consul `hcl:"consul,block"`
}
@ -971,6 +972,7 @@ const (
TaskRestartSignal = "Restart Signaled"
TaskLeaderDead = "Leader Task Dead"
TaskBuildingTaskDir = "Building Task Directory"
TaskClientReconnected = "Reconnected"
)
// TaskEvent is an event that effects the state of a task and contains meta-data

View File

@ -784,6 +784,16 @@ func (ar *allocRunner) NetworkStatus() *structs.AllocNetworkStatus {
return ar.state.NetworkStatus.Copy()
}
// setIndexes is a helper for forcing alloc state on the alloc runner. This is
// used during reconnect when the task has been marked unknown by the server.
func (ar *allocRunner) setIndexes(update *structs.Allocation) {
ar.allocLock.Lock()
defer ar.allocLock.Unlock()
ar.alloc.AllocModifyIndex = update.AllocModifyIndex
ar.alloc.ModifyIndex = update.ModifyIndex
ar.alloc.ModifyTime = update.ModifyTime
}
// AllocState returns a copy of allocation state including a snapshot of task
// states.
func (ar *allocRunner) AllocState() *state.State {
@ -1240,6 +1250,42 @@ func (ar *allocRunner) Signal(taskName, signal string) error {
return err.ErrorOrNil()
}
// Reconnect logs a reconnect event for each task in the allocation and syncs the current alloc state with the server.
func (ar *allocRunner) Reconnect(update *structs.Allocation) (err error) {
event := structs.NewTaskEvent(structs.TaskClientReconnected)
event.Time = time.Now().UnixNano()
for _, tr := range ar.tasks {
tr.AppendEvent(event)
}
// Update the client alloc with the server side indexes.
ar.setIndexes(update)
// Calculate alloc state to get the final state with the new events.
// Cannot rely on AllocStates as it won't recompute TaskStates once they are set.
states := make(map[string]*structs.TaskState, len(ar.tasks))
for name, tr := range ar.tasks {
states[name] = tr.TaskState()
}
// Build the client allocation
alloc := ar.clientAlloc(states)
// Update the client state store.
err = ar.stateUpdater.PutAllocation(alloc)
if err != nil {
return
}
// Update the server.
ar.stateUpdater.AllocStateUpdated(alloc)
// Broadcast client alloc to listeners.
err = ar.allocBroadcaster.Send(alloc)
return
}
func (ar *allocRunner) GetTaskExecHandler(taskName string) drivermanager.TaskExecHandler {
tr, ok := ar.tasks[taskName]
if !ok {

View File

@ -1,6 +1,7 @@
package allocrunner
import (
"errors"
"fmt"
"io/ioutil"
"os"
@ -1575,3 +1576,110 @@ func TestAllocRunner_PersistState_Destroyed(t *testing.T) {
require.NoError(t, err)
require.Nil(t, ts)
}
func TestAllocRunner_Reconnect(t *testing.T) {
t.Parallel()
type tcase struct {
clientStatus string
taskState string
taskEvent *structs.TaskEvent
}
tcases := []tcase{
{
structs.AllocClientStatusRunning,
structs.TaskStateRunning,
structs.NewTaskEvent(structs.TaskStarted),
},
{
structs.AllocClientStatusComplete,
structs.TaskStateDead,
structs.NewTaskEvent(structs.TaskTerminated),
},
{
structs.AllocClientStatusFailed,
structs.TaskStateDead,
structs.NewTaskEvent(structs.TaskDriverFailure).SetFailsTask(),
},
{
structs.AllocClientStatusPending,
structs.TaskStatePending,
structs.NewTaskEvent(structs.TaskReceived),
},
}
for _, tc := range tcases {
t.Run(tc.clientStatus, func(t *testing.T) {
// create a running alloc
alloc := mock.BatchAlloc()
alloc.AllocModifyIndex = 10
alloc.ModifyIndex = 10
alloc.ModifyTime = time.Now().UnixNano()
// Ensure task takes some time
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config["run_for"] = "30s"
original := alloc.Copy()
conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()
ar, err := NewAllocRunner(conf)
require.NoError(t, err)
defer destroy(ar)
go ar.Run()
for _, taskRunner := range ar.tasks {
taskRunner.UpdateState(tc.taskState, tc.taskEvent)
}
update := ar.Alloc().Copy()
update.ClientStatus = structs.AllocClientStatusUnknown
update.AllocModifyIndex = original.AllocModifyIndex + 10
update.ModifyIndex = original.ModifyIndex + 10
update.ModifyTime = original.ModifyTime + 10
err = ar.Reconnect(update)
require.NoError(t, err)
require.Equal(t, tc.clientStatus, ar.AllocState().ClientStatus)
// Make sure the runner's alloc indexes match the update.
require.Equal(t, update.AllocModifyIndex, ar.Alloc().AllocModifyIndex)
require.Equal(t, update.ModifyIndex, ar.Alloc().ModifyIndex)
require.Equal(t, update.ModifyTime, ar.Alloc().ModifyTime)
found := false
updater := conf.StateUpdater.(*MockStateUpdater)
var last *structs.Allocation
testutil.WaitForResult(func() (bool, error) {
last = updater.Last()
if last == nil {
return false, errors.New("last update nil")
}
states := last.TaskStates
for _, s := range states {
for _, e := range s.Events {
if e.Type == structs.TaskClientReconnected {
found = true
return true, nil
}
}
}
return false, errors.New("no reconnect event found")
}, func(err error) {
require.NoError(t, err)
})
require.True(t, found, "no reconnect event found")
})
}
}

View File

@ -36,6 +36,11 @@ func (m *MockStateUpdater) AllocStateUpdated(alloc *structs.Allocation) {
m.mu.Unlock()
}
// PutAllocation satisfies the AllocStateHandler interface.
func (m *MockStateUpdater) PutAllocation(alloc *structs.Allocation) (err error) {
return
}
// Last returns a copy of the last alloc (or nil) update. Safe for concurrent
// access with updates.
func (m *MockStateUpdater) Last() *structs.Allocation {

View File

@ -404,8 +404,7 @@ func (p *remotePrevAlloc) Wait(ctx context.Context) error {
p.logger.Debug("blocking alloc was GC'd")
return nil
}
if resp.Alloc.Terminated() {
// Terminated!
if resp.Alloc.Terminated() || resp.Alloc.ClientStatus == structs.AllocClientStatusUnknown {
p.nodeID = resp.Alloc.NodeID
return nil
}

View File

@ -158,6 +158,7 @@ type AllocRunner interface {
RestartTask(taskName string, taskEvent *structs.TaskEvent) error
RestartAll(taskEvent *structs.TaskEvent) error
Reconnect(update *structs.Allocation) error
GetTaskExecHandler(taskName string) drivermanager.TaskExecHandler
GetTaskDriverCapabilities(taskName string) (*drivers.Capabilities, error)
@ -1979,6 +1980,11 @@ func (c *Client) AllocStateUpdated(alloc *structs.Allocation) {
}
}
// PutAllocation stores an allocation or returns an error if it could not be stored.
func (c *Client) PutAllocation(alloc *structs.Allocation) error {
return c.stateDB.PutAllocation(alloc)
}
// allocSync is a long lived function that batches allocation updates to the
// server.
func (c *Client) allocSync() {
@ -2421,6 +2427,15 @@ func (c *Client) updateAlloc(update *structs.Allocation) {
return
}
// Reconnect unknown allocations
if update.ClientStatus == structs.AllocClientStatusUnknown && update.AllocModifyIndex > ar.Alloc().AllocModifyIndex {
err = ar.Reconnect(update)
if err != nil {
c.logger.Error("error reconnecting alloc", "alloc_id", update.ID, "alloc_modify_index", update.AllocModifyIndex, "err", err)
}
return
}
// Update local copy of alloc
if err := c.stateDB.PutAllocation(update); err != nil {
c.logger.Error("error persisting updated alloc locally", "error", err, "alloc_id", update.ID)

View File

@ -1713,3 +1713,89 @@ func Test_verifiedTasks(t *testing.T) {
try(t, alloc(tgTasks), tasks, tasks, "")
})
}
func TestClient_ReconnectAllocs(t *testing.T) {
t.Parallel()
s1, _, cleanupS1 := testServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
c1, cleanupC1 := TestClient(t, func(c *config.Config) {
c.DevMode = false
c.RPCHandler = s1
})
defer cleanupC1()
waitTilNodeReady(c1, t)
job := mock.Job()
runningAlloc := mock.Alloc()
runningAlloc.NodeID = c1.Node().ID
runningAlloc.Job = job
runningAlloc.JobID = job.ID
runningAlloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
runningAlloc.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "10s",
}
runningAlloc.ClientStatus = structs.AllocClientStatusPending
state := s1.State()
err := state.UpsertJob(structs.MsgTypeTestSetup, 100, job)
require.NoError(t, err)
err = state.UpsertJobSummary(101, mock.JobSummary(runningAlloc.JobID))
require.NoError(t, err)
err = state.UpsertAllocs(structs.MsgTypeTestSetup, 102, []*structs.Allocation{runningAlloc})
require.NoError(t, err)
// Ensure allocation gets upserted with desired status.
testutil.WaitForResult(func() (bool, error) {
upsertResult, stateErr := state.AllocByID(nil, runningAlloc.ID)
return upsertResult.ClientStatus == structs.AllocClientStatusRunning, stateErr
}, func(err error) {
require.NoError(t, err, "allocation query failed")
})
// Create the unknown version of the alloc from the running one, update state
// to simulate what reconciler would have done, and then send to the client.
unknownAlloc, err := state.AllocByID(nil, runningAlloc.ID)
require.Equal(t, structs.AllocClientStatusRunning, unknownAlloc.ClientStatus)
require.NoError(t, err)
unknownAlloc.ClientStatus = structs.AllocClientStatusUnknown
unknownAlloc.AppendState(structs.AllocStateFieldClientStatus, structs.AllocClientStatusUnknown)
err = state.UpsertAllocs(structs.MsgTypeTestSetup, runningAlloc.AllocModifyIndex+1, []*structs.Allocation{unknownAlloc})
require.NoError(t, err)
updates := &allocUpdates{
pulled: map[string]*structs.Allocation{
unknownAlloc.ID: unknownAlloc,
},
}
c1.runAllocs(updates)
invalid := false
var runner AllocRunner
var finalAlloc *structs.Allocation
// Ensure the allocation is not invalid on the client and has been marked
// running on the server with the new modify index
testutil.WaitForResult(func() (result bool, stateErr error) {
c1.allocLock.RLock()
runner = c1.allocs[unknownAlloc.ID]
_, invalid = c1.invalidAllocs[unknownAlloc.ID]
c1.allocLock.RUnlock()
finalAlloc, stateErr = state.AllocByID(nil, unknownAlloc.ID)
result = structs.AllocClientStatusRunning == finalAlloc.ClientStatus
return
}, func(err error) {
require.NoError(t, err, "allocation server check failed")
})
require.NotNil(t, runner, "expected alloc runner")
require.False(t, invalid, "expected alloc to not be marked invalid")
require.Equal(t, unknownAlloc.AllocModifyIndex, finalAlloc.AllocModifyIndex)
}

View File

@ -14,6 +14,9 @@ type AllocStateHandler interface {
// AllocStateUpdated is used to emit an updated allocation. This allocation
// is stripped to only include client settable fields.
AllocStateUpdated(alloc *structs.Allocation)
// PutAllocation is used to persist an updated allocation in the local state store.
PutAllocation(*structs.Allocation) error
}
// DeviceStatsReporter gives access to the latest resource usage

View File

@ -1012,6 +1012,10 @@ func ApiTgToStructsTG(job *structs.Job, taskGroup *api.TaskGroup, tg *structs.Ta
tg.StopAfterClientDisconnect = taskGroup.StopAfterClientDisconnect
}
if taskGroup.MaxClientDisconnect != nil {
tg.MaxClientDisconnect = taskGroup.MaxClientDisconnect
}
if taskGroup.ReschedulePolicy != nil {
tg.ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: *taskGroup.ReschedulePolicy.Attempts,

View File

@ -2558,6 +2558,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
},
},
},
MaxClientDisconnect: helper.TimeToPtr(30 * time.Second),
Tasks: []*api.Task{
{
Name: "task1",
@ -2955,6 +2956,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
},
},
},
MaxClientDisconnect: helper.TimeToPtr(30 * time.Second),
Tasks: []*structs.Task{
{
Name: "task1",

View File

@ -538,6 +538,8 @@ func buildDisplayMessage(event *api.TaskEvent) string {
desc = event.DriverMessage
case api.TaskLeaderDead:
desc = "Leader Task in Group dead"
case api.TaskClientReconnected:
desc = "Client reconnected"
default:
desc = event.Message
}

View File

@ -540,7 +540,7 @@ func (c *JobStatusCommand) outputJobSummary(client *api.Client, job *api.Job) er
if !periodic && !parameterizedJob {
c.Ui.Output(c.Colorize().Color("\n[bold]Summary[reset]"))
summaries := make([]string, len(summary.Summary)+1)
summaries[0] = "Task Group|Queued|Starting|Running|Failed|Complete|Lost"
summaries[0] = "Task Group|Queued|Starting|Running|Failed|Complete|Lost|Unknown"
taskGroups := make([]string, 0, len(summary.Summary))
for taskGroup := range summary.Summary {
taskGroups = append(taskGroups, taskGroup)
@ -548,10 +548,10 @@ func (c *JobStatusCommand) outputJobSummary(client *api.Client, job *api.Job) er
sort.Strings(taskGroups)
for idx, taskGroup := range taskGroups {
tgs := summary.Summary[taskGroup]
summaries[idx+1] = fmt.Sprintf("%s|%d|%d|%d|%d|%d|%d",
summaries[idx+1] = fmt.Sprintf("%s|%d|%d|%d|%d|%d|%d|%d",
taskGroup, tgs.Queued, tgs.Starting,
tgs.Running, tgs.Failed,
tgs.Complete, tgs.Lost,
tgs.Complete, tgs.Lost, tgs.Unknown,
)
}
c.Ui.Output(formatList(summaries))

View File

@ -352,6 +352,29 @@ func CopyMapStringInterface(m map[string]interface{}) map[string]interface{} {
return c
}
// MergeMapStringString will merge two maps into one. If a duplicate key exists
// the value in the second map will replace the value in the first map. If both
// maps are empty or nil this returns an empty map.
func MergeMapStringString(m map[string]string, n map[string]string) map[string]string {
if len(m) == 0 && len(n) == 0 {
return map[string]string{}
}
if len(m) == 0 {
return n
}
if len(n) == 0 {
return m
}
result := CopyMapStringString(m)
for k, v := range n {
result[k] = v
}
return result
}
func CopyMapStringInt(m map[string]int) map[string]int {
l := len(m)
if l == 0 {

View File

@ -207,6 +207,27 @@ func TestCopyMapSliceInterface(t *testing.T) {
require.False(t, reflect.DeepEqual(m, c))
}
func TestMergeMapStringString(t *testing.T) {
type testCase struct {
map1 map[string]string
map2 map[string]string
expected map[string]string
}
cases := []testCase{
{map[string]string{"foo": "bar"}, map[string]string{"baz": "qux"}, map[string]string{"foo": "bar", "baz": "qux"}},
{map[string]string{"foo": "bar"}, nil, map[string]string{"foo": "bar"}},
{nil, map[string]string{"baz": "qux"}, map[string]string{"baz": "qux"}},
{nil, nil, map[string]string{}},
}
for _, c := range cases {
if output := MergeMapStringString(c.map1, c.map2); !CompareMapStringString(output, c.expected) {
t.Errorf("MergeMapStringString(%q, %q) -> %q != %q", c.map1, c.map2, output, c.expected)
}
}
}
func TestCleanEnvVar(t *testing.T) {
type testCase struct {
input string

View File

@ -161,12 +161,34 @@ func (h *nodeHeartbeater) invalidateHeartbeat(id string) {
Region: h.config.Region,
},
}
if h.shouldDisconnect(id) {
req.Status = structs.NodeStatusDisconnected
}
var resp structs.NodeUpdateResponse
if err := h.staticEndpoints.Node.UpdateStatus(&req, &resp); err != nil {
h.logger.Error("update node status failed", "error", err)
}
}
func (h *nodeHeartbeater) shouldDisconnect(id string) bool {
allocs, err := h.State().AllocsByNode(nil, id)
if err != nil {
h.logger.Error("error retrieving allocs by node", "error", err)
return false
}
now := time.Now().UTC()
for _, alloc := range allocs {
if alloc.DisconnectTimeout(now).After(now) {
return true
}
}
return false
}
// clearHeartbeatTimer is used to clear the heartbeat time for
// a single heartbeat. This is used when a heartbeat is destroyed
// explicitly and no longer needed.

View File

@ -1033,6 +1033,8 @@ func (s *Server) iterateJobSummaryMetrics(summary *structs.JobSummary) {
float32(tgSummary.Starting), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "lost"},
float32(tgSummary.Lost), labels)
metrics.SetGaugeWithLabels([]string{"nomad", "job_summary", "unknown"},
float32(tgSummary.Unknown), labels)
}
}

View File

@ -487,7 +487,7 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
// Check if we should trigger evaluations
transitionToReady := transitionedToReady(args.Status, node.Status)
if structs.ShouldDrainNode(args.Status) || transitionToReady {
if structs.ShouldDrainNode(args.Status) || transitionToReady || args.Status == structs.NodeStatusDisconnected {
evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index)
if err != nil {
n.logger.Error("eval creation failed", "error", err)
@ -546,6 +546,9 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
}
}
case structs.NodeStatusDisconnected:
n.logger.Trace(fmt.Sprintf("heartbeat reset skipped for disconnected node %q", args.NodeID))
default:
ttl, err := n.srv.resetHeartbeatTimer(args.NodeID)
if err != nil {
@ -572,7 +575,8 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct
func transitionedToReady(newStatus, oldStatus string) bool {
initToReady := oldStatus == structs.NodeStatusInit && newStatus == structs.NodeStatusReady
terminalToReady := oldStatus == structs.NodeStatusDown && newStatus == structs.NodeStatusReady
return initToReady || terminalToReady
disconnectedToReady := oldStatus == structs.NodeStatusDisconnected && newStatus == structs.NodeStatusReady
return initToReady || terminalToReady || disconnectedToReady
}
// UpdateDrain is used to update the drain mode of a client node
@ -1147,48 +1151,85 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
var evals []*structs.Evaluation
for _, allocToUpdate := range args.Alloc {
evalTriggerBy := ""
allocToUpdate.ModifyTime = now.UTC().UnixNano()
if !allocToUpdate.TerminalStatus() {
continue
}
alloc, _ := n.srv.State().AllocByID(nil, allocToUpdate.ID)
if alloc == nil {
continue
}
// if the job has been purged, this will always return error
job, err := n.srv.State().JobByID(nil, alloc.Namespace, alloc.JobID)
if !allocToUpdate.TerminalStatus() && alloc.ClientStatus != structs.AllocClientStatusUnknown {
continue
}
var job *structs.Job
var jobType string
var jobPriority int
job, err = n.srv.State().JobByID(nil, alloc.Namespace, alloc.JobID)
if err != nil {
n.logger.Debug("UpdateAlloc unable to find job", "job", alloc.JobID, "error", err)
continue
}
// If the job is nil it means it has been de-registered.
if job == nil {
n.logger.Debug("UpdateAlloc unable to find job", "job", alloc.JobID)
continue
jobType = alloc.Job.Type
jobPriority = alloc.Job.Priority
evalTriggerBy = structs.EvalTriggerJobDeregister
allocToUpdate.DesiredStatus = structs.AllocDesiredStatusStop
n.logger.Debug("UpdateAlloc unable to find job - shutting down alloc", "job", alloc.JobID)
}
taskGroup := job.LookupTaskGroup(alloc.TaskGroup)
if taskGroup == nil {
continue
var taskGroup *structs.TaskGroup
if job != nil {
jobType = job.Type
jobPriority = job.Priority
taskGroup = job.LookupTaskGroup(alloc.TaskGroup)
}
// Add an evaluation if this is a failed alloc that is eligible for rescheduling
if allocToUpdate.ClientStatus == structs.AllocClientStatusFailed && alloc.FollowupEvalID == "" && alloc.RescheduleEligible(taskGroup.ReschedulePolicy, now) {
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: alloc.Namespace,
TriggeredBy: structs.EvalTriggerRetryFailedAlloc,
JobID: alloc.JobID,
Type: job.Type,
Priority: job.Priority,
Status: structs.EvalStatusPending,
CreateTime: now.UTC().UnixNano(),
ModifyTime: now.UTC().UnixNano(),
// If we cannot find the task group for a failed alloc we cannot continue, unless it is an orphan.
if evalTriggerBy != structs.EvalTriggerJobDeregister &&
allocToUpdate.ClientStatus == structs.AllocClientStatusFailed &&
alloc.FollowupEvalID == "" {
if taskGroup == nil {
n.logger.Debug("UpdateAlloc unable to find task group for job", "job", alloc.JobID, "alloc", alloc.ID, "task_group", alloc.TaskGroup)
continue
}
// Set trigger by failed if not an orphan.
if alloc.RescheduleEligible(taskGroup.ReschedulePolicy, now) {
evalTriggerBy = structs.EvalTriggerRetryFailedAlloc
}
evals = append(evals, eval)
}
var eval *structs.Evaluation
// If unknown, and not an orphan, set the trigger by.
if evalTriggerBy != structs.EvalTriggerJobDeregister &&
alloc.ClientStatus == structs.AllocClientStatusUnknown {
evalTriggerBy = structs.EvalTriggerReconnect
}
// If we weren't able to determine one of our expected eval triggers,
// continue and don't create an eval.
if evalTriggerBy == "" {
continue
}
eval = &structs.Evaluation{
ID: uuid.Generate(),
Namespace: alloc.Namespace,
TriggeredBy: evalTriggerBy,
JobID: alloc.JobID,
Type: jobType,
Priority: jobPriority,
Status: structs.EvalStatusPending,
CreateTime: now.UTC().UnixNano(),
ModifyTime: now.UTC().UnixNano(),
}
evals = append(evals, eval)
}
// Add this to the batch
@ -1236,6 +1277,7 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
// batchUpdate is used to update all the allocations
func (n *Node) batchUpdate(future *structs.BatchFuture, updates []*structs.Allocation, evals []*structs.Evaluation) {
var mErr multierror.Error
// Group pending evals by jobID to prevent creating unnecessary evals
evalsByJobId := make(map[structs.NamespacedID]struct{})
var trimmedEvals []*structs.Evaluation
@ -1265,7 +1307,6 @@ func (n *Node) batchUpdate(future *structs.BatchFuture, updates []*structs.Alloc
}
// Commit this update via Raft
var mErr multierror.Error
_, index, err := n.srv.raftApply(structs.AllocClientUpdateRequestType, batch)
if err != nil {
n.logger.Error("alloc update failed", "error", err)
@ -1439,6 +1480,7 @@ func (n *Node) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint6
CreateTime: now,
ModifyTime: now,
}
evals = append(evals, eval)
evalIDs = append(evalIDs, eval.ID)
}

View File

@ -3765,3 +3765,178 @@ func TestClientEndpoint_ShouldCreateNodeEval(t *testing.T) {
})
}
}
func TestClientEndpoint_UpdateAlloc_Evals_ByTrigger(t *testing.T) {
t.Parallel()
type testCase struct {
name string
clientStatus string
serverClientStatus string
triggerBy string
missingJob bool
missingAlloc bool
invalidTaskGroup bool
}
testCases := []testCase{
{
name: "failed-alloc",
clientStatus: structs.AllocClientStatusFailed,
serverClientStatus: structs.AllocClientStatusRunning,
triggerBy: structs.EvalTriggerRetryFailedAlloc,
missingJob: false,
missingAlloc: false,
invalidTaskGroup: false,
},
{
name: "unknown-alloc",
clientStatus: structs.AllocClientStatusRunning,
serverClientStatus: structs.AllocClientStatusUnknown,
triggerBy: structs.EvalTriggerReconnect,
missingJob: false,
missingAlloc: false,
invalidTaskGroup: false,
},
{
name: "orphaned-unknown-alloc",
clientStatus: structs.AllocClientStatusRunning,
serverClientStatus: structs.AllocClientStatusUnknown,
triggerBy: structs.EvalTriggerJobDeregister,
missingJob: true,
missingAlloc: false,
invalidTaskGroup: false,
},
{
name: "running-job",
clientStatus: structs.AllocClientStatusRunning,
serverClientStatus: structs.AllocClientStatusRunning,
triggerBy: "",
missingJob: false,
missingAlloc: false,
invalidTaskGroup: false,
},
{
name: "complete-job",
clientStatus: structs.AllocClientStatusComplete,
serverClientStatus: structs.AllocClientStatusComplete,
triggerBy: "",
missingJob: false,
missingAlloc: false,
invalidTaskGroup: false,
},
{
name: "no-alloc-at-server",
clientStatus: structs.AllocClientStatusUnknown,
serverClientStatus: "",
triggerBy: "",
missingJob: false,
missingAlloc: true,
invalidTaskGroup: false,
},
{
name: "invalid-task-group",
clientStatus: structs.AllocClientStatusUnknown,
serverClientStatus: structs.AllocClientStatusRunning,
triggerBy: "",
missingJob: false,
missingAlloc: false,
invalidTaskGroup: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
s1, cleanupS1 := TestServer(t, func(c *Config) {
// Disabling scheduling in this test so that we can
// ensure that the state store doesn't accumulate more evals
// than what we expect the unit test to add
c.NumSchedulers = 0
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
node := mock.Node()
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var nodeResp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &nodeResp)
require.NoError(t, err)
fsmState := s1.fsm.State()
job := mock.Job()
job.ID = tc.name + "-test-job"
if !tc.missingJob {
err = fsmState.UpsertJob(structs.MsgTypeTestSetup, 101, job)
require.NoError(t, err)
}
serverAlloc := mock.Alloc()
serverAlloc.JobID = job.ID
serverAlloc.NodeID = node.ID
serverAlloc.ClientStatus = tc.serverClientStatus
serverAlloc.TaskGroup = job.TaskGroups[0].Name
// Create the incoming client alloc.
clientAlloc := serverAlloc.Copy()
clientAlloc.ClientStatus = tc.clientStatus
err = fsmState.UpsertJobSummary(99, mock.JobSummary(serverAlloc.JobID))
require.NoError(t, err)
if tc.invalidTaskGroup {
serverAlloc.TaskGroup = "invalid"
}
if !tc.missingAlloc {
err = fsmState.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{serverAlloc})
require.NoError(t, err)
}
updateReq := &structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{clientAlloc},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nodeAllocResp structs.NodeAllocsResponse
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", updateReq, &nodeAllocResp)
require.NoError(t, err)
require.NotEqual(t, uint64(0), nodeAllocResp.Index)
// If no eval should be created validate, none were and return.
if tc.triggerBy == "" {
evaluations, err := fsmState.EvalsByJob(nil, job.Namespace, job.ID)
require.NoError(t, err)
require.Len(t, evaluations, 0)
return
}
// Lookup the alloc
updatedAlloc, err := fsmState.AllocByID(nil, serverAlloc.ID)
require.NoError(t, err)
require.Equal(t, tc.clientStatus, updatedAlloc.ClientStatus)
// Assert that exactly one eval with test case TriggeredBy exists
evaluations, err := fsmState.EvalsByJob(nil, job.Namespace, job.ID)
require.NoError(t, err)
require.Equal(t, 1, len(evaluations))
foundCount := 0
for _, resultEval := range evaluations {
if resultEval.TriggeredBy == tc.triggerBy && resultEval.WaitUntil.IsZero() {
foundCount++
}
}
require.Equal(t, 1, foundCount, "Should create exactly one eval for trigger by", tc.triggerBy)
})
}
}

View File

@ -655,6 +655,11 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri
// the Raft commit happens.
if node == nil {
return false, "node does not exist", nil
} else if node.Status == structs.NodeStatusDisconnected {
if isValidForDisconnectedNode(plan, node.ID) {
return true, "", nil
}
return false, "node is disconnected and contains invalid updates", nil
} else if node.Status != structs.NodeStatusReady {
return false, "node is not ready for placements", nil
} else if node.SchedulingEligibility == structs.NodeSchedulingIneligible {
@ -690,6 +695,22 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri
return fit, reason, err
}
// The plan is only valid for disconnected nodes if it only contains
// updates to mark allocations as unknown.
func isValidForDisconnectedNode(plan *structs.Plan, nodeID string) bool {
if len(plan.NodeUpdate[nodeID]) != 0 || len(plan.NodePreemptions[nodeID]) != 0 {
return false
}
for _, alloc := range plan.NodeAllocation[nodeID] {
if alloc.ClientStatus != structs.AllocClientStatusUnknown {
return false
}
}
return true
}
func max(a, b uint64) uint64 {
if a > b {
return a

View File

@ -3528,9 +3528,10 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation
// Keep the clients task states
alloc.TaskStates = exist.TaskStates
// If the scheduler is marking this allocation as lost we do not
// If the scheduler is marking this allocation as lost or unknown we do not
// want to reuse the status of the existing allocation.
if alloc.ClientStatus != structs.AllocClientStatusLost {
if alloc.ClientStatus != structs.AllocClientStatusLost &&
alloc.ClientStatus != structs.AllocClientStatusUnknown {
alloc.ClientStatus = exist.ClientStatus
alloc.ClientDescription = exist.ClientDescription
}
@ -4732,6 +4733,8 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error {
tg.Failed += 1
case structs.AllocClientStatusLost:
tg.Lost += 1
case structs.AllocClientStatusUnknown:
tg.Unknown += 1
case structs.AllocClientStatusComplete:
tg.Complete += 1
case structs.AllocClientStatusRunning:
@ -5289,6 +5292,8 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
tgSummary.Complete += 1
case structs.AllocClientStatusLost:
tgSummary.Lost += 1
case structs.AllocClientStatusUnknown:
tgSummary.Unknown += 1
}
// Decrementing the count of the bin of the last state
@ -5305,6 +5310,10 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
if tgSummary.Lost > 0 {
tgSummary.Lost -= 1
}
case structs.AllocClientStatusUnknown:
if tgSummary.Unknown > 0 {
tgSummary.Unknown -= 1
}
case structs.AllocClientStatusFailed, structs.AllocClientStatusComplete:
default:
s.logger.Error("invalid old client status for allocation",

View File

@ -5793,7 +5793,13 @@ func TestStateStore_ReconcileJobSummary(t *testing.T) {
alloc11 := alloc10.Copy()
alloc11.ClientStatus = structs.AllocClientStatusLost
state.UpsertAllocs(structs.MsgTypeTestSetup, 130, []*structs.Allocation{alloc4, alloc6, alloc8, alloc10})
alloc12 := mock.Alloc()
alloc12.JobID = alloc.JobID
alloc12.Job = alloc.Job
alloc12.TaskGroup = "db"
alloc12.ClientStatus = structs.AllocClientStatusUnknown
state.UpsertAllocs(structs.MsgTypeTestSetup, 130, []*structs.Allocation{alloc4, alloc6, alloc8, alloc10, alloc12})
state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 150, []*structs.Allocation{alloc5, alloc7, alloc9, alloc11})
@ -5817,6 +5823,7 @@ func TestStateStore_ReconcileJobSummary(t *testing.T) {
Failed: 1,
Complete: 1,
Lost: 1,
Unknown: 1,
},
},
CreateIndex: 100,

View File

@ -265,6 +265,20 @@ func (tg *TaskGroup) Diff(other *TaskGroup, contextual bool) (*TaskGroupDiff, er
}
}
// MaxClientDisconnect diff
if oldPrimitiveFlat != nil && newPrimitiveFlat != nil {
if tg.MaxClientDisconnect == nil {
oldPrimitiveFlat["MaxClientDisconnect"] = ""
} else {
oldPrimitiveFlat["MaxClientDisconnect"] = fmt.Sprintf("%d", *tg.MaxClientDisconnect)
}
if other.MaxClientDisconnect == nil {
newPrimitiveFlat["MaxClientDisconnect"] = ""
} else {
newPrimitiveFlat["MaxClientDisconnect"] = fmt.Sprintf("%d", *other.MaxClientDisconnect)
}
}
// Diff the primitive fields.
diff.Fields = fieldDiffs(oldPrimitiveFlat, newPrimitiveFlat, false)

View File

@ -3910,6 +3910,75 @@ func TestTaskGroupDiff(t *testing.T) {
},
},
},
{
TestCase: "MaxClientDisconnect added",
Old: &TaskGroup{
Name: "foo",
MaxClientDisconnect: nil,
},
New: &TaskGroup{
Name: "foo",
MaxClientDisconnect: helper.TimeToPtr(20 * time.Second),
},
Expected: &TaskGroupDiff{
Type: DiffTypeEdited,
Name: "foo",
Fields: []*FieldDiff{
{
Type: DiffTypeAdded,
Name: "MaxClientDisconnect",
Old: "",
New: "20000000000",
},
},
},
},
{
TestCase: "MaxClientDisconnect updated",
Old: &TaskGroup{
Name: "foo",
MaxClientDisconnect: helper.TimeToPtr(10 * time.Second),
},
New: &TaskGroup{
Name: "foo",
MaxClientDisconnect: helper.TimeToPtr(20 * time.Second),
},
Expected: &TaskGroupDiff{
Type: DiffTypeEdited,
Name: "foo",
Fields: []*FieldDiff{
{
Type: DiffTypeEdited,
Name: "MaxClientDisconnect",
Old: "10000000000",
New: "20000000000",
},
},
},
},
{
TestCase: "MaxClientDisconnect deleted",
Old: &TaskGroup{
Name: "foo",
MaxClientDisconnect: helper.TimeToPtr(10 * time.Second),
},
New: &TaskGroup{
Name: "foo",
MaxClientDisconnect: nil,
},
Expected: &TaskGroupDiff{
Type: DiffTypeEdited,
Name: "foo",
Fields: []*FieldDiff{
{
Type: DiffTypeDeleted,
Name: "MaxClientDisconnect",
Old: "10000000000",
New: "",
},
},
},
},
}
for i, c := range cases {

View File

@ -1694,16 +1694,17 @@ func (ne *NodeEvent) AddDetail(k, v string) *NodeEvent {
}
const (
NodeStatusInit = "initializing"
NodeStatusReady = "ready"
NodeStatusDown = "down"
NodeStatusInit = "initializing"
NodeStatusReady = "ready"
NodeStatusDown = "down"
NodeStatusDisconnected = "disconnected"
)
// ShouldDrainNode checks if a given node status should trigger an
// evaluation. Some states don't require any further action.
func ShouldDrainNode(status string) bool {
switch status {
case NodeStatusInit, NodeStatusReady:
case NodeStatusInit, NodeStatusReady, NodeStatusDisconnected:
return false
case NodeStatusDown:
return true
@ -1715,7 +1716,7 @@ func ShouldDrainNode(status string) bool {
// ValidNodeStatus is used to check if a node status is valid
func ValidNodeStatus(status string) bool {
switch status {
case NodeStatusInit, NodeStatusReady, NodeStatusDown:
case NodeStatusInit, NodeStatusReady, NodeStatusDown, NodeStatusDisconnected:
return true
default:
return false
@ -4793,6 +4794,7 @@ type TaskGroupSummary struct {
Running int
Starting int
Lost int
Unknown int
}
const (
@ -6168,6 +6170,10 @@ type TaskGroup struct {
// StopAfterClientDisconnect, if set, configures the client to stop the task group
// after this duration since the last known good heartbeat
StopAfterClientDisconnect *time.Duration
// MaxClientDisconnect, if set, configures the client to allow placed
// allocations for tasks in this group to attempt to resume running without a restart.
MaxClientDisconnect *time.Duration
}
func (tg *TaskGroup) Copy() *TaskGroup {
@ -6224,6 +6230,10 @@ func (tg *TaskGroup) Copy() *TaskGroup {
ntg.StopAfterClientDisconnect = tg.StopAfterClientDisconnect
}
if tg.MaxClientDisconnect != nil {
ntg.MaxClientDisconnect = tg.MaxClientDisconnect
}
return ntg
}
@ -6287,6 +6297,14 @@ func (tg *TaskGroup) Validate(j *Job) error {
mErr.Errors = append(mErr.Errors, errors.New("Missing tasks for task group"))
}
if tg.MaxClientDisconnect != nil && tg.StopAfterClientDisconnect != nil {
mErr.Errors = append(mErr.Errors, errors.New("Task group cannot be configured with both max_client_disconnect and stop_after_client_disconnect"))
}
if tg.MaxClientDisconnect != nil && *tg.MaxClientDisconnect < 0 {
mErr.Errors = append(mErr.Errors, errors.New("max_client_disconnect cannot be negative"))
}
for idx, constr := range tg.Constraints {
if err := constr.Validate(); err != nil {
outer := fmt.Errorf("Constraint %d validation failed: %s", idx+1, err)
@ -7976,6 +7994,9 @@ const (
// TaskPluginHealthy indicates that a plugin managed by Nomad became healthy
TaskPluginHealthy = "Plugin became healthy"
// TaskClientReconnected indicates that the client running the task disconnected.
TaskClientReconnected = "Reconnected"
)
// TaskEvent is an event that effects the state of a task and contains meta-data
@ -8187,6 +8208,8 @@ func (e *TaskEvent) PopulateEventDisplayMessage() {
desc = "Leader Task in Group dead"
case TaskMainDead:
desc = "Main tasks in the group died"
case TaskClientReconnected:
desc = "Client reconnected"
default:
desc = e.Message
}
@ -9427,6 +9450,7 @@ const (
AllocClientStatusComplete = "complete"
AllocClientStatusFailed = "failed"
AllocClientStatusLost = "lost"
AllocClientStatusUnknown = "unknown"
)
// Allocation is used to allocate the placement of a task group to a node.
@ -9823,6 +9847,10 @@ func (a *Allocation) NextRescheduleTime() (time.Time, bool) {
return time.Time{}, false
}
return a.nextRescheduleTime(failTime, reschedulePolicy)
}
func (a *Allocation) nextRescheduleTime(failTime time.Time, reschedulePolicy *ReschedulePolicy) (time.Time, bool) {
nextDelay := a.NextDelay()
nextRescheduleTime := failTime.Add(nextDelay)
rescheduleEligible := reschedulePolicy.Unlimited || (reschedulePolicy.Attempts > 0 && a.RescheduleTracker == nil)
@ -9834,6 +9862,18 @@ func (a *Allocation) NextRescheduleTime() (time.Time, bool) {
return nextRescheduleTime, rescheduleEligible
}
// NextRescheduleTimeByFailTime works like NextRescheduleTime but allows callers
// specify a failure time. Useful for things like determining whether to reschedule
// an alloc on a disconnected node.
func (a *Allocation) NextRescheduleTimeByFailTime(failTime time.Time) (time.Time, bool) {
reschedulePolicy := a.ReschedulePolicy()
if reschedulePolicy == nil {
return time.Time{}, false
}
return a.nextRescheduleTime(failTime, reschedulePolicy)
}
// ShouldClientStop tests an alloc for StopAfterClientDisconnect configuration
func (a *Allocation) ShouldClientStop() bool {
tg := a.Job.LookupTaskGroup(a.TaskGroup)
@ -9877,6 +9917,24 @@ func (a *Allocation) WaitClientStop() time.Time {
return t.Add(*tg.StopAfterClientDisconnect + kill)
}
// DisconnectTimeout uses the MaxClientDisconnect to compute when the allocation
// should transition to lost.
func (a *Allocation) DisconnectTimeout(now time.Time) time.Time {
if a == nil || a.Job == nil {
return now
}
tg := a.Job.LookupTaskGroup(a.TaskGroup)
timeout := tg.MaxClientDisconnect
if timeout == nil {
return now
}
return now.Add(*timeout)
}
// NextDelay returns a duration after which the allocation can be rescheduled.
// It is calculated according to the delay function and previous reschedule attempts.
func (a *Allocation) NextDelay() time.Duration {
@ -10112,6 +10170,76 @@ func (a *Allocation) AllocationDiff() *AllocationDiff {
return (*AllocationDiff)(a)
}
// Expired determines whether an allocation has exceeded its MaxClientDisonnect
// duration relative to the passed time stamp.
func (a *Allocation) Expired(now time.Time) bool {
if a == nil || a.Job == nil {
return false
}
// If alloc is not Unknown it cannot be expired.
if a.ClientStatus != AllocClientStatusUnknown {
return false
}
lastUnknown := a.LastUnknown()
if lastUnknown.IsZero() {
return false
}
tg := a.Job.LookupTaskGroup(a.TaskGroup)
if tg == nil {
return false
}
if tg.MaxClientDisconnect == nil {
return false
}
expiry := lastUnknown.Add(*tg.MaxClientDisconnect)
return now.UTC().After(expiry) || now.UTC().Equal(expiry)
}
// LastUnknown returns the timestamp for the last time the allocation
// transitioned into the unknown client status.
func (a *Allocation) LastUnknown() time.Time {
var lastUnknown time.Time
for _, s := range a.AllocStates {
if s.Field == AllocStateFieldClientStatus &&
s.Value == AllocClientStatusUnknown {
if lastUnknown.IsZero() || lastUnknown.Before(s.Time) {
lastUnknown = s.Time
}
}
}
return lastUnknown.UTC()
}
// Reconnected determines whether a reconnect event has occurred for any task
// and whether that event occurred within the allowable duration specified by MaxClientDisconnect.
func (a *Allocation) Reconnected() (bool, bool) {
var lastReconnect time.Time
for _, taskState := range a.TaskStates {
for _, taskEvent := range taskState.Events {
if taskEvent.Type != TaskClientReconnected {
continue
}
eventTime := time.Unix(0, taskEvent.Time).UTC()
if lastReconnect.IsZero() || lastReconnect.Before(eventTime) {
lastReconnect = eventTime
}
}
}
if lastReconnect.IsZero() {
return false, false
}
return true, a.Expired(lastReconnect)
}
// AllocationDiff is another named type for Allocation (to use the same fields),
// which is used to represent the delta for an Allocation. If you need a method
// defined on the al
@ -10377,6 +10505,15 @@ func (a *AllocMetric) PopulateScoreMetaData() {
}
}
// MaxNormScore returns the ScoreMetaData entry with the highest normalized
// score.
func (a *AllocMetric) MaxNormScore() *NodeScoreMeta {
if a == nil || len(a.ScoreMetaData) == 0 {
return nil
}
return a.ScoreMetaData[0]
}
// NodeScoreMeta captures scoring meta data derived from
// different scoring factors.
type NodeScoreMeta struct {
@ -10505,21 +10642,23 @@ const (
)
const (
EvalTriggerJobRegister = "job-register"
EvalTriggerJobDeregister = "job-deregister"
EvalTriggerPeriodicJob = "periodic-job"
EvalTriggerNodeDrain = "node-drain"
EvalTriggerNodeUpdate = "node-update"
EvalTriggerAllocStop = "alloc-stop"
EvalTriggerScheduled = "scheduled"
EvalTriggerRollingUpdate = "rolling-update"
EvalTriggerDeploymentWatcher = "deployment-watcher"
EvalTriggerFailedFollowUp = "failed-follow-up"
EvalTriggerMaxPlans = "max-plan-attempts"
EvalTriggerRetryFailedAlloc = "alloc-failure"
EvalTriggerQueuedAllocs = "queued-allocs"
EvalTriggerPreemption = "preemption"
EvalTriggerScaling = "job-scaling"
EvalTriggerJobRegister = "job-register"
EvalTriggerJobDeregister = "job-deregister"
EvalTriggerPeriodicJob = "periodic-job"
EvalTriggerNodeDrain = "node-drain"
EvalTriggerNodeUpdate = "node-update"
EvalTriggerAllocStop = "alloc-stop"
EvalTriggerScheduled = "scheduled"
EvalTriggerRollingUpdate = "rolling-update"
EvalTriggerDeploymentWatcher = "deployment-watcher"
EvalTriggerFailedFollowUp = "failed-follow-up"
EvalTriggerMaxPlans = "max-plan-attempts"
EvalTriggerRetryFailedAlloc = "alloc-failure"
EvalTriggerQueuedAllocs = "queued-allocs"
EvalTriggerPreemption = "preemption"
EvalTriggerScaling = "job-scaling"
EvalTriggerMaxDisconnectTimeout = "max-disconnect-timeout"
EvalTriggerReconnect = "reconnect"
)
const (
@ -10621,7 +10760,8 @@ type Evaluation struct {
Wait time.Duration
// WaitUntil is the time when this eval should be run. This is used to
// supported delayed rescheduling of failed allocations
// supported delayed rescheduling of failed allocations, and delayed
// stopping of allocations that are configured with max_client_disconnect.
WaitUntil time.Time
// NextEval is the evaluation ID for the eval created to do a followup.
@ -11136,6 +11276,17 @@ func (p *Plan) AppendPreemptedAlloc(alloc *Allocation, preemptingAllocID string)
p.NodePreemptions[node] = append(existing, newAlloc)
}
// AppendUnknownAlloc marks an allocation as unknown.
func (p *Plan) AppendUnknownAlloc(alloc *Allocation) {
// Strip the job as it's set once on the ApplyPlanResultRequest.
alloc.Job = nil
// Strip the resources as they can be rebuilt.
alloc.Resources = nil
existing := p.NodeAllocation[alloc.NodeID]
p.NodeAllocation[alloc.NodeID] = append(existing, alloc)
}
func (p *Plan) PopUpdate(alloc *Allocation) {
existing := p.NodeUpdate[alloc.NodeID]
n := len(existing)

View File

@ -5365,6 +5365,332 @@ func TestAllocation_WaitClientStop(t *testing.T) {
}
}
func TestAllocation_DisconnectTimeout(t *testing.T) {
type testCase struct {
desc string
maxDisconnect *time.Duration
}
testCases := []testCase{
{
desc: "no max_client_disconnect",
maxDisconnect: nil,
},
{
desc: "has max_client_disconnect",
maxDisconnect: helper.TimeToPtr(30 * time.Second),
},
{
desc: "zero max_client_disconnect",
maxDisconnect: helper.TimeToPtr(0 * time.Second),
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
j := testJob()
a := &Allocation{
Job: j,
}
j.TaskGroups[0].MaxClientDisconnect = tc.maxDisconnect
a.TaskGroup = j.TaskGroups[0].Name
now := time.Now()
reschedTime := a.DisconnectTimeout(now)
if tc.maxDisconnect == nil {
require.Equal(t, now, reschedTime, "expected to be now")
} else {
difference := reschedTime.Sub(now)
require.Equal(t, *tc.maxDisconnect, difference, "expected durations to be equal")
}
})
}
}
func TestAllocation_Expired(t *testing.T) {
type testCase struct {
name string
maxDisconnect string
ellapsed int
expected bool
nilJob bool
badTaskGroup bool
mixedUTC bool
noReconnectEvent bool
status string
}
testCases := []testCase{
{
name: "has-expired",
maxDisconnect: "5s",
ellapsed: 10,
expected: true,
},
{
name: "has-not-expired",
maxDisconnect: "5s",
ellapsed: 3,
expected: false,
},
{
name: "are-equal",
maxDisconnect: "5s",
ellapsed: 5,
expected: true,
},
{
name: "nil-job",
maxDisconnect: "5s",
ellapsed: 10,
expected: false,
nilJob: true,
},
{
name: "wrong-status",
maxDisconnect: "5s",
ellapsed: 10,
expected: false,
status: AllocClientStatusRunning,
},
{
name: "bad-task-group",
maxDisconnect: "",
badTaskGroup: true,
ellapsed: 10,
expected: false,
},
{
name: "no-max-disconnect",
maxDisconnect: "",
ellapsed: 10,
expected: false,
},
{
name: "mixed-utc-has-expired",
maxDisconnect: "5s",
ellapsed: 10,
mixedUTC: true,
expected: true,
},
{
name: "mixed-utc-has-not-expired",
maxDisconnect: "5s",
ellapsed: 3,
mixedUTC: true,
expected: false,
},
{
name: "no-reconnect-event",
maxDisconnect: "5s",
ellapsed: 2,
expected: false,
noReconnectEvent: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
alloc := MockAlloc()
var err error
var maxDisconnect time.Duration
if tc.maxDisconnect != "" {
maxDisconnect, err = time.ParseDuration(tc.maxDisconnect)
require.NoError(t, err)
alloc.Job.TaskGroups[0].MaxClientDisconnect = &maxDisconnect
}
if tc.nilJob {
alloc.Job = nil
}
if tc.badTaskGroup {
alloc.TaskGroup = "bad"
}
alloc.ClientStatus = AllocClientStatusUnknown
if tc.status != "" {
alloc.ClientStatus = tc.status
}
alloc.AllocStates = []*AllocState{{
Field: AllocStateFieldClientStatus,
Value: AllocClientStatusUnknown,
Time: time.Now(),
}}
require.NoError(t, err)
now := time.Now().UTC()
if tc.mixedUTC {
now = time.Now()
}
if !tc.noReconnectEvent {
event := NewTaskEvent(TaskClientReconnected)
event.Time = now.UnixNano()
alloc.TaskStates = map[string]*TaskState{
"web": {
Events: []*TaskEvent{event},
},
}
}
ellapsedDuration := time.Duration(tc.ellapsed) * time.Second
now = now.Add(ellapsedDuration)
require.Equal(t, tc.expected, alloc.Expired(now))
})
}
}
func TestAllocation_Reconnected(t *testing.T) {
type testCase struct {
name string
maxDisconnect string
elapsed int
reconnected bool
expired bool
nilJob bool
badTaskGroup bool
mixedTZ bool
noReconnectEvent bool
status string
}
testCases := []testCase{
{
name: "has-expired",
maxDisconnect: "5s",
elapsed: 10,
reconnected: true,
expired: true,
},
{
name: "has-not-expired",
maxDisconnect: "5s",
elapsed: 3,
reconnected: true,
expired: false,
},
{
name: "are-equal",
maxDisconnect: "5s",
elapsed: 5,
reconnected: true,
expired: true,
},
{
name: "nil-job",
maxDisconnect: "5s",
elapsed: 10,
reconnected: true,
expired: false,
nilJob: true,
},
{
name: "bad-task-group",
maxDisconnect: "",
elapsed: 10,
reconnected: true,
expired: false,
badTaskGroup: true,
},
{
name: "no-max-disconnect",
maxDisconnect: "",
elapsed: 10,
reconnected: true,
expired: false,
},
{
name: "mixed-utc-has-expired",
maxDisconnect: "5s",
elapsed: 10,
reconnected: true,
expired: true,
mixedTZ: true,
},
{
name: "mixed-utc-has-not-expired",
maxDisconnect: "5s",
elapsed: 3,
reconnected: true,
expired: false,
mixedTZ: true,
},
{
name: "no-reconnect-event",
maxDisconnect: "5s",
elapsed: 2,
reconnected: false,
expired: false,
noReconnectEvent: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
alloc := MockAlloc()
var err error
var maxDisconnect time.Duration
if tc.maxDisconnect != "" {
maxDisconnect, err = time.ParseDuration(tc.maxDisconnect)
require.NoError(t, err)
alloc.Job.TaskGroups[0].MaxClientDisconnect = &maxDisconnect
}
if tc.nilJob {
alloc.Job = nil
}
if tc.badTaskGroup {
alloc.TaskGroup = "bad"
}
alloc.ClientStatus = AllocClientStatusUnknown
if tc.status != "" {
alloc.ClientStatus = tc.status
}
alloc.AllocStates = []*AllocState{{
Field: AllocStateFieldClientStatus,
Value: AllocClientStatusUnknown,
Time: time.Now().UTC(),
}}
now := time.Now().UTC()
if tc.mixedTZ {
var loc *time.Location
loc, err = time.LoadLocation("America/New_York")
require.NoError(t, err)
now = time.Now().In(loc)
}
ellapsedDuration := time.Duration(tc.elapsed) * time.Second
now = now.Add(ellapsedDuration)
if !tc.noReconnectEvent {
event := NewTaskEvent(TaskClientReconnected)
event.Time = now.UnixNano()
alloc.TaskStates = map[string]*TaskState{
"web": {
Events: []*TaskEvent{event},
},
}
}
reconnected, expired := alloc.Reconnected()
require.Equal(t, tc.reconnected, reconnected)
require.Equal(t, tc.expired, expired)
})
}
}
func TestAllocation_Canonicalize_Old(t *testing.T) {
ci.Parallel(t)
@ -5553,7 +5879,6 @@ func TestParameterizedJobConfig_Validate_NonBatch(t *testing.T) {
func TestJobConfig_Validate_StopAferClientDisconnect(t *testing.T) {
ci.Parallel(t)
// Setup a system Job with stop_after_client_disconnect set, which is invalid
job := testJob()
job.Type = JobTypeSystem
@ -5580,6 +5905,26 @@ func TestJobConfig_Validate_StopAferClientDisconnect(t *testing.T) {
require.NoError(t, err)
}
func TestJobConfig_Validate_MaxClientDisconnect(t *testing.T) {
// Set up a job with an invalid max_client_disconnect value
job := testJob()
timeout := -1 * time.Minute
job.TaskGroups[0].MaxClientDisconnect = &timeout
job.TaskGroups[0].StopAfterClientDisconnect = &timeout
err := job.Validate()
require.Error(t, err)
require.Contains(t, err.Error(), "max_client_disconnect cannot be negative")
require.Contains(t, err.Error(), "Task group cannot be configured with both max_client_disconnect and stop_after_client_disconnect")
// Modify the job with a valid max_client_disconnect value
timeout = 1 * time.Minute
job.TaskGroups[0].MaxClientDisconnect = &timeout
job.TaskGroups[0].StopAfterClientDisconnect = nil
err = job.Validate()
require.NoError(t, err)
}
func TestParameterizedJobConfig_Canonicalize(t *testing.T) {
ci.Parallel(t)
@ -5961,7 +6306,8 @@ func TestTaskEventPopulate(t *testing.T) {
{NewTaskEvent(TaskSignaling).SetTaskSignal(os.Interrupt).SetTaskSignalReason("process interrupted"), "Task being sent signal interrupt: process interrupted"},
{NewTaskEvent(TaskRestartSignal), "Task signaled to restart"},
{NewTaskEvent(TaskRestartSignal).SetRestartReason("Chaos Monkey restarted it"), "Chaos Monkey restarted it"},
{NewTaskEvent(TaskDriverMessage).SetDriverMessage("YOLO"), "YOLO"},
{NewTaskEvent(TaskClientReconnected), "Client reconnected"},
{NewTaskEvent(TaskLeaderDead), "Leader Task in Group dead"},
{NewTaskEvent("Unknown Type, No message"), ""},
{NewTaskEvent("Unknown Type").SetMessage("Hello world"), "Hello world"},
}

View File

@ -193,6 +193,44 @@ func TestServersMeetMinimumVersionIncludingFailed(t *testing.T) {
}
}
func TestServersMeetMinimumVersionSuffix(t *testing.T) {
t.Parallel()
cases := []struct {
members []serf.Member
ver *version.Version
expected bool
}{
// Multiple servers, meets req version
{
members: []serf.Member{
makeMember("1.3.0", serf.StatusAlive),
makeMember("1.2.6", serf.StatusAlive),
makeMember("1.2.6-dev", serf.StatusFailed),
},
ver: version.Must(version.NewVersion("1.2.6-dev")),
expected: true,
},
// Multiple servers, doesn't meet req version
{
members: []serf.Member{
makeMember("1.1.18", serf.StatusAlive),
makeMember("1.2.6-dev", serf.StatusAlive),
makeMember("1.0.11", serf.StatusFailed),
},
ver: version.Must(version.NewVersion("1.2.6-dev")),
expected: false,
},
}
for _, tc := range cases {
result := ServersMeetMinimumVersion(tc.members, tc.ver, true)
if result != tc.expected {
t.Fatalf("bad: %v, %v, %v", result, tc.ver.String(), tc)
}
}
}
func makeMember(version string, status serf.MemberStatus) serf.Member {
return serf.Member{
Name: "foo",

View File

@ -11,6 +11,7 @@ import (
metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
@ -580,6 +581,13 @@ func (w *Worker) invokeScheduler(snap *state.StateSnapshot, eval *structs.Evalua
return nil
}
// ServersMeetMinimumVersion allows implementations of the Scheduler interface in
// other packages to perform server version checks without direct references to
// the Nomad server.
func (w *Worker) ServersMeetMinimumVersion(minVersion *version.Version, checkFailedServers bool) bool {
return ServersMeetMinimumVersion(w.srv.Members(), minVersion, checkFailedServers)
}
// SubmitPlan is used to submit a plan for consideration. This allows
// the worker to act as the planner for the scheduler.
func (w *Worker) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, scheduler.State, error) {

View File

@ -8,6 +8,7 @@ import (
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -24,6 +25,10 @@ const (
// allocNotNeeded is the status used when a job no longer requires an allocation
allocNotNeeded = "alloc not needed due to job update"
// allocReconnected is the status to use when a replacement allocation is stopped
// because a disconnected node reconnects.
allocReconnected = "alloc not needed due to disconnected client reconnect"
// allocMigrating is the status used when we must migrate an allocation
allocMigrating = "alloc is being migrated"
@ -33,6 +38,9 @@ const (
// allocLost is the status used when an allocation is lost
allocLost = "alloc is lost since its node is down"
// allocUnknown is the status used when an allocation is unknown
allocUnknown = "alloc is unknown since its node is disconnected"
// allocInPlace is the status used when speculating on an in-place update
allocInPlace = "alloc updating in-place"
@ -55,11 +63,19 @@ const (
// up evals for delayed rescheduling
reschedulingFollowupEvalDesc = "created for delayed rescheduling"
// disconnectTimeoutFollowupEvalDesc is the description used when creating follow
// up evals for allocations that be should be stopped after its disconnect
// timeout has passed.
disconnectTimeoutFollowupEvalDesc = "created for delayed disconnect timeout"
// maxPastRescheduleEvents is the maximum number of past reschedule event
// that we track when unlimited rescheduling is enabled
maxPastRescheduleEvents = 5
)
// minVersionMaxClientDisconnect is the minimum version that supports max_client_disconnect.
var minVersionMaxClientDisconnect = version.Must(version.NewVersion("1.2.6"))
// SetStatusError is used to set the status of the evaluation to the given error
type SetStatusError struct {
Err error
@ -148,7 +164,7 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) (err error) {
structs.EvalTriggerPeriodicJob, structs.EvalTriggerMaxPlans,
structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerRetryFailedAlloc,
structs.EvalTriggerFailedFollowUp, structs.EvalTriggerPreemption,
structs.EvalTriggerScaling:
structs.EvalTriggerScaling, structs.EvalTriggerMaxDisconnectTimeout, structs.EvalTriggerReconnect:
default:
desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason",
eval.TriggeredBy)
@ -361,7 +377,9 @@ func (s *GenericScheduler) computeJobAllocs() error {
reconciler := NewAllocReconciler(s.logger,
genericAllocUpdateFn(s.ctx, s.stack, s.eval.ID),
s.batch, s.eval.JobID, s.job, s.deployment, allocs, tainted, s.eval.ID, s.eval.Priority)
s.batch, s.eval.JobID, s.job, s.deployment, allocs, tainted, s.eval.ID,
s.eval.Priority, s.planner.ServersMeetMinimumVersion(minVersionMaxClientDisconnect, true))
results := reconciler.Compute()
s.logger.Debug("reconciled current state with desired state", "results", log.Fmt("%#v", results))
@ -392,6 +410,11 @@ func (s *GenericScheduler) computeJobAllocs() error {
s.plan.AppendStoppedAlloc(stop.alloc, stop.statusDescription, stop.clientStatus, stop.followupEvalID)
}
// Handle disconnect updates
for _, update := range results.disconnectUpdates {
s.plan.AppendUnknownAlloc(update)
}
// Handle the in-place updates
for _, update := range results.inplaceUpdate {
if update.DeploymentID != s.deployment.GetID() {
@ -406,6 +429,11 @@ func (s *GenericScheduler) computeJobAllocs() error {
s.ctx.Plan().AppendAlloc(update, nil)
}
// Log reconnect updates. They will be pulled by the client when it reconnects.
for _, update := range results.reconnectUpdates {
s.logger.Trace("reconnecting alloc", "alloc_id", update.ID, "alloc_modify_index", update.AllocModifyIndex)
}
// Nothing remaining to do if placement is not required
if len(results.place)+len(results.destructiveUpdate) == 0 {
// If the job has been purged we don't have access to the job. Otherwise

View File

@ -6514,3 +6514,115 @@ func TestPropagateTaskState(t *testing.T) {
})
}
}
// Tests that a client disconnect generates attribute updates and follow up evals.
func TestServiceSched_Client_Disconnect_Creates_Updates_and_Evals(t *testing.T) {
h := NewHarness(t)
count := 1
maxClientDisconnect := 10 * time.Minute
disconnectedNode, job, unknownAllocs := initNodeAndAllocs(t, h, count, maxClientDisconnect,
structs.NodeStatusReady, structs.AllocClientStatusRunning)
// Now disconnect the node
disconnectedNode.Status = structs.NodeStatusDisconnected
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), disconnectedNode))
// Create an evaluation triggered by the disconnect
evals := []*structs.Evaluation{{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
NodeID: disconnectedNode.ID,
Status: structs.EvalStatusPending,
}}
nodeStatusUpdateEval := evals[0]
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), evals))
// Process the evaluation
err := h.Process(NewServiceScheduler, nodeStatusUpdateEval)
require.NoError(t, err)
require.Equal(t, structs.EvalStatusComplete, h.Evals[0].Status)
require.Len(t, h.Plans, 1, "plan")
// One followup delayed eval created
require.Len(t, h.CreateEvals, 1)
followUpEval := h.CreateEvals[0]
require.Equal(t, nodeStatusUpdateEval.ID, followUpEval.PreviousEval)
require.Equal(t, "pending", followUpEval.Status)
require.NotEmpty(t, followUpEval.WaitUntil)
// Insert eval in the state store
testutil.WaitForResult(func() (bool, error) {
found, err := h.State.EvalByID(nil, followUpEval.ID)
if err != nil {
return false, err
}
if found == nil {
return false, nil
}
require.Equal(t, nodeStatusUpdateEval.ID, found.PreviousEval)
require.Equal(t, "pending", found.Status)
require.NotEmpty(t, found.WaitUntil)
return true, nil
}, func(err error) {
require.NoError(t, err)
})
// Validate that the ClientStatus updates are part of the plan.
require.Len(t, h.Plans[0].NodeAllocation[disconnectedNode.ID], count)
// Pending update should have unknown status.
for _, nodeAlloc := range h.Plans[0].NodeAllocation[disconnectedNode.ID] {
require.Equal(t, nodeAlloc.ClientStatus, structs.AllocClientStatusUnknown)
}
// Simulate that NodeAllocation got processed.
err = h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), h.Plans[0].NodeAllocation[disconnectedNode.ID])
require.NoError(t, err, "plan.NodeUpdate")
// Validate that the StateStore Upsert applied the ClientStatus we specified.
for _, alloc := range unknownAllocs {
alloc, err = h.State.AllocByID(nil, alloc.ID)
require.NoError(t, err)
require.Equal(t, alloc.ClientStatus, structs.AllocClientStatusUnknown)
// Allocations have been transitioned to unknown
require.Equal(t, structs.AllocDesiredStatusRun, alloc.DesiredStatus)
require.Equal(t, structs.AllocClientStatusUnknown, alloc.ClientStatus)
}
}
func initNodeAndAllocs(t *testing.T, h *Harness, allocCount int,
maxClientDisconnect time.Duration, nodeStatus, clientStatus string) (*structs.Node, *structs.Job, []*structs.Allocation) {
// Node, which is ready
node := mock.Node()
node.Status = nodeStatus
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
// Job with allocations and max_client_disconnect
job := mock.Job()
job.TaskGroups[0].Count = allocCount
job.TaskGroups[0].MaxClientDisconnect = &maxClientDisconnect
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job))
allocs := make([]*structs.Allocation, allocCount)
for i := 0; i < allocCount; i++ {
// Alloc for the running group
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
alloc.DesiredStatus = structs.AllocDesiredStatusRun
alloc.ClientStatus = clientStatus
allocs[i] = alloc
}
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))
return node, job, allocs
}

View File

@ -78,6 +78,10 @@ type allocReconciler struct {
evalID string
evalPriority int
// supportsDisconnectedClients indicates whether all servers meet the required
// minimum version to allow application of max_client_disconnect configuration.
supportsDisconnectedClients bool
// now is the time used when determining rescheduling eligibility
// defaults to time.Now, and overridden in unit tests
now time.Time
@ -114,6 +118,14 @@ type reconcileResults struct {
// jobspec change.
attributeUpdates map[string]*structs.Allocation
// disconnectUpdates is the set of allocations are on disconnected nodes, but
// have not yet had their ClientStatus set to AllocClientStatusUnknown.
disconnectUpdates map[string]*structs.Allocation
// reconnectUpdates is the set of allocations that have ClientStatus set to
// AllocClientStatusUnknown, but the associated Node has reconnected.
reconnectUpdates map[string]*structs.Allocation
// desiredTGUpdates captures the desired set of changes to make for each
// task group.
desiredTGUpdates map[string]*structs.DesiredUpdates
@ -137,8 +149,8 @@ type delayedRescheduleInfo struct {
}
func (r *reconcileResults) GoString() string {
base := fmt.Sprintf("Total changes: (place %d) (destructive %d) (inplace %d) (stop %d)",
len(r.place), len(r.destructiveUpdate), len(r.inplaceUpdate), len(r.stop))
base := fmt.Sprintf("Total changes: (place %d) (destructive %d) (inplace %d) (stop %d) (disconnect %d) (reconnect %d)",
len(r.place), len(r.destructiveUpdate), len(r.inplaceUpdate), len(r.stop), len(r.disconnectUpdates), len(r.reconnectUpdates))
if r.deployment != nil {
base += fmt.Sprintf("\nCreated Deployment: %q", r.deployment.ID)
@ -163,21 +175,24 @@ func (r *reconcileResults) Changes() int {
func NewAllocReconciler(logger log.Logger, allocUpdateFn allocUpdateType, batch bool,
jobID string, job *structs.Job, deployment *structs.Deployment,
existingAllocs []*structs.Allocation, taintedNodes map[string]*structs.Node, evalID string,
evalPriority int) *allocReconciler {
evalPriority int, supportsDisconnectedClients bool) *allocReconciler {
return &allocReconciler{
logger: logger.Named("reconciler"),
allocUpdateFn: allocUpdateFn,
batch: batch,
jobID: jobID,
job: job,
deployment: deployment.Copy(),
existingAllocs: existingAllocs,
taintedNodes: taintedNodes,
evalID: evalID,
evalPriority: evalPriority,
now: time.Now(),
logger: logger.Named("reconciler"),
allocUpdateFn: allocUpdateFn,
batch: batch,
jobID: jobID,
job: job,
deployment: deployment.Copy(),
existingAllocs: existingAllocs,
taintedNodes: taintedNodes,
evalID: evalID,
evalPriority: evalPriority,
supportsDisconnectedClients: supportsDisconnectedClients,
now: time.Now(),
result: &reconcileResults{
attributeUpdates: make(map[string]*structs.Allocation),
disconnectUpdates: make(map[string]*structs.Allocation),
reconnectUpdates: make(map[string]*structs.Allocation),
desiredTGUpdates: make(map[string]*structs.DesiredUpdates),
desiredFollowupEvals: make(map[string][]*structs.Evaluation),
},
@ -326,11 +341,15 @@ func (a *allocReconciler) handleStop(m allocMatrix) {
}
}
// filterAndStopAll stops all allocations in an allocSet. This is useful in when
// stopping an entire job or task group.
func (a *allocReconciler) filterAndStopAll(set allocSet) uint64 {
untainted, migrate, lost := set.filterByTainted(a.taintedNodes)
untainted, migrate, lost, disconnecting, reconnecting, _ := set.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients, a.now)
a.markStop(untainted, "", allocNotNeeded)
a.markStop(migrate, "", allocNotNeeded)
a.markStop(lost, structs.AllocClientStatusLost, allocLost)
a.markStop(disconnecting, "", allocNotNeeded)
a.markStop(reconnecting, "", allocNotNeeded)
return uint64(len(set))
}
@ -387,16 +406,29 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
canaries, all := a.cancelUnneededCanaries(all, desiredChanges)
// Determine what set of allocations are on tainted nodes
untainted, migrate, lost := all.filterByTainted(a.taintedNodes)
untainted, migrate, lost, disconnecting, reconnecting, ignore := all.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients, a.now)
desiredChanges.Ignore += uint64(len(ignore))
// Determine what set of terminal allocations need to be rescheduled
untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, a.now, a.evalID, a.deployment)
untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, false, a.now, a.evalID, a.deployment)
// Determine what set of disconnecting allocations need to be rescheduled
_, rescheduleDisconnecting, _ := disconnecting.filterByRescheduleable(a.batch, true, a.now, a.evalID, a.deployment)
rescheduleNow = rescheduleNow.union(rescheduleDisconnecting)
// Find delays for any lost allocs that have stop_after_client_disconnect
lostLater := lost.delayByStopAfterClientDisconnect()
lostLaterEvals := a.createLostLaterEvals(lostLater, all, tg.Name)
lostLaterEvals := a.createLostLaterEvals(lostLater, tg.Name)
// Create batched follow up evaluations for allocations that are
// Find delays for any disconnecting allocs that have max_client_disconnect,
// create followup evals, and update the ClientStatus to unknown.
timeoutLaterEvals := a.createTimeoutLaterEvals(disconnecting, tg.Name)
// Merge disconnecting with the stop_after_client_disconnect set into the
// lostLaterEvals so that computeStop can add them to the stop set.
lostLaterEvals = helper.MergeMapStringString(lostLaterEvals, timeoutLaterEvals)
// Create batched follow-up evaluations for allocations that are
// reschedulable later and mark the allocations for in place updating
a.createRescheduleLaterEvals(rescheduleLater, all, tg.Name)
@ -408,10 +440,14 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
// Stop any unneeded allocations and update the untainted set to not
// include stopped allocations.
isCanarying := dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted
stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, isCanarying, lostLaterEvals)
stop, reconnecting := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, reconnecting, isCanarying, lostLaterEvals)
desiredChanges.Stop += uint64(len(stop))
untainted = untainted.difference(stop)
// Validate and add reconnecting allocs to the plan so that they will be logged.
a.computeReconnecting(reconnecting)
desiredChanges.Ignore += uint64(len(a.result.reconnectUpdates))
// Do inplace upgrades where possible and capture the set of upgrades that
// need to be done destructively.
ignore, inplace, destructive := a.computeUpdates(tg, untainted)
@ -442,9 +478,10 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
// * If there are any canaries that they have been promoted
// * There is no delayed stop_after_client_disconnect alloc, which delays scheduling for the whole group
// * An alloc was lost
// * There is not a corresponding reconnecting alloc.
var place []allocPlaceResult
if len(lostLater) == 0 {
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, lost, isCanarying)
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, lost, reconnecting, isCanarying)
if !existingDeployment {
dstate.DesiredTotal += len(place)
}
@ -541,10 +578,12 @@ func (a *allocReconciler) filterOldTerminalAllocs(all allocSet) (filtered, ignor
// cancelUnneededCanaries handles the canaries for the group by stopping the
// unneeded ones and returning the current set of canaries and the updated total
// set of allocs for the group
func (a *allocReconciler) cancelUnneededCanaries(all allocSet, desiredChanges *structs.DesiredUpdates) (canaries, newAll allocSet) {
func (a *allocReconciler) cancelUnneededCanaries(original allocSet, desiredChanges *structs.DesiredUpdates) (canaries, all allocSet) {
// Stop any canary from an older deployment or from a failed one
var stop []string
all = original
// Cancel any non-promoted canaries from the older deployment
if a.oldDeployment != nil {
for _, dstate := range a.oldDeployment.TaskGroups {
@ -579,7 +618,7 @@ func (a *allocReconciler) cancelUnneededCanaries(all allocSet, desiredChanges *s
}
canaries = all.fromKeys(canaryIDs)
untainted, migrate, lost := canaries.filterByTainted(a.taintedNodes)
untainted, migrate, lost, _, _, _ := canaries.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients, a.now)
a.markStop(migrate, "", allocMigrating)
a.markStop(lost, structs.AllocClientStatusLost, allocLost)
@ -587,7 +626,7 @@ func (a *allocReconciler) cancelUnneededCanaries(all allocSet, desiredChanges *s
all = all.difference(migrate, lost)
}
return canaries, all
return
}
// computeUnderProvisionedBy returns the number of allocs that still need to be
@ -639,7 +678,7 @@ func (a *allocReconciler) computeUnderProvisionedBy(group *structs.TaskGroup, un
//
// Placements will meet or exceed group count.
func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
nameIndex *allocNameIndex, untainted, migrate, reschedule, lost allocSet,
nameIndex *allocNameIndex, untainted, migrate, reschedule, lost, reconnecting allocSet,
isCanarying bool) []allocPlaceResult {
// Add rescheduled placement results
@ -658,9 +697,10 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
})
}
// Add replacements for lost allocs up to group.Count
existing := len(untainted) + len(migrate) + len(reschedule)
// Add replacements for disconnected and lost allocs up to group.Count
existing := len(untainted) + len(migrate) + len(reschedule) + len(reconnecting) - len(reconnecting.filterByFailedReconnect())
// Add replacements for lost
for _, alloc := range lost {
if existing >= group.Count {
// Reached desired count, do not replace remaining lost
@ -701,7 +741,17 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
// The input deploymentPlaceReady is calculated as the deployment is not paused, failed, or canarying.
// It returns the number of allocs still needed.
func (a *allocReconciler) computeReplacements(deploymentPlaceReady bool, desiredChanges *structs.DesiredUpdates,
place []allocPlaceResult, failed, lost allocSet, underProvisionedBy int) int {
place []allocPlaceResult, rescheduleNow, lost allocSet, underProvisionedBy int) int {
// Disconnecting allocs are not failing, but are included in rescheduleNow.
// Create a new set that only includes the actual failures and compute
// replacements based off that.
failed := make(allocSet)
for id, alloc := range rescheduleNow {
if _, ok := a.result.disconnectUpdates[id]; !ok {
failed[id] = alloc
}
}
// If the deployment is place ready, apply all placements and return
if deploymentPlaceReady {
@ -709,6 +759,7 @@ func (a *allocReconciler) computeReplacements(deploymentPlaceReady bool, desired
// This relies on the computePlacements having built this set, which in
// turn relies on len(lostLater) == 0.
a.result.place = append(a.result.place, place...)
a.markStop(failed, "", allocRescheduled)
desiredChanges.Stop += uint64(len(failed))
@ -730,13 +781,13 @@ func (a *allocReconciler) computeReplacements(deploymentPlaceReady bool, desired
}
// if no failures or there are no pending placements return.
if len(failed) == 0 || len(place) == 0 {
if len(rescheduleNow) == 0 || len(place) == 0 {
return underProvisionedBy
}
// Handle rescheduling of failed allocations even if the deployment is failed.
// If the placement is rescheduling, and not part of a failed deployment, add
// to the place set, and add the previous alloc to the stop set.
// to the place set. Add the previous alloc to the stop set unless it is disconnecting.
for _, p := range place {
prev := p.PreviousAllocation()
partOfFailedDeployment := a.deploymentFailed && prev != nil && a.deployment.ID == prev.DeploymentID
@ -745,6 +796,11 @@ func (a *allocReconciler) computeReplacements(deploymentPlaceReady bool, desired
a.result.place = append(a.result.place, p)
desiredChanges.Place++
_, prevIsDisconnecting := a.result.disconnectUpdates[prev.ID]
if prevIsDisconnecting {
continue
}
a.result.stop = append(a.result.stop, allocStopResult{
alloc: prev,
statusDescription: allocRescheduled,
@ -843,9 +899,7 @@ func (a *allocReconciler) isDeploymentComplete(groupName string, destructive, in
}
// Final check to see if the deployment is complete is to ensure everything is healthy
var ok bool
var dstate *structs.DeploymentState
if dstate, ok = a.deployment.TaskGroups[groupName]; ok {
if dstate, ok := a.deployment.TaskGroups[groupName]; ok {
if dstate.HealthyAllocs < helper.IntMax(dstate.DesiredTotal, dstate.DesiredCanaries) || // Make sure we have enough healthy allocs
(dstate.DesiredCanaries > 0 && !dstate.Promoted) { // Make sure we are promoted if we have canaries
complete = false
@ -859,22 +913,28 @@ func (a *allocReconciler) isDeploymentComplete(groupName string, destructive, in
// the group definition, the set of allocations in various states and whether we
// are canarying.
func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *allocNameIndex,
untainted, migrate, lost, canaries allocSet, isCanarying bool, followupEvals map[string]string) allocSet {
untainted, migrate, lost, canaries, reconnecting allocSet, isCanarying bool, followupEvals map[string]string) (allocSet, allocSet) {
// Mark all lost allocations for stop.
var stop allocSet
stop = stop.union(lost)
a.markDelayed(lost, structs.AllocClientStatusLost, allocLost, followupEvals)
// Mark all failed reconnects for stop.
failedReconnects := reconnecting.filterByFailedReconnect()
stop = stop.union(failedReconnects)
a.markStop(failedReconnects, structs.AllocClientStatusFailed, allocRescheduled)
reconnecting = reconnecting.difference(failedReconnects)
// If we are still deploying or creating canaries, don't stop them
if isCanarying {
untainted = untainted.difference(canaries)
}
// Hot path the nothing to do case
remove := len(untainted) + len(migrate) - group.Count
remove := len(untainted) + len(migrate) + len(reconnecting) - group.Count
if remove <= 0 {
return stop
return stop, reconnecting
}
// Filter out any terminal allocations from the untainted set
@ -896,7 +956,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
remove--
if remove == 0 {
return stop
return stop, reconnecting
}
}
}
@ -920,11 +980,19 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
remove--
if remove == 0 {
return stop
return stop, reconnecting
}
}
}
// Handle allocs that might be able to reconnect.
if len(reconnecting) != 0 {
remove = a.computeStopByReconnecting(untainted, reconnecting, stop, remove)
if remove == 0 {
return stop, reconnecting
}
}
// Select the allocs with the highest count to remove
removeNames := nameIndex.Highest(uint(remove))
for id, alloc := range untainted {
@ -938,7 +1006,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
remove--
if remove == 0 {
return stop
return stop, reconnecting
}
}
}
@ -955,11 +1023,95 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
remove--
if remove == 0 {
return stop
return stop, reconnecting
}
}
return stop
return stop, reconnecting
}
// computeStopByReconnecting moves allocations from either the untainted or reconnecting
// sets to the stop set and returns the number of allocations that still need to be removed.
func (a *allocReconciler) computeStopByReconnecting(untainted, reconnecting, stop allocSet, remove int) int {
if remove == 0 {
return remove
}
for _, reconnectingAlloc := range reconnecting {
// if the desired status is not run, or if the user-specified desired
// transition is not run, stop the reconnecting allocation.
if reconnectingAlloc.DesiredStatus != structs.AllocDesiredStatusRun ||
reconnectingAlloc.DesiredTransition.ShouldMigrate() ||
reconnectingAlloc.DesiredTransition.ShouldReschedule() ||
reconnectingAlloc.DesiredTransition.ShouldForceReschedule() ||
reconnectingAlloc.Job.Version < a.job.Version ||
reconnectingAlloc.Job.CreateIndex < a.job.CreateIndex {
stop[reconnectingAlloc.ID] = reconnectingAlloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: reconnectingAlloc,
statusDescription: allocNotNeeded,
})
delete(reconnecting, reconnectingAlloc.ID)
remove--
// if we've removed all we need to, stop iterating and return.
if remove == 0 {
return remove
}
continue
}
// Compare reconnecting to untainted and decide which to keep.
for _, untaintedAlloc := range untainted {
// If not a match by name and previous alloc continue
if reconnectingAlloc.Name != untaintedAlloc.Name {
continue
}
// By default, we prefer stopping the replacement alloc unless
// the replacement has a higher metrics score.
stopAlloc := untaintedAlloc
deleteSet := untainted
untaintedMaxScoreMeta := untaintedAlloc.Metrics.MaxNormScore()
reconnectingMaxScoreMeta := reconnectingAlloc.Metrics.MaxNormScore()
if untaintedMaxScoreMeta == nil {
a.logger.Error("error computing stop: replacement allocation metrics not available", "alloc_name", untaintedAlloc.Name, "alloc_id", untaintedAlloc.ID)
continue
}
if reconnectingMaxScoreMeta == nil {
a.logger.Error("error computing stop: reconnecting allocation metrics not available", "alloc_name", reconnectingAlloc.Name, "alloc_id", reconnectingAlloc.ID)
continue
}
statusDescription := allocNotNeeded
if untaintedAlloc.Job.Version > reconnectingAlloc.Job.Version ||
untaintedAlloc.Job.CreateIndex > reconnectingAlloc.Job.CreateIndex ||
untaintedMaxScoreMeta.NormScore > reconnectingMaxScoreMeta.NormScore {
stopAlloc = reconnectingAlloc
deleteSet = reconnecting
} else {
statusDescription = allocReconnected
}
stop[stopAlloc.ID] = stopAlloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: stopAlloc,
statusDescription: statusDescription,
})
delete(deleteSet, stopAlloc.ID)
remove--
// if we've removed all we need to, stop iterating and return.
if remove == 0 {
return remove
}
}
}
return remove
}
// computeUpdates determines which allocations for the passed group require
@ -994,7 +1146,7 @@ func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted all
// the followupEvalID
func (a *allocReconciler) createRescheduleLaterEvals(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) {
// followupEvals are created in the same way as for delayed lost allocs
allocIDToFollowupEvalID := a.createLostLaterEvals(rescheduleLater, all, tgName)
allocIDToFollowupEvalID := a.createLostLaterEvals(rescheduleLater, tgName)
// Create updates that will be applied to the allocs to mark the FollowupEvalID
for allocID, evalID := range allocIDToFollowupEvalID {
@ -1005,10 +1157,45 @@ func (a *allocReconciler) createRescheduleLaterEvals(rescheduleLater []*delayedR
}
}
// createLostLaterEvals creates batched followup evaluations with the WaitUntil field set for
// computeReconnecting copies existing allocations in the unknown state, but
// whose nodes have been identified as ready. The Allocations DesiredStatus is
// set to running, and these allocs are appended to the Plan as non-destructive
// updates. Clients are responsible for reconciling the DesiredState with the
// actual state as the node comes back online.
func (a *allocReconciler) computeReconnecting(reconnecting allocSet) {
if len(reconnecting) == 0 {
return
}
// Create updates that will be appended to the plan.
for _, alloc := range reconnecting {
// If the user has defined a DesiredTransition don't resume the alloc.
if alloc.DesiredTransition.ShouldMigrate() ||
alloc.DesiredTransition.ShouldReschedule() ||
alloc.DesiredTransition.ShouldForceReschedule() ||
alloc.Job.Version < a.job.Version ||
alloc.Job.CreateIndex < a.job.CreateIndex {
continue
}
// If the scheduler has defined a terminal DesiredStatus don't resume the alloc.
if alloc.DesiredStatus != structs.AllocDesiredStatusRun {
continue
}
// If the alloc has failed don't reconnect.
if alloc.ClientStatus != structs.AllocClientStatusRunning {
continue
}
a.result.reconnectUpdates[alloc.ID] = alloc
}
}
// handleDelayedLost creates batched followup evaluations with the WaitUntil field set for
// lost allocations. followupEvals are appended to a.result as a side effect, we return a
// map of alloc IDs to their followupEval IDs.
func (a *allocReconciler) createLostLaterEvals(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) map[string]string {
func (a *allocReconciler) createLostLaterEvals(rescheduleLater []*delayedRescheduleInfo, tgName string) map[string]string {
if len(rescheduleLater) == 0 {
return map[string]string{}
}
@ -1062,12 +1249,104 @@ func (a *allocReconciler) createLostLaterEvals(rescheduleLater []*delayedResched
emitRescheduleInfo(allocReschedInfo.alloc, eval)
}
a.result.desiredFollowupEvals[tgName] = evals
a.appendFollowupEvals(tgName, evals)
return allocIDToFollowupEvalID
}
// emitRescheduleInfo emits metrics about the reschedule decision of an evaluation. If a followup evaluation is
// createTimeoutLaterEvals creates followup evaluations with the
// WaitUntil field set for allocations in an unknown state on disconnected nodes.
// Followup Evals are appended to a.result as a side effect. It returns a map of
// allocIDs to their associated followUpEvalIDs.
func (a *allocReconciler) createTimeoutLaterEvals(disconnecting allocSet, tgName string) map[string]string {
if len(disconnecting) == 0 {
return map[string]string{}
}
timeoutDelays, err := disconnecting.delayByMaxClientDisconnect(a.now)
if err != nil || len(timeoutDelays) != len(disconnecting) {
a.logger.Error("error computing disconnecting timeouts for task_group", "task_group", tgName, "err", err)
return map[string]string{}
}
// Sort by time
sort.Slice(timeoutDelays, func(i, j int) bool {
return timeoutDelays[i].rescheduleTime.Before(timeoutDelays[j].rescheduleTime)
})
var evals []*structs.Evaluation
nextReschedTime := timeoutDelays[0].rescheduleTime
allocIDToFollowupEvalID := make(map[string]string, len(timeoutDelays))
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: a.job.Namespace,
Priority: a.evalPriority,
Type: a.job.Type,
TriggeredBy: structs.EvalTriggerMaxDisconnectTimeout,
JobID: a.job.ID,
JobModifyIndex: a.job.ModifyIndex,
Status: structs.EvalStatusPending,
StatusDescription: disconnectTimeoutFollowupEvalDesc,
WaitUntil: nextReschedTime,
}
evals = append(evals, eval)
// Important to remember that these are sorted. The rescheduleTime can only
// get farther into the future. If this loop detects the next delay is greater
// than the batch window (5s) it creates another batch.
for _, timeoutInfo := range timeoutDelays {
if timeoutInfo.rescheduleTime.Sub(nextReschedTime) < batchedFailedAllocWindowSize {
allocIDToFollowupEvalID[timeoutInfo.allocID] = eval.ID
} else {
// Start a new batch
nextReschedTime = timeoutInfo.rescheduleTime
// Create a new eval for the new batch
eval = &structs.Evaluation{
ID: uuid.Generate(),
Namespace: a.job.Namespace,
Priority: a.evalPriority,
Type: a.job.Type,
TriggeredBy: structs.EvalTriggerMaxDisconnectTimeout,
JobID: a.job.ID,
JobModifyIndex: a.job.ModifyIndex,
Status: structs.EvalStatusPending,
StatusDescription: disconnectTimeoutFollowupEvalDesc,
WaitUntil: timeoutInfo.rescheduleTime,
}
evals = append(evals, eval)
allocIDToFollowupEvalID[timeoutInfo.allocID] = eval.ID
}
emitRescheduleInfo(timeoutInfo.alloc, eval)
// Create updates that will be applied to the allocs to mark the FollowupEvalID
// and the unknown ClientStatus and AllocState.
updatedAlloc := timeoutInfo.alloc.Copy()
updatedAlloc.ClientStatus = structs.AllocClientStatusUnknown
updatedAlloc.AppendState(structs.AllocStateFieldClientStatus, structs.AllocClientStatusUnknown)
updatedAlloc.ClientDescription = allocUnknown
updatedAlloc.FollowupEvalID = eval.ID
a.result.disconnectUpdates[updatedAlloc.ID] = updatedAlloc
}
a.appendFollowupEvals(tgName, evals)
return allocIDToFollowupEvalID
}
// appendFollowupEvals appends a set of followup evals for a task group to the
// desiredFollowupEvals map which is later added to the scheduler's followUpEvals set.
func (a *allocReconciler) appendFollowupEvals(tgName string, evals []*structs.Evaluation) {
// Merge with
if existingFollowUpEvals, ok := a.result.desiredFollowupEvals[tgName]; ok {
evals = append(existingFollowUpEvals, evals...)
}
a.result.desiredFollowupEvals[tgName] = evals
}
// emitRescheduleInfo emits metrics about the rescheduling decision of an evaluation. If a followup evaluation is
// provided, the waitUntil time is emitted.
func emitRescheduleInfo(alloc *structs.Allocation, followupEval *structs.Evaluation) {
// Emit short-lived metrics data point. Note, these expire and stop emitting after about a minute.

File diff suppressed because it is too large Load Diff

View File

@ -209,17 +209,46 @@ func (a allocSet) fromKeys(keys ...[]string) allocSet {
}
// filterByTainted takes a set of tainted nodes and filters the allocation set
// into three groups:
// into the following groups:
// 1. Those that exist on untainted nodes
// 2. Those exist on nodes that are draining
// 3. Those that exist on lost nodes
func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node) (untainted, migrate, lost allocSet) {
// 3. Those that exist on lost nodes or have expired
// 4. Those that are on nodes that are disconnected, but have not had their ClientState set to unknown
// 5. Those that are on a node that has reconnected.
// 6. Those that are in a state that results in a noop.
func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, supportsDisconnectedClients bool, now time.Time) (untainted, migrate, lost, disconnecting, reconnecting, ignore allocSet) {
untainted = make(map[string]*structs.Allocation)
migrate = make(map[string]*structs.Allocation)
lost = make(map[string]*structs.Allocation)
disconnecting = make(map[string]*structs.Allocation)
reconnecting = make(map[string]*structs.Allocation)
ignore = make(map[string]*structs.Allocation)
for _, alloc := range a {
// Terminal allocs are always untainted as they should never be migrated
if alloc.TerminalStatus() {
reconnected := false
expired := false
// Only compute reconnected for unknown, running, and failed since they need to go through the reconnect logic.
if supportsDisconnectedClients &&
(alloc.ClientStatus == structs.AllocClientStatusUnknown ||
alloc.ClientStatus == structs.AllocClientStatusRunning ||
alloc.ClientStatus == structs.AllocClientStatusFailed) {
reconnected, expired = alloc.Reconnected()
}
// Failed reconnected allocs need to be added to reconnecting so that they
// can be handled as a failed reconnect.
if supportsDisconnectedClients &&
reconnected &&
alloc.DesiredStatus == structs.AllocDesiredStatusRun &&
alloc.ClientStatus == structs.AllocClientStatusFailed {
reconnecting[alloc.ID] = alloc
continue
}
// Terminal allocs, if not reconnected, are always untainted as they
// should never be migrated.
if alloc.TerminalStatus() && !reconnected {
untainted[alloc.ID] = alloc
continue
}
@ -230,13 +259,74 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node) (untain
continue
}
// Expired unknown allocs are lost
if supportsDisconnectedClients && alloc.Expired(now) {
lost[alloc.ID] = alloc
continue
}
// Ignore unknown allocs that we want to reconnect eventually.
if supportsDisconnectedClients &&
alloc.ClientStatus == structs.AllocClientStatusUnknown &&
alloc.DesiredStatus == structs.AllocDesiredStatusRun {
ignore[alloc.ID] = alloc
continue
}
// Ignore reconnected failed allocs that have been marked stop by the server.
if supportsDisconnectedClients &&
reconnected &&
alloc.ClientStatus == structs.AllocClientStatusFailed &&
alloc.DesiredStatus == structs.AllocDesiredStatusStop {
ignore[alloc.ID] = alloc
continue
}
taintedNode, ok := taintedNodes[alloc.NodeID]
if !ok {
// Node is untainted so alloc is untainted
// Filter allocs on a node that is now re-connected to be resumed.
if reconnected {
if expired {
lost[alloc.ID] = alloc
continue
}
reconnecting[alloc.ID] = alloc
continue
}
// Otherwise, Node is untainted so alloc is untainted
untainted[alloc.ID] = alloc
continue
}
if taintedNode != nil {
// Group disconnecting/reconnecting
switch taintedNode.Status {
case structs.NodeStatusDisconnected:
// Filter running allocs on a node that is disconnected to be marked as unknown.
if supportsDisconnectedClients && alloc.ClientStatus == structs.AllocClientStatusRunning {
disconnecting[alloc.ID] = alloc
continue
}
// Filter pending allocs on a node that is disconnected to be marked as lost.
if alloc.ClientStatus == structs.AllocClientStatusPending {
lost[alloc.ID] = alloc
continue
}
case structs.NodeStatusReady:
// Filter reconnecting allocs with replacements on a node that is now connected.
if reconnected {
if expired {
lost[alloc.ID] = alloc
continue
}
reconnecting[alloc.ID] = alloc
continue
}
default:
}
}
// Allocs on GC'd (nil) or lost nodes are Lost
if taintedNode == nil || taintedNode.TerminalStatus() {
lost[alloc.ID] = alloc
@ -246,6 +336,7 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node) (untain
// All other allocs are untainted
untainted[alloc.ID] = alloc
}
return
}
@ -253,23 +344,25 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node) (untain
// untainted or a set of allocations that must be rescheduled now. Allocations that can be rescheduled
// at a future time are also returned so that we can create follow up evaluations for them. Allocs are
// skipped or considered untainted according to logic defined in shouldFilter method.
func (a allocSet) filterByRescheduleable(isBatch bool, now time.Time, evalID string, deployment *structs.Deployment) (untainted, rescheduleNow allocSet, rescheduleLater []*delayedRescheduleInfo) {
func (a allocSet) filterByRescheduleable(isBatch, isDisconnecting bool, now time.Time, evalID string, deployment *structs.Deployment) (untainted, rescheduleNow allocSet, rescheduleLater []*delayedRescheduleInfo) {
untainted = make(map[string]*structs.Allocation)
rescheduleNow = make(map[string]*structs.Allocation)
// When filtering disconnected sets, the untainted set is never populated.
// It has no purpose in that context.
for _, alloc := range a {
var eligibleNow, eligibleLater bool
var rescheduleTime time.Time
// Ignore failing allocs that have already been rescheduled
// only failed allocs should be rescheduled, but protect against a bug allowing rescheduling
// running allocs
// Ignore failing allocs that have already been rescheduled.
// Only failed or disconnecting allocs should be rescheduled.
// Protects against a bug allowing rescheduling running allocs.
if alloc.NextAllocation != "" && alloc.TerminalStatus() {
continue
}
isUntainted, ignore := shouldFilter(alloc, isBatch)
if isUntainted {
if isUntainted && !isDisconnecting {
untainted[alloc.ID] = alloc
}
if isUntainted || ignore {
@ -277,9 +370,11 @@ func (a allocSet) filterByRescheduleable(isBatch bool, now time.Time, evalID str
}
// Only failed allocs with desired state run get to this point
// If the failed alloc is not eligible for rescheduling now we add it to the untainted set
eligibleNow, eligibleLater, rescheduleTime = updateByReschedulable(alloc, now, evalID, deployment)
if !eligibleNow {
// If the failed alloc is not eligible for rescheduling now we
// add it to the untainted set. Disconnecting delay evals are
// handled by allocReconciler.createTimeoutLaterEvals
eligibleNow, eligibleLater, rescheduleTime = updateByReschedulable(alloc, now, evalID, deployment, isDisconnecting)
if !isDisconnecting && !eligibleNow {
untainted[alloc.ID] = alloc
if eligibleLater {
rescheduleLater = append(rescheduleLater, &delayedRescheduleInfo{alloc.ID, alloc, rescheduleTime})
@ -341,7 +436,7 @@ func shouldFilter(alloc *structs.Allocation, isBatch bool) (untainted, ignore bo
// updateByReschedulable is a helper method that encapsulates logic for whether a failed allocation
// should be rescheduled now, later or left in the untainted set
func updateByReschedulable(alloc *structs.Allocation, now time.Time, evalID string, d *structs.Deployment) (rescheduleNow, rescheduleLater bool, rescheduleTime time.Time) {
func updateByReschedulable(alloc *structs.Allocation, now time.Time, evalID string, d *structs.Deployment, isDisconnecting bool) (rescheduleNow, rescheduleLater bool, rescheduleTime time.Time) {
// If the allocation is part of an ongoing active deployment, we only allow it to reschedule
// if it has been marked eligible
if d != nil && alloc.DeploymentID == d.ID && d.Active() && !alloc.DesiredTransition.ShouldReschedule() {
@ -354,7 +449,13 @@ func updateByReschedulable(alloc *structs.Allocation, now time.Time, evalID stri
}
// Reschedule if the eval ID matches the alloc's followup evalID or if its close to its reschedule time
rescheduleTime, eligible := alloc.NextRescheduleTime()
var eligible bool
if isDisconnecting {
rescheduleTime, eligible = alloc.NextRescheduleTimeByFailTime(now)
} else {
rescheduleTime, eligible = alloc.NextRescheduleTime()
}
if eligible && (alloc.FollowupEvalID == evalID || rescheduleTime.Sub(now) <= rescheduleWindowSize) {
rescheduleNow = true
return
@ -391,6 +492,19 @@ func (a allocSet) filterByDeployment(id string) (match, nonmatch allocSet) {
return
}
// filterByFailedReconnect filters allocation into a set that have failed on the
// client but do not have a terminal status at the server so that they can be
// marked as stop at the server.
func (a allocSet) filterByFailedReconnect() allocSet {
failed := make(allocSet)
for _, alloc := range a {
if !alloc.ServerTerminalStatus() && alloc.ClientStatus == structs.AllocClientStatusFailed {
failed[alloc.ID] = alloc
}
}
return failed
}
// delayByStopAfterClientDisconnect returns a delay for any lost allocation that's got a
// stop_after_client_disconnect configured
func (a allocSet) delayByStopAfterClientDisconnect() (later []*delayedRescheduleInfo) {
@ -413,6 +527,26 @@ func (a allocSet) delayByStopAfterClientDisconnect() (later []*delayedReschedule
return later
}
// delayByMaxClientDisconnect returns a delay for any unknown allocation
// that's got a max_client_reconnect configured
func (a allocSet) delayByMaxClientDisconnect(now time.Time) (later []*delayedRescheduleInfo, err error) {
for _, alloc := range a {
timeout := alloc.DisconnectTimeout(now)
if !timeout.After(now) {
continue
}
later = append(later, &delayedRescheduleInfo{
allocID: alloc.ID,
alloc: alloc,
rescheduleTime: timeout,
})
}
return
}
// allocNameIndex is used to select allocation names for placement or removal
// given an existing set of placed allocations.
type allocNameIndex struct {

View File

@ -8,6 +8,7 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
"time"
)
// Test that we properly create the bitmap even when the alloc set includes an
@ -39,8 +40,6 @@ func TestBitmapFrom(t *testing.T) {
func TestAllocSet_filterByTainted(t *testing.T) {
ci.Parallel(t)
require := require.New(t)
nodes := map[string]*structs.Node{
"draining": {
ID: "draining",
@ -55,82 +54,602 @@ func TestAllocSet_filterByTainted(t *testing.T) {
ID: "normal",
Status: structs.NodeStatusReady,
},
}
batchJob := &structs.Job{
Type: structs.JobTypeBatch,
}
allocs := allocSet{
// Non-terminal alloc with migrate=true should migrate on a draining node
"migrating1": {
ID: "migrating1",
ClientStatus: structs.AllocClientStatusRunning,
DesiredTransition: structs.DesiredTransition{Migrate: helper.BoolToPtr(true)},
Job: batchJob,
NodeID: "draining",
},
// Non-terminal alloc with migrate=true should migrate on an unknown node
"migrating2": {
ID: "migrating2",
ClientStatus: structs.AllocClientStatusRunning,
DesiredTransition: structs.DesiredTransition{Migrate: helper.BoolToPtr(true)},
Job: batchJob,
NodeID: "nil",
},
"untainted1": {
ID: "untainted1",
ClientStatus: structs.AllocClientStatusRunning,
Job: batchJob,
NodeID: "normal",
},
// Terminal allocs are always untainted
"untainted2": {
ID: "untainted2",
ClientStatus: structs.AllocClientStatusComplete,
Job: batchJob,
NodeID: "normal",
},
// Terminal allocs are always untainted, even on draining nodes
"untainted3": {
ID: "untainted3",
ClientStatus: structs.AllocClientStatusComplete,
Job: batchJob,
NodeID: "draining",
},
// Terminal allocs are always untainted, even on lost nodes
"untainted4": {
ID: "untainted4",
ClientStatus: structs.AllocClientStatusComplete,
Job: batchJob,
NodeID: "lost",
},
// Non-terminal allocs on lost nodes are lost
"lost1": {
ID: "lost1",
ClientStatus: structs.AllocClientStatusPending,
Job: batchJob,
NodeID: "lost",
},
// Non-terminal allocs on lost nodes are lost
"lost2": {
ID: "lost2",
ClientStatus: structs.AllocClientStatusRunning,
Job: batchJob,
NodeID: "lost",
"disconnected": {
ID: "disconnected",
Status: structs.NodeStatusDisconnected,
},
}
untainted, migrate, lost := allocs.filterByTainted(nodes)
require.Len(untainted, 4)
require.Contains(untainted, "untainted1")
require.Contains(untainted, "untainted2")
require.Contains(untainted, "untainted3")
require.Contains(untainted, "untainted4")
require.Len(migrate, 2)
require.Contains(migrate, "migrating1")
require.Contains(migrate, "migrating2")
require.Len(lost, 2)
require.Contains(lost, "lost1")
require.Contains(lost, "lost2")
testJob := mock.Job()
testJob.TaskGroups[0].MaxClientDisconnect = helper.TimeToPtr(5 * time.Second)
now := time.Now()
unknownAllocState := []*structs.AllocState{{
Field: structs.AllocStateFieldClientStatus,
Value: structs.AllocClientStatusUnknown,
Time: now,
}}
expiredAllocState := []*structs.AllocState{{
Field: structs.AllocStateFieldClientStatus,
Value: structs.AllocClientStatusUnknown,
Time: now.Add(-60 * time.Second),
}}
reconnectedEvent := structs.NewTaskEvent(structs.TaskClientReconnected)
reconnectedEvent.Time = time.Now().UnixNano()
reconnectTaskState := map[string]*structs.TaskState{
testJob.TaskGroups[0].Tasks[0].Name: {
Events: []*structs.TaskEvent{reconnectedEvent},
},
}
type testCase struct {
name string
all allocSet
taintedNodes map[string]*structs.Node
supportsDisconnectedClients bool
skipNilNodeTest bool
now time.Time
// expected results
untainted allocSet
migrate allocSet
lost allocSet
disconnecting allocSet
reconnecting allocSet
ignore allocSet
}
testCases := []testCase{
// These two cases test that we maintain parity with pre-disconnected-clients behavior.
{
name: "lost-client",
supportsDisconnectedClients: false,
now: time.Now(),
taintedNodes: nodes,
skipNilNodeTest: false,
all: allocSet{
"untainted1": {
ID: "untainted1",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
NodeID: "normal",
},
// Terminal allocs are always untainted
"untainted2": {
ID: "untainted2",
ClientStatus: structs.AllocClientStatusComplete,
Job: testJob,
NodeID: "normal",
},
// Terminal allocs are always untainted, even on draining nodes
"untainted3": {
ID: "untainted3",
ClientStatus: structs.AllocClientStatusComplete,
Job: testJob,
NodeID: "draining",
},
// Terminal allocs are always untainted, even on lost nodes
"untainted4": {
ID: "untainted4",
ClientStatus: structs.AllocClientStatusComplete,
Job: testJob,
NodeID: "lost",
},
// Non-terminal alloc with migrate=true should migrate on a draining node
"migrating1": {
ID: "migrating1",
ClientStatus: structs.AllocClientStatusRunning,
DesiredTransition: structs.DesiredTransition{Migrate: helper.BoolToPtr(true)},
Job: testJob,
NodeID: "draining",
},
// Non-terminal alloc with migrate=true should migrate on an unknown node
"migrating2": {
ID: "migrating2",
ClientStatus: structs.AllocClientStatusRunning,
DesiredTransition: structs.DesiredTransition{Migrate: helper.BoolToPtr(true)},
Job: testJob,
NodeID: "nil",
},
},
untainted: allocSet{
"untainted1": {
ID: "untainted1",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
NodeID: "normal",
},
// Terminal allocs are always untainted
"untainted2": {
ID: "untainted2",
ClientStatus: structs.AllocClientStatusComplete,
Job: testJob,
NodeID: "normal",
},
// Terminal allocs are always untainted, even on draining nodes
"untainted3": {
ID: "untainted3",
ClientStatus: structs.AllocClientStatusComplete,
Job: testJob,
NodeID: "draining",
},
// Terminal allocs are always untainted, even on lost nodes
"untainted4": {
ID: "untainted4",
ClientStatus: structs.AllocClientStatusComplete,
Job: testJob,
NodeID: "lost",
},
},
migrate: allocSet{
// Non-terminal alloc with migrate=true should migrate on a draining node
"migrating1": {
ID: "migrating1",
ClientStatus: structs.AllocClientStatusRunning,
DesiredTransition: structs.DesiredTransition{Migrate: helper.BoolToPtr(true)},
Job: testJob,
NodeID: "draining",
},
// Non-terminal alloc with migrate=true should migrate on an unknown node
"migrating2": {
ID: "migrating2",
ClientStatus: structs.AllocClientStatusRunning,
DesiredTransition: structs.DesiredTransition{Migrate: helper.BoolToPtr(true)},
Job: testJob,
NodeID: "nil",
},
},
disconnecting: allocSet{},
reconnecting: allocSet{},
ignore: allocSet{},
lost: allocSet{},
},
{
name: "lost-client-only-tainted-nodes",
supportsDisconnectedClients: false,
now: time.Now(),
taintedNodes: nodes,
// The logic associated with this test case can only trigger if there
// is a tainted node. Therefore, testing with a nil node set produces
// false failures, so don't perform that test if in this case.
skipNilNodeTest: true,
all: allocSet{
// Non-terminal allocs on lost nodes are lost
"lost1": {
ID: "lost1",
ClientStatus: structs.AllocClientStatusPending,
Job: testJob,
NodeID: "lost",
},
// Non-terminal allocs on lost nodes are lost
"lost2": {
ID: "lost2",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
NodeID: "lost",
},
},
untainted: allocSet{},
migrate: allocSet{},
disconnecting: allocSet{},
reconnecting: allocSet{},
ignore: allocSet{},
lost: allocSet{
// Non-terminal allocs on lost nodes are lost
"lost1": {
ID: "lost1",
ClientStatus: structs.AllocClientStatusPending,
Job: testJob,
NodeID: "lost",
},
// Non-terminal allocs on lost nodes are lost
"lost2": {
ID: "lost2",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
NodeID: "lost",
},
},
},
// Everything below this line tests the disconnected client mode.
{
name: "disco-client-untainted-reconnect-failed-and-replaced",
supportsDisconnectedClients: true,
now: time.Now(),
taintedNodes: nodes,
skipNilNodeTest: false,
all: allocSet{
"running-replacement": {
ID: "running-replacement",
Name: "web",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
PreviousAllocation: "failed-original",
},
// Failed and replaced allocs on reconnected nodes are untainted
"failed-original": {
ID: "failed-original",
Name: "web",
ClientStatus: structs.AllocClientStatusFailed,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
TaskStates: reconnectTaskState,
},
},
untainted: allocSet{
"running-replacement": {
ID: "running-replacement",
Name: "web",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
PreviousAllocation: "failed-original",
},
},
migrate: allocSet{},
disconnecting: allocSet{},
reconnecting: allocSet{
"failed-original": {
ID: "failed-original",
Name: "web",
ClientStatus: structs.AllocClientStatusFailed,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
TaskStates: reconnectTaskState,
},
},
ignore: allocSet{},
lost: allocSet{},
},
{
name: "disco-client-reconnecting-running-no-replacement",
supportsDisconnectedClients: true,
now: time.Now(),
taintedNodes: nodes,
skipNilNodeTest: false,
all: allocSet{
// Running allocs on reconnected nodes with no replacement are reconnecting.
// Node.UpdateStatus has already handled syncing client state so this
// should be a noop.
"reconnecting-running-no-replacement": {
ID: "reconnecting-running-no-replacement",
Name: "web",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
TaskStates: reconnectTaskState,
},
},
untainted: allocSet{},
migrate: allocSet{},
disconnecting: allocSet{},
reconnecting: allocSet{
"reconnecting-running-no-replacement": {
ID: "reconnecting-running-no-replacement",
Name: "web",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
TaskStates: reconnectTaskState,
},
},
ignore: allocSet{},
lost: allocSet{},
},
{
name: "disco-client-terminal",
supportsDisconnectedClients: true,
now: time.Now(),
taintedNodes: nodes,
skipNilNodeTest: false,
all: allocSet{
// Allocs on reconnected nodes that are complete are untainted
"untainted-reconnect-complete": {
ID: "untainted-reconnect-complete",
Name: "untainted-reconnect-complete",
ClientStatus: structs.AllocClientStatusComplete,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
TaskStates: reconnectTaskState,
},
// Failed allocs on reconnected nodes are in reconnecting so that
// they be marked with desired status stop at the server.
"reconnecting-failed": {
ID: "reconnecting-failed",
Name: "reconnecting-failed",
ClientStatus: structs.AllocClientStatusFailed,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
TaskStates: reconnectTaskState,
},
// Lost allocs on reconnected nodes don't get restarted
"untainted-reconnect-lost": {
ID: "untainted-reconnect-lost",
Name: "untainted-reconnect-lost",
ClientStatus: structs.AllocClientStatusLost,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
TaskStates: reconnectTaskState,
},
// Replacement allocs that are complete are untainted
"untainted-reconnect-complete-replacement": {
ID: "untainted-reconnect-complete-replacement",
Name: "untainted-reconnect-complete",
ClientStatus: structs.AllocClientStatusComplete,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
PreviousAllocation: "untainted-reconnect-complete",
},
// Replacement allocs on reconnected nodes that are failed are untainted
"untainted-reconnect-failed-replacement": {
ID: "untainted-reconnect-failed-replacement",
Name: "untainted-reconnect-failed",
ClientStatus: structs.AllocClientStatusFailed,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
PreviousAllocation: "reconnecting-failed",
},
// Lost replacement allocs on reconnected nodes don't get restarted
"untainted-reconnect-lost-replacement": {
ID: "untainted-reconnect-lost-replacement",
Name: "untainted-reconnect-lost",
ClientStatus: structs.AllocClientStatusLost,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
PreviousAllocation: "untainted-reconnect-lost",
},
},
untainted: allocSet{
"untainted-reconnect-complete": {
ID: "untainted-reconnect-complete",
Name: "untainted-reconnect-complete",
ClientStatus: structs.AllocClientStatusComplete,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
TaskStates: reconnectTaskState,
},
"untainted-reconnect-lost": {
ID: "untainted-reconnect-lost",
Name: "untainted-reconnect-lost",
ClientStatus: structs.AllocClientStatusLost,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
TaskStates: reconnectTaskState,
},
"untainted-reconnect-complete-replacement": {
ID: "untainted-reconnect-complete-replacement",
Name: "untainted-reconnect-complete",
ClientStatus: structs.AllocClientStatusComplete,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
PreviousAllocation: "untainted-reconnect-complete",
},
"untainted-reconnect-failed-replacement": {
ID: "untainted-reconnect-failed-replacement",
Name: "untainted-reconnect-failed",
ClientStatus: structs.AllocClientStatusFailed,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
PreviousAllocation: "reconnecting-failed",
},
"untainted-reconnect-lost-replacement": {
ID: "untainted-reconnect-lost-replacement",
Name: "untainted-reconnect-lost",
ClientStatus: structs.AllocClientStatusLost,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
PreviousAllocation: "untainted-reconnect-lost",
},
},
migrate: allocSet{},
disconnecting: allocSet{},
reconnecting: allocSet{
"reconnecting-failed": {
ID: "reconnecting-failed",
Name: "reconnecting-failed",
ClientStatus: structs.AllocClientStatusFailed,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
TaskStates: reconnectTaskState,
},
},
ignore: allocSet{},
lost: allocSet{},
},
{
name: "disco-client-disconnect",
supportsDisconnectedClients: true,
now: time.Now(),
taintedNodes: nodes,
skipNilNodeTest: true,
all: allocSet{
// Non-terminal allocs on disconnected nodes are disconnecting
"disconnect-running": {
ID: "disconnect-running",
Name: "disconnect-running",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
NodeID: "disconnected",
TaskGroup: "web",
},
// Unknown allocs on disconnected nodes are ignored
"ignore-unknown": {
ID: "ignore-unknown",
Name: "ignore-unknown",
ClientStatus: structs.AllocClientStatusUnknown,
DesiredStatus: structs.AllocDesiredStatusRun,
Job: testJob,
NodeID: "disconnected",
TaskGroup: "web",
AllocStates: unknownAllocState,
},
// Unknown allocs on disconnected nodes are lost when expired
"lost-unknown": {
ID: "lost-unknown",
Name: "lost-unknown",
ClientStatus: structs.AllocClientStatusUnknown,
Job: testJob,
NodeID: "disconnected",
TaskGroup: "web",
AllocStates: expiredAllocState,
},
},
untainted: allocSet{},
migrate: allocSet{},
disconnecting: allocSet{
"disconnect-running": {
ID: "disconnect-running",
Name: "disconnect-running",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
NodeID: "disconnected",
TaskGroup: "web",
},
},
reconnecting: allocSet{},
ignore: allocSet{
// Unknown allocs on disconnected nodes are ignored
"ignore-unknown": {
ID: "ignore-unknown",
Name: "ignore-unknown",
ClientStatus: structs.AllocClientStatusUnknown,
DesiredStatus: structs.AllocDesiredStatusRun,
Job: testJob,
NodeID: "disconnected",
TaskGroup: "web",
AllocStates: unknownAllocState,
},
},
lost: allocSet{
"lost-unknown": {
ID: "lost-unknown",
Name: "lost-unknown",
ClientStatus: structs.AllocClientStatusUnknown,
Job: testJob,
NodeID: "disconnected",
TaskGroup: "web",
AllocStates: expiredAllocState,
},
},
},
{
name: "disco-client-running-reconnecting-and-replacement-untainted",
supportsDisconnectedClients: true,
now: time.Now(),
taintedNodes: nodes,
skipNilNodeTest: false,
all: allocSet{
"running-replacement": {
ID: "running-replacement",
Name: "web",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
PreviousAllocation: "running-original",
},
// Running and replaced allocs on reconnected nodes are reconnecting
"running-original": {
ID: "running-original",
Name: "web",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
TaskStates: reconnectTaskState,
},
},
untainted: allocSet{
"running-replacement": {
ID: "running-replacement",
Name: "web",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
PreviousAllocation: "running-original",
},
},
migrate: allocSet{},
disconnecting: allocSet{},
reconnecting: allocSet{
"running-original": {
ID: "running-original",
Name: "web",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
TaskStates: reconnectTaskState,
},
},
ignore: allocSet{},
lost: allocSet{},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// With tainted nodes
untainted, migrate, lost, disconnecting, reconnecting, ignore := tc.all.filterByTainted(tc.taintedNodes, tc.supportsDisconnectedClients, tc.now)
require.Equal(t, tc.untainted, untainted, "with-nodes", "untainted")
require.Equal(t, tc.migrate, migrate, "with-nodes", "migrate")
require.Equal(t, tc.lost, lost, "with-nodes", "lost")
require.Equal(t, tc.disconnecting, disconnecting, "with-nodes", "disconnecting")
require.Equal(t, tc.reconnecting, reconnecting, "with-nodes", "reconnecting")
require.Equal(t, tc.ignore, ignore, "with-nodes", "ignore")
if tc.skipNilNodeTest {
return
}
// Now again with nodes nil
untainted, migrate, lost, disconnecting, reconnecting, ignore = tc.all.filterByTainted(nil, tc.supportsDisconnectedClients, tc.now)
require.Equal(t, tc.untainted, untainted, "nodes-nil", "untainted")
require.Equal(t, tc.migrate, migrate, "nodes-nil", "migrate")
require.Equal(t, tc.lost, lost, "nodes-nil", "lost")
require.Equal(t, tc.disconnecting, disconnecting, "nodes-nil", "disconnecting")
require.Equal(t, tc.reconnecting, reconnecting, "nodes-nil", "reconnecting")
require.Equal(t, tc.ignore, ignore, "nodes-nil", "ignore")
})
}
}

View File

@ -6,6 +6,7 @@ import (
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -132,4 +133,9 @@ type Planner interface {
// evaluation must exist in a blocked state prior to this being called such
// that on leader changes, the evaluation will be reblocked properly.
ReblockEval(*structs.Evaluation) error
// ServersMeetMinimumVersion returns whether the Nomad servers are at least on the
// given Nomad version. The checkFailedServers parameter specifies whether version
// for the failed servers should be verified.
ServersMeetMinimumVersion(minVersion *version.Version, checkFailedServers bool) bool
}

View File

@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
@ -19,6 +20,10 @@ type RejectPlan struct {
Harness *Harness
}
func (r *RejectPlan) ServersMeetMinimumVersion(minVersion *version.Version, checkFailedServers bool) bool {
return r.Harness.serversMeetMinimumVersion
}
func (r *RejectPlan) SubmitPlan(*structs.Plan) (*structs.PlanResult, State, error) {
result := new(structs.PlanResult)
result.RefreshIndex = r.Harness.NextIndex()
@ -55,16 +60,18 @@ type Harness struct {
nextIndex uint64
nextIndexLock sync.Mutex
optimizePlan bool
optimizePlan bool
serversMeetMinimumVersion bool
}
// NewHarness is used to make a new testing harness
func NewHarness(t testing.TB) *Harness {
state := state.TestStateStore(t)
h := &Harness{
t: t,
State: state,
nextIndex: 1,
t: t,
State: state,
nextIndex: 1,
serversMeetMinimumVersion: true,
}
return h
}
@ -243,6 +250,10 @@ func (h *Harness) ReblockEval(eval *structs.Evaluation) error {
return nil
}
func (h *Harness) ServersMeetMinimumVersion(_ *version.Version, _ bool) bool {
return h.serversMeetMinimumVersion
}
// NextIndex returns the next index
func (h *Harness) NextIndex() uint64 {
h.nextIndexLock.Lock()

View File

@ -350,8 +350,9 @@ func progressMade(result *structs.PlanResult) bool {
}
// taintedNodes is used to scan the allocations and then check if the
// underlying nodes are tainted, and should force a migration of the allocation.
// All the nodes returned in the map are tainted.
// underlying nodes are tainted, and should force a migration of the allocation,
// or if the underlying nodes are disconnected, and should be used to calculate
// the reconnect timeout of its allocations. All the nodes returned in the map are tainted.
func taintedNodes(state State, allocs []*structs.Allocation) (map[string]*structs.Node, error) {
out := make(map[string]*structs.Node)
for _, alloc := range allocs {
@ -373,7 +374,15 @@ func taintedNodes(state State, allocs []*structs.Allocation) (map[string]*struct
if structs.ShouldDrainNode(node.Status) || node.DrainStrategy != nil {
out[alloc.NodeID] = node
}
// Disconnected nodes are included in the tainted set so that their
// MaxClientDisconnect configuration can be included in the
// timeout calculation.
if node.Status == structs.NodeStatusDisconnected {
out[alloc.NodeID] = node
}
}
return out, nil
}

View File

@ -614,6 +614,7 @@ $ curl \
"SizeMB": 300,
"Sticky": false
},
"MaxClientDisconnect": 300000000000,
"Meta": null,
"Migrate": {
"HealthCheck": "checks",
@ -827,6 +828,7 @@ $ curl \
"SizeMB": 300,
"Sticky": false
},
"MaxClientDisconnect": null,
"Meta": null,
"Migrate": {
"HealthCheck": "checks",
@ -994,6 +996,7 @@ $ curl \
"SizeMB": 300,
"Sticky": false
},
"MaxClientDisconnect": null,
"Meta": null,
"Migrate": {
"HealthCheck": "checks",

View File

@ -81,19 +81,24 @@ job "docs" {
own [`shutdown_delay`](/docs/job-specification/task#shutdown_delay)
which waits between deregistering task services and stopping the task.
- `stop_after_client_disconnect` `(string: "")` - Specifies a duration
after which a Nomad client that cannot communicate with the servers
will stop allocations based on this task group. By default, a client
will not stop an allocation until explicitly told to by a server. A
client that fails to heartbeat to a server within the
[`heartbeat_grace`] window and any allocations running on it will be
marked "lost" and Nomad will schedule replacement
allocations. However, these replaced allocations will continue to
run on the non-responsive client; an operator may desire that these
replaced allocations are also stopped in this case — for example,
allocations requiring exclusive access to an external resource. When
specified, the Nomad client will stop them after this duration. The
Nomad client process must be running for this to occur.
- `stop_after_client_disconnect` `(string: "")` - Specifies a duration after
which a Nomad client will stop allocations, if it cannot communicate with the
servers. By default, a client will not stop an allocation until explicitly
told to by a server. A client that fails to heartbeat to a server within the
[`heartbeat_grace`] window and any allocations running on it will be marked
"lost" and Nomad will schedule replacement allocations. The replaced
allocations will normally continue to run on the non-responsive client. But
you may want them to stop instead — for example, allocations requiring
exclusive access to an external resource. When specified, the Nomad client
will stop them after this duration.
The Nomad client process must be running for this to occur. This setting
cannot be used with [`max_client_disconnect`].
- `max_client_disconnect` `(string: "")` - Specifies a duration during which a
Nomad client will attempt to reconnect allocations after it fails to heartbeat
in the [`heartbeat_grace`] window. See [the example code
below][max-client-disconnect] for more details. This setting cannot be used
with [`stop_after_client_disconnect`].
- `task` <code>([Task][]: &lt;required&gt;)</code> - Specifies one or more tasks to run
within this group. This can be specified multiple times, to add a task as part
@ -255,6 +260,75 @@ group "second" {
}
```
### Max Client Disconnect
`max_client_disconnect` specifies a duration during which a Nomad client will
attempt to reconnect allocations after it fails to heartbeat in the
[`heartbeat_grace`] window.
By default, allocations running on a client that fails to heartbeat will be
marked "lost". When a client reconnects, its allocations, which may still be
healthy, will restart because they have been marked "lost". This can cause
issues with stateful tasks or tasks with long restart times.
Instead, an operator may desire that these allocations reconnect without a
restart. When `max_client_disconnect` is specified, the Nomad server will mark
clients that fail to heartbeat as "disconnected" rather than "down", and will
mark allocations on a disconnected client as "unknown" rather than "lost". These
allocations may continue to run on the disconnected client. Replacement
allocations will be scheduled according to the allocations' reschedule policy
until the disconnected client reconnects. Once a disconnected client reconnects,
Nomad will compare the "unknown" allocations with their replacements and keep
the one with the best node score. If the `max_client_disconnect` duration
expires before the client reconnects, the allocations will be marked "lost".
Clients that contain "unknown" allocations will transition to "disconnected"
rather than "down" until the last `max_client_disconnect` duration has expired.
In the example code below, if both of these task groups were placed on the same
client and that client experienced a network outage, both of the group's
allocations would be marked as "disconnected" at two minutes because of the
client's `heartbeat_grace` value of "2m". If the network outage continued for
eight hours, and the client continued to fail to heartbeat, the client would
remain in a "disconnected" state, as the first group's `max_client_disconnect`
is twelve hours. Once all groups' `max_client_disconnect` durations are
exceeded, in this case in twelve hours, the client node will be marked as "down"
and the allocation will be marked as "lost". If the client had reconnected
before twelve hours had passed, the allocations would gracefully reconnect
without a restart.
Max Client Disconnect is useful for edge deployments, or scenarios when
operators want zero on-client downtime due to node connectivity issues. This
setting cannot be used with [`stop_after_client_disconnect`].
```hcl
# client_config.hcl
client {
enabled = true
heartbeat_grace = "2m"
}
```
```hcl
# jobspec.nomad
group "first" {
max_client_disconnect = "12h"
task "first-task" {
...
}
}
group "second" {
max_client_disconnect = "6h"
task "second-task" {
...
}
}
```
[task]: /docs/job-specification/task 'Nomad task Job Specification'
[job]: /docs/job-specification/job 'Nomad job Job Specification'
[constraint]: /docs/job-specification/constraint 'Nomad constraint Job Specification'
@ -264,6 +338,9 @@ group "second" {
[affinity]: /docs/job-specification/affinity 'Nomad affinity Job Specification'
[ephemeraldisk]: /docs/job-specification/ephemeral_disk 'Nomad ephemeral_disk Job Specification'
[`heartbeat_grace`]: /docs/configuration/server#heartbeat_grace
[`max_client_disconnect`]: /docs/job-specification/group#max_client_disconnect
[max-client-disconnect]: /docs/job-specification/group#max-client-disconnect 'the example code below'
[`stop_after_client_disconnect`]: /docs/job-specification/group#stop_after_client_disconnect
[meta]: /docs/job-specification/meta 'Nomad meta Job Specification'
[migrate]: /docs/job-specification/migrate 'Nomad migrate Job Specification'
[network]: /docs/job-specification/network 'Nomad network Job Specification'

View File

@ -211,6 +211,7 @@ Job summary metrics are emitted by the Nomad leader server.
| `nomad.nomad.job_summary.complete` | Number of complete allocations for a job | Integer | Gauge | host, job, namespace, task_group |
| `nomad.nomad.job_summary.failed` | Number of failed allocations for a job | Integer | Gauge | host, job, namespace, task_group |
| `nomad.nomad.job_summary.lost` | Number of lost allocations for a job | Integer | Gauge | host, job, namespace, task_group |
| `nomad.nomad.job_summary.unknown` | Number of unknown allocations for a job | Integer | Gauge | host, job, namespace, task_group |
| `nomad.nomad.job_summary.queued` | Number of queued allocations for a job | Integer | Gauge | host, job, namespace, task_group |
| `nomad.nomad.job_summary.running` | Number of running allocations for a job | Integer | Gauge | host, job, namespace, task_group |
| `nomad.nomad.job_summary.starting` | Number of starting allocations for a job | Integer | Gauge | host, job, namespace, task_group |