Merge pull request #369 from hashicorp/f-client-side-restarts

Client Side Restarts - Part 1
This commit is contained in:
Diptanu Choudhury 2015-11-02 20:02:52 -08:00
commit dcc4bcb7a9
10 changed files with 231 additions and 30 deletions

View file

@ -69,6 +69,7 @@ func TestCompose(t *testing.T) {
Operand: "=",
},
},
RestartPolicy: NewRestartPolicy(),
Tasks: []*Task{
&Task{
Name: "task1",

View file

@ -1,19 +1,42 @@
package api
import (
"time"
)
// RestartPolicy defines how the Nomad client restarts
// tasks in a taskgroup when they fail
type RestartPolicy struct {
Interval time.Duration
Attempts int
Delay time.Duration
}
func NewRestartPolicy() *RestartPolicy {
return &RestartPolicy{
Attempts: 10,
Interval: 3 * time.Minute,
Delay: 5 * time.Second,
}
}
// TaskGroup is the unit of scheduling.
type TaskGroup struct {
Name string
Count int
Constraints []*Constraint
Tasks []*Task
Meta map[string]string
Name string
Count int
Constraints []*Constraint
Tasks []*Task
RestartPolicy *RestartPolicy
Meta map[string]string
}
// NewTaskGroup creates a new TaskGroup.
func NewTaskGroup(name string, count int) *TaskGroup {
restartPolicy := NewRestartPolicy()
return &TaskGroup{
Name: name,
Count: count,
Name: name,
Count: count,
RestartPolicy: restartPolicy,
}
}

View file

@ -8,8 +8,9 @@ import (
func TestTaskGroup_NewTaskGroup(t *testing.T) {
grp := NewTaskGroup("grp1", 2)
expect := &TaskGroup{
Name: "grp1",
Count: 2,
Name: "grp1",
Count: 2,
RestartPolicy: NewRestartPolicy(),
}
if !reflect.DeepEqual(grp, expect) {
t.Fatalf("expect: %#v, got: %#v", expect, grp)

View file

@ -104,6 +104,17 @@ job "example" {
# Defaults to 1
# count = 1
# Restart Policy - This block defines the restart policy for TaskGroups,
# the attempts value defines the number of restarts Nomad will do if Tasks
# in this TaskGroup fails in a rolling window of interval duration
# The delay value makes Nomad wait for that duration to restart after a Task
# fails or crashes.
restart {
interval = "5m"
attempts = 10
delay = "25s"
}
# Define a task to run
task "redis" {
# Use Docker to run the task.

View file

@ -124,7 +124,7 @@ func parseJob(result *structs.Job, obj *hclobj.Object) error {
}
}
// If we have tasks outside, do those
// If we have tasks outside, create TaskGroups for them
if o := obj.Get("task", false); o != nil {
var tasks []*structs.Task
if err := parseTasks(&tasks, o); err != nil {
@ -134,9 +134,10 @@ func parseJob(result *structs.Job, obj *hclobj.Object) error {
result.TaskGroups = make([]*structs.TaskGroup, len(tasks), len(tasks)*2)
for i, t := range tasks {
result.TaskGroups[i] = &structs.TaskGroup{
Name: t.Name,
Count: 1,
Tasks: []*structs.Task{t},
Name: t.Name,
Count: 1,
Tasks: []*structs.Task{t},
RestartPolicy: structs.NewRestartPolicy(result.Type),
}
}
}
@ -180,6 +181,7 @@ func parseGroups(result *structs.Job, obj *hclobj.Object) error {
delete(m, "constraint")
delete(m, "meta")
delete(m, "task")
delete(m, "restart")
// Default count to 1 if not specified
if _, ok := m["count"]; !ok {
@ -199,6 +201,11 @@ func parseGroups(result *structs.Job, obj *hclobj.Object) error {
return err
}
}
g.RestartPolicy = structs.NewRestartPolicy(result.Type)
if err := parseRestartPolicy(g.RestartPolicy, o); err != nil {
return err
}
// Parse out meta fields. These are in HCL as a list so we need
// to iterate over them and merge them.
@ -228,6 +235,42 @@ func parseGroups(result *structs.Job, obj *hclobj.Object) error {
return nil
}
func parseRestartPolicy(result *structs.RestartPolicy, obj *hclobj.Object) error {
var restartHclObj *hclobj.Object
var m map[string]interface{}
if restartHclObj = obj.Get("restart", false); restartHclObj == nil {
return nil
}
if err := hcl.DecodeObject(&m, restartHclObj); err != nil {
return err
}
if delay, ok := m["delay"]; ok {
d, err := toDuration(delay)
if err != nil {
return fmt.Errorf("Invalid Delay time in restart policy: %v", err)
}
result.Delay = d
}
if interval, ok := m["interval"]; ok {
i, err := toDuration(interval)
if err != nil {
return fmt.Errorf("Invalid Interval time in restart policy: %v", err)
}
result.Interval = i
}
if attempts, ok := m["attempts"]; ok {
a, err := toInteger(attempts)
if err != nil {
return fmt.Errorf("Invalid value in attempts: %v", err)
}
result.Attempts = a
}
return nil
}
func parseConstraints(result *[]*structs.Constraint, obj *hclobj.Object) error {
for _, o := range obj.Elem(false) {
var m map[string]interface{}
@ -455,19 +498,11 @@ func parseUpdate(result *structs.UpdateStrategy, obj *hclobj.Object) error {
}
for _, key := range []string{"stagger", "Stagger"} {
if raw, ok := m[key]; ok {
switch v := raw.(type) {
case string:
dur, err := time.ParseDuration(v)
if err != nil {
return fmt.Errorf("invalid stagger time '%s'", raw)
}
m[key] = dur
case int:
m[key] = time.Duration(v) * time.Second
default:
return fmt.Errorf("invalid type for stagger time '%s'",
raw)
staggerTime, err := toDuration(raw)
if err != nil {
return fmt.Errorf("Invalid stagger time: %v", err)
}
m[key] = staggerTime
}
}
@ -477,3 +512,35 @@ func parseUpdate(result *structs.UpdateStrategy, obj *hclobj.Object) error {
}
return nil
}
func toDuration(value interface{}) (time.Duration, error) {
var dur time.Duration
var err error
switch v := value.(type) {
case string:
dur, err = time.ParseDuration(v)
case int:
dur = time.Duration(v) * time.Second
default:
err = fmt.Errorf("Invalid time %s", value)
}
return dur, err
}
func toInteger(value interface{}) (int, error) {
var integer int
var err error
switch v := value.(type) {
case string:
var i int64
i, err = strconv.ParseInt(v, 10, 32)
integer = int(i)
case int:
integer = v
default:
err = fmt.Errorf("Value: %v can't be parsed into int", value)
}
return integer, err
}

View file

@ -48,6 +48,11 @@ func TestParse(t *testing.T) {
&structs.TaskGroup{
Name: "outside",
Count: 1,
RestartPolicy: &structs.RestartPolicy{
Attempts: 2,
Interval: 1 * time.Minute,
Delay: 15 * time.Second,
},
Tasks: []*structs.Task{
&structs.Task{
Name: "outside",
@ -77,6 +82,11 @@ func TestParse(t *testing.T) {
"elb_interval": "10",
"elb_checks": "3",
},
RestartPolicy: &structs.RestartPolicy{
Interval: 10 * time.Minute,
Attempts: 5,
Delay: 15 * time.Second,
},
Tasks: []*structs.Task{
&structs.Task{
Name: "binstore",

View file

@ -31,6 +31,11 @@ job "binstore-storagelocker" {
group "binsl" {
count = 5
restart {
attempts = 5
interval = "10m"
delay = "15s"
}
task "binstore" {
driver = "docker"
config {

View file

@ -1,6 +1,9 @@
package mock
import "github.com/hashicorp/nomad/nomad/structs"
import (
"github.com/hashicorp/nomad/nomad/structs"
"time"
)
func Node() *structs.Node {
node := &structs.Node{
@ -71,6 +74,11 @@ func Job() *structs.Job {
&structs.TaskGroup{
Name: "web",
Count: 10,
RestartPolicy: &structs.RestartPolicy{
Attempts: 3,
Interval: 10 * time.Minute,
Delay: 1 * time.Minute,
},
Tasks: []*structs.Task{
&structs.Task{
Name: "web",
@ -131,6 +139,11 @@ func SystemJob() *structs.Job {
&structs.TaskGroup{
Name: "web",
Count: 1,
RestartPolicy: &structs.RestartPolicy{
Attempts: 3,
Interval: 10 * time.Minute,
Delay: 1 * time.Minute,
},
Tasks: []*structs.Task{
&structs.Task{
Name: "web",

View file

@ -16,6 +16,15 @@ import (
var (
ErrNoLeader = fmt.Errorf("No cluster leader")
ErrNoRegionPath = fmt.Errorf("No path to region")
defaultServiceJobRestartPolicy = RestartPolicy{
Delay: 15 * time.Second,
Attempts: 2,
Interval: 1 * time.Minute,
}
defaultBatchJobRestartPolicy = RestartPolicy{
Delay: 15 * time.Second,
Attempts: 15,
}
)
type MessageType uint8
@ -898,6 +907,33 @@ func (u *UpdateStrategy) Rolling() bool {
return u.Stagger > 0 && u.MaxParallel > 0
}
// RestartPolicy influences how Nomad restarts Tasks when they
// crash or fail.
type RestartPolicy struct {
Attempts int
Interval time.Duration
Delay time.Duration
}
func (r *RestartPolicy) Validate() error {
if time.Duration(r.Attempts)*r.Delay > r.Interval {
return fmt.Errorf("Nomad can't restart the TaskGroup %v times in an interval of %v with a delay of %v", r.Attempts, r.Interval, r.Delay)
}
return nil
}
func NewRestartPolicy(jobType string) *RestartPolicy {
switch jobType {
case JobTypeService:
rp := defaultServiceJobRestartPolicy
return &rp
case JobTypeBatch:
rp := defaultBatchJobRestartPolicy
return &rp
}
return nil
}
// TaskGroup is an atomic unit of placement. Each task group belongs to
// a job and may contain any number of tasks. A task group support running
// in many replicas using the same configuration..
@ -913,6 +949,9 @@ type TaskGroup struct {
// all the tasks contained.
Constraints []*Constraint
//RestartPolicy of a TaskGroup
RestartPolicy *RestartPolicy
// Tasks are the collection of tasks that this task group needs to run
Tasks []*Task
@ -940,6 +979,10 @@ func (tg *TaskGroup) Validate() error {
}
}
if err := tg.RestartPolicy.Validate(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
// Check for duplicate tasks
tasks := make(map[string]int)
for idx, task := range tg.Tasks {

View file

@ -1,11 +1,11 @@
package structs
import (
"github.com/hashicorp/go-multierror"
"reflect"
"strings"
"testing"
"github.com/hashicorp/go-multierror"
"time"
)
func TestJob_Validate(t *testing.T) {
@ -44,11 +44,27 @@ func TestJob_Validate(t *testing.T) {
TaskGroups: []*TaskGroup{
&TaskGroup{
Name: "web",
RestartPolicy: &RestartPolicy{
Interval: 5 * time.Minute,
Delay: 10 * time.Second,
Attempts: 10,
},
},
&TaskGroup{
Name: "web",
RestartPolicy: &RestartPolicy{
Interval: 5 * time.Minute,
Delay: 10 * time.Second,
Attempts: 10,
},
},
&TaskGroup{
RestartPolicy: &RestartPolicy{
Interval: 5 * time.Minute,
Delay: 10 * time.Second,
Attempts: 10,
},
},
&TaskGroup{},
},
}
err = j.Validate()
@ -65,7 +81,13 @@ func TestJob_Validate(t *testing.T) {
}
func TestTaskGroup_Validate(t *testing.T) {
tg := &TaskGroup{}
tg := &TaskGroup{
RestartPolicy: &RestartPolicy{
Interval: 5 * time.Minute,
Delay: 10 * time.Second,
Attempts: 10,
},
}
err := tg.Validate()
mErr := err.(*multierror.Error)
if !strings.Contains(mErr.Errors[0].Error(), "group name") {
@ -86,6 +108,11 @@ func TestTaskGroup_Validate(t *testing.T) {
&Task{Name: "web"},
&Task{},
},
RestartPolicy: &RestartPolicy{
Interval: 5 * time.Minute,
Delay: 10 * time.Second,
Attempts: 10,
},
}
err = tg.Validate()
mErr = err.(*multierror.Error)