Merge pull request #3981 from hashicorp/f-delayed-scheduling

Delayed rescheduling
This commit is contained in:
Preetha 2018-03-14 16:31:18 -05:00 committed by GitHub
commit aeed9e3cba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 2817 additions and 191 deletions

View File

@ -86,6 +86,7 @@ type Allocation struct {
TaskStates map[string]*TaskState
DeploymentID string
DeploymentStatus *AllocDeploymentStatus
FollowupEvalID string
PreviousAllocation string
NextAllocation string
RescheduleTracker *RescheduleTracker
@ -129,6 +130,7 @@ type AllocationListStub struct {
TaskStates map[string]*TaskState
DeploymentStatus *AllocDeploymentStatus
RescheduleTracker *RescheduleTracker
FollowupEvalID string
CreateIndex uint64
ModifyIndex uint64
CreateTime int64

View File

@ -67,6 +67,7 @@ type Evaluation struct {
Status string
StatusDescription string
Wait time.Duration
WaitUntil time.Time
NextEval string
PreviousEval string
BlockedEval string

View File

@ -136,8 +136,12 @@ func TestJobs_Canonicalize(t *testing.T) {
Mode: helper.StringToPtr("fail"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(2),
Interval: helper.TimeToPtr(1 * time.Hour),
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0),
DelayFunction: helper.StringToPtr("exponential"),
Delay: helper.TimeToPtr(30 * time.Second),
MaxDelay: helper.TimeToPtr(1 * time.Hour),
Unlimited: helper.BoolToPtr(true),
},
Tasks: []*Task{
{
@ -202,8 +206,12 @@ func TestJobs_Canonicalize(t *testing.T) {
Mode: helper.StringToPtr("fail"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(2),
Interval: helper.TimeToPtr(1 * time.Hour),
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0),
DelayFunction: helper.StringToPtr("exponential"),
Delay: helper.TimeToPtr(30 * time.Second),
MaxDelay: helper.TimeToPtr(1 * time.Hour),
Unlimited: helper.BoolToPtr(true),
},
Tasks: []*Task{
{
@ -335,8 +343,12 @@ func TestJobs_Canonicalize(t *testing.T) {
Mode: helper.StringToPtr("delay"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(2),
Interval: helper.TimeToPtr(1 * time.Hour),
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0),
DelayFunction: helper.StringToPtr("exponential"),
Delay: helper.TimeToPtr(30 * time.Second),
MaxDelay: helper.TimeToPtr(1 * time.Hour),
Unlimited: helper.BoolToPtr(true),
},
EphemeralDisk: &EphemeralDisk{
Sticky: helper.BoolToPtr(false),
@ -550,8 +562,12 @@ func TestJobs_Canonicalize(t *testing.T) {
Mode: helper.StringToPtr("fail"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(2),
Interval: helper.TimeToPtr(1 * time.Hour),
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0),
DelayFunction: helper.StringToPtr("exponential"),
Delay: helper.TimeToPtr(30 * time.Second),
MaxDelay: helper.TimeToPtr(1 * time.Hour),
Unlimited: helper.BoolToPtr(true),
},
Update: &UpdateStrategy{
Stagger: helper.TimeToPtr(2 * time.Second),
@ -586,8 +602,12 @@ func TestJobs_Canonicalize(t *testing.T) {
Mode: helper.StringToPtr("fail"),
},
ReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(2),
Interval: helper.TimeToPtr(1 * time.Hour),
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0),
DelayFunction: helper.StringToPtr("exponential"),
Delay: helper.TimeToPtr(30 * time.Second),
MaxDelay: helper.TimeToPtr(1 * time.Hour),
Unlimited: helper.BoolToPtr(true),
},
Update: &UpdateStrategy{
Stagger: helper.TimeToPtr(1 * time.Second),

View File

@ -4,11 +4,11 @@ import (
"testing"
"github.com/hashicorp/nomad/api/contexts"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestSearch_List(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
t.Parallel()
c, s := makeClient(t, nil, nil)
@ -16,16 +16,17 @@ func TestSearch_List(t *testing.T) {
job := testJob()
_, _, err := c.Jobs().Register(job, nil)
assert.Nil(err)
require.Nil(err)
id := *job.ID
prefix := id[:len(id)-2]
resp, qm, err := c.Search().PrefixSearch(prefix, contexts.Jobs, nil)
assert.Nil(err)
assert.NotNil(qm)
require.Nil(err)
require.NotNil(qm)
require.NotNil(qm)
jobMatches := resp.Matches[contexts.Jobs]
assert.Equal(1, len(jobMatches))
assert.Equal(id, jobMatches[0])
require.Equal(1, len(jobMatches))
require.Equal(id, jobMatches[0])
}

View File

@ -86,6 +86,20 @@ type ReschedulePolicy struct {
// Interval is a duration in which we can limit the number of reschedule attempts.
Interval *time.Duration `mapstructure:"interval"`
// Delay is a minimum duration to wait between reschedule attempts.
// The delay function determines how much subsequent reschedule attempts are delayed by.
Delay *time.Duration `mapstructure:"delay"`
// DelayFunction determines how the delay progressively changes on subsequent reschedule
// attempts. Valid values are "exponential", "linear", and "fibonacci".
DelayFunction *string `mapstructure:"delay_function"`
// MaxDelay is an upper bound on the delay.
MaxDelay *time.Duration `mapstructure:"max_delay"`
// Unlimited allows rescheduling attempts until they succeed
Unlimited *bool `mapstructure:"unlimited"`
}
func (r *ReschedulePolicy) Merge(rp *ReschedulePolicy) {
@ -95,6 +109,18 @@ func (r *ReschedulePolicy) Merge(rp *ReschedulePolicy) {
if rp.Attempts != nil {
r.Attempts = rp.Attempts
}
if rp.Delay != nil {
r.Delay = rp.Delay
}
if rp.DelayFunction != nil {
r.DelayFunction = rp.DelayFunction
}
if rp.MaxDelay != nil {
r.MaxDelay = rp.MaxDelay
}
if rp.Unlimited != nil {
r.Unlimited = rp.Unlimited
}
}
func (r *ReschedulePolicy) Copy() *ReschedulePolicy {
@ -316,13 +342,21 @@ func (g *TaskGroup) Canonicalize(job *Job) {
switch *job.Type {
case "service":
defaultReschedulePolicy = &ReschedulePolicy{
Attempts: helper.IntToPtr(structs.DefaultServiceJobReschedulePolicy.Attempts),
Interval: helper.TimeToPtr(structs.DefaultServiceJobReschedulePolicy.Interval),
Attempts: helper.IntToPtr(structs.DefaultServiceJobReschedulePolicy.Attempts),
Interval: helper.TimeToPtr(structs.DefaultServiceJobReschedulePolicy.Interval),
Delay: helper.TimeToPtr(structs.DefaultServiceJobReschedulePolicy.Delay),
DelayFunction: helper.StringToPtr(structs.DefaultServiceJobReschedulePolicy.DelayFunction),
MaxDelay: helper.TimeToPtr(structs.DefaultServiceJobReschedulePolicy.MaxDelay),
Unlimited: helper.BoolToPtr(structs.DefaultServiceJobReschedulePolicy.Unlimited),
}
case "batch":
defaultReschedulePolicy = &ReschedulePolicy{
Attempts: helper.IntToPtr(structs.DefaultBatchJobReschedulePolicy.Attempts),
Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval),
Attempts: helper.IntToPtr(structs.DefaultBatchJobReschedulePolicy.Attempts),
Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval),
Delay: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Delay),
DelayFunction: helper.StringToPtr(structs.DefaultBatchJobReschedulePolicy.DelayFunction),
MaxDelay: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.MaxDelay),
Unlimited: helper.BoolToPtr(structs.DefaultBatchJobReschedulePolicy.Unlimited),
}
default:
defaultReschedulePolicy = &ReschedulePolicy{

View File

@ -284,70 +284,115 @@ func TestTaskGroup_Canonicalize_ReschedulePolicy(t *testing.T) {
jobReschedulePolicy: nil,
taskReschedulePolicy: nil,
expected: &ReschedulePolicy{
Attempts: helper.IntToPtr(structs.DefaultBatchJobReschedulePolicy.Attempts),
Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval),
Attempts: helper.IntToPtr(structs.DefaultBatchJobReschedulePolicy.Attempts),
Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval),
Delay: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Delay),
DelayFunction: helper.StringToPtr(structs.DefaultBatchJobReschedulePolicy.DelayFunction),
MaxDelay: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.MaxDelay),
Unlimited: helper.BoolToPtr(structs.DefaultBatchJobReschedulePolicy.Unlimited),
},
},
{
desc: "Empty job reschedule policy",
jobReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0),
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0),
Delay: helper.TimeToPtr(0),
MaxDelay: helper.TimeToPtr(0),
DelayFunction: helper.StringToPtr(""),
Unlimited: helper.BoolToPtr(false),
},
taskReschedulePolicy: nil,
expected: &ReschedulePolicy{
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0),
Attempts: helper.IntToPtr(0),
Interval: helper.TimeToPtr(0),
Delay: helper.TimeToPtr(0),
MaxDelay: helper.TimeToPtr(0),
DelayFunction: helper.StringToPtr(""),
Unlimited: helper.BoolToPtr(false),
},
},
{
desc: "Inherit from job",
jobReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(1),
Interval: helper.TimeToPtr(20 * time.Second),
Attempts: helper.IntToPtr(1),
Interval: helper.TimeToPtr(20 * time.Second),
Delay: helper.TimeToPtr(20 * time.Second),
MaxDelay: helper.TimeToPtr(10 * time.Minute),
DelayFunction: helper.StringToPtr("linear"),
Unlimited: helper.BoolToPtr(false),
},
taskReschedulePolicy: nil,
expected: &ReschedulePolicy{
Attempts: helper.IntToPtr(1),
Interval: helper.TimeToPtr(20 * time.Second),
Attempts: helper.IntToPtr(1),
Interval: helper.TimeToPtr(20 * time.Second),
Delay: helper.TimeToPtr(20 * time.Second),
MaxDelay: helper.TimeToPtr(10 * time.Minute),
DelayFunction: helper.StringToPtr("linear"),
Unlimited: helper.BoolToPtr(false),
},
},
{
desc: "Set in task",
jobReschedulePolicy: nil,
taskReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(5),
Interval: helper.TimeToPtr(2 * time.Minute),
Attempts: helper.IntToPtr(5),
Interval: helper.TimeToPtr(2 * time.Minute),
Delay: helper.TimeToPtr(20 * time.Second),
MaxDelay: helper.TimeToPtr(10 * time.Minute),
DelayFunction: helper.StringToPtr("linear"),
Unlimited: helper.BoolToPtr(false),
},
expected: &ReschedulePolicy{
Attempts: helper.IntToPtr(5),
Interval: helper.TimeToPtr(2 * time.Minute),
Attempts: helper.IntToPtr(5),
Interval: helper.TimeToPtr(2 * time.Minute),
Delay: helper.TimeToPtr(20 * time.Second),
MaxDelay: helper.TimeToPtr(10 * time.Minute),
DelayFunction: helper.StringToPtr("linear"),
Unlimited: helper.BoolToPtr(false),
},
},
{
desc: "Merge from job",
jobReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(1),
Delay: helper.TimeToPtr(20 * time.Second),
MaxDelay: helper.TimeToPtr(10 * time.Minute),
},
taskReschedulePolicy: &ReschedulePolicy{
Interval: helper.TimeToPtr(5 * time.Minute),
Interval: helper.TimeToPtr(5 * time.Minute),
DelayFunction: helper.StringToPtr("linear"),
Unlimited: helper.BoolToPtr(false),
},
expected: &ReschedulePolicy{
Attempts: helper.IntToPtr(1),
Interval: helper.TimeToPtr(5 * time.Minute),
Attempts: helper.IntToPtr(1),
Interval: helper.TimeToPtr(5 * time.Minute),
Delay: helper.TimeToPtr(20 * time.Second),
MaxDelay: helper.TimeToPtr(10 * time.Minute),
DelayFunction: helper.StringToPtr("linear"),
Unlimited: helper.BoolToPtr(false),
},
},
{
desc: "Override from group",
jobReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(1),
MaxDelay: helper.TimeToPtr(10 * time.Second),
},
taskReschedulePolicy: &ReschedulePolicy{
Attempts: helper.IntToPtr(5),
Attempts: helper.IntToPtr(5),
Delay: helper.TimeToPtr(20 * time.Second),
MaxDelay: helper.TimeToPtr(20 * time.Minute),
DelayFunction: helper.StringToPtr("linear"),
Unlimited: helper.BoolToPtr(false),
},
expected: &ReschedulePolicy{
Attempts: helper.IntToPtr(5),
Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval),
Attempts: helper.IntToPtr(5),
Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval),
Delay: helper.TimeToPtr(20 * time.Second),
MaxDelay: helper.TimeToPtr(20 * time.Minute),
DelayFunction: helper.StringToPtr("linear"),
Unlimited: helper.BoolToPtr(false),
},
},
{
@ -357,8 +402,12 @@ func TestTaskGroup_Canonicalize_ReschedulePolicy(t *testing.T) {
},
taskReschedulePolicy: nil,
expected: &ReschedulePolicy{
Attempts: helper.IntToPtr(1),
Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval),
Attempts: helper.IntToPtr(1),
Interval: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Interval),
Delay: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.Delay),
DelayFunction: helper.StringToPtr(structs.DefaultBatchJobReschedulePolicy.DelayFunction),
MaxDelay: helper.TimeToPtr(structs.DefaultBatchJobReschedulePolicy.MaxDelay),
Unlimited: helper.BoolToPtr(structs.DefaultBatchJobReschedulePolicy.Unlimited),
},
},
}

View File

@ -639,8 +639,12 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) {
}
tg.ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: *taskGroup.ReschedulePolicy.Attempts,
Interval: *taskGroup.ReschedulePolicy.Interval,
Attempts: *taskGroup.ReschedulePolicy.Attempts,
Interval: *taskGroup.ReschedulePolicy.Interval,
Delay: *taskGroup.ReschedulePolicy.Delay,
DelayFunction: *taskGroup.ReschedulePolicy.DelayFunction,
MaxDelay: *taskGroup.ReschedulePolicy.MaxDelay,
Unlimited: *taskGroup.ReschedulePolicy.Unlimited,
}
tg.EphemeralDisk = &structs.EphemeralDisk{

View File

@ -1172,8 +1172,12 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Mode: helper.StringToPtr("delay"),
},
ReschedulePolicy: &api.ReschedulePolicy{
Interval: helper.TimeToPtr(12 * time.Hour),
Attempts: helper.IntToPtr(5),
Interval: helper.TimeToPtr(12 * time.Hour),
Attempts: helper.IntToPtr(5),
DelayFunction: helper.StringToPtr("linear"),
Delay: helper.TimeToPtr(30 * time.Second),
Unlimited: helper.BoolToPtr(true),
MaxDelay: helper.TimeToPtr(20 * time.Minute),
},
EphemeralDisk: &api.EphemeralDisk{
SizeMB: helper.IntToPtr(100),
@ -1384,8 +1388,12 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Mode: "delay",
},
ReschedulePolicy: &structs.ReschedulePolicy{
Interval: 12 * time.Hour,
Attempts: 5,
Interval: 12 * time.Hour,
Attempts: 5,
DelayFunction: "linear",
Delay: 30 * time.Second,
Unlimited: true,
MaxDelay: 20 * time.Minute,
},
EphemeralDisk: &structs.EphemeralDisk{
SizeMB: 100,

View File

@ -283,6 +283,13 @@ func formatAllocBasicInfo(alloc *api.Allocation, client *api.Client, uuidLength
basic = append(basic,
fmt.Sprintf("Replacement Alloc ID|%s", limit(alloc.NextAllocation, uuidLength)))
}
if alloc.FollowupEvalID != "" {
nextEvalTime := futureEvalTimePretty(alloc.FollowupEvalID, client)
if nextEvalTime != "" {
basic = append(basic,
fmt.Sprintf("Reschedule Eligibility|%s", nextEvalTime))
}
}
if verbose {
basic = append(basic,
@ -296,6 +303,18 @@ func formatAllocBasicInfo(alloc *api.Allocation, client *api.Client, uuidLength
return formatKV(basic), nil
}
// futureEvalTimePretty returns when the eval is eligible to reschedule
// relative to current time, based on the WaitUntil field
func futureEvalTimePretty(evalID string, client *api.Client) string {
evaluation, _, err := client.Evaluations().Info(evalID, nil)
// Eval time is not a critical output,
// don't return it on errors, if its not set or already in the past
if err != nil || evaluation.WaitUntil.IsZero() || time.Now().After(evaluation.WaitUntil) {
return ""
}
return prettyTimeDiff(evaluation.WaitUntil, time.Now())
}
// outputTaskDetails prints task details for each task in the allocation,
// optionally printing verbose statistics if displayStats is set
func (c *AllocStatusCommand) outputTaskDetails(alloc *api.Allocation, stats *api.AllocResourceUsage, displayStats bool) {

View File

@ -103,7 +103,15 @@ func prettyTimeDiff(first, second time.Time) string {
second = second.Round(time.Second)
// calculate time difference in seconds
d := second.Sub(first)
var d time.Duration
messageSuffix := "ago"
if second.Equal(first) || second.After(first) {
d = second.Sub(first)
} else {
d = first.Sub(second)
messageSuffix = "from now"
}
u := uint64(d.Seconds())
var buf [32]byte
@ -183,9 +191,9 @@ func prettyTimeDiff(first, second time.Time) string {
end = indexes[num_periods-3]
}
if start == end { //edge case when time difference is less than a second
return "0s ago"
return "0s " + messageSuffix
} else {
return string(buf[start:end]) + " ago"
return string(buf[start:end]) + " " + messageSuffix
}
}

View File

@ -310,6 +310,7 @@ func TestPrettyTimeDiff(t *testing.T) {
{now, now.Add(-60 * time.Minute), "1h ago"},
{now, now.Add(-80 * time.Minute), "1h20m ago"},
{now, now.Add(-6 * time.Hour), "6h ago"},
{now.Add(-6 * time.Hour), now, "6h from now"},
{now, now.Add(-22165 * time.Second), "6h9m ago"},
{now, now.Add(-100 * time.Hour), "4d4h ago"},
{now, now.Add(-438000 * time.Minute), "10mo4d ago"},

View File

@ -446,6 +446,10 @@ func parseReschedulePolicy(final **api.ReschedulePolicy, list *ast.ObjectList) e
valid := []string{
"attempts",
"interval",
"unlimited",
"delay",
"max_delay",
"delay_function",
}
if err := helper.CheckHCLKeys(obj.Val, valid); err != nil {
return err

View File

@ -679,8 +679,42 @@ func TestParse(t *testing.T) {
Type: helper.StringToPtr("batch"),
Datacenters: []string{"dc1"},
Reschedule: &api.ReschedulePolicy{
Attempts: helper.IntToPtr(15),
Interval: helper.TimeToPtr(30 * time.Minute),
Attempts: helper.IntToPtr(15),
Interval: helper.TimeToPtr(30 * time.Minute),
DelayFunction: helper.StringToPtr("linear"),
Delay: helper.TimeToPtr(10 * time.Second),
},
TaskGroups: []*api.TaskGroup{
{
Name: helper.StringToPtr("bar"),
Count: helper.IntToPtr(3),
Tasks: []*api.Task{
{
Name: "bar",
Driver: "raw_exec",
Config: map[string]interface{}{
"command": "bash",
"args": []interface{}{"-c", "echo hi"},
},
},
},
},
},
},
false,
},
{
"reschedule-job-unlimited.hcl",
&api.Job{
ID: helper.StringToPtr("foo"),
Name: helper.StringToPtr("foo"),
Type: helper.StringToPtr("batch"),
Datacenters: []string{"dc1"},
Reschedule: &api.ReschedulePolicy{
DelayFunction: helper.StringToPtr("exponential"),
Delay: helper.TimeToPtr(10 * time.Second),
MaxDelay: helper.TimeToPtr(120 * time.Second),
Unlimited: helper.BoolToPtr(true),
},
TaskGroups: []*api.TaskGroup{
{

View File

@ -0,0 +1,20 @@
job "foo" {
datacenters = ["dc1"]
type = "batch"
reschedule {
delay = "10s",
delay_function = "exponential"
max_delay="120s"
unlimited = true
}
group "bar" {
count = 3
task "bar" {
driver = "raw_exec"
config {
command = "bash"
args = ["-c", "echo hi"]
}
}
}
}

View File

@ -4,6 +4,8 @@ job "foo" {
reschedule {
attempts = 15
interval = "30m"
delay = "10s",
delay_function = "linear"
}
group "bar" {
count = 3

167
lib/delay_heap.go Normal file
View File

@ -0,0 +1,167 @@
package lib
import (
"container/heap"
"fmt"
"time"
"github.com/hashicorp/nomad/nomad/structs"
)
// DelayHeap wraps a heap and gives operations other than Push/Pop.
// The inner heap is sorted by the time in the WaitUntil field of delayHeapNode
type DelayHeap struct {
index map[structs.NamespacedID]*delayHeapNode
heap delayedHeapImp
}
// HeapNode is an interface type implemented by objects stored in the DelayHeap
type HeapNode interface {
Data() interface{} // The data object
ID() string // ID of the object, used in conjunction with namespace for deduplication
Namespace() string // Namespace of the object, can be empty
}
// delayHeapNode encapsulates the node stored in DelayHeap
// WaitUntil is used as the sorting criteria
type delayHeapNode struct {
// Node is the data object stored in the delay heap
Node HeapNode
// WaitUntil is the time delay associated with the node
// Objects in the heap are sorted by WaitUntil
WaitUntil time.Time
index int
}
type delayedHeapImp []*delayHeapNode
func (h delayedHeapImp) Len() int {
return len(h)
}
func (h delayedHeapImp) Less(i, j int) bool {
// Two zero times should return false.
// Otherwise, zero is "greater" than any other time.
// (To sort it at the end of the list.)
// Sort such that zero times are at the end of the list.
iZero, jZero := h[i].WaitUntil.IsZero(), h[j].WaitUntil.IsZero()
if iZero && jZero {
return false
} else if iZero {
return false
} else if jZero {
return true
}
return h[i].WaitUntil.Before(h[j].WaitUntil)
}
func (h delayedHeapImp) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}
func (h *delayedHeapImp) Push(x interface{}) {
node := x.(*delayHeapNode)
n := len(*h)
node.index = n
*h = append(*h, node)
}
func (h *delayedHeapImp) Pop() interface{} {
old := *h
n := len(old)
node := old[n-1]
node.index = -1 // for safety
*h = old[0 : n-1]
return node
}
func NewDelayHeap() *DelayHeap {
return &DelayHeap{
index: make(map[structs.NamespacedID]*delayHeapNode),
heap: make(delayedHeapImp, 0),
}
}
func (p *DelayHeap) Push(dataNode HeapNode, next time.Time) error {
tuple := structs.NamespacedID{
ID: dataNode.ID(),
Namespace: dataNode.Namespace(),
}
if _, ok := p.index[tuple]; ok {
return fmt.Errorf("node %q (%s) already exists", dataNode.ID(), dataNode.Namespace())
}
delayHeapNode := &delayHeapNode{dataNode, next, 0}
p.index[tuple] = delayHeapNode
heap.Push(&p.heap, delayHeapNode)
return nil
}
func (p *DelayHeap) Pop() *delayHeapNode {
if len(p.heap) == 0 {
return nil
}
delayHeapNode := heap.Pop(&p.heap).(*delayHeapNode)
tuple := structs.NamespacedID{
ID: delayHeapNode.Node.ID(),
Namespace: delayHeapNode.Node.Namespace(),
}
delete(p.index, tuple)
return delayHeapNode
}
func (p *DelayHeap) Peek() *delayHeapNode {
if len(p.heap) == 0 {
return nil
}
return p.heap[0]
}
func (p *DelayHeap) Contains(heapNode HeapNode) bool {
tuple := structs.NamespacedID{
ID: heapNode.ID(),
Namespace: heapNode.Namespace(),
}
_, ok := p.index[tuple]
return ok
}
func (p *DelayHeap) Update(heapNode HeapNode, waitUntil time.Time) error {
tuple := structs.NamespacedID{
ID: heapNode.ID(),
Namespace: heapNode.Namespace(),
}
if existingHeapNode, ok := p.index[tuple]; ok {
// Need to update the job as well because its spec can change.
existingHeapNode.Node = heapNode
existingHeapNode.WaitUntil = waitUntil
heap.Fix(&p.heap, existingHeapNode.index)
return nil
}
return fmt.Errorf("heap doesn't contain object with ID %q (%s)", heapNode.ID(), heapNode.Namespace())
}
func (p *DelayHeap) Remove(heapNode HeapNode) error {
tuple := structs.NamespacedID{
ID: heapNode.ID(),
Namespace: heapNode.Namespace(),
}
if node, ok := p.index[tuple]; ok {
heap.Remove(&p.heap, node.index)
delete(p.index, tuple)
return nil
}
return fmt.Errorf("heap doesn't contain object with ID %q (%s)", heapNode.ID(), heapNode.Namespace())
}
func (p *DelayHeap) Length() int {
return len(p.heap)
}

115
lib/delay_heap_test.go Normal file
View File

@ -0,0 +1,115 @@
package lib
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
// HeapNodeImpl satisfies the HeapNode interface
type heapNodeImpl struct {
dataObject interface{}
id string
namespace string
}
func (d *heapNodeImpl) Data() interface{} {
return d.dataObject
}
func (d *heapNodeImpl) ID() string {
return d.id
}
func (d *heapNodeImpl) Namespace() string {
return d.namespace
}
func TestDelayHeap_PushPop(t *testing.T) {
delayHeap := NewDelayHeap()
now := time.Now()
require := require.New(t)
// a dummy type to use as the inner object in the heap
type myObj struct {
a int
b string
}
dataNode1 := &heapNodeImpl{
dataObject: &myObj{a: 0, b: "hey"},
id: "101",
namespace: "default",
}
delayHeap.Push(dataNode1, now.Add(-10*time.Minute))
dataNode2 := &heapNodeImpl{
dataObject: &myObj{a: 0, b: "hey"},
id: "102",
namespace: "default",
}
delayHeap.Push(dataNode2, now.Add(10*time.Minute))
dataNode3 := &heapNodeImpl{
dataObject: &myObj{a: 0, b: "hey"},
id: "103",
namespace: "default",
}
delayHeap.Push(dataNode3, now.Add(-15*time.Second))
dataNode4 := &heapNodeImpl{
dataObject: &myObj{a: 0, b: "hey"},
id: "101",
namespace: "test-namespace",
}
delayHeap.Push(dataNode4, now.Add(2*time.Hour))
expectedWaitTimes := []time.Time{now.Add(-10 * time.Minute), now.Add(-15 * time.Second), now.Add(10 * time.Minute), now.Add(2 * time.Hour)}
entries := getHeapEntries(delayHeap, now)
for i, entry := range entries {
require.Equal(expectedWaitTimes[i], entry.WaitUntil)
}
}
func TestDelayHeap_Update(t *testing.T) {
delayHeap := NewDelayHeap()
now := time.Now()
require := require.New(t)
// a dummy type to use as the inner object in the heap
type myObj struct {
a int
b string
}
dataNode1 := &heapNodeImpl{
dataObject: &myObj{a: 0, b: "hey"},
id: "101",
namespace: "default",
}
delayHeap.Push(dataNode1, now.Add(-10*time.Minute))
dataNode2 := &heapNodeImpl{
dataObject: &myObj{a: 0, b: "hey"},
id: "102",
namespace: "default",
}
delayHeap.Push(dataNode2, now.Add(10*time.Minute))
delayHeap.Update(dataNode1, now.Add(20*time.Minute))
expectedWaitTimes := []time.Time{now.Add(10 * time.Minute), now.Add(20 * time.Minute)}
expectedIdOrder := []string{"102", "101"}
entries := getHeapEntries(delayHeap, now)
for i, entry := range entries {
require.Equal(expectedWaitTimes[i], entry.WaitUntil)
require.Equal(expectedIdOrder[i], entry.Node.ID())
}
}
func getHeapEntries(delayHeap *DelayHeap, now time.Time) []*delayHeapNode {
var entries []*delayHeapNode
for node := delayHeap.Pop(); node != nil; {
entries = append(entries, node)
node = delayHeap.Pop()
}
return entries
}

View File

@ -8,8 +8,11 @@ import (
"sync"
"time"
"context"
"github.com/armon/go-metrics"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/lib"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -77,7 +80,19 @@ type EvalBroker struct {
// timeWait has evaluations that are waiting for time to elapse
timeWait map[string]*time.Timer
// initialNackDelay is the delay applied before reenqueuing a
// delayedEvalCancelFunc is used to stop the long running go routine
// that processes delayed evaluations
delayedEvalCancelFunc context.CancelFunc
// delayHeap is a heap used to track incoming evaluations that are
// not eligible to enqueue until their WaitTime
delayHeap *lib.DelayHeap
// delayedEvalsUpdateCh is used to trigger notifications for updates
// to the delayHeap
delayedEvalsUpdateCh chan struct{}
// initialNackDelay is the delay applied before re-enqueuing a
// Nacked evaluation for the first time.
initialNackDelay time.Duration
@ -113,22 +128,25 @@ func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration,
return nil, fmt.Errorf("timeout cannot be negative")
}
b := &EvalBroker{
nackTimeout: timeout,
deliveryLimit: deliveryLimit,
enabled: false,
stats: new(BrokerStats),
evals: make(map[string]int),
jobEvals: make(map[structs.NamespacedID]string),
blocked: make(map[structs.NamespacedID]PendingEvaluations),
ready: make(map[string]PendingEvaluations),
unack: make(map[string]*unackEval),
waiting: make(map[string]chan struct{}),
requeue: make(map[string]*structs.Evaluation),
timeWait: make(map[string]*time.Timer),
initialNackDelay: initialNackDelay,
subsequentNackDelay: subsequentNackDelay,
nackTimeout: timeout,
deliveryLimit: deliveryLimit,
enabled: false,
stats: new(BrokerStats),
evals: make(map[string]int),
jobEvals: make(map[structs.NamespacedID]string),
blocked: make(map[structs.NamespacedID]PendingEvaluations),
ready: make(map[string]PendingEvaluations),
unack: make(map[string]*unackEval),
waiting: make(map[string]chan struct{}),
requeue: make(map[string]*structs.Evaluation),
timeWait: make(map[string]*time.Timer),
initialNackDelay: initialNackDelay,
subsequentNackDelay: subsequentNackDelay,
delayHeap: lib.NewDelayHeap(),
delayedEvalsUpdateCh: make(chan struct{}, 1),
}
b.stats.ByScheduler = make(map[string]*SchedulerStats)
return b, nil
}
@ -143,10 +161,17 @@ func (b *EvalBroker) Enabled() bool {
// should only be enabled on the active leader.
func (b *EvalBroker) SetEnabled(enabled bool) {
b.l.Lock()
prevEnabled := b.enabled
b.enabled = enabled
if !prevEnabled && enabled {
// start the go routine for delayed evals
ctx, cancel := context.WithCancel(context.Background())
b.delayedEvalCancelFunc = cancel
go b.runDelayedEvalsWatcher(ctx)
}
b.l.Unlock()
if !enabled {
b.Flush()
b.flush()
}
}
@ -206,6 +231,17 @@ func (b *EvalBroker) processEnqueue(eval *structs.Evaluation, token string) {
return
}
if !eval.WaitUntil.IsZero() {
b.delayHeap.Push(&evalWrapper{eval}, eval.WaitUntil)
b.stats.TotalWaiting += 1
// Signal an update.
select {
case b.delayedEvalsUpdateCh <- struct{}{}:
default:
}
return
}
b.enqueueLocked(eval, eval.Type)
}
@ -643,7 +679,7 @@ func (b *EvalBroker) ResumeNackTimeout(evalID, token string) error {
}
// Flush is used to clear the state of the broker
func (b *EvalBroker) Flush() {
func (b *EvalBroker) flush() {
b.l.Lock()
defer b.l.Unlock()
@ -663,6 +699,14 @@ func (b *EvalBroker) Flush() {
wait.Stop()
}
// Cancel the delayed evaluations goroutine
if b.delayedEvalCancelFunc != nil {
b.delayedEvalCancelFunc()
}
// Clear out the update channel for delayed evaluations
b.delayedEvalsUpdateCh = make(chan struct{}, 1)
// Reset the broker
b.stats.TotalReady = 0
b.stats.TotalUnacked = 0
@ -675,6 +719,75 @@ func (b *EvalBroker) Flush() {
b.ready = make(map[string]PendingEvaluations)
b.unack = make(map[string]*unackEval)
b.timeWait = make(map[string]*time.Timer)
b.delayHeap = lib.NewDelayHeap()
}
// evalWrapper satisfies the HeapNode interface
type evalWrapper struct {
eval *structs.Evaluation
}
func (d *evalWrapper) Data() interface{} {
return d.eval
}
func (d *evalWrapper) ID() string {
return d.eval.ID
}
func (d *evalWrapper) Namespace() string {
return d.eval.Namespace
}
// runDelayedEvalsWatcher is a long-lived function that waits till a time deadline is met for
// pending evaluations before enqueuing them
func (b *EvalBroker) runDelayedEvalsWatcher(ctx context.Context) {
var timerChannel <-chan time.Time
var delayTimer *time.Timer
for {
eval, waitUntil := b.nextDelayedEval()
if waitUntil.IsZero() {
timerChannel = nil
} else {
launchDur := waitUntil.Sub(time.Now().UTC())
if delayTimer == nil {
delayTimer = time.NewTimer(launchDur)
} else {
delayTimer.Reset(launchDur)
}
timerChannel = delayTimer.C
}
select {
case <-ctx.Done():
return
case <-timerChannel:
// remove from the heap since we can enqueue it now
b.delayHeap.Remove(&evalWrapper{eval})
b.l.Lock()
b.stats.TotalWaiting -= 1
b.enqueueLocked(eval, eval.Type)
b.l.Unlock()
case <-b.delayedEvalsUpdateCh:
continue
}
}
}
// nextDelayedEval returns the next delayed eval to launch and when it should be enqueued.
// This peeks at the heap to return the top. If the heap is empty, this returns nil and zero time.
func (b *EvalBroker) nextDelayedEval() (*structs.Evaluation, time.Time) {
// If there is nothing wait for an update.
if b.delayHeap.Length() == 0 {
return nil, time.Time{}
}
nextEval := b.delayHeap.Peek()
if nextEval == nil {
return nil, time.Time{}
}
eval := nextEval.Node.Data().(*structs.Evaluation)
return eval, nextEval.WaitUntil
}
// Stats is used to query the state of the broker

View File

@ -1141,7 +1141,53 @@ func TestEvalBroker_Wait(t *testing.T) {
}
}
// Ensure that priority is taken into account when enqueuing many evaluations.
// Ensure that delayed evaluations work as expected
func TestEvalBroker_WaitUntil(t *testing.T) {
t.Parallel()
require := require.New(t)
b := testBroker(t, 0)
b.SetEnabled(true)
now := time.Now()
// Create a few of evals with WaitUntil set
eval1 := mock.Eval()
eval1.WaitUntil = now.Add(1 * time.Second)
eval1.CreateIndex = 1
b.Enqueue(eval1)
eval2 := mock.Eval()
eval2.WaitUntil = now.Add(100 * time.Millisecond)
// set CreateIndex to use as a tie breaker when eval2
// and eval3 are both in the pending evals heap
eval2.CreateIndex = 2
b.Enqueue(eval2)
eval3 := mock.Eval()
eval3.WaitUntil = now.Add(20 * time.Millisecond)
eval3.CreateIndex = 1
b.Enqueue(eval3)
require.Equal(3, b.stats.TotalWaiting)
// sleep enough for two evals to be ready
time.Sleep(200 * time.Millisecond)
// first dequeue should return eval3
out, _, err := b.Dequeue(defaultSched, time.Second)
require.Nil(err)
require.Equal(eval3, out)
// second dequeue should return eval2
out, _, err = b.Dequeue(defaultSched, time.Second)
require.Nil(err)
require.Equal(eval2, out)
// third dequeue should return eval1
out, _, err = b.Dequeue(defaultSched, 2*time.Second)
require.Nil(err)
require.Equal(eval1, out)
require.Equal(0, b.stats.TotalWaiting)
}
// Ensure that priority is taken into account when enqueueing many evaluations.
func TestEvalBroker_EnqueueAll_Dequeue_Fair(t *testing.T) {
t.Parallel()
b := testBroker(t, 0)

View File

@ -92,8 +92,10 @@ func Job() *structs.Job {
Mode: structs.RestartPolicyModeDelay,
},
ReschedulePolicy: &structs.ReschedulePolicy{
Attempts: 2,
Interval: 10 * time.Minute,
Attempts: 2,
Interval: 10 * time.Minute,
Delay: 5 * time.Second,
DelayFunction: "linear",
},
Tasks: []*structs.Task{
{

View File

@ -1499,8 +1499,12 @@ func TestTaskGroupDiff(t *testing.T) {
Old: &TaskGroup{},
New: &TaskGroup{
ReschedulePolicy: &ReschedulePolicy{
Attempts: 1,
Interval: 15 * time.Second,
Attempts: 1,
Interval: 15 * time.Second,
Delay: 5 * time.Second,
MaxDelay: 20 * time.Second,
DelayFunction: "exponential",
Unlimited: false,
},
},
Expected: &TaskGroupDiff{
@ -1516,12 +1520,36 @@ func TestTaskGroupDiff(t *testing.T) {
Old: "",
New: "1",
},
{
Type: DiffTypeAdded,
Name: "Delay",
Old: "",
New: "5000000000",
},
{
Type: DiffTypeAdded,
Name: "DelayFunction",
Old: "",
New: "exponential",
},
{
Type: DiffTypeAdded,
Name: "Interval",
Old: "",
New: "15000000000",
},
{
Type: DiffTypeAdded,
Name: "MaxDelay",
Old: "",
New: "20000000000",
},
{
Type: DiffTypeAdded,
Name: "Unlimited",
Old: "",
New: "false",
},
},
},
},
@ -1531,8 +1559,12 @@ func TestTaskGroupDiff(t *testing.T) {
// ReschedulePolicy deleted
Old: &TaskGroup{
ReschedulePolicy: &ReschedulePolicy{
Attempts: 1,
Interval: 15 * time.Second,
Attempts: 1,
Interval: 15 * time.Second,
Delay: 5 * time.Second,
MaxDelay: 20 * time.Second,
DelayFunction: "exponential",
Unlimited: false,
},
},
New: &TaskGroup{},
@ -1549,12 +1581,36 @@ func TestTaskGroupDiff(t *testing.T) {
Old: "1",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "Delay",
Old: "5000000000",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "DelayFunction",
Old: "exponential",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "Interval",
Old: "15000000000",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "MaxDelay",
Old: "20000000000",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "Unlimited",
Old: "false",
New: "",
},
},
},
},
@ -1564,14 +1620,22 @@ func TestTaskGroupDiff(t *testing.T) {
// ReschedulePolicy edited
Old: &TaskGroup{
ReschedulePolicy: &ReschedulePolicy{
Attempts: 1,
Interval: 1 * time.Second,
Attempts: 1,
Interval: 1 * time.Second,
DelayFunction: "exponential",
Delay: 20 * time.Second,
MaxDelay: 1 * time.Minute,
Unlimited: false,
},
},
New: &TaskGroup{
ReschedulePolicy: &ReschedulePolicy{
Attempts: 2,
Interval: 2 * time.Second,
Attempts: 2,
Interval: 2 * time.Second,
DelayFunction: "linear",
Delay: 30 * time.Second,
MaxDelay: 1 * time.Minute,
Unlimited: true,
},
},
Expected: &TaskGroupDiff{
@ -1587,12 +1651,30 @@ func TestTaskGroupDiff(t *testing.T) {
Old: "1",
New: "2",
},
{
Type: DiffTypeEdited,
Name: "Delay",
Old: "20000000000",
New: "30000000000",
},
{
Type: DiffTypeEdited,
Name: "DelayFunction",
Old: "exponential",
New: "linear",
},
{
Type: DiffTypeEdited,
Name: "Interval",
Old: "1000000000",
New: "2000000000",
},
{
Type: DiffTypeEdited,
Name: "Unlimited",
Old: "false",
New: "true",
},
},
},
},
@ -1625,12 +1707,36 @@ func TestTaskGroupDiff(t *testing.T) {
Old: "1",
New: "1",
},
{
Type: DiffTypeNone,
Name: "Delay",
Old: "0",
New: "0",
},
{
Type: DiffTypeNone,
Name: "DelayFunction",
Old: "",
New: "",
},
{
Type: DiffTypeEdited,
Name: "Interval",
Old: "1000000000",
New: "2000000000",
},
{
Type: DiffTypeNone,
Name: "MaxDelay",
Old: "0",
New: "0",
},
{
Type: DiffTypeNone,
Name: "Unlimited",
Old: "false",
New: "false",
},
},
},
},

View File

@ -35,6 +35,8 @@ import (
"github.com/mitchellh/copystructure"
"github.com/ugorji/go/codec"
"math"
hcodec "github.com/hashicorp/go-msgpack/codec"
)
@ -2656,12 +2658,16 @@ var (
var (
DefaultServiceJobReschedulePolicy = ReschedulePolicy{
Attempts: 2,
Interval: 1 * time.Hour,
Delay: 30 * time.Second,
DelayFunction: "exponential",
MaxDelay: 1 * time.Hour,
Unlimited: true,
}
DefaultBatchJobReschedulePolicy = ReschedulePolicy{
Attempts: 1,
Interval: 24 * time.Hour,
Attempts: 1,
Interval: 24 * time.Hour,
Delay: 5 * time.Second,
DelayFunction: "linear",
}
)
@ -2744,6 +2750,9 @@ func NewRestartPolicy(jobType string) *RestartPolicy {
}
const ReschedulePolicyMinInterval = 15 * time.Second
const ReschedulePolicyMinDelay = 5 * time.Second
var RescheduleDelayFunctions = [...]string{"linear", "exponential", "fibonacci"}
// ReschedulePolicy configures how Tasks are rescheduled when they crash or fail.
type ReschedulePolicy struct {
@ -2753,7 +2762,20 @@ type ReschedulePolicy struct {
// Interval is a duration in which we can limit the number of reschedule attempts.
Interval time.Duration
//TODO delay
// Delay is a minimum duration to wait between reschedule attempts.
// The delay function determines how much subsequent reschedule attempts are delayed by.
Delay time.Duration
// DelayFunction determines how the delay progressively changes on subsequent reschedule
// attempts. Valid values are "exponential", "linear", and "fibonacci".
DelayFunction string
// MaxDelay is an upper bound on the delay.
MaxDelay time.Duration
// Unlimited allows infinite rescheduling attempts. Only allowed when delay is set
// between reschedule attempts.
Unlimited bool
}
func (r *ReschedulePolicy) Copy() *ReschedulePolicy {
@ -2765,17 +2787,151 @@ func (r *ReschedulePolicy) Copy() *ReschedulePolicy {
return nrp
}
// Validate uses different criteria to validate the reschedule policy
// Delay must be a minimum of 5 seconds
// Delay Ceiling is ignored if Delay Function is "linear"
// Number of possible attempts is validated, given the interval, delay and delay function
func (r *ReschedulePolicy) Validate() error {
if r != nil && r.Attempts > 0 {
var mErr multierror.Error
// Check for ambiguous/confusing settings
if r.Interval.Nanoseconds() < ReschedulePolicyMinInterval.Nanoseconds() {
multierror.Append(&mErr, fmt.Errorf("Interval cannot be less than %v (got %v)", RestartPolicyMinInterval, r.Interval))
enabled := r != nil && (r.Attempts > 0 || r.Unlimited)
if !enabled {
return nil
}
var mErr multierror.Error
// Check for ambiguous/confusing settings
delayPreCheck := true
// Delay should be bigger than the default
if r.Delay.Nanoseconds() < ReschedulePolicyMinDelay.Nanoseconds() {
multierror.Append(&mErr, fmt.Errorf("Delay cannot be less than %v (got %v)", ReschedulePolicyMinDelay, r.Delay))
delayPreCheck = false
}
// Must use a valid delay function
if !isValidDelayFunction(r.DelayFunction) {
multierror.Append(&mErr, fmt.Errorf("Invalid delay function %q, must be one of %q", r.DelayFunction, RescheduleDelayFunctions))
delayPreCheck = false
}
// Validate MaxDelay if not using linear delay progression
if r.DelayFunction != "linear" {
if r.MaxDelay.Nanoseconds() < ReschedulePolicyMinDelay.Nanoseconds() {
multierror.Append(&mErr, fmt.Errorf("Delay Ceiling cannot be less than %v (got %v)", ReschedulePolicyMinDelay, r.Delay))
delayPreCheck = false
}
if r.MaxDelay < r.Delay {
multierror.Append(&mErr, fmt.Errorf("Delay Ceiling cannot be less than Delay %v (got %v)", r.Delay, r.MaxDelay))
delayPreCheck = false
}
return mErr.ErrorOrNil()
}
return nil
// Validate Interval and other delay parameters if attempts are limited
if !r.Unlimited {
if r.Interval.Nanoseconds() < ReschedulePolicyMinInterval.Nanoseconds() {
multierror.Append(&mErr, fmt.Errorf("Interval cannot be less than %v (got %v)", ReschedulePolicyMinInterval, r.Interval))
}
if !delayPreCheck {
// We can't cross validate the rest of the delay params if delayPreCheck fails, so return early
return mErr.ErrorOrNil()
}
crossValidationErr := r.validateDelayParams()
if crossValidationErr != nil {
multierror.Append(&mErr, crossValidationErr)
}
}
return mErr.ErrorOrNil()
}
func isValidDelayFunction(delayFunc string) bool {
for _, value := range RescheduleDelayFunctions {
if value == delayFunc {
return true
}
}
return false
}
func (r *ReschedulePolicy) validateDelayParams() error {
ok, possibleAttempts, recommendedInterval := r.viableAttempts()
if ok {
return nil
}
var mErr multierror.Error
if r.DelayFunction == "linear" {
multierror.Append(&mErr, fmt.Errorf("Nomad can only make %v attempts in %v with initial delay %v and "+
"delay function %q", possibleAttempts, r.Interval, r.Delay, r.DelayFunction))
} else {
multierror.Append(&mErr, fmt.Errorf("Nomad can only make %v attempts in %v with initial delay %v, "+
"delay function %q, and delay ceiling %v", possibleAttempts, r.Interval, r.Delay, r.DelayFunction, r.MaxDelay))
}
multierror.Append(&mErr, fmt.Errorf("Set the interval to at least %v to accommodate %v attempts", recommendedInterval.Round(time.Minute), r.Attempts))
return mErr.ErrorOrNil()
}
func (r *ReschedulePolicy) viableAttempts() (bool, int, time.Duration) {
var possibleAttempts int
var recommendedInterval time.Duration
valid := true
switch r.DelayFunction {
case "linear":
recommendedInterval = time.Duration(r.Attempts) * r.Delay
if r.Interval < recommendedInterval {
possibleAttempts = int(r.Interval / r.Delay)
valid = false
}
case "exponential":
for i := 0; i < r.Attempts; i++ {
nextDelay := time.Duration(math.Pow(2, float64(i))) * r.Delay
if nextDelay > r.MaxDelay {
nextDelay = r.MaxDelay
recommendedInterval += nextDelay
} else {
recommendedInterval = nextDelay
}
if recommendedInterval < r.Interval {
possibleAttempts++
}
}
if possibleAttempts < r.Attempts {
valid = false
}
case "fibonacci":
var slots []time.Duration
slots = append(slots, r.Delay)
slots = append(slots, r.Delay)
reachedCeiling := false
for i := 2; i < r.Attempts; i++ {
var nextDelay time.Duration
if reachedCeiling {
//switch to linear
nextDelay = slots[i-1] + r.MaxDelay
} else {
nextDelay = slots[i-1] + slots[i-2]
if nextDelay > r.MaxDelay {
nextDelay = r.MaxDelay
reachedCeiling = true
}
}
slots = append(slots, nextDelay)
}
recommendedInterval = slots[len(slots)-1]
if r.Interval < recommendedInterval {
valid = false
// calculate possible attempts
for i := 0; i < len(slots); i++ {
if slots[i] > r.Interval {
possibleAttempts = i
break
}
}
}
default:
return false, 0, 0
}
if possibleAttempts < 0 { // can happen if delay is bigger than interval
possibleAttempts = 0
}
return valid, possibleAttempts, recommendedInterval
}
func NewReschedulePolicy(jobType string) *ReschedulePolicy {
@ -2920,12 +3076,14 @@ func (tg *TaskGroup) Validate(j *Job) error {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have a restart policy", tg.Name))
}
if tg.ReschedulePolicy != nil {
if err := tg.ReschedulePolicy.Validate(); err != nil {
mErr.Errors = append(mErr.Errors, err)
if j.Type != JobTypeSystem {
if tg.ReschedulePolicy != nil {
if err := tg.ReschedulePolicy.Validate(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
} else {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have a reschedule policy", tg.Name))
}
} else {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have a reschedule policy", tg.Name))
}
if tg.EphemeralDisk != nil {
@ -5080,12 +5238,16 @@ type RescheduleEvent struct {
// PrevNodeID is the node ID of the previous allocation
PrevNodeID string
// Delay is the reschedule delay associated with the attempt
Delay time.Duration
}
func NewRescheduleEvent(rescheduleTime int64, prevAllocID string, prevNodeID string) *RescheduleEvent {
func NewRescheduleEvent(rescheduleTime int64, prevAllocID string, prevNodeID string, delay time.Duration) *RescheduleEvent {
return &RescheduleEvent{RescheduleTime: rescheduleTime,
PrevAllocID: prevAllocID,
PrevNodeID: prevNodeID}
PrevNodeID: prevNodeID,
Delay: delay}
}
func (re *RescheduleEvent) Copy() *RescheduleEvent {
@ -5181,6 +5343,13 @@ type Allocation struct {
// given deployment
DeploymentStatus *AllocDeploymentStatus
// RescheduleTrackers captures details of previous reschedule attempts of the allocation
RescheduleTracker *RescheduleTracker
// FollowupEvalID captures a follow up evaluation created to handle a failed allocation
// that can be rescheduled in the future
FollowupEvalID string
// Raft Indexes
CreateIndex uint64
ModifyIndex uint64
@ -5195,9 +5364,6 @@ type Allocation struct {
// ModifyTime is the time the allocation was last updated.
ModifyTime int64
// RescheduleTrackers captures details of previous reschedule attempts of the allocation
RescheduleTracker *RescheduleTracker
}
// Index returns the index of the allocation. If the allocation is from a task
@ -5304,11 +5470,11 @@ func (a *Allocation) RescheduleEligible(reschedulePolicy *ReschedulePolicy, fail
}
attempts := reschedulePolicy.Attempts
interval := reschedulePolicy.Interval
if attempts == 0 {
enabled := attempts > 0 || reschedulePolicy.Unlimited
if !enabled {
return false
}
if (a.RescheduleTracker == nil || len(a.RescheduleTracker.Events) == 0) && attempts > 0 {
if (a.RescheduleTracker == nil || len(a.RescheduleTracker.Events) == 0) && attempts > 0 || reschedulePolicy.Unlimited {
return true
}
attempted := 0
@ -5322,6 +5488,98 @@ func (a *Allocation) RescheduleEligible(reschedulePolicy *ReschedulePolicy, fail
return attempted < attempts
}
// LastEventTime is the time of the last task event in the allocation.
// It is used to determine allocation failure time.
func (a *Allocation) LastEventTime() time.Time {
var lastEventTime time.Time
if a.TaskStates != nil {
for _, e := range a.TaskStates {
if lastEventTime.IsZero() || e.FinishedAt.After(lastEventTime) {
lastEventTime = e.FinishedAt
}
}
}
return lastEventTime
}
// ReschedulePolicy returns the reschedule policy based on the task group
func (a *Allocation) ReschedulePolicy() *ReschedulePolicy {
tg := a.Job.LookupTaskGroup(a.TaskGroup)
if tg == nil {
return nil
}
return tg.ReschedulePolicy
}
// NextRescheduleTime returns a time on or after which the allocation is eligible to be rescheduled,
// and whether the next reschedule time is within policy's interval if the policy doesn't allow unlimited reschedules
func (a *Allocation) NextRescheduleTime() (time.Time, bool) {
failTime := a.LastEventTime()
reschedulePolicy := a.ReschedulePolicy()
if a.ClientStatus != AllocClientStatusFailed || failTime.IsZero() || reschedulePolicy == nil {
return time.Time{}, false
}
nextDelay := a.NextDelay()
nextRescheduleTime := failTime.Add(nextDelay)
rescheduleEligible := reschedulePolicy.Unlimited || (reschedulePolicy.Attempts > 0 && a.RescheduleTracker == nil)
if reschedulePolicy.Attempts > 0 && a.RescheduleTracker != nil && a.RescheduleTracker.Events != nil {
// Check for eligibility based on the interval if max attempts is set
attempted := 0
for j := len(a.RescheduleTracker.Events) - 1; j >= 0; j-- {
lastAttempt := a.RescheduleTracker.Events[j].RescheduleTime
timeDiff := failTime.UTC().UnixNano() - lastAttempt
if timeDiff < reschedulePolicy.Interval.Nanoseconds() {
attempted += 1
}
}
rescheduleEligible = attempted < reschedulePolicy.Attempts && nextDelay < reschedulePolicy.Interval
}
return nextRescheduleTime, rescheduleEligible
}
// NextDelay returns a duration after which the allocation can be rescheduled.
// It is calculated according to the delay function and previous reschedule attempts.
func (a *Allocation) NextDelay() time.Duration {
policy := a.ReschedulePolicy()
delayDur := policy.Delay
if a.RescheduleTracker == nil || a.RescheduleTracker.Events == nil || len(a.RescheduleTracker.Events) == 0 {
return delayDur
}
events := a.RescheduleTracker.Events
switch policy.DelayFunction {
case "exponential":
delayDur = a.RescheduleTracker.Events[len(a.RescheduleTracker.Events)-1].Delay * 2
case "fibonacci":
if len(events) >= 2 {
fibN1Delay := events[len(events)-1].Delay
fibN2Delay := events[len(events)-2].Delay
// Handle reset of delay ceiling which should cause
// a new series to start
if fibN2Delay == policy.MaxDelay && fibN1Delay == policy.Delay {
delayDur = fibN1Delay
} else {
delayDur = fibN1Delay + fibN2Delay
}
}
default:
return delayDur
}
if policy.MaxDelay > 0 && delayDur > policy.MaxDelay {
delayDur = policy.MaxDelay
// check if delay needs to be reset
lastRescheduleEvent := a.RescheduleTracker.Events[len(a.RescheduleTracker.Events)-1]
timeDiff := a.LastEventTime().UTC().UnixNano() - lastRescheduleEvent.RescheduleTime
if timeDiff > delayDur.Nanoseconds() {
delayDur = policy.Delay
}
}
return delayDur
}
// Terminated returns if the allocation is in a terminal state on a client.
func (a *Allocation) Terminated() bool {
if a.ClientStatus == AllocClientStatusFailed ||
@ -5426,6 +5684,7 @@ type AllocListStub struct {
ClientDescription string
TaskStates map[string]*TaskState
DeploymentStatus *AllocDeploymentStatus
FollowupEvalID string
CreateIndex uint64
ModifyIndex uint64
CreateTime int64
@ -5711,9 +5970,14 @@ type Evaluation struct {
StatusDescription string
// Wait is a minimum wait time for running the eval. This is used to
// support a rolling upgrade.
// support a rolling upgrade in versions prior to 0.7.0
// Deprecated
Wait time.Duration
// WaitUntil is the time when this eval should be run. This is used to
// supported delayed rescheduling of failed allocations
WaitUntil time.Time
// NextEval is the evaluation ID for the eval created to do a followup.
// This is used to support rolling upgrades, where we need a chain of evaluations.
NextEval string

View File

@ -570,8 +570,10 @@ func testJob() *Job {
Delay: 1 * time.Minute,
},
ReschedulePolicy: &ReschedulePolicy{
Interval: 5 * time.Minute,
Attempts: 10,
Interval: 5 * time.Minute,
Attempts: 10,
Delay: 5 * time.Second,
DelayFunction: "linear",
},
Tasks: []*Task{
{
@ -930,6 +932,7 @@ func TestTaskGroup_Validate(t *testing.T) {
ReschedulePolicy: &ReschedulePolicy{
Interval: 5 * time.Minute,
Attempts: 5,
Delay: 5 * time.Second,
},
}
err := tg.Validate(j)
@ -1012,8 +1015,10 @@ func TestTaskGroup_Validate(t *testing.T) {
Mode: RestartPolicyModeDelay,
},
ReschedulePolicy: &ReschedulePolicy{
Interval: 5 * time.Minute,
Attempts: 10,
Interval: 5 * time.Minute,
Attempts: 10,
Delay: 5 * time.Second,
DelayFunction: "linear",
},
}
@ -2424,45 +2429,180 @@ func TestRestartPolicy_Validate(t *testing.T) {
func TestReschedulePolicy_Validate(t *testing.T) {
type testCase struct {
desc string
ReschedulePolicy *ReschedulePolicy
err error
errors []error
}
testCases := []testCase{
{
desc: "Nil",
},
{
desc: "Disabled",
ReschedulePolicy: &ReschedulePolicy{
Attempts: 0,
Interval: 0 * time.Second},
err: nil,
},
{
ReschedulePolicy: &ReschedulePolicy{
Attempts: 1,
Interval: 5 * time.Minute},
err: nil,
},
{
desc: "Disabled",
ReschedulePolicy: &ReschedulePolicy{
Attempts: -1,
Interval: 5 * time.Minute},
err: nil,
},
{
desc: "Valid Linear Delay",
ReschedulePolicy: &ReschedulePolicy{
Attempts: 1,
Interval: 1 * time.Second},
err: fmt.Errorf("Interval cannot be less than %v (got %v)", RestartPolicyMinInterval, time.Second),
Attempts: 1,
Interval: 5 * time.Minute,
Delay: 10 * time.Second,
DelayFunction: "linear"},
},
{
desc: "Valid Exponential Delay",
ReschedulePolicy: &ReschedulePolicy{
Attempts: 5,
Interval: 1 * time.Hour,
Delay: 30 * time.Second,
MaxDelay: 5 * time.Minute,
DelayFunction: "exponential"},
},
{
desc: "Valid Fibonacci Delay",
ReschedulePolicy: &ReschedulePolicy{
Attempts: 5,
Interval: 15 * time.Minute,
Delay: 10 * time.Second,
MaxDelay: 5 * time.Minute,
DelayFunction: "fibonacci"},
},
{
desc: "Invalid delay function",
ReschedulePolicy: &ReschedulePolicy{
Attempts: 1,
Interval: 1 * time.Second,
DelayFunction: "blah"},
errors: []error{
fmt.Errorf("Interval cannot be less than %v (got %v)", ReschedulePolicyMinInterval, time.Second),
fmt.Errorf("Delay cannot be less than %v (got %v)", ReschedulePolicyMinDelay, 0*time.Second),
fmt.Errorf("Invalid delay function %q, must be one of %q", "blah", RescheduleDelayFunctions),
},
},
{
desc: "Invalid delay ceiling",
ReschedulePolicy: &ReschedulePolicy{
Attempts: 1,
Interval: 8 * time.Second,
DelayFunction: "exponential",
Delay: 15 * time.Second,
MaxDelay: 5 * time.Second},
errors: []error{
fmt.Errorf("Delay Ceiling cannot be less than Delay %v (got %v)",
15*time.Second, 5*time.Second),
},
},
{
desc: "Invalid delay and interval",
ReschedulePolicy: &ReschedulePolicy{
Attempts: 1,
Interval: 1 * time.Second,
DelayFunction: "linear"},
errors: []error{
fmt.Errorf("Interval cannot be less than %v (got %v)", ReschedulePolicyMinInterval, time.Second),
fmt.Errorf("Delay cannot be less than %v (got %v)", ReschedulePolicyMinDelay, 0*time.Second),
},
}, {
// Should suggest 2h40m as the interval
desc: "Invalid Attempts - linear delay",
ReschedulePolicy: &ReschedulePolicy{
Attempts: 10,
Interval: 1 * time.Hour,
Delay: 20 * time.Minute,
DelayFunction: "linear",
},
errors: []error{
fmt.Errorf("Nomad can only make %v attempts in %v with initial delay %v and"+
" delay function %q", 3, time.Hour, 20*time.Minute, "linear"),
fmt.Errorf("Set the interval to at least %v to accommodate %v attempts",
200*time.Minute, 10),
},
},
{
// Should suggest 4h40m as the interval
// Delay progression in minutes {5, 10, 20, 40, 40, 40, 40, 40, 40, 40}
desc: "Invalid Attempts - exponential delay",
ReschedulePolicy: &ReschedulePolicy{
Attempts: 10,
Interval: 30 * time.Minute,
Delay: 5 * time.Minute,
MaxDelay: 40 * time.Minute,
DelayFunction: "exponential",
},
errors: []error{
fmt.Errorf("Nomad can only make %v attempts in %v with initial delay %v, "+
"delay function %q, and delay ceiling %v", 3, 30*time.Minute, 5*time.Minute,
"exponential", 40*time.Minute),
fmt.Errorf("Set the interval to at least %v to accommodate %v attempts",
280*time.Minute, 10),
},
},
{
// Should suggest 8h as the interval
// Delay progression in minutes {20, 20, 40, 60, 80, 80, 80, 80, 80, 80}
desc: "Invalid Attempts - fibonacci delay",
ReschedulePolicy: &ReschedulePolicy{
Attempts: 10,
Interval: 1 * time.Hour,
Delay: 20 * time.Minute,
MaxDelay: 80 * time.Minute,
DelayFunction: "fibonacci",
},
errors: []error{
fmt.Errorf("Nomad can only make %v attempts in %v with initial delay %v, "+
"delay function %q, and delay ceiling %v", 4, 1*time.Hour, 20*time.Minute,
"fibonacci", 80*time.Minute),
fmt.Errorf("Set the interval to at least %v to accommodate %v attempts",
480*time.Minute, 10),
},
},
{
desc: "Valid Unlimited config",
ReschedulePolicy: &ReschedulePolicy{
Attempts: 1,
Unlimited: true,
DelayFunction: "exponential",
Delay: 5 * time.Minute,
MaxDelay: 1 * time.Hour,
},
},
{
desc: "Invalid Unlimited config",
ReschedulePolicy: &ReschedulePolicy{
Attempts: 1,
Interval: 1 * time.Second,
Unlimited: true,
DelayFunction: "exponential",
},
errors: []error{
fmt.Errorf("Delay cannot be less than %v (got %v)", ReschedulePolicyMinDelay, 0*time.Second),
fmt.Errorf("Delay Ceiling cannot be less than %v (got %v)", ReschedulePolicyMinDelay, 0*time.Second),
},
},
}
assert := assert.New(t)
for _, tc := range testCases {
if tc.err != nil {
assert.Contains(tc.ReschedulePolicy.Validate().Error(), tc.err.Error())
} else {
assert.Nil(tc.err)
}
t.Run(tc.desc, func(t *testing.T) {
require := require.New(t)
gotErr := tc.ReschedulePolicy.Validate()
if tc.errors != nil {
// Validate all errors
for _, err := range tc.errors {
require.Contains(gotErr.Error(), err.Error())
}
} else {
require.Nil(gotErr)
}
})
}
}
@ -2719,7 +2859,7 @@ func TestAllocation_ShouldReschedule(t *testing.T) {
ClientStatus: AllocClientStatusFailed,
DesiredStatus: AllocDesiredStatusRun,
FailTime: fail,
ReschedulePolicy: &ReschedulePolicy{0, 1 * time.Minute},
ReschedulePolicy: &ReschedulePolicy{Attempts: 0, Interval: 1 * time.Minute},
ShouldReschedule: false,
},
{
@ -2751,7 +2891,7 @@ func TestAllocation_ShouldReschedule(t *testing.T) {
ClientStatus: AllocClientStatusComplete,
DesiredStatus: AllocDesiredStatusRun,
FailTime: fail,
ReschedulePolicy: &ReschedulePolicy{1, 1 * time.Minute},
ReschedulePolicy: &ReschedulePolicy{Attempts: 1, Interval: 1 * time.Minute},
ShouldReschedule: false,
},
{
@ -2759,14 +2899,14 @@ func TestAllocation_ShouldReschedule(t *testing.T) {
ClientStatus: AllocClientStatusFailed,
DesiredStatus: AllocDesiredStatusRun,
FailTime: fail,
ReschedulePolicy: &ReschedulePolicy{1, 1 * time.Minute},
ReschedulePolicy: &ReschedulePolicy{Attempts: 1, Interval: 1 * time.Minute},
ShouldReschedule: true,
},
{
Desc: "Reschedule with leftover attempts",
ClientStatus: AllocClientStatusFailed,
DesiredStatus: AllocDesiredStatusRun,
ReschedulePolicy: &ReschedulePolicy{2, 5 * time.Minute},
ReschedulePolicy: &ReschedulePolicy{Attempts: 2, Interval: 5 * time.Minute},
FailTime: fail,
RescheduleTrackers: []*RescheduleEvent{
{
@ -2780,7 +2920,7 @@ func TestAllocation_ShouldReschedule(t *testing.T) {
ClientStatus: AllocClientStatusFailed,
DesiredStatus: AllocDesiredStatusRun,
FailTime: fail,
ReschedulePolicy: &ReschedulePolicy{1, 5 * time.Minute},
ReschedulePolicy: &ReschedulePolicy{Attempts: 1, Interval: 5 * time.Minute},
RescheduleTrackers: []*RescheduleEvent{
{
RescheduleTime: fail.Add(-6 * time.Minute).UTC().UnixNano(),
@ -2793,7 +2933,7 @@ func TestAllocation_ShouldReschedule(t *testing.T) {
ClientStatus: AllocClientStatusFailed,
DesiredStatus: AllocDesiredStatusRun,
FailTime: fail,
ReschedulePolicy: &ReschedulePolicy{2, 5 * time.Minute},
ReschedulePolicy: &ReschedulePolicy{Attempts: 2, Interval: 5 * time.Minute},
RescheduleTrackers: []*RescheduleEvent{
{
RescheduleTime: fail.Add(-3 * time.Minute).UTC().UnixNano(),
@ -2821,6 +2961,533 @@ func TestAllocation_ShouldReschedule(t *testing.T) {
}
}
func TestAllocation_LastEventTime(t *testing.T) {
type testCase struct {
desc string
taskState map[string]*TaskState
expectedLastEventTime time.Time
}
var timeZero time.Time
t1 := time.Now()
testCases := []testCase{
{
desc: "nil task state",
expectedLastEventTime: timeZero,
},
{
desc: "empty task state",
taskState: make(map[string]*TaskState),
expectedLastEventTime: timeZero,
},
{
desc: "Finished At not set",
taskState: map[string]*TaskState{"foo": {State: "start",
StartedAt: t1.Add(-2 * time.Hour)}},
expectedLastEventTime: timeZero,
},
{
desc: "One finished event",
taskState: map[string]*TaskState{"foo": {State: "start",
StartedAt: t1.Add(-2 * time.Hour),
FinishedAt: t1.Add(-1 * time.Hour)}},
expectedLastEventTime: t1.Add(-1 * time.Hour),
},
{
desc: "Multiple events",
taskState: map[string]*TaskState{"foo": {State: "start",
StartedAt: t1.Add(-2 * time.Hour),
FinishedAt: t1.Add(-1 * time.Hour)},
"bar": {State: "start",
StartedAt: t1.Add(-2 * time.Hour),
FinishedAt: t1.Add(-40 * time.Minute)}},
expectedLastEventTime: t1.Add(-40 * time.Minute),
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
alloc := &Allocation{}
alloc.TaskStates = tc.taskState
require.Equal(t, tc.expectedLastEventTime, alloc.LastEventTime())
})
}
}
func TestAllocation_NextDelay(t *testing.T) {
type testCase struct {
desc string
reschedulePolicy *ReschedulePolicy
alloc *Allocation
expectedRescheduleTime time.Time
expectedRescheduleEligible bool
}
now := time.Now()
testCases := []testCase{
{
desc: "Allocation hasn't failed yet",
reschedulePolicy: &ReschedulePolicy{
DelayFunction: "linear",
Delay: 5 * time.Second,
},
alloc: &Allocation{},
expectedRescheduleTime: time.Time{},
expectedRescheduleEligible: false,
},
{
desc: "Allocation lacks task state",
reschedulePolicy: &ReschedulePolicy{
DelayFunction: "linear",
Delay: 5 * time.Second,
},
alloc: &Allocation{ClientStatus: AllocClientStatusFailed},
expectedRescheduleTime: time.Time{},
expectedRescheduleEligible: false,
},
{
desc: "linear delay, unlimited restarts, no reschedule tracker",
reschedulePolicy: &ReschedulePolicy{
DelayFunction: "linear",
Delay: 5 * time.Second,
Unlimited: true,
},
alloc: &Allocation{
ClientStatus: AllocClientStatusFailed,
TaskStates: map[string]*TaskState{"foo": {State: "dead",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-2 * time.Second)}},
},
expectedRescheduleTime: now.Add(-2 * time.Second).Add(5 * time.Second),
expectedRescheduleEligible: true,
},
{
desc: "linear delay with reschedule tracker",
reschedulePolicy: &ReschedulePolicy{
DelayFunction: "linear",
Delay: 5 * time.Second,
Interval: 10 * time.Minute,
Attempts: 2,
},
alloc: &Allocation{
ClientStatus: AllocClientStatusFailed,
TaskStates: map[string]*TaskState{"foo": {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-2 * time.Second)}},
RescheduleTracker: &RescheduleTracker{
Events: []*RescheduleEvent{{
RescheduleTime: now.Add(-2 * time.Minute).UTC().UnixNano(),
Delay: 5 * time.Second,
}},
}},
expectedRescheduleTime: now.Add(-2 * time.Second).Add(5 * time.Second),
expectedRescheduleEligible: true,
},
{
desc: "linear delay with reschedule tracker, attempts exhausted",
reschedulePolicy: &ReschedulePolicy{
DelayFunction: "linear",
Delay: 5 * time.Second,
Interval: 10 * time.Minute,
Attempts: 2,
},
alloc: &Allocation{
ClientStatus: AllocClientStatusFailed,
TaskStates: map[string]*TaskState{"foo": {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-2 * time.Second)}},
RescheduleTracker: &RescheduleTracker{
Events: []*RescheduleEvent{
{
RescheduleTime: now.Add(-3 * time.Minute).UTC().UnixNano(),
Delay: 5 * time.Second,
},
{
RescheduleTime: now.Add(-2 * time.Minute).UTC().UnixNano(),
Delay: 5 * time.Second,
},
},
}},
expectedRescheduleTime: now.Add(-2 * time.Second).Add(5 * time.Second),
expectedRescheduleEligible: false,
},
{
desc: "exponential delay - no reschedule tracker",
reschedulePolicy: &ReschedulePolicy{
DelayFunction: "exponential",
Delay: 5 * time.Second,
MaxDelay: 90 * time.Second,
Unlimited: true,
},
alloc: &Allocation{
ClientStatus: AllocClientStatusFailed,
TaskStates: map[string]*TaskState{"foo": {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-2 * time.Second)}},
},
expectedRescheduleTime: now.Add(-2 * time.Second).Add(5 * time.Second),
expectedRescheduleEligible: true,
},
{
desc: "exponential delay with reschedule tracker",
reschedulePolicy: &ReschedulePolicy{
DelayFunction: "exponential",
Delay: 5 * time.Second,
MaxDelay: 90 * time.Second,
Unlimited: true,
},
alloc: &Allocation{
ClientStatus: AllocClientStatusFailed,
TaskStates: map[string]*TaskState{"foo": {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-2 * time.Second)}},
RescheduleTracker: &RescheduleTracker{
Events: []*RescheduleEvent{
{
RescheduleTime: now.Add(-2 * time.Hour).UTC().UnixNano(),
Delay: 5 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 10 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 20 * time.Second,
},
},
}},
expectedRescheduleTime: now.Add(-2 * time.Second).Add(40 * time.Second),
expectedRescheduleEligible: true,
},
{
desc: "exponential delay with delay ceiling reached",
reschedulePolicy: &ReschedulePolicy{
DelayFunction: "exponential",
Delay: 5 * time.Second,
MaxDelay: 90 * time.Second,
Unlimited: true,
},
alloc: &Allocation{
ClientStatus: AllocClientStatusFailed,
TaskStates: map[string]*TaskState{"foo": {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-15 * time.Second)}},
RescheduleTracker: &RescheduleTracker{
Events: []*RescheduleEvent{
{
RescheduleTime: now.Add(-2 * time.Hour).UTC().UnixNano(),
Delay: 5 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 10 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 20 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 40 * time.Second,
},
{
RescheduleTime: now.Add(-40 * time.Second).UTC().UnixNano(),
Delay: 80 * time.Second,
},
},
}},
expectedRescheduleTime: now.Add(-15 * time.Second).Add(90 * time.Second),
expectedRescheduleEligible: true,
},
{
// Test case where most recent reschedule ran longer than delay ceiling
desc: "exponential delay, delay ceiling reset condition met",
reschedulePolicy: &ReschedulePolicy{
DelayFunction: "exponential",
Delay: 5 * time.Second,
MaxDelay: 90 * time.Second,
Unlimited: true,
},
alloc: &Allocation{
ClientStatus: AllocClientStatusFailed,
TaskStates: map[string]*TaskState{"foo": {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-15 * time.Minute)}},
RescheduleTracker: &RescheduleTracker{
Events: []*RescheduleEvent{
{
RescheduleTime: now.Add(-2 * time.Hour).UTC().UnixNano(),
Delay: 5 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 10 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 20 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 40 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 80 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 90 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 90 * time.Second,
},
},
}},
expectedRescheduleTime: now.Add(-15 * time.Minute).Add(5 * time.Second),
expectedRescheduleEligible: true,
},
{
desc: "fibonacci delay - no reschedule tracker",
reschedulePolicy: &ReschedulePolicy{
DelayFunction: "fibonacci",
Delay: 5 * time.Second,
MaxDelay: 90 * time.Second,
Unlimited: true,
},
alloc: &Allocation{
ClientStatus: AllocClientStatusFailed,
TaskStates: map[string]*TaskState{"foo": {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-2 * time.Second)}}},
expectedRescheduleTime: now.Add(-2 * time.Second).Add(5 * time.Second),
expectedRescheduleEligible: true,
},
{
desc: "fibonacci delay with reschedule tracker",
reschedulePolicy: &ReschedulePolicy{
DelayFunction: "fibonacci",
Delay: 5 * time.Second,
MaxDelay: 90 * time.Second,
Unlimited: true,
},
alloc: &Allocation{
ClientStatus: AllocClientStatusFailed,
TaskStates: map[string]*TaskState{"foo": {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-2 * time.Second)}},
RescheduleTracker: &RescheduleTracker{
Events: []*RescheduleEvent{
{
RescheduleTime: now.Add(-2 * time.Hour).UTC().UnixNano(),
Delay: 5 * time.Second,
},
{
RescheduleTime: now.Add(-5 * time.Second).UTC().UnixNano(),
Delay: 5 * time.Second,
},
},
}},
expectedRescheduleTime: now.Add(-2 * time.Second).Add(10 * time.Second),
expectedRescheduleEligible: true,
},
{
desc: "fibonacci delay with more events",
reschedulePolicy: &ReschedulePolicy{
DelayFunction: "fibonacci",
Delay: 5 * time.Second,
MaxDelay: 90 * time.Second,
Unlimited: true,
},
alloc: &Allocation{
ClientStatus: AllocClientStatusFailed,
TaskStates: map[string]*TaskState{"foo": {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-2 * time.Second)}},
RescheduleTracker: &RescheduleTracker{
Events: []*RescheduleEvent{
{
RescheduleTime: now.Add(-2 * time.Hour).UTC().UnixNano(),
Delay: 5 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 5 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 10 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 15 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 25 * time.Second,
},
},
}},
expectedRescheduleTime: now.Add(-2 * time.Second).Add(40 * time.Second),
expectedRescheduleEligible: true,
},
{
desc: "fibonacci delay with delay ceiling reached",
reschedulePolicy: &ReschedulePolicy{
DelayFunction: "fibonacci",
Delay: 5 * time.Second,
MaxDelay: 50 * time.Second,
Unlimited: true,
},
alloc: &Allocation{
ClientStatus: AllocClientStatusFailed,
TaskStates: map[string]*TaskState{"foo": {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-15 * time.Second)}},
RescheduleTracker: &RescheduleTracker{
Events: []*RescheduleEvent{
{
RescheduleTime: now.Add(-2 * time.Hour).UTC().UnixNano(),
Delay: 5 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 5 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 10 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 15 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 25 * time.Second,
},
{
RescheduleTime: now.Add(-40 * time.Second).UTC().UnixNano(),
Delay: 40 * time.Second,
},
},
}},
expectedRescheduleTime: now.Add(-15 * time.Second).Add(50 * time.Second),
expectedRescheduleEligible: true,
},
{
desc: "fibonacci delay with delay reset condition met",
reschedulePolicy: &ReschedulePolicy{
DelayFunction: "fibonacci",
Delay: 5 * time.Second,
MaxDelay: 50 * time.Second,
Unlimited: true,
},
alloc: &Allocation{
ClientStatus: AllocClientStatusFailed,
TaskStates: map[string]*TaskState{"foo": {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-5 * time.Minute)}},
RescheduleTracker: &RescheduleTracker{
Events: []*RescheduleEvent{
{
RescheduleTime: now.Add(-2 * time.Hour).UTC().UnixNano(),
Delay: 5 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 5 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 10 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 15 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 25 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 40 * time.Second,
},
},
}},
expectedRescheduleTime: now.Add(-5 * time.Minute).Add(5 * time.Second),
expectedRescheduleEligible: true,
},
{
desc: "fibonacci delay with the most recent event that reset delay value",
reschedulePolicy: &ReschedulePolicy{
DelayFunction: "fibonacci",
Delay: 5 * time.Second,
MaxDelay: 50 * time.Second,
Unlimited: true,
},
alloc: &Allocation{
ClientStatus: AllocClientStatusFailed,
TaskStates: map[string]*TaskState{"foo": {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-5 * time.Second)}},
RescheduleTracker: &RescheduleTracker{
Events: []*RescheduleEvent{
{
RescheduleTime: now.Add(-2 * time.Hour).UTC().UnixNano(),
Delay: 5 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 5 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 10 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 15 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 25 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 40 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
Delay: 50 * time.Second,
},
{
RescheduleTime: now.Add(-1 * time.Minute).UTC().UnixNano(),
Delay: 5 * time.Second,
},
},
}},
expectedRescheduleTime: now.Add(-5 * time.Second).Add(5 * time.Second),
expectedRescheduleEligible: true,
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
require := require.New(t)
j := testJob()
j.TaskGroups[0].ReschedulePolicy = tc.reschedulePolicy
tc.alloc.Job = j
tc.alloc.TaskGroup = j.TaskGroups[0].Name
reschedTime, allowed := tc.alloc.NextRescheduleTime()
require.Equal(tc.expectedRescheduleEligible, allowed)
require.Equal(tc.expectedRescheduleTime, reschedTime)
})
}
}
func TestRescheduleTracker_Copy(t *testing.T) {
type testCase struct {
original *RescheduleTracker
@ -2830,9 +3497,15 @@ func TestRescheduleTracker_Copy(t *testing.T) {
cases := []testCase{
{nil, nil},
{&RescheduleTracker{Events: []*RescheduleEvent{
{2, "12", "12"},
{RescheduleTime: 2,
PrevAllocID: "12",
PrevNodeID: "12",
Delay: 30 * time.Second},
}}, &RescheduleTracker{Events: []*RescheduleEvent{
{2, "12", "12"},
{RescheduleTime: 2,
PrevAllocID: "12",
PrevNodeID: "12",
Delay: 30 * time.Second},
}}},
}

View File

@ -42,6 +42,10 @@ const (
// blockedEvalFailedPlacements is the description used for blocked evals
// that are a result of failing to place all allocations.
blockedEvalFailedPlacements = "created to place remaining allocations"
// maxPastRescheduleEvents is the maximum number of past reschedule event
// that we track when unlimited rescheduling is enabled
maxPastRescheduleEvents = 5
)
// SetStatusError is used to set the status of the evaluation to the given error
@ -72,8 +76,10 @@ type GenericScheduler struct {
ctx *EvalContext
stack *GenericStack
// Deprecated, was used in pre Nomad 0.7 rolling update stanza and in node draining prior to Nomad 0.8
followupEvalWait time.Duration
nextEval *structs.Evaluation
followUpEvals []*structs.Evaluation
deployment *structs.Deployment
@ -204,6 +210,7 @@ func (s *GenericScheduler) process() (bool, error) {
numTaskGroups = len(s.job.TaskGroups)
}
s.queuedAllocs = make(map[string]int, numTaskGroups)
s.followUpEvals = nil
// Create a plan
s.plan = s.eval.MakePlan(s.job)
@ -261,6 +268,19 @@ func (s *GenericScheduler) process() (bool, error) {
s.logger.Printf("[DEBUG] sched: %#v: rolling migration limit reached, next eval '%s' created", s.eval, s.nextEval.ID)
}
// Create follow up evals for any delayed reschedule eligible allocations
if len(s.followUpEvals) > 0 {
for _, eval := range s.followUpEvals {
eval.PreviousEval = s.eval.ID
// TODO(preetha) this should be batching evals before inserting them
if err := s.planner.CreateEval(eval); err != nil {
s.logger.Printf("[ERR] sched: %#v failed to make next eval for rescheduling: %v", s.eval, err)
return false, err
}
s.logger.Printf("[DEBUG] sched: %#v: found reschedulable allocs, next eval '%s' created", s.eval, eval.ID)
}
}
// Submit the plan and store the results.
result, newState, err := s.planner.SubmitPlan(s.plan)
s.planResult = result
@ -336,6 +356,12 @@ func (s *GenericScheduler) computeJobAllocs() error {
// follow up eval to handle node draining.
s.followupEvalWait = results.followupEvalWait
// Store all the follow up evaluations from rescheduled allocations
if len(results.desiredFollowupEvals) > 0 {
for _, evals := range results.desiredFollowupEvals {
s.followUpEvals = append(s.followUpEvals, evals...)
}
}
// Update the stored deployment
if results.deployment != nil {
s.deployment = results.deployment
@ -403,6 +429,9 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
// Update the set of placement nodes
s.stack.SetNodes(nodes)
// Capture current time to use as the start time for any rescheduled allocations
now := time.Now()
// Have to handle destructive changes first as we need to discount their
// resources. To understand this imagine the resources were reduced and the
// count was scaled up.
@ -467,7 +496,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
if prevAllocation != nil {
alloc.PreviousAllocation = prevAllocation.ID
if missing.IsRescheduling() {
updateRescheduleTracker(alloc, prevAllocation)
updateRescheduleTracker(alloc, prevAllocation, now)
}
}
@ -524,14 +553,38 @@ func getSelectOptions(prevAllocation *structs.Allocation, preferredNode *structs
}
// updateRescheduleTracker carries over previous restart attempts and adds the most recent restart
func updateRescheduleTracker(alloc *structs.Allocation, prev *structs.Allocation) {
func updateRescheduleTracker(alloc *structs.Allocation, prev *structs.Allocation, now time.Time) {
reschedPolicy := prev.ReschedulePolicy()
var rescheduleEvents []*structs.RescheduleEvent
if prev.RescheduleTracker != nil {
for _, reschedEvent := range prev.RescheduleTracker.Events {
rescheduleEvents = append(rescheduleEvents, reschedEvent.Copy())
var interval time.Duration
if reschedPolicy != nil {
interval = reschedPolicy.Interval
}
// If attempts is set copy all events in the interval range
if reschedPolicy.Attempts > 0 {
for _, reschedEvent := range prev.RescheduleTracker.Events {
timeDiff := now.UnixNano() - reschedEvent.RescheduleTime
// Only copy over events that are within restart interval
// This keeps the list of events small in cases where there's a long chain of old restart events
if interval > 0 && timeDiff <= interval.Nanoseconds() {
rescheduleEvents = append(rescheduleEvents, reschedEvent.Copy())
}
}
} else {
// Only copy the last n if unlimited is set
start := 0
if len(prev.RescheduleTracker.Events) > maxPastRescheduleEvents {
start = len(prev.RescheduleTracker.Events) - maxPastRescheduleEvents
}
for i := start; i < len(prev.RescheduleTracker.Events); i++ {
reschedEvent := prev.RescheduleTracker.Events[i]
rescheduleEvents = append(rescheduleEvents, reschedEvent.Copy())
}
}
}
rescheduleEvent := structs.NewRescheduleEvent(time.Now().UTC().UnixNano(), prev.ID, prev.NodeID)
nextDelay := prev.NextDelay()
rescheduleEvent := structs.NewRescheduleEvent(now.UnixNano(), prev.ID, prev.NodeID, nextDelay)
rescheduleEvents = append(rescheduleEvents, rescheduleEvent)
alloc.RescheduleTracker = &structs.RescheduleTracker{Events: rescheduleEvents}
}

View File

@ -2715,7 +2715,7 @@ func TestServiceSched_RetryLimit(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusFailed)
}
func TestServiceSched_Reschedule_Once(t *testing.T) {
func TestServiceSched_Reschedule_OnceNow(t *testing.T) {
h := NewHarness(t)
// Create some nodes
@ -2730,9 +2730,15 @@ func TestServiceSched_Reschedule_Once(t *testing.T) {
job := mock.Job()
job.TaskGroups[0].Count = 2
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: 1,
Interval: 15 * time.Minute,
Attempts: 1,
Interval: 15 * time.Minute,
Delay: 5 * time.Second,
MaxDelay: 1 * time.Minute,
DelayFunction: "linear",
}
tgName := job.TaskGroups[0].Name
now := time.Now()
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
var allocs []*structs.Allocation
@ -2746,6 +2752,9 @@ func TestServiceSched_Reschedule_Once(t *testing.T) {
}
// Mark one of the allocations as failed
allocs[1].ClientStatus = structs.AllocClientStatusFailed
allocs[1].TaskStates = map[string]*structs.TaskState{tgName: {State: "dead",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-10 * time.Second)}}
failedAllocID := allocs[1].ID
successAllocID := allocs[0].ID
@ -2817,7 +2826,96 @@ func TestServiceSched_Reschedule_Once(t *testing.T) {
}
func TestServiceSched_Reschedule_Multiple(t *testing.T) {
// Tests that alloc reschedulable at a future time creates a follow up eval
func TestServiceSched_Reschedule_Later(t *testing.T) {
h := NewHarness(t)
require := require.New(t)
// Create some nodes
var nodes []*structs.Node
for i := 0; i < 10; i++ {
node := mock.Node()
nodes = append(nodes, node)
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
}
// Generate a fake job with allocations and an update policy.
job := mock.Job()
job.TaskGroups[0].Count = 2
delayDuration := 15 * time.Second
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: 1,
Interval: 15 * time.Minute,
Delay: delayDuration,
MaxDelay: 1 * time.Minute,
DelayFunction: "linear",
}
tgName := job.TaskGroups[0].Name
now := time.Now()
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
var allocs []*structs.Allocation
for i := 0; i < 2; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = nodes[i].ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
allocs = append(allocs, alloc)
}
// Mark one of the allocations as failed
allocs[1].ClientStatus = structs.AllocClientStatusFailed
allocs[1].TaskStates = map[string]*structs.TaskState{tgName: {State: "dead",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now}}
failedAllocID := allocs[1].ID
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
// Create a mock evaluation
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure multiple plans
if len(h.Plans) == 0 {
t.Fatalf("bad: %#v", h.Plans)
}
// Lookup the allocations by JobID
ws := memdb.NewWatchSet()
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
noErr(t, err)
// Verify no new allocs were created
require.Equal(2, len(out))
// Verify follow up eval was created for the failed alloc
alloc, err := h.State.AllocByID(ws, failedAllocID)
require.Nil(err)
require.NotEmpty(alloc.FollowupEvalID)
// Ensure there is a follow up eval.
if len(h.CreateEvals) != 1 || h.CreateEvals[0].Status != structs.EvalStatusPending {
t.Fatalf("bad: %#v", h.CreateEvals)
}
followupEval := h.CreateEvals[0]
require.Equal(now.Add(delayDuration), followupEval.WaitUntil)
}
func TestServiceSched_Reschedule_MultipleNow(t *testing.T) {
h := NewHarness(t)
// Create some nodes
@ -2833,9 +2931,14 @@ func TestServiceSched_Reschedule_Multiple(t *testing.T) {
job := mock.Job()
job.TaskGroups[0].Count = 2
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: maxRestartAttempts,
Interval: 30 * time.Minute,
Attempts: maxRestartAttempts,
Interval: 30 * time.Minute,
Delay: 5 * time.Second,
DelayFunction: "linear",
}
tgName := job.TaskGroups[0].Name
now := time.Now()
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
var allocs []*structs.Allocation
@ -2850,6 +2953,9 @@ func TestServiceSched_Reschedule_Multiple(t *testing.T) {
}
// Mark one of the allocations as failed
allocs[1].ClientStatus = structs.AllocClientStatusFailed
allocs[1].TaskStates = map[string]*structs.TaskState{tgName: {State: "dead",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-10 * time.Second)}}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
@ -2915,6 +3021,9 @@ func TestServiceSched_Reschedule_Multiple(t *testing.T) {
// Mark this alloc as failed again
newAlloc.ClientStatus = structs.AllocClientStatusFailed
newAlloc.TaskStates = map[string]*structs.TaskState{tgName: {State: "dead",
StartedAt: now.Add(-12 * time.Second),
FinishedAt: now.Add(-10 * time.Second)}}
failedAllocId = newAlloc.ID
failedNodeID = newAlloc.NodeID
@ -2946,6 +3055,136 @@ func TestServiceSched_Reschedule_Multiple(t *testing.T) {
assert.Equal(5, len(out)) // 2 original, plus 3 reschedule attempts
}
// Tests that old reschedule attempts are pruned
func TestServiceSched_Reschedule_PruneEvents(t *testing.T) {
h := NewHarness(t)
// Create some nodes
var nodes []*structs.Node
for i := 0; i < 10; i++ {
node := mock.Node()
nodes = append(nodes, node)
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
}
// Generate a fake job with allocations and an update policy.
job := mock.Job()
job.TaskGroups[0].Count = 2
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
DelayFunction: "exponential",
MaxDelay: 1 * time.Hour,
Delay: 5 * time.Second,
Unlimited: true,
}
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
var allocs []*structs.Allocation
for i := 0; i < 2; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = nodes[i].ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
allocs = append(allocs, alloc)
}
now := time.Now()
// Mark allocations as failed with restart info
allocs[1].TaskStates = map[string]*structs.TaskState{job.TaskGroups[0].Name: {State: "dead",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-15 * time.Minute)}}
allocs[1].ClientStatus = structs.AllocClientStatusFailed
allocs[1].RescheduleTracker = &structs.RescheduleTracker{
Events: []*structs.RescheduleEvent{
{RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
PrevAllocID: uuid.Generate(),
PrevNodeID: uuid.Generate(),
Delay: 5 * time.Second,
},
{RescheduleTime: now.Add(-40 * time.Minute).UTC().UnixNano(),
PrevAllocID: allocs[0].ID,
PrevNodeID: uuid.Generate(),
Delay: 10 * time.Second,
},
{RescheduleTime: now.Add(-30 * time.Minute).UTC().UnixNano(),
PrevAllocID: allocs[0].ID,
PrevNodeID: uuid.Generate(),
Delay: 20 * time.Second,
},
{RescheduleTime: now.Add(-20 * time.Minute).UTC().UnixNano(),
PrevAllocID: allocs[0].ID,
PrevNodeID: uuid.Generate(),
Delay: 40 * time.Second,
},
{RescheduleTime: now.Add(-10 * time.Minute).UTC().UnixNano(),
PrevAllocID: allocs[0].ID,
PrevNodeID: uuid.Generate(),
Delay: 80 * time.Second,
},
{RescheduleTime: now.Add(-3 * time.Minute).UTC().UnixNano(),
PrevAllocID: allocs[0].ID,
PrevNodeID: uuid.Generate(),
Delay: 160 * time.Second,
},
},
}
expectedFirstRescheduleEvent := allocs[1].RescheduleTracker.Events[1]
expectedDelay := 320 * time.Second
failedAllocID := allocs[1].ID
successAllocID := allocs[0].ID
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
// Create a mock evaluation
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure multiple plans
if len(h.Plans) == 0 {
t.Fatalf("bad: %#v", h.Plans)
}
// Lookup the allocations by JobID
ws := memdb.NewWatchSet()
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
noErr(t, err)
// Verify that one new allocation got created with its restart tracker info
assert := assert.New(t)
assert.Equal(3, len(out))
var newAlloc *structs.Allocation
for _, alloc := range out {
if alloc.ID != successAllocID && alloc.ID != failedAllocID {
newAlloc = alloc
}
}
assert.Equal(failedAllocID, newAlloc.PreviousAllocation)
// Verify that the new alloc copied the last 5 reschedule attempts
assert.Equal(6, len(newAlloc.RescheduleTracker.Events))
assert.Equal(expectedFirstRescheduleEvent, newAlloc.RescheduleTracker.Events[0])
mostRecentRescheduleEvent := newAlloc.RescheduleTracker.Events[5]
// Verify that the failed alloc ID is in the most recent reschedule event
assert.Equal(failedAllocID, mostRecentRescheduleEvent.PrevAllocID)
// Verify that the delay value was captured correctly
assert.Equal(expectedDelay, mostRecentRescheduleEvent.Delay)
}
// Tests that deployments with failed allocs don't result in placements
func TestDeployment_FailedAllocs_NoReschedule(t *testing.T) {
h := NewHarness(t)
@ -3079,6 +3318,9 @@ func TestBatchSched_Run_FailedAlloc(t *testing.T) {
job.TaskGroups[0].Count = 1
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
tgName := job.TaskGroups[0].Name
now := time.Now()
// Create a failed alloc
alloc := mock.Alloc()
alloc.Job = job
@ -3086,6 +3328,9 @@ func TestBatchSched_Run_FailedAlloc(t *testing.T) {
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
alloc.ClientStatus = structs.AllocClientStatusFailed
alloc.TaskStates = map[string]*structs.TaskState{tgName: {State: "dead",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-10 * time.Second)}}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc}))
// Create a mock evaluation to register the job
@ -3231,6 +3476,9 @@ func TestBatchSched_Run_FailedAllocQueuedAllocations(t *testing.T) {
job.TaskGroups[0].Count = 1
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
tgName := job.TaskGroups[0].Name
now := time.Now()
// Create a failed alloc
alloc := mock.Alloc()
alloc.Job = job
@ -3238,6 +3486,9 @@ func TestBatchSched_Run_FailedAllocQueuedAllocations(t *testing.T) {
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
alloc.ClientStatus = structs.AllocClientStatusFailed
alloc.TaskStates = map[string]*structs.TaskState{tgName: {State: "dead",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-10 * time.Second)}}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc}))
// Create a mock evaluation to register the job
@ -3963,3 +4214,233 @@ func TestServiceSched_CancelDeployment_NewerJob(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
// Various table driven tests for carry forward
// of past reschedule events
func Test_updateRescheduleTracker(t *testing.T) {
t1 := time.Now().UTC()
alloc := mock.Alloc()
prevAlloc := mock.Alloc()
type testCase struct {
desc string
prevAllocEvents []*structs.RescheduleEvent
reschedPolicy *structs.ReschedulePolicy
expectedRescheduleEvents []*structs.RescheduleEvent
reschedTime time.Time
}
testCases := []testCase{
{
desc: "No past events",
prevAllocEvents: nil,
reschedPolicy: &structs.ReschedulePolicy{Unlimited: false, Interval: 24 * time.Hour, Attempts: 2, Delay: 5 * time.Second},
reschedTime: t1,
expectedRescheduleEvents: []*structs.RescheduleEvent{{t1.UnixNano(), prevAlloc.ID, prevAlloc.NodeID, 5 * time.Second}},
},
{
desc: "one past event, linear delay",
prevAllocEvents: []*structs.RescheduleEvent{
{RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 5 * time.Second}},
reschedPolicy: &structs.ReschedulePolicy{Unlimited: false, Interval: 24 * time.Hour, Attempts: 2, Delay: 5 * time.Second},
reschedTime: t1,
expectedRescheduleEvents: []*structs.RescheduleEvent{
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 5 * time.Second,
},
{
RescheduleTime: t1.UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 5 * time.Second,
},
},
},
{
desc: "one past event, fibonacci delay",
prevAllocEvents: []*structs.RescheduleEvent{
{RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 5 * time.Second}},
reschedPolicy: &structs.ReschedulePolicy{Unlimited: false, Interval: 24 * time.Hour, Attempts: 2, Delay: 5 * time.Second, DelayFunction: "fibonacci", MaxDelay: 60 * time.Second},
reschedTime: t1,
expectedRescheduleEvents: []*structs.RescheduleEvent{
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 5 * time.Second,
},
{
RescheduleTime: t1.UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 5 * time.Second,
},
},
},
{
desc: "eight past events, fibonacci delay, unlimited",
prevAllocEvents: []*structs.RescheduleEvent{
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 5 * time.Second,
},
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 5 * time.Second,
},
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 10 * time.Second,
},
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 15 * time.Second,
},
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 25 * time.Second,
},
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 40 * time.Second,
},
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 65 * time.Second,
},
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 105 * time.Second,
},
},
reschedPolicy: &structs.ReschedulePolicy{Unlimited: true, Delay: 5 * time.Second, DelayFunction: "fibonacci", MaxDelay: 240 * time.Second},
reschedTime: t1,
expectedRescheduleEvents: []*structs.RescheduleEvent{
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 15 * time.Second,
},
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 25 * time.Second,
},
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 40 * time.Second,
},
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 65 * time.Second,
},
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 105 * time.Second,
},
{
RescheduleTime: t1.UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 170 * time.Second,
},
},
},
{
desc: " old attempts past interval, exponential delay, limited",
prevAllocEvents: []*structs.RescheduleEvent{
{
RescheduleTime: t1.Add(-2 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 5 * time.Second,
},
{
RescheduleTime: t1.Add(-70 * time.Minute).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 10 * time.Second,
},
{
RescheduleTime: t1.Add(-30 * time.Minute).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 20 * time.Second,
},
{
RescheduleTime: t1.Add(-10 * time.Minute).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 40 * time.Second,
},
},
reschedPolicy: &structs.ReschedulePolicy{Unlimited: false, Interval: 1 * time.Hour, Attempts: 5, Delay: 5 * time.Second, DelayFunction: "exponential", MaxDelay: 240 * time.Second},
reschedTime: t1,
expectedRescheduleEvents: []*structs.RescheduleEvent{
{
RescheduleTime: t1.Add(-30 * time.Minute).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 20 * time.Second,
},
{
RescheduleTime: t1.Add(-10 * time.Minute).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 40 * time.Second,
},
{
RescheduleTime: t1.UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 80 * time.Second,
},
},
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
require := require.New(t)
prevAlloc.RescheduleTracker = &structs.RescheduleTracker{Events: tc.prevAllocEvents}
prevAlloc.Job.LookupTaskGroup(prevAlloc.TaskGroup).ReschedulePolicy = tc.reschedPolicy
updateRescheduleTracker(alloc, prevAlloc, tc.reschedTime)
require.Equal(tc.expectedRescheduleEvents, alloc.RescheduleTracker.Events)
})
}
}

View File

@ -5,10 +5,19 @@ import (
"log"
"time"
"sort"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// batchedFailedAllocWindowSize is the window size used
// to batch up failed allocations before creating an eval
batchedFailedAllocWindowSize = 5 * time.Second
)
// allocUpdateType takes an existing allocation and a new job definition and
// returns whether the allocation can ignore the change, requires a destructive
// update, or can be inplace updated. If it can be inplace updated, an updated
@ -92,7 +101,23 @@ type reconcileResults struct {
// followupEvalWait is set if there should be a followup eval run after the
// given duration
// Deprecated, the delay strategy that sets this is not available after nomad 0.7.0
followupEvalWait time.Duration
// desiredFollowupEvals is the map of follow up evaluations to create per task group
// This is used to create a delayed evaluation for rescheduling failed allocations.
desiredFollowupEvals map[string][]*structs.Evaluation
}
// delayedRescheduleInfo contains the allocation id and a time when its eligible to be rescheduled.
// this is used to create follow up evaluations
type delayedRescheduleInfo struct {
// allocID is the ID of the allocation eligible to be rescheduled
allocID string
// rescheduleTime is the time to use in the delayed evaluation
rescheduleTime time.Time
}
func (r *reconcileResults) GoString() string {
@ -136,7 +161,8 @@ func NewAllocReconciler(logger *log.Logger, allocUpdateFn allocUpdateType, batch
existingAllocs: existingAllocs,
taintedNodes: taintedNodes,
result: &reconcileResults{
desiredTGUpdates: make(map[string]*structs.DesiredUpdates),
desiredTGUpdates: make(map[string]*structs.DesiredUpdates),
desiredFollowupEvals: make(map[string][]*structs.Evaluation),
},
}
}
@ -318,11 +344,17 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
untainted, migrate, lost := all.filterByTainted(a.taintedNodes)
// Determine what set of terminal allocations need to be rescheduled
untainted, reschedule := untainted.filterByReschedulable(a.batch, tg.ReschedulePolicy)
untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch)
// Create batched follow up evaluations for allocations that are reschedulable later
var rescheduleLaterAllocs map[string]*structs.Allocation
if len(rescheduleLater) > 0 {
rescheduleLaterAllocs = a.handleDelayedReschedules(rescheduleLater, all, tg.Name)
}
// Create a structure for choosing names. Seed with the taken names which is
// the union of untainted and migrating nodes (includes canaries)
nameIndex := newAllocNameIndex(a.jobID, group, tg.Count, untainted.union(migrate, reschedule))
nameIndex := newAllocNameIndex(a.jobID, group, tg.Count, untainted.union(migrate, rescheduleNow))
// Stop any unneeded allocations and update the untainted set to not
// included stopped allocations.
@ -341,7 +373,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// Do inplace upgrades where possible and capture the set of upgrades that
// need to be done destructively.
ignore, inplace, destructive := a.computeUpdates(tg, untainted)
ignore, inplace, destructive := a.computeUpdates(tg, untainted, rescheduleLaterAllocs)
desiredChanges.Ignore += uint64(len(ignore))
desiredChanges.InPlaceUpdate += uint64(len(inplace))
if !existingDeployment {
@ -379,7 +411,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// * The deployment is not paused or failed
// * Not placing any canaries
// * If there are any canaries that they have been promoted
place := a.computePlacements(tg, nameIndex, untainted, migrate, reschedule)
place := a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow)
if !existingDeployment {
dstate.DesiredTotal += len(place)
}
@ -774,7 +806,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
// 2. Those that can be upgraded in-place. These are added to the results
// automatically since the function contains the correct state to do so,
// 3. Those that require destructive updates
func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted allocSet) (ignore, inplace, destructive allocSet) {
func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted, rescheduleLaterAllocs allocSet) (ignore, inplace, destructive allocSet) {
// Determine the set of allocations that need to be updated
ignore = make(map[string]*structs.Allocation)
inplace = make(map[string]*structs.Allocation)
@ -782,7 +814,13 @@ func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted all
for _, alloc := range untainted {
ignoreChange, destructiveChange, inplaceAlloc := a.allocUpdateFn(alloc, a.job, group)
if ignoreChange {
// Also check if the alloc is marked for later rescheduling.
// If so it should be in the inplace list
reschedLaterAlloc, isRescheduleLater := rescheduleLaterAllocs[alloc.ID]
if isRescheduleLater {
inplace[alloc.ID] = alloc
a.result.inplaceUpdate = append(a.result.inplaceUpdate, reschedLaterAlloc)
} else if ignoreChange {
ignore[alloc.ID] = alloc
} else if destructiveChange {
destructive[alloc.ID] = alloc
@ -796,3 +834,64 @@ func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted all
return
}
// handleDelayedReschedules creates batched followup evaluations with the WaitUntil field set
// for allocations that are eligible to be rescheduled later
func (a *allocReconciler) handleDelayedReschedules(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) allocSet {
// Sort by time
sort.Slice(rescheduleLater, func(i, j int) bool {
return rescheduleLater[i].rescheduleTime.Before(rescheduleLater[j].rescheduleTime)
})
var evals []*structs.Evaluation
nextReschedTime := rescheduleLater[0].rescheduleTime
allocIDToFollowupEvalID := make(map[string]string, len(rescheduleLater))
// Create a new eval for the first batch
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: a.job.Namespace,
Priority: a.job.Priority,
Type: a.job.Type,
TriggeredBy: structs.EvalTriggerRetryFailedAlloc,
JobID: a.job.ID,
JobModifyIndex: a.job.ModifyIndex,
Status: structs.EvalStatusPending,
WaitUntil: nextReschedTime,
}
evals = append(evals, eval)
for _, allocReschedInfo := range rescheduleLater {
if allocReschedInfo.rescheduleTime.Sub(nextReschedTime) < batchedFailedAllocWindowSize {
allocIDToFollowupEvalID[allocReschedInfo.allocID] = eval.ID
} else {
// Start a new batch
nextReschedTime = allocReschedInfo.rescheduleTime
// Create a new eval for the new batch
eval = &structs.Evaluation{
ID: uuid.Generate(),
Namespace: a.job.Namespace,
Priority: a.job.Priority,
Type: a.job.Type,
TriggeredBy: structs.EvalTriggerRetryFailedAlloc,
JobID: a.job.ID,
JobModifyIndex: a.job.ModifyIndex,
Status: structs.EvalStatusPending,
WaitUntil: nextReschedTime,
}
evals = append(evals, eval)
// Set the evalID for the first alloc in this new batch
allocIDToFollowupEvalID[allocReschedInfo.allocID] = eval.ID
}
}
a.result.desiredFollowupEvals[tgName] = evals
// Create in-place updates for every alloc ID that needs to be updated with its follow up eval ID
rescheduleLaterAllocs := make(map[string]*structs.Allocation)
for allocID, evalID := range allocIDToFollowupEvalID {
existingAlloc := all[allocID]
updatedAlloc := existingAlloc.Copy()
updatedAlloc.FollowupEvalID = evalID
rescheduleLaterAllocs[allocID] = updatedAlloc
}
return rescheduleLaterAllocs
}

View File

@ -15,6 +15,7 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/kr/pretty"
"github.com/stretchr/testify/require"
)
/*
@ -38,9 +39,12 @@ Basic Tests:
Handle task group being removed
Handle job being stopped both as .Stopped and nil
Place more that one group
Handle rescheduling failed allocs for batch jobs
Handle rescheduling failed allocs for service jobs
Handle delayed rescheduling failed allocs for batch jobs
Handle delayed rescheduling failed allocs for service jobs
Handle eligible now rescheduling failed allocs for batch jobs
Handle eligible now rescheduling failed allocs for service jobs
Previously rescheduled allocs should not be rescheduled again
Aggregated evaluations for allocations that fail close together
Update stanza Tests:
Stopped job cancels any active deployment
@ -1203,15 +1207,17 @@ func TestReconciler_MultiTG(t *testing.T) {
assertNamesHaveIndexes(t, intRange(2, 9, 0, 9), placeResultsToNames(r.place))
}
// Tests rescheduling failed batch allocations
func TestReconciler_Reschedule_Batch(t *testing.T) {
// Tests delayed rescheduling of failed batch allocations
func TestReconciler_RescheduleLater_Batch(t *testing.T) {
require := require.New(t)
// Set desired 4
job := mock.Job()
job.TaskGroups[0].Count = 4
now := time.Now()
// Set up reschedule policy
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 3, Interval: 24 * time.Hour}
delayDur := 15 * time.Second
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 3, Interval: 24 * time.Hour, Delay: delayDur, DelayFunction: "linear"}
tgName := job.TaskGroups[0].Name
// Create 6 existing allocations - 2 running, 1 complete and 3 failed
var allocs []*structs.Allocation
for i := 0; i < 6; i++ {
@ -1235,6 +1241,9 @@ func TestReconciler_Reschedule_Batch(t *testing.T) {
}}
allocs[1].NextAllocation = allocs[2].ID
allocs[2].ClientStatus = structs.AllocClientStatusFailed
allocs[2].TaskStates = map[string]*structs.TaskState{tgName: {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now}}
allocs[2].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{
{RescheduleTime: time.Now().Add(-2 * time.Hour).UTC().UnixNano(),
PrevAllocID: allocs[0].ID,
@ -1251,7 +1260,171 @@ func TestReconciler_Reschedule_Batch(t *testing.T) {
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, true, job.ID, job, nil, allocs, nil)
r := reconciler.Compute()
// Two reschedule attempts were made, one more can be made
// Two reschedule attempts were already made, one more can be made at a future time
// Verify that the follow up eval has the expected waitUntil time
evals := r.desiredFollowupEvals[tgName]
require.NotNil(evals)
require.Equal(1, len(evals))
require.Equal(now.Add(delayDur), evals[0].WaitUntil)
// Alloc 5 should not be replaced because it is terminal
assertResults(t, r, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: 0,
inplace: 1,
stop: 0,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Place: 0,
InPlaceUpdate: 1,
Ignore: 3,
},
},
})
assertNamesHaveIndexes(t, intRange(2, 2), allocsToNames(r.inplaceUpdate))
// verify that the followup evalID field is set correctly
r.inplaceUpdate[0].EvalID = evals[0].ID
}
// Tests delayed rescheduling of failed batch allocations and batching of allocs
// with fail times that are close together
func TestReconciler_RescheduleLaterWithBatchedEvals_Batch(t *testing.T) {
require := require.New(t)
// Set desired 4
job := mock.Job()
job.TaskGroups[0].Count = 10
now := time.Now()
// Set up reschedule policy
delayDur := 15 * time.Second
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 3, Interval: 24 * time.Hour, Delay: delayDur, DelayFunction: "linear"}
tgName := job.TaskGroups[0].Name
// Create 10 existing allocations
var allocs []*structs.Allocation
for i := 0; i < 10; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = uuid.Generate()
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
allocs = append(allocs, alloc)
alloc.ClientStatus = structs.AllocClientStatusRunning
}
// Mark 5 as failed with fail times very close together
for i := 0; i < 5; i++ {
allocs[i].ClientStatus = structs.AllocClientStatusFailed
allocs[i].TaskStates = map[string]*structs.TaskState{tgName: {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(time.Duration(50*i) * time.Millisecond)}}
}
// Mark two more as failed several seconds later
for i := 5; i < 7; i++ {
allocs[i].ClientStatus = structs.AllocClientStatusFailed
allocs[i].TaskStates = map[string]*structs.TaskState{tgName: {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(10 * time.Second)}}
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, true, job.ID, job, nil, allocs, nil)
r := reconciler.Compute()
// Verify that two follow up evals were created
evals := r.desiredFollowupEvals[tgName]
require.NotNil(evals)
require.Equal(2, len(evals))
// Verify expected WaitUntil values for both batched evals
require.Equal(now.Add(delayDur), evals[0].WaitUntil)
secondBatchDuration := delayDur + 10*time.Second
require.Equal(now.Add(secondBatchDuration), evals[1].WaitUntil)
// Alloc 5 should not be replaced because it is terminal
assertResults(t, r, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: 0,
inplace: 7,
stop: 0,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Place: 0,
InPlaceUpdate: 7,
Ignore: 3,
},
},
})
assertNamesHaveIndexes(t, intRange(0, 6), allocsToNames(r.inplaceUpdate))
// verify that the followup evalID field is set correctly
for _, alloc := range r.inplaceUpdate {
if allocNameToIndex(alloc.Name) < 5 {
require.Equal(evals[0].ID, alloc.FollowupEvalID)
} else if allocNameToIndex(alloc.Name) < 7 {
require.Equal(evals[1].ID, alloc.FollowupEvalID)
} else {
t.Fatalf("Unexpected alloc name in Inplace results %v", alloc.Name)
}
}
}
// Tests rescheduling failed batch allocations
func TestReconciler_RescheduleNow_Batch(t *testing.T) {
require := require.New(t)
// Set desired 4
job := mock.Job()
job.TaskGroups[0].Count = 4
now := time.Now()
// Set up reschedule policy
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 3, Interval: 24 * time.Hour, Delay: 5 * time.Second, DelayFunction: "linear"}
tgName := job.TaskGroups[0].Name
// Create 6 existing allocations - 2 running, 1 complete and 3 failed
var allocs []*structs.Allocation
for i := 0; i < 6; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = uuid.Generate()
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
allocs = append(allocs, alloc)
alloc.ClientStatus = structs.AllocClientStatusRunning
}
// Mark 3 as failed with restart tracking info
allocs[0].ClientStatus = structs.AllocClientStatusFailed
allocs[0].NextAllocation = allocs[1].ID
allocs[1].ClientStatus = structs.AllocClientStatusFailed
allocs[1].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{
{RescheduleTime: time.Now().Add(-1 * time.Hour).UTC().UnixNano(),
PrevAllocID: allocs[0].ID,
PrevNodeID: uuid.Generate(),
},
}}
allocs[1].NextAllocation = allocs[2].ID
allocs[2].ClientStatus = structs.AllocClientStatusFailed
allocs[2].TaskStates = map[string]*structs.TaskState{tgName: {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-10 * time.Second)}}
allocs[2].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{
{RescheduleTime: time.Now().Add(-2 * time.Hour).UTC().UnixNano(),
PrevAllocID: allocs[0].ID,
PrevNodeID: uuid.Generate(),
},
{RescheduleTime: time.Now().Add(-1 * time.Hour).UTC().UnixNano(),
PrevAllocID: allocs[1].ID,
PrevNodeID: uuid.Generate(),
},
}}
// Mark one as complete
allocs[5].ClientStatus = structs.AllocClientStatusComplete
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, true, job.ID, job, nil, allocs, nil)
r := reconciler.Compute()
// Verify that no follow up evals were created
evals := r.desiredFollowupEvals[tgName]
require.Nil(evals)
// Two reschedule attempts were made, one more can be made now
// Alloc 5 should not be replaced because it is terminal
assertResults(t, r, &resultExpectation{
createDeployment: nil,
@ -1266,19 +1439,24 @@ func TestReconciler_Reschedule_Batch(t *testing.T) {
},
},
})
assertNamesHaveIndexes(t, intRange(2, 2), placeResultsToNames(r.place))
assertPlaceResultsHavePreviousAllocs(t, 1, r.place)
assertPlacementsAreRescheduled(t, 1, r.place)
}
// Tests rescheduling failed service allocations with desired state stop
func TestReconciler_Reschedule_Service(t *testing.T) {
func TestReconciler_RescheduleLater_Service(t *testing.T) {
require := require.New(t)
// Set desired 5
job := mock.Job()
job.TaskGroups[0].Count = 5
tgName := job.TaskGroups[0].Name
now := time.Now()
// Set up reschedule policy
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 1, Interval: 24 * time.Hour}
delayDur := 15 * time.Second
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 1, Interval: 24 * time.Hour, Delay: delayDur, MaxDelay: 1 * time.Hour}
// Create 5 existing allocations
var allocs []*structs.Allocation
@ -1293,15 +1471,17 @@ func TestReconciler_Reschedule_Service(t *testing.T) {
}
// Mark two as failed
allocs[0].ClientStatus = structs.AllocClientStatusFailed
allocs[1].ClientStatus = structs.AllocClientStatusFailed
// Mark one of them as already rescheduled once
allocs[1].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{
allocs[0].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{
{RescheduleTime: time.Now().Add(-1 * time.Hour).UTC().UnixNano(),
PrevAllocID: uuid.Generate(),
PrevNodeID: uuid.Generate(),
},
}}
allocs[1].TaskStates = map[string]*structs.TaskState{tgName: {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now}}
allocs[1].ClientStatus = structs.AllocClientStatusFailed
// Mark one as desired state stop
allocs[4].DesiredStatus = structs.AllocDesiredStatusStop
@ -1309,7 +1489,81 @@ func TestReconciler_Reschedule_Service(t *testing.T) {
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil)
r := reconciler.Compute()
// Should place 2, one is rescheduled, one is past its reschedule limit and one is a new placement
// Should place a new placement and create a follow up eval for the delayed reschedule
// Verify that the follow up eval has the expected waitUntil time
evals := r.desiredFollowupEvals[tgName]
require.NotNil(evals)
require.Equal(1, len(evals))
require.Equal(now.Add(delayDur), evals[0].WaitUntil)
assertResults(t, r, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: 1,
inplace: 1,
stop: 0,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Place: 1,
InPlaceUpdate: 1,
Ignore: 3,
},
},
})
assertNamesHaveIndexes(t, intRange(4, 4), placeResultsToNames(r.place))
assertNamesHaveIndexes(t, intRange(1, 1), allocsToNames(r.inplaceUpdate))
// verify that the followup evalID field is set correctly
r.inplaceUpdate[0].EvalID = evals[0].ID
}
// Tests rescheduling failed service allocations with desired state stop
func TestReconciler_RescheduleNow_Service(t *testing.T) {
require := require.New(t)
// Set desired 5
job := mock.Job()
job.TaskGroups[0].Count = 5
tgName := job.TaskGroups[0].Name
now := time.Now()
// Set up reschedule policy
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 1, Interval: 24 * time.Hour, Delay: 5 * time.Second, MaxDelay: 1 * time.Hour}
// Create 5 existing allocations
var allocs []*structs.Allocation
for i := 0; i < 5; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = uuid.Generate()
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
allocs = append(allocs, alloc)
alloc.ClientStatus = structs.AllocClientStatusRunning
}
// Mark two as failed
allocs[0].ClientStatus = structs.AllocClientStatusFailed
// Mark one of them as already rescheduled once
allocs[0].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{
{RescheduleTime: time.Now().Add(-1 * time.Hour).UTC().UnixNano(),
PrevAllocID: uuid.Generate(),
PrevNodeID: uuid.Generate(),
},
}}
allocs[1].TaskStates = map[string]*structs.TaskState{tgName: {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-10 * time.Second)}}
allocs[1].ClientStatus = structs.AllocClientStatusFailed
// Mark one as desired state stop
allocs[4].DesiredStatus = structs.AllocDesiredStatusStop
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil)
r := reconciler.Compute()
// Verify that no follow up evals were created
evals := r.desiredFollowupEvals[tgName]
require.Nil(evals)
// Verify that one rescheduled alloc and one replacement for terminal alloc were placed
assertResults(t, r, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
@ -1324,8 +1578,8 @@ func TestReconciler_Reschedule_Service(t *testing.T) {
},
})
assertNamesHaveIndexes(t, intRange(0, 0, 4, 4), placeResultsToNames(r.place))
// 2 rescheduled allocs should have previous allocs
assertNamesHaveIndexes(t, intRange(1, 1, 4, 4), placeResultsToNames(r.place))
// Rescheduled allocs should have previous allocs
assertPlaceResultsHavePreviousAllocs(t, 1, r.place)
assertPlacementsAreRescheduled(t, 1, r.place)
}
@ -3374,6 +3628,8 @@ func TestReconciler_FailedDeployment_DontReschedule(t *testing.T) {
job := mock.Job()
job.TaskGroups[0].Update = noCanaryUpdate
tgName := job.TaskGroups[0].Name
now := time.Now()
// Create an existing failed deployment that has some placed allocs
d := structs.NewDeployment(job)
d.Status = structs.DeploymentStatusFailed
@ -3394,8 +3650,17 @@ func TestReconciler_FailedDeployment_DontReschedule(t *testing.T) {
alloc.TaskGroup = job.TaskGroups[0].Name
allocs = append(allocs, alloc)
}
//create some allocations that are reschedulable now
allocs[2].ClientStatus = structs.AllocClientStatusFailed
allocs[2].TaskStates = map[string]*structs.TaskState{tgName: {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-10 * time.Second)}}
allocs[3].ClientStatus = structs.AllocClientStatusFailed
allocs[3].TaskStates = map[string]*structs.TaskState{tgName: {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-10 * time.Second)}}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil)
r := reconciler.Compute()
@ -3417,6 +3682,8 @@ func TestReconciler_FailedDeployment_DontReschedule(t *testing.T) {
func TestReconciler_DeploymentWithFailedAllocs_DontReschedule(t *testing.T) {
job := mock.Job()
job.TaskGroups[0].Update = noCanaryUpdate
tgName := job.TaskGroups[0].Name
now := time.Now()
// Mock deployment with failed allocs, but deployment watcher hasn't marked it as failed yet
d := structs.NewDeployment(job)
@ -3439,8 +3706,17 @@ func TestReconciler_DeploymentWithFailedAllocs_DontReschedule(t *testing.T) {
alloc.DeploymentID = d.ID
allocs = append(allocs, alloc)
}
// Create allocs that are reschedulable now
allocs[2].ClientStatus = structs.AllocClientStatusFailed
allocs[2].TaskStates = map[string]*structs.TaskState{tgName: {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-10 * time.Second)}}
allocs[3].ClientStatus = structs.AllocClientStatusFailed
allocs[3].TaskStates = map[string]*structs.TaskState{tgName: {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-10 * time.Second)}}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil)
r := reconciler.Compute()

View File

@ -227,14 +227,17 @@ func (a allocSet) filterByTainted(nodes map[string]*structs.Node) (untainted, mi
return
}
// filterByReschedulable filters the allocation set to return the set of allocations that are either
// terminal or running, and a set of allocations that must be rescheduled
func (a allocSet) filterByReschedulable(isBatch bool, reschedulePolicy *structs.ReschedulePolicy) (untainted, reschedule allocSet) {
// filterByRescheduleable filters the allocation set to return the set of allocations that are either
// terminal or running, and a set of allocations that must be rescheduled now. Allocations that can be rescheduled
// at a future time are also returned so that we can create follow up evaluations for them
func (a allocSet) filterByRescheduleable(isBatch bool) (untainted, rescheduleNow allocSet, rescheduleLater []*delayedRescheduleInfo) {
untainted = make(map[string]*structs.Allocation)
reschedule = make(map[string]*structs.Allocation)
rescheduleNow = make(map[string]*structs.Allocation)
now := time.Now()
for _, alloc := range a {
var isUntainted, eligibleNow, eligibleLater bool
var rescheduleTime time.Time
if isBatch {
// Allocs from batch jobs should be filtered when the desired status
// is terminal and the client did not finish or when the client
@ -249,26 +252,47 @@ func (a allocSet) filterByReschedulable(isBatch bool, reschedulePolicy *structs.
default:
}
if alloc.NextAllocation == "" {
if alloc.ShouldReschedule(reschedulePolicy, now) {
reschedule[alloc.ID] = alloc
} else {
untainted[alloc.ID] = alloc
}
// Ignore allocs that have already been rescheduled
isUntainted, eligibleNow, eligibleLater, rescheduleTime = updateByReschedulable(alloc, now, true)
}
} else {
//ignore allocs that have already been rescheduled
// Ignore allocs that have already been rescheduled
if alloc.NextAllocation == "" {
// ignore allocs whose desired state is stop/evict
// everything else is either reschedulable or untainted
if alloc.ShouldReschedule(reschedulePolicy, now) {
reschedule[alloc.ID] = alloc
} else if alloc.DesiredStatus != structs.AllocDesiredStatusStop && alloc.DesiredStatus != structs.AllocDesiredStatusEvict {
untainted[alloc.ID] = alloc
}
isUntainted, eligibleNow, eligibleLater, rescheduleTime = updateByReschedulable(alloc, now, false)
}
}
if isUntainted {
untainted[alloc.ID] = alloc
}
if eligibleNow {
rescheduleNow[alloc.ID] = alloc
} else if eligibleLater {
rescheduleLater = append(rescheduleLater, &delayedRescheduleInfo{alloc.ID, rescheduleTime})
}
}
return
}
// updateByReschedulable is a helper method that encapsulates logic for whether a failed allocation
// should be rescheduled now, later or left in the untainted set
func updateByReschedulable(alloc *structs.Allocation, now time.Time, batch bool) (untainted, rescheduleNow, rescheduleLater bool, rescheduleTime time.Time) {
shouldAllow := true
if !batch {
// For service type jobs we ignore allocs whose desired state is stop/evict
// everything else is either rescheduleable or untainted
shouldAllow = alloc.DesiredStatus != structs.AllocDesiredStatusStop && alloc.DesiredStatus != structs.AllocDesiredStatusEvict
}
rescheduleTime, eligible := alloc.NextRescheduleTime()
// We consider a time difference of less than 5 seconds to be eligible
// because we collapse allocations that failed within 5 seconds into a single evaluation
if eligible && now.After(rescheduleTime) {
rescheduleNow = true
} else if shouldAllow {
untainted = true
if eligible && alloc.FollowupEvalID == "" {
rescheduleLater = true
}
}
return
}