Merge pull request #594 from hashicorp/f-restart-policy
More restart policy options and consolidate batch/service restart tracker
This commit is contained in:
commit
3bdd372413
|
@ -69,7 +69,6 @@ func TestCompose(t *testing.T) {
|
|||
Operand: "=",
|
||||
},
|
||||
},
|
||||
RestartPolicy: NewRestartPolicy(),
|
||||
Tasks: []*Task{
|
||||
&Task{
|
||||
Name: "task1",
|
||||
|
|
22
api/tasks.go
22
api/tasks.go
|
@ -7,17 +7,11 @@ import (
|
|||
// RestartPolicy defines how the Nomad client restarts
|
||||
// tasks in a taskgroup when they fail
|
||||
type RestartPolicy struct {
|
||||
Interval time.Duration
|
||||
Attempts int
|
||||
Delay time.Duration
|
||||
}
|
||||
|
||||
func NewRestartPolicy() *RestartPolicy {
|
||||
return &RestartPolicy{
|
||||
Attempts: 10,
|
||||
Interval: 3 * time.Minute,
|
||||
Delay: 5 * time.Second,
|
||||
}
|
||||
Interval time.Duration
|
||||
Attempts int
|
||||
Delay time.Duration
|
||||
RestartOnSuccess bool
|
||||
Mode string
|
||||
}
|
||||
|
||||
// The ServiceCheck data model represents the consul health check that
|
||||
|
@ -54,11 +48,9 @@ type TaskGroup struct {
|
|||
|
||||
// NewTaskGroup creates a new TaskGroup.
|
||||
func NewTaskGroup(name string, count int) *TaskGroup {
|
||||
restartPolicy := NewRestartPolicy()
|
||||
return &TaskGroup{
|
||||
Name: name,
|
||||
Count: count,
|
||||
RestartPolicy: restartPolicy,
|
||||
Name: name,
|
||||
Count: count,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -8,9 +8,8 @@ import (
|
|||
func TestTaskGroup_NewTaskGroup(t *testing.T) {
|
||||
grp := NewTaskGroup("grp1", 2)
|
||||
expect := &TaskGroup{
|
||||
Name: "grp1",
|
||||
Count: 2,
|
||||
RestartPolicy: NewRestartPolicy(),
|
||||
Name: "grp1",
|
||||
Count: 2,
|
||||
}
|
||||
if !reflect.DeepEqual(grp, expect) {
|
||||
t.Fatalf("expect: %#v, got: %#v", expect, grp)
|
||||
|
|
|
@ -110,7 +110,7 @@ func (r *AllocRunner) RestoreState() error {
|
|||
r.restored[name] = struct{}{}
|
||||
|
||||
task := &structs.Task{Name: name}
|
||||
restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy)
|
||||
restartTracker := newRestartTracker(r.RestartPolicy)
|
||||
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
|
||||
r.alloc, task, r.alloc.TaskStates[task.Name], restartTracker,
|
||||
r.consulService)
|
||||
|
@ -322,7 +322,7 @@ func (r *AllocRunner) Run() {
|
|||
|
||||
// Merge in the task resources
|
||||
task.Resources = alloc.TaskResources[task.Name]
|
||||
restartTracker := newRestartTracker(r.alloc.Job.Type, r.RestartPolicy)
|
||||
restartTracker := newRestartTracker(r.RestartPolicy)
|
||||
tr := NewTaskRunner(r.logger, r.config, r.setTaskState, r.ctx,
|
||||
r.alloc, task, r.alloc.TaskStates[task.Name], restartTracker,
|
||||
r.consulService)
|
||||
|
|
|
@ -1,89 +1,71 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
// The errorCounter keeps track of the number of times a process has exited
|
||||
// It returns the duration after which a task is restarted
|
||||
// For Batch jobs, the interval is set to zero value since the takss
|
||||
// will be restarted only upto maxAttempts times
|
||||
type restartTracker interface {
|
||||
nextRestart(exitCode int) (bool, time.Duration)
|
||||
}
|
||||
// jitter is the percent of jitter added to restart delays.
|
||||
const jitter = 0.25
|
||||
|
||||
func newRestartTracker(jobType string, restartPolicy *structs.RestartPolicy) restartTracker {
|
||||
switch jobType {
|
||||
case structs.JobTypeService:
|
||||
return &serviceRestartTracker{
|
||||
maxAttempts: restartPolicy.Attempts,
|
||||
startTime: time.Now(),
|
||||
interval: restartPolicy.Interval,
|
||||
delay: restartPolicy.Delay,
|
||||
}
|
||||
default:
|
||||
return &batchRestartTracker{
|
||||
maxAttempts: restartPolicy.Attempts,
|
||||
delay: restartPolicy.Delay,
|
||||
}
|
||||
func newRestartTracker(policy *structs.RestartPolicy) *RestartTracker {
|
||||
return &RestartTracker{
|
||||
startTime: time.Now(),
|
||||
policy: policy,
|
||||
rand: rand.New(rand.NewSource(time.Now().Unix())),
|
||||
}
|
||||
}
|
||||
|
||||
// noRestartsTracker returns a RestartTracker that never restarts.
|
||||
func noRestartsTracker() restartTracker {
|
||||
return &batchRestartTracker{maxAttempts: 0}
|
||||
type RestartTracker struct {
|
||||
count int // Current number of attempts.
|
||||
startTime time.Time // When the interval began
|
||||
policy *structs.RestartPolicy
|
||||
rand *rand.Rand
|
||||
}
|
||||
|
||||
type batchRestartTracker struct {
|
||||
maxAttempts int
|
||||
delay time.Duration
|
||||
|
||||
count int
|
||||
}
|
||||
|
||||
func (b *batchRestartTracker) increment() {
|
||||
b.count += 1
|
||||
}
|
||||
|
||||
func (b *batchRestartTracker) nextRestart(exitCode int) (bool, time.Duration) {
|
||||
if b.count < b.maxAttempts && exitCode > 0 {
|
||||
b.increment()
|
||||
return true, b.delay
|
||||
}
|
||||
return false, 0
|
||||
}
|
||||
|
||||
type serviceRestartTracker struct {
|
||||
maxAttempts int
|
||||
delay time.Duration
|
||||
interval time.Duration
|
||||
|
||||
count int
|
||||
startTime time.Time
|
||||
}
|
||||
|
||||
func (s *serviceRestartTracker) increment() {
|
||||
s.count += 1
|
||||
}
|
||||
|
||||
func (s *serviceRestartTracker) nextRestart(exitCode int) (bool, time.Duration) {
|
||||
defer s.increment()
|
||||
windowEndTime := s.startTime.Add(s.interval)
|
||||
func (r *RestartTracker) NextRestart(exitCode int) (bool, time.Duration) {
|
||||
// Check if we have entered a new interval.
|
||||
end := r.startTime.Add(r.policy.Interval)
|
||||
now := time.Now()
|
||||
// If the window of restart is over we wait until the delay duration
|
||||
if now.After(windowEndTime) {
|
||||
s.count = 0
|
||||
s.startTime = time.Now()
|
||||
return true, s.delay
|
||||
if now.After(end) {
|
||||
r.count = 0
|
||||
r.startTime = now
|
||||
return true, r.jitter()
|
||||
}
|
||||
|
||||
// If we are within the delay duration and didn't exhaust all retries
|
||||
if s.count < s.maxAttempts {
|
||||
return true, s.delay
|
||||
r.count++
|
||||
|
||||
// If we are under the attempts, restart with delay.
|
||||
if r.count <= r.policy.Attempts {
|
||||
return r.shouldRestart(exitCode), r.jitter()
|
||||
}
|
||||
|
||||
// If we exhausted all the retries and are withing the time window
|
||||
return true, windowEndTime.Sub(now)
|
||||
// Don't restart since mode is "fail"
|
||||
if r.policy.Mode == structs.RestartPolicyModeFail {
|
||||
return false, 0
|
||||
}
|
||||
|
||||
// Apply an artifical wait to enter the next interval
|
||||
return r.shouldRestart(exitCode), end.Sub(now)
|
||||
}
|
||||
|
||||
// shouldRestart returns whether a restart should occur based on the exit code
|
||||
// and the RestartOnSuccess configuration.
|
||||
func (r *RestartTracker) shouldRestart(exitCode int) bool {
|
||||
return exitCode != 0 || r.policy.RestartOnSuccess
|
||||
}
|
||||
|
||||
// jitter returns the delay time plus a jitter.
|
||||
func (r *RestartTracker) jitter() time.Duration {
|
||||
d := r.policy.Delay.Nanoseconds()
|
||||
j := float64(r.rand.Int63n(d)) * jitter
|
||||
return time.Duration(d + int64(j))
|
||||
}
|
||||
|
||||
// Returns a tracker that never restarts.
|
||||
func noRestartsTracker() *RestartTracker {
|
||||
policy := &structs.RestartPolicy{Attempts: 0, Mode: structs.RestartPolicyModeFail}
|
||||
return newRestartTracker(policy)
|
||||
}
|
||||
|
|
|
@ -1,78 +1,81 @@
|
|||
package client
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
func TestTaskRunner_ServiceRestartCounter(t *testing.T) {
|
||||
interval := 2 * time.Minute
|
||||
delay := 1 * time.Second
|
||||
attempts := 3
|
||||
rt := newRestartTracker(structs.JobTypeService, &structs.RestartPolicy{Attempts: attempts, Interval: interval, Delay: delay})
|
||||
func testPolicy(success bool, mode string) *structs.RestartPolicy {
|
||||
return &structs.RestartPolicy{
|
||||
Interval: 2 * time.Minute,
|
||||
Delay: 1 * time.Second,
|
||||
Attempts: 3,
|
||||
Mode: mode,
|
||||
RestartOnSuccess: success,
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < attempts; i++ {
|
||||
actual, when := rt.nextRestart(127)
|
||||
// withinJitter is a helper that returns whether the returned delay is within
|
||||
// the jitter.
|
||||
func withinJitter(expected, actual time.Duration) bool {
|
||||
return float64((actual.Nanoseconds()-expected.Nanoseconds())/
|
||||
expected.Nanoseconds()) <= jitter
|
||||
}
|
||||
|
||||
func TestClient_RestartTracker_ModeDelay(t *testing.T) {
|
||||
t.Parallel()
|
||||
p := testPolicy(true, structs.RestartPolicyModeDelay)
|
||||
rt := newRestartTracker(p)
|
||||
for i := 0; i < p.Attempts; i++ {
|
||||
actual, when := rt.NextRestart(127)
|
||||
if !actual {
|
||||
t.Fatalf("should restart returned %v, actual %v", actual, true)
|
||||
t.Fatalf("NextRestart() returned %v, want %v", actual, true)
|
||||
}
|
||||
if when != delay {
|
||||
t.Fatalf("nextRestart() returned %v; want %v", when, delay)
|
||||
if !withinJitter(p.Delay, when) {
|
||||
t.Fatalf("NextRestart() returned %v; want %v+jitter", when, p.Delay)
|
||||
}
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
// Follow up restarts should cause delay.
|
||||
for i := 0; i < 3; i++ {
|
||||
actual, when := rt.nextRestart(127)
|
||||
actual, when := rt.NextRestart(127)
|
||||
if !actual {
|
||||
t.Fail()
|
||||
}
|
||||
if !(when > delay && when < interval) {
|
||||
t.Fatalf("nextRestart() returned %v; want less than %v and more than %v", when, interval, delay)
|
||||
if !(when > p.Delay && when < p.Interval) {
|
||||
t.Fatalf("NextRestart() returned %v; want less than %v and more than %v", when, p.Interval, p.Delay)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestTaskRunner_BatchRestartCounter(t *testing.T) {
|
||||
attempts := 2
|
||||
interval := 1 * time.Second
|
||||
delay := 1 * time.Second
|
||||
rt := newRestartTracker(structs.JobTypeBatch,
|
||||
&structs.RestartPolicy{Attempts: attempts,
|
||||
Interval: interval,
|
||||
Delay: delay,
|
||||
},
|
||||
)
|
||||
for i := 0; i < attempts; i++ {
|
||||
shouldRestart, when := rt.nextRestart(127)
|
||||
if !shouldRestart {
|
||||
t.Fatalf("should restart returned %v, actual %v", shouldRestart, true)
|
||||
}
|
||||
if when != delay {
|
||||
t.Fatalf("Delay should be %v, actual: %v", delay, when)
|
||||
}
|
||||
}
|
||||
actual, _ := rt.nextRestart(1)
|
||||
if actual {
|
||||
t.Fatalf("Expect %v, Actual: %v", false, actual)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTaskRunner_BatchRestartOnSuccess(t *testing.T) {
|
||||
attempts := 2
|
||||
interval := 1 * time.Second
|
||||
delay := 1 * time.Second
|
||||
rt := newRestartTracker(structs.JobTypeBatch,
|
||||
&structs.RestartPolicy{Attempts: attempts,
|
||||
Interval: interval,
|
||||
Delay: delay,
|
||||
},
|
||||
)
|
||||
shouldRestart, _ := rt.nextRestart(0)
|
||||
if shouldRestart {
|
||||
t.Fatalf("should restart returned %v, expected: %v", shouldRestart, false)
|
||||
func TestClient_RestartTracker_ModeFail(t *testing.T) {
|
||||
t.Parallel()
|
||||
p := testPolicy(true, structs.RestartPolicyModeFail)
|
||||
rt := newRestartTracker(p)
|
||||
for i := 0; i < p.Attempts; i++ {
|
||||
actual, when := rt.NextRestart(127)
|
||||
if !actual {
|
||||
t.Fatalf("NextRestart() returned %v, want %v", actual, true)
|
||||
}
|
||||
if !withinJitter(p.Delay, when) {
|
||||
t.Fatalf("NextRestart() returned %v; want %v+jitter", when, p.Delay)
|
||||
}
|
||||
}
|
||||
|
||||
// Next restart should cause fail
|
||||
if actual, _ := rt.NextRestart(127); actual {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
||||
func TestClient_RestartTracker_NoRestartOnSuccess(t *testing.T) {
|
||||
t.Parallel()
|
||||
p := testPolicy(false, structs.RestartPolicyModeDelay)
|
||||
rt := newRestartTracker(p)
|
||||
if shouldRestart, _ := rt.NextRestart(0); shouldRestart {
|
||||
t.Fatalf("NextRestart() returned %v, expected: %v", shouldRestart, false)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -24,7 +24,7 @@ type TaskRunner struct {
|
|||
logger *log.Logger
|
||||
ctx *driver.ExecContext
|
||||
alloc *structs.Allocation
|
||||
restartTracker restartTracker
|
||||
restartTracker *RestartTracker
|
||||
consulService *ConsulService
|
||||
|
||||
task *structs.Task
|
||||
|
@ -53,7 +53,7 @@ type TaskStateUpdater func(taskName string)
|
|||
func NewTaskRunner(logger *log.Logger, config *config.Config,
|
||||
updater TaskStateUpdater, ctx *driver.ExecContext,
|
||||
alloc *structs.Allocation, task *structs.Task, state *structs.TaskState,
|
||||
restartTracker restartTracker, consulService *ConsulService) *TaskRunner {
|
||||
restartTracker *RestartTracker, consulService *ConsulService) *TaskRunner {
|
||||
|
||||
tc := &TaskRunner{
|
||||
config: config,
|
||||
|
@ -280,7 +280,7 @@ func (r *TaskRunner) run() {
|
|||
}
|
||||
|
||||
// Check if we should restart. If not mark task as dead and exit.
|
||||
shouldRestart, when := r.restartTracker.nextRestart(waitRes.ExitCode)
|
||||
shouldRestart, when := r.restartTracker.NextRestart(waitRes.ExitCode)
|
||||
waitEvent := r.waitErrorToEvent(waitRes)
|
||||
if !shouldRestart {
|
||||
r.logger.Printf("[INFO] client: Not restarting task: %v for alloc: %v ", r.task.Name, r.alloc.ID)
|
||||
|
|
|
@ -42,7 +42,7 @@ func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) {
|
|||
|
||||
ctx := driver.NewExecContext(allocDir, alloc.ID)
|
||||
rp := structs.NewRestartPolicy(structs.JobTypeService)
|
||||
restartTracker := newRestartTracker(structs.JobTypeService, rp)
|
||||
restartTracker := newRestartTracker(rp)
|
||||
if !restarts {
|
||||
restartTracker = noRestartsTracker()
|
||||
}
|
||||
|
|
|
@ -104,15 +104,24 @@ job "example" {
|
|||
# Defaults to 1
|
||||
# count = 1
|
||||
|
||||
# Restart Policy - This block defines the restart policy for TaskGroups,
|
||||
# the attempts value defines the number of restarts Nomad will do if Tasks
|
||||
# in this TaskGroup fails in a rolling window of interval duration
|
||||
# The delay value makes Nomad wait for that duration to restart after a Task
|
||||
# fails or crashes.
|
||||
# Configure the restart policy for the task group. If not provided, a
|
||||
# default is used based on the job type.
|
||||
restart {
|
||||
interval = "5m"
|
||||
# The number of attempts to run the job within the specified interval.
|
||||
attempts = 10
|
||||
interval = "5m"
|
||||
|
||||
# A delay between a task failing and a restart occuring.
|
||||
delay = "25s"
|
||||
|
||||
# Whether the tasks should be restarted if the exit successfully.
|
||||
on_success = true
|
||||
|
||||
# Mode controls what happens when a task has restarted "attempts"
|
||||
# times within the interval. "delay" mode delays the next restart
|
||||
# till the next interval. "fail" mode does not restart the task if
|
||||
# "attempts" has been hit within the interval.
|
||||
mode = "delay"
|
||||
}
|
||||
|
||||
# Define a task to run
|
||||
|
|
|
@ -80,6 +80,9 @@ func (c *RunCommand) Run(args []string) int {
|
|||
return 1
|
||||
}
|
||||
|
||||
// Initialize any fields that need to be.
|
||||
job.InitFields()
|
||||
|
||||
// Check that the job is valid
|
||||
if err := job.Validate(); err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Error validating job: %s", err))
|
||||
|
|
|
@ -77,6 +77,7 @@ func TestRunCommand_Fails(t *testing.T) {
|
|||
defer os.Remove(fh3.Name())
|
||||
_, err = fh3.WriteString(`
|
||||
job "job1" {
|
||||
type = "service"
|
||||
datacenters = [ "dc1" ]
|
||||
group "group1" {
|
||||
count = 1
|
||||
|
|
|
@ -48,6 +48,9 @@ func (c *ValidateCommand) Run(args []string) int {
|
|||
return 1
|
||||
}
|
||||
|
||||
// Initialize any fields that need to be.
|
||||
job.InitFields()
|
||||
|
||||
// Check that the job is valid
|
||||
if err := job.Validate(); err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Error validating job: %s", err))
|
||||
|
|
|
@ -24,6 +24,7 @@ func TestValidateCommand(t *testing.T) {
|
|||
defer os.Remove(fh.Name())
|
||||
_, err = fh.WriteString(`
|
||||
job "job1" {
|
||||
type = "service"
|
||||
datacenters = [ "dc1" ]
|
||||
group "group1" {
|
||||
count = 1
|
||||
|
|
|
@ -159,10 +159,9 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error {
|
|||
result.TaskGroups = make([]*structs.TaskGroup, len(tasks), len(tasks)*2)
|
||||
for i, t := range tasks {
|
||||
result.TaskGroups[i] = &structs.TaskGroup{
|
||||
Name: t.Name,
|
||||
Count: 1,
|
||||
Tasks: []*structs.Task{t},
|
||||
RestartPolicy: structs.NewRestartPolicy(result.Type),
|
||||
Name: t.Name,
|
||||
Count: 1,
|
||||
Tasks: []*structs.Task{t},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -230,11 +229,10 @@ func parseGroups(result *structs.Job, list *ast.ObjectList) error {
|
|||
return err
|
||||
}
|
||||
}
|
||||
g.RestartPolicy = structs.NewRestartPolicy(result.Type)
|
||||
|
||||
// Parse restart policy
|
||||
if o := listVal.Filter("restart"); len(o.Items) > 0 {
|
||||
if err := parseRestartPolicy(g.RestartPolicy, o); err != nil {
|
||||
if err := parseRestartPolicy(&g.RestartPolicy, o); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -267,12 +265,9 @@ func parseGroups(result *structs.Job, list *ast.ObjectList) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func parseRestartPolicy(final *structs.RestartPolicy, list *ast.ObjectList) error {
|
||||
func parseRestartPolicy(final **structs.RestartPolicy, list *ast.ObjectList) error {
|
||||
list = list.Elem()
|
||||
if len(list.Items) == 0 {
|
||||
return nil
|
||||
}
|
||||
if len(list.Items) != 1 {
|
||||
if len(list.Items) > 1 {
|
||||
return fmt.Errorf("only one 'restart' block allowed")
|
||||
}
|
||||
|
||||
|
@ -297,7 +292,7 @@ func parseRestartPolicy(final *structs.RestartPolicy, list *ast.ObjectList) erro
|
|||
return err
|
||||
}
|
||||
|
||||
*final = result
|
||||
*final = &result
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -48,11 +48,6 @@ func TestParse(t *testing.T) {
|
|||
&structs.TaskGroup{
|
||||
Name: "outside",
|
||||
Count: 1,
|
||||
RestartPolicy: &structs.RestartPolicy{
|
||||
Attempts: 2,
|
||||
Interval: 1 * time.Minute,
|
||||
Delay: 15 * time.Second,
|
||||
},
|
||||
Tasks: []*structs.Task{
|
||||
&structs.Task{
|
||||
Name: "outside",
|
||||
|
@ -83,9 +78,11 @@ func TestParse(t *testing.T) {
|
|||
"elb_checks": "3",
|
||||
},
|
||||
RestartPolicy: &structs.RestartPolicy{
|
||||
Interval: 10 * time.Minute,
|
||||
Attempts: 5,
|
||||
Delay: 15 * time.Second,
|
||||
Interval: 10 * time.Minute,
|
||||
Attempts: 5,
|
||||
Delay: 15 * time.Second,
|
||||
RestartOnSuccess: true,
|
||||
Mode: "delay",
|
||||
},
|
||||
Tasks: []*structs.Task{
|
||||
&structs.Task{
|
||||
|
@ -271,11 +268,6 @@ func TestParse(t *testing.T) {
|
|||
&structs.TaskGroup{
|
||||
Name: "bar",
|
||||
Count: 1,
|
||||
RestartPolicy: &structs.RestartPolicy{
|
||||
Attempts: 2,
|
||||
Interval: 1 * time.Minute,
|
||||
Delay: 15 * time.Second,
|
||||
},
|
||||
Tasks: []*structs.Task{
|
||||
&structs.Task{
|
||||
Name: "bar",
|
||||
|
|
|
@ -35,6 +35,8 @@ job "binstore-storagelocker" {
|
|||
attempts = 5
|
||||
interval = "10m"
|
||||
delay = "15s"
|
||||
on_success = true
|
||||
mode = "delay"
|
||||
}
|
||||
task "binstore" {
|
||||
driver = "docker"
|
||||
|
|
|
@ -192,7 +192,7 @@ func TestCoreScheduler_JobGC(t *testing.T) {
|
|||
// Should still exist
|
||||
out, err := state.JobByID(job.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("test(%s) err: %v", err)
|
||||
t.Fatalf("test(%s) err: %v", test.test, err)
|
||||
}
|
||||
if (test.shouldExist && out == nil) || (!test.shouldExist && out != nil) {
|
||||
t.Fatalf("test(%s) bad: %v", test.test, out)
|
||||
|
@ -200,7 +200,7 @@ func TestCoreScheduler_JobGC(t *testing.T) {
|
|||
|
||||
outE, err := state.EvalByID(eval.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("test(%s) err: %v", err)
|
||||
t.Fatalf("test(%s) err: %v", test.test, err)
|
||||
}
|
||||
if (test.shouldExist && outE == nil) || (!test.shouldExist && outE != nil) {
|
||||
t.Fatalf("test(%s) bad: %v", test.test, out)
|
||||
|
@ -208,7 +208,7 @@ func TestCoreScheduler_JobGC(t *testing.T) {
|
|||
|
||||
outA, err := state.AllocByID(alloc.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("test(%s) err: %v", err)
|
||||
t.Fatalf("test(%s) err: %v", test.test, err)
|
||||
}
|
||||
if (test.shouldExist && outA == nil) || (!test.shouldExist && outA != nil) {
|
||||
t.Fatalf("test(%s) bad: %v", test.test, outA)
|
||||
|
|
|
@ -76,9 +76,11 @@ func Job() *structs.Job {
|
|||
Name: "web",
|
||||
Count: 10,
|
||||
RestartPolicy: &structs.RestartPolicy{
|
||||
Attempts: 3,
|
||||
Interval: 10 * time.Minute,
|
||||
Delay: 1 * time.Minute,
|
||||
Attempts: 3,
|
||||
Interval: 10 * time.Minute,
|
||||
Delay: 1 * time.Minute,
|
||||
RestartOnSuccess: true,
|
||||
Mode: structs.RestartPolicyModeDelay,
|
||||
},
|
||||
Tasks: []*structs.Task{
|
||||
&structs.Task{
|
||||
|
@ -126,7 +128,7 @@ func Job() *structs.Job {
|
|||
CreateIndex: 42,
|
||||
ModifyIndex: 99,
|
||||
}
|
||||
job.InitAllServiceFields()
|
||||
job.InitFields()
|
||||
return job
|
||||
}
|
||||
|
||||
|
@ -151,9 +153,11 @@ func SystemJob() *structs.Job {
|
|||
Name: "web",
|
||||
Count: 1,
|
||||
RestartPolicy: &structs.RestartPolicy{
|
||||
Attempts: 3,
|
||||
Interval: 10 * time.Minute,
|
||||
Delay: 1 * time.Minute,
|
||||
Attempts: 3,
|
||||
Interval: 10 * time.Minute,
|
||||
Delay: 1 * time.Minute,
|
||||
RestartOnSuccess: true,
|
||||
Mode: structs.RestartPolicyModeDelay,
|
||||
},
|
||||
Tasks: []*structs.Task{
|
||||
&structs.Task{
|
||||
|
|
|
@ -19,17 +19,8 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
ErrNoLeader = fmt.Errorf("No cluster leader")
|
||||
ErrNoRegionPath = fmt.Errorf("No path to region")
|
||||
defaultServiceJobRestartPolicy = RestartPolicy{
|
||||
Delay: 15 * time.Second,
|
||||
Attempts: 2,
|
||||
Interval: 1 * time.Minute,
|
||||
}
|
||||
defaultBatchJobRestartPolicy = RestartPolicy{
|
||||
Delay: 15 * time.Second,
|
||||
Attempts: 15,
|
||||
}
|
||||
ErrNoLeader = fmt.Errorf("No cluster leader")
|
||||
ErrNoRegionPath = fmt.Errorf("No path to region")
|
||||
)
|
||||
|
||||
type MessageType uint8
|
||||
|
@ -786,8 +777,9 @@ type Job struct {
|
|||
// InitFields is used to initialize fields in the Job. This should be called
|
||||
// when registering a Job.
|
||||
func (j *Job) InitFields() {
|
||||
// Initialize the service block.
|
||||
j.InitAllServiceFields()
|
||||
for _, tg := range j.TaskGroups {
|
||||
tg.InitFields(j)
|
||||
}
|
||||
|
||||
// If the job is batch then make it GC.
|
||||
if j.Type == JobTypeBatch {
|
||||
|
@ -795,16 +787,6 @@ func (j *Job) InitFields() {
|
|||
}
|
||||
}
|
||||
|
||||
// InitAllServiceFields traverses all Task Groups and makes them
|
||||
// interpolate Job, Task group and Task names in all Service names.
|
||||
// It also generates the check names if they are not set. This method also
|
||||
// generates Check and Service IDs
|
||||
func (j *Job) InitAllServiceFields() {
|
||||
for _, tg := range j.TaskGroups {
|
||||
tg.InitAllServiceFields(j.Name)
|
||||
}
|
||||
}
|
||||
|
||||
// Validate is used to sanity check a job input
|
||||
func (j *Job) Validate() error {
|
||||
var mErr multierror.Error
|
||||
|
@ -984,15 +966,61 @@ func (p *PeriodicConfig) Next(fromTime time.Time) time.Time {
|
|||
return time.Time{}
|
||||
}
|
||||
|
||||
// RestartPolicy influences how Nomad restarts Tasks when they
|
||||
// crash or fail.
|
||||
var (
|
||||
defaultServiceJobRestartPolicy = RestartPolicy{
|
||||
Delay: 15 * time.Second,
|
||||
Attempts: 2,
|
||||
Interval: 1 * time.Minute,
|
||||
RestartOnSuccess: true,
|
||||
Mode: RestartPolicyModeDelay,
|
||||
}
|
||||
defaultBatchJobRestartPolicy = RestartPolicy{
|
||||
Delay: 15 * time.Second,
|
||||
Attempts: 15,
|
||||
Interval: 7 * 24 * time.Hour,
|
||||
RestartOnSuccess: false,
|
||||
Mode: RestartPolicyModeDelay,
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
// RestartPolicyModeDelay causes an artificial delay till the next interval is
|
||||
// reached when the specified attempts have been reached in the interval.
|
||||
RestartPolicyModeDelay = "delay"
|
||||
|
||||
// RestartPolicyModeFail causes a job to fail if the specified number of
|
||||
// attempts are reached within an interval.
|
||||
RestartPolicyModeFail = "fail"
|
||||
)
|
||||
|
||||
// RestartPolicy configures how Tasks are restarted when they crash or fail.
|
||||
type RestartPolicy struct {
|
||||
// Attempts is the number of restart that will occur in an interval.
|
||||
Attempts int
|
||||
|
||||
// Interval is a duration in which we can limit the number of restarts
|
||||
// within.
|
||||
Interval time.Duration
|
||||
Delay time.Duration
|
||||
|
||||
// Delay is the time between a failure and a restart.
|
||||
Delay time.Duration
|
||||
|
||||
// RestartOnSuccess determines whether a task should be restarted if it
|
||||
// exited successfully.
|
||||
RestartOnSuccess bool `mapstructure:"on_success"`
|
||||
|
||||
// Mode controls what happens when the task restarts more than attempt times
|
||||
// in an interval.
|
||||
Mode string
|
||||
}
|
||||
|
||||
func (r *RestartPolicy) Validate() error {
|
||||
switch r.Mode {
|
||||
case RestartPolicyModeDelay, RestartPolicyModeFail:
|
||||
default:
|
||||
return fmt.Errorf("Unsupported restart mode: %q", r.Mode)
|
||||
}
|
||||
|
||||
if r.Interval == 0 {
|
||||
return nil
|
||||
}
|
||||
|
@ -1040,12 +1068,15 @@ type TaskGroup struct {
|
|||
Meta map[string]string
|
||||
}
|
||||
|
||||
// InitAllServiceFields traverses over all Tasks and makes them to interpolate
|
||||
// values of Job, Task Group and Task names in all Service Names.
|
||||
// It also generates service ids, check ids and check names
|
||||
func (tg *TaskGroup) InitAllServiceFields(job string) {
|
||||
// InitFields is used to initialize fields in the TaskGroup.
|
||||
func (tg *TaskGroup) InitFields(job *Job) {
|
||||
// Set the default restart policy.
|
||||
if tg.RestartPolicy == nil {
|
||||
tg.RestartPolicy = NewRestartPolicy(job.Type)
|
||||
}
|
||||
|
||||
for _, task := range tg.Tasks {
|
||||
task.InitAllServiceFields(job, tg.Name)
|
||||
task.InitFields(job, tg)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1240,10 +1271,15 @@ type Task struct {
|
|||
Meta map[string]string
|
||||
}
|
||||
|
||||
// InitAllServiceFields interpolates values of Job, Task Group
|
||||
// InitFields initializes fields in the task.
|
||||
func (t *Task) InitFields(job *Job, tg *TaskGroup) {
|
||||
t.InitServiceFields(job.Name, tg.Name)
|
||||
}
|
||||
|
||||
// InitServiceFields interpolates values of Job, Task Group
|
||||
// and Tasks in all the service Names of a Task. This also generates the service
|
||||
// id, check id and check names.
|
||||
func (t *Task) InitAllServiceFields(job string, taskGroup string) {
|
||||
func (t *Task) InitServiceFields(job string, taskGroup string) {
|
||||
for _, service := range t.Services {
|
||||
service.InitFields(job, taskGroup, t.Name)
|
||||
}
|
||||
|
|
|
@ -115,9 +115,11 @@ func TestJob_IsPeriodic(t *testing.T) {
|
|||
func TestTaskGroup_Validate(t *testing.T) {
|
||||
tg := &TaskGroup{
|
||||
RestartPolicy: &RestartPolicy{
|
||||
Interval: 5 * time.Minute,
|
||||
Delay: 10 * time.Second,
|
||||
Attempts: 10,
|
||||
Interval: 5 * time.Minute,
|
||||
Delay: 10 * time.Second,
|
||||
Attempts: 10,
|
||||
RestartOnSuccess: true,
|
||||
Mode: RestartPolicyModeDelay,
|
||||
},
|
||||
}
|
||||
err := tg.Validate()
|
||||
|
@ -141,9 +143,11 @@ func TestTaskGroup_Validate(t *testing.T) {
|
|||
&Task{},
|
||||
},
|
||||
RestartPolicy: &RestartPolicy{
|
||||
Interval: 5 * time.Minute,
|
||||
Delay: 10 * time.Second,
|
||||
Attempts: 10,
|
||||
Interval: 5 * time.Minute,
|
||||
Delay: 10 * time.Second,
|
||||
Attempts: 10,
|
||||
RestartOnSuccess: true,
|
||||
Mode: RestartPolicyModeDelay,
|
||||
},
|
||||
}
|
||||
err = tg.Validate()
|
||||
|
@ -505,7 +509,7 @@ func TestJob_ExpandServiceNames(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
j.InitAllServiceFields()
|
||||
j.InitFields()
|
||||
|
||||
service1Name := j.TaskGroups[0].Tasks[0].Services[0].Name
|
||||
if service1Name != "my-job-web-frontend-default" {
|
||||
|
|
|
@ -236,19 +236,28 @@ The `network` object supports the following keys:
|
|||
|
||||
The `restart` object supports the following keys:
|
||||
|
||||
* `attempts` - For `batch` jobs, `attempts` is the maximum number of restarts
|
||||
allowed before the task is failed. For non-batch jobs, the `attempts` is the
|
||||
number of restarts allowed in an `interval` before a restart delay is added.
|
||||
* `attempts` - `attempts` is the number of restarts allowed in an `interval`.
|
||||
|
||||
* `interval` - `interval` is only valid on non-batch jobs and is a time duration
|
||||
that can be specified using the `s`, `m`, and `h` suffixes, such as `30s`.
|
||||
The `interval` begins when the first task starts and ensures that only
|
||||
`attempts` number of restarts happens within it. If more than `attempts`
|
||||
number of failures happen, the restart is delayed till after the `interval`,
|
||||
which is then reset.
|
||||
* `interval` - `interval` is a time duration that can be specified using the
|
||||
`s`, `m`, and `h` suffixes, such as `30s`. The `interval` begins when the
|
||||
first task starts and ensures that only `attempts` number of restarts happens
|
||||
within it. If more than `attempts` number of failures happen, behavior is
|
||||
controlled by `mode`.
|
||||
|
||||
* `delay` - A duration to wait before restarting a task. It is specified as a
|
||||
time duration using the `s`, `m`, and `h` suffixes, such as `30s`.
|
||||
time duration using the `s`, `m`, and `h` suffixes, such as `30s`. A random
|
||||
jitter of up to 25% is added to the the delay.
|
||||
|
||||
* `on_success` - `on_success` controls whether a task is restarted when the
|
||||
task exits successfully.
|
||||
|
||||
* `mode` - Controls the behavior when the task fails more than `attempts`
|
||||
times in an interval. Possible values are listed below:
|
||||
|
||||
* `delay` - `delay` will delay the next restart until the next `interval` is
|
||||
reached.
|
||||
|
||||
* `fail` - `fail` will not restart the task again.
|
||||
|
||||
The default `batch` restart policy is:
|
||||
|
||||
|
@ -256,6 +265,9 @@ The default `batch` restart policy is:
|
|||
restart {
|
||||
attempts = 15
|
||||
delay = "15s"
|
||||
interval = "168h" # 7 days
|
||||
on_success = false
|
||||
mode = "delay"
|
||||
}
|
||||
```
|
||||
|
||||
|
@ -266,6 +278,8 @@ restart {
|
|||
interval = "1m"
|
||||
attempts = 2
|
||||
delay = "15s"
|
||||
on_success = true
|
||||
mode = "delay"
|
||||
}
|
||||
```
|
||||
|
||||
|
|
Loading…
Reference in New Issue