Merge pull request #1654 from hashicorp/f-sticky-spec

Server side changes for sticky volumes
This commit is contained in:
Diptanu Choudhury 2016-08-29 14:00:32 -07:00 committed by GitHub
commit 1269900973
30 changed files with 715 additions and 97 deletions

View File

@ -82,6 +82,12 @@ type Service struct {
Checks []ServiceCheck
}
// LocalDisk is an ephemeral disk object
type LocalDisk struct {
Sticky bool
DiskMB int `mapstructure:"disk"`
}
// TaskGroup is the unit of scheduling.
type TaskGroup struct {
Name string
@ -89,6 +95,7 @@ type TaskGroup struct {
Constraints []*Constraint
Tasks []*Task
RestartPolicy *RestartPolicy
LocalDisk *LocalDisk
Meta map[string]string
}
@ -121,6 +128,12 @@ func (g *TaskGroup) AddTask(t *Task) *TaskGroup {
return g
}
// RequireDisk adds a local disk to the task group
func (g *TaskGroup) RequireDisk(disk *LocalDisk) *TaskGroup {
g.LocalDisk = disk
return g
}
// LogConfig provides configuration for log rotation
type LogConfig struct {
MaxFiles int

View File

@ -1,8 +1,6 @@
package api
import (
"testing"
)
import "testing"
func assertQueryMeta(t *testing.T, qm *QueryMeta) {
if qm.LastIndex == 0 {
@ -25,7 +23,6 @@ func testJob() *Job {
Require(&Resources{
CPU: 100,
MemoryMB: 256,
DiskMB: 25,
IOPS: 10,
}).
SetLogConfig(&LogConfig{
@ -34,7 +31,10 @@ func testJob() *Job {
})
group := NewTaskGroup("group1", 1).
AddTask(task)
AddTask(task).
RequireDisk(&LocalDisk{
DiskMB: 25,
})
job := NewBatchJob("job1", "redis", "region1", 1).
AddDatacenter("dc1").

View File

@ -85,7 +85,6 @@ job "job1" {
driver = "exec"
resources = {
cpu = 1000
disk = 150
memory = 512
}
}
@ -125,7 +124,6 @@ job "job1" {
driver = "exec"
resources = {
cpu = 1000
disk = 150
memory = 512
}
}

View File

@ -32,7 +32,6 @@ job "job1" {
driver = "exec"
resources = {
cpu = 1000
disk = 150
memory = 512
}
}
@ -121,7 +120,6 @@ job "job1" {
driver = "exec"
resources = {
cpu = 1000
disk = 150
memory = 512
}
}
@ -171,7 +169,6 @@ job "job1" {
driver = "exec"
resources = {
cpu = 1000
disk = 150
memory = 512
}
}

View File

@ -45,7 +45,6 @@ func testJob(jobID string) *api.Job {
SetConfig("exit_code", 0).
Require(&api.Resources{
MemoryMB: 256,
DiskMB: 20,
CPU: 100,
}).
SetLogConfig(&api.LogConfig{
@ -54,7 +53,10 @@ func testJob(jobID string) *api.Job {
})
group := api.NewTaskGroup("group1", 1).
AddTask(task)
AddTask(task).
RequireDisk(&api.LocalDisk{
DiskMB: 20,
})
job := api.NewBatchJob(jobID, jobID, "region1", 1).
AddDatacenter("dc1").

View File

@ -32,7 +32,6 @@ job "job1" {
driver = "exec"
resources = {
cpu = 1000
disk = 150
memory = 512
}
}
@ -126,7 +125,6 @@ job "job1" {
driver = "exec"
resources = {
cpu = 1000
disk = 150
memory = 512
}
}

View File

@ -190,9 +190,10 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error {
result.TaskGroups = make([]*structs.TaskGroup, len(tasks), len(tasks)*2)
for i, t := range tasks {
result.TaskGroups[i] = &structs.TaskGroup{
Name: t.Name,
Count: 1,
Tasks: []*structs.Task{t},
Name: t.Name,
Count: 1,
LocalDisk: structs.DefaultLocalDisk(),
Tasks: []*structs.Task{t},
}
}
}
@ -240,6 +241,7 @@ func parseGroups(result *structs.Job, list *ast.ObjectList) error {
"restart",
"meta",
"task",
"local_disk",
}
if err := checkHCLKeys(listVal, valid); err != nil {
return multierror.Prefix(err, fmt.Sprintf("'%s' ->", n))
@ -253,6 +255,7 @@ func parseGroups(result *structs.Job, list *ast.ObjectList) error {
delete(m, "meta")
delete(m, "task")
delete(m, "restart")
delete(m, "local_disk")
// Default count to 1 if not specified
if _, ok := m["count"]; !ok {
@ -280,6 +283,14 @@ func parseGroups(result *structs.Job, list *ast.ObjectList) error {
}
}
// Parse local disk
g.LocalDisk = structs.DefaultLocalDisk()
if o := listVal.Filter("local_disk"); len(o.Items) > 0 {
if err := parseLocalDisk(&g.LocalDisk, o); err != nil {
return multierror.Prefix(err, fmt.Sprintf("'%s', local_disk ->", n))
}
}
// Parse out meta fields. These are in HCL as a list so we need
// to iterate over them and merge them.
if metaO := listVal.Filter("meta"); len(metaO.Items) > 0 {
@ -417,6 +428,38 @@ func parseConstraints(result *[]*structs.Constraint, list *ast.ObjectList) error
return nil
}
func parseLocalDisk(result **structs.LocalDisk, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) > 1 {
return fmt.Errorf("only one 'local_disk' block allowed")
}
// Get our local_disk object
obj := list.Items[0]
// Check for invalid keys
valid := []string{
"sticky",
"disk",
}
if err := checkHCLKeys(obj.Val, valid); err != nil {
return err
}
var m map[string]interface{}
if err := hcl.DecodeObject(&m, obj.Val); err != nil {
return err
}
var localDisk structs.LocalDisk
if err := mapstructure.WeakDecode(m, &localDisk); err != nil {
return err
}
*result = &localDisk
return nil
}
// parseBool takes an interface value and tries to convert it to a boolean and
// returns an error if the type can't be converted.
func parseBool(value interface{}) (bool, error) {
@ -835,7 +878,6 @@ func parseResources(result *structs.Resources, list *ast.ObjectList) error {
// Check for invalid keys
valid := []string{
"cpu",
"disk",
"iops",
"memory",
"network",

View File

@ -49,8 +49,9 @@ func TestParse(t *testing.T) {
TaskGroups: []*structs.TaskGroup{
&structs.TaskGroup{
Name: "outside",
Count: 1,
Name: "outside",
Count: 1,
LocalDisk: structs.DefaultLocalDisk(),
Tasks: []*structs.Task{
&structs.Task{
Name: "outside",
@ -87,6 +88,10 @@ func TestParse(t *testing.T) {
Delay: 15 * time.Second,
Mode: "delay",
},
LocalDisk: &structs.LocalDisk{
Sticky: true,
DiskMB: 150,
},
Tasks: []*structs.Task{
&structs.Task{
Name: "binstore",
@ -123,7 +128,6 @@ func TestParse(t *testing.T) {
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 128,
DiskMB: 300,
IOPS: 0,
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
@ -168,7 +172,6 @@ func TestParse(t *testing.T) {
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 128,
DiskMB: 300,
IOPS: 30,
},
Constraints: []*structs.Constraint{
@ -313,8 +316,9 @@ func TestParse(t *testing.T) {
TaskGroups: []*structs.TaskGroup{
&structs.TaskGroup{
Name: "bar",
Count: 1,
Name: "bar",
Count: 1,
LocalDisk: structs.DefaultLocalDisk(),
Tasks: []*structs.Task{
&structs.Task{
Name: "bar",
@ -356,8 +360,9 @@ func TestParse(t *testing.T) {
TaskGroups: []*structs.TaskGroup{
&structs.TaskGroup{
Name: "binsl",
Count: 1,
Name: "binsl",
Count: 1,
LocalDisk: structs.DefaultLocalDisk(),
Tasks: []*structs.Task{
&structs.Task{
Name: "binstore",
@ -365,7 +370,6 @@ func TestParse(t *testing.T) {
Resources: &structs.Resources{
CPU: 100,
MemoryMB: 10,
DiskMB: 300,
IOPS: 0,
},
LogConfig: &structs.LogConfig{
@ -406,8 +410,9 @@ func TestParse(t *testing.T) {
Region: "global",
TaskGroups: []*structs.TaskGroup{
&structs.TaskGroup{
Name: "group",
Count: 1,
Name: "group",
Count: 1,
LocalDisk: structs.DefaultLocalDisk(),
Tasks: []*structs.Task{
&structs.Task{
Name: "task",

View File

@ -42,6 +42,11 @@ job "binstore-storagelocker" {
mode = "delay"
}
local_disk {
sticky = true
disk = 150
}
task "binstore" {
driver = "docker"
user = "bob"

View File

@ -1,5 +1,11 @@
job "binstore-storagelocker" {
group "binsl" {
local_disk {
disk = 500
}
local_disk {
disk = 100
}
count = 5
task "binstore" {
driver = "docker"

View File

@ -390,6 +390,14 @@ func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} {
// denormalized prior to being inserted into MemDB.
for _, alloc := range req.Alloc {
if alloc.Resources != nil {
// COMPAT 0.4.1 -> 0.5
// Set the shared resources for allocations which don't have them
if alloc.SharedResources == nil {
alloc.SharedResources = &structs.Resources{
DiskMB: alloc.Resources.DiskMB,
}
}
continue
}
@ -397,6 +405,9 @@ func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} {
for _, task := range alloc.TaskResources {
alloc.Resources.Add(task)
}
// Add the shared resources
alloc.Resources.Add(alloc.SharedResources)
}
if err := n.state.UpsertAllocs(index, req.Alloc); err != nil {

View File

@ -644,6 +644,7 @@ func TestFSM_UpsertAllocs_StrippedResources(t *testing.T) {
alloc.AllocModifyIndex = out.AllocModifyIndex
// Resources should be recomputed
resources.DiskMB = alloc.Job.TaskGroups[0].LocalDisk.DiskMB
alloc.Resources = resources
if !reflect.DeepEqual(alloc, out) {
t.Fatalf("bad: %#v %#v", alloc, out)
@ -933,6 +934,35 @@ func TestFSM_SnapshotRestore_Allocs(t *testing.T) {
}
}
func TestFSM_SnapshotRestore_Allocs_NoSharedResources(t *testing.T) {
// Add some state
fsm := testFSM(t)
state := fsm.State()
alloc1 := mock.Alloc()
alloc2 := mock.Alloc()
alloc1.SharedResources = nil
alloc2.SharedResources = nil
state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID))
state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))
state.UpsertAllocs(1000, []*structs.Allocation{alloc1})
state.UpsertAllocs(1001, []*structs.Allocation{alloc2})
// Verify the contents
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
out1, _ := state2.AllocByID(alloc1.ID)
out2, _ := state2.AllocByID(alloc2.ID)
alloc1.SharedResources = &structs.Resources{DiskMB: 150}
alloc2.SharedResources = &structs.Resources{DiskMB: 150}
if !reflect.DeepEqual(alloc1, out1) {
t.Fatalf("bad: \n%#v\n%#v", out1, alloc1)
}
if !reflect.DeepEqual(alloc2, out2) {
t.Fatalf("bad: \n%#v\n%#v", out2, alloc2)
}
}
func TestFSM_SnapshotRestore_Indexes(t *testing.T) {
// Add some state
fsm := testFSM(t)

View File

@ -79,6 +79,9 @@ func Job() *structs.Job {
&structs.TaskGroup{
Name: "web",
Count: 10,
LocalDisk: &structs.LocalDisk{
DiskMB: 150,
},
RestartPolicy: &structs.RestartPolicy{
Attempts: 3,
Interval: 10 * time.Minute,
@ -120,7 +123,6 @@ func Job() *structs.Job {
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 256,
DiskMB: 150,
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
MBits: 50,
@ -178,6 +180,7 @@ func SystemJob() *structs.Job {
Delay: 1 * time.Minute,
Mode: structs.RestartPolicyModeDelay,
},
LocalDisk: structs.DefaultLocalDisk(),
Tasks: []*structs.Task{
&structs.Task{
Name: "web",
@ -255,7 +258,7 @@ func Alloc() *structs.Allocation {
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 256,
DiskMB: 10,
DiskMB: 150,
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
Device: "eth0",
@ -270,7 +273,6 @@ func Alloc() *structs.Allocation {
"web": &structs.Resources{
CPU: 500,
MemoryMB: 256,
DiskMB: 10,
Networks: []*structs.NetworkResource{
&structs.NetworkResource{
Device: "eth0",
@ -282,6 +284,9 @@ func Alloc() *structs.Allocation {
},
},
},
SharedResources: &structs.Resources{
DiskMB: 150,
},
Job: Job(),
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,

View File

@ -357,6 +357,24 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
return fmt.Errorf("unable to create job summary: %v", err)
}
// COMPAT 0.4.1 -> 0.5 Create the LocalDisk if it's nil by adding up DiskMB
// from task resources
for _, tg := range job.TaskGroups {
if tg.LocalDisk != nil {
continue
}
var diskMB int
for _, task := range tg.Tasks {
if task.Resources != nil {
diskMB += task.Resources.DiskMB
task.Resources.DiskMB = 0
}
}
tg.LocalDisk = &structs.LocalDisk{
DiskMB: diskMB,
}
}
// Insert the job
if err := txn.Insert("jobs", job); err != nil {
return fmt.Errorf("job insert failed: %v", err)
@ -1690,6 +1708,25 @@ func (r *StateRestore) NodeRestore(node *structs.Node) error {
func (r *StateRestore) JobRestore(job *structs.Job) error {
r.items.Add(watch.Item{Table: "jobs"})
r.items.Add(watch.Item{Job: job.ID})
// COMPAT 0.4.1 -> 0.5 Create the LocalDisk if it's nil by adding up DiskMB
// from task resources
for _, tg := range job.TaskGroups {
if tg.LocalDisk != nil {
continue
}
var diskMB int
for _, task := range tg.Tasks {
if task.Resources != nil {
diskMB += task.Resources.DiskMB
task.Resources.DiskMB = 0
}
}
tg.LocalDisk = &structs.LocalDisk{
DiskMB: diskMB,
}
}
if err := r.txn.Insert("jobs", job); err != nil {
return fmt.Errorf("job insert failed: %v", err)
}
@ -1713,6 +1750,15 @@ func (r *StateRestore) AllocRestore(alloc *structs.Allocation) error {
r.items.Add(watch.Item{AllocEval: alloc.EvalID})
r.items.Add(watch.Item{AllocJob: alloc.JobID})
r.items.Add(watch.Item{AllocNode: alloc.NodeID})
//COMPAT 0.4.1 -> 0.5
// Set the shared resources if it's not present
if alloc.SharedResources == nil {
alloc.SharedResources = &structs.Resources{
DiskMB: alloc.Resources.DiskMB,
}
}
if err := r.txn.Insert("allocs", alloc); err != nil {
return fmt.Errorf("alloc insert failed: %v", err)
}

View File

@ -433,6 +433,40 @@ func TestStateStore_UpdateUpsertJob_Job(t *testing.T) {
notify.verify(t)
}
// This test ensures that UpsertJob creates the LocalDisk is a job doesn't have
// one and clear out the task's disk resource asks
// COMPAT 0.4.1 -> 0.5
func TestStateStore_UpsertJob_NoLocalDisk(t *testing.T) {
state := testStateStore(t)
job := mock.Job()
// Set the LocalDisk to nil and set the tasks's DiskMB to 150
job.TaskGroups[0].LocalDisk = nil
job.TaskGroups[0].Tasks[0].Resources.DiskMB = 150
err := state.UpsertJob(1000, job)
if err != nil {
t.Fatalf("err: %v", err)
}
out, err := state.JobByID(job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
// Expect the state store to create the LocalDisk and clear out Tasks's
// DiskMB
expected := job.Copy()
expected.TaskGroups[0].LocalDisk = &structs.LocalDisk{
DiskMB: 150,
}
expected.TaskGroups[0].Tasks[0].Resources.DiskMB = 0
if !reflect.DeepEqual(expected, out) {
t.Fatalf("bad: %#v %#v", expected, out)
}
}
func TestStateStore_DeleteJob_Job(t *testing.T) {
state := testStateStore(t)
job := mock.Job()
@ -813,6 +847,51 @@ func TestStateStore_RestoreJob(t *testing.T) {
notify.verify(t)
}
// This test ensures that the state restore creates the LocalDisk for a job if
// it doesn't have one
// COMPAT 0.4.1 -> 0.5
func TestStateStore_Jobs_NoLocalDisk(t *testing.T) {
state := testStateStore(t)
job := mock.Job()
// Set LocalDisk to nil and set the DiskMB to 150
job.TaskGroups[0].LocalDisk = nil
job.TaskGroups[0].Tasks[0].Resources.DiskMB = 150
notify := setupNotifyTest(
state,
watch.Item{Table: "jobs"},
watch.Item{Job: job.ID})
restore, err := state.Restore()
if err != nil {
t.Fatalf("err: %v", err)
}
err = restore.JobRestore(job)
if err != nil {
t.Fatalf("err: %v", err)
}
restore.Commit()
out, err := state.JobByID(job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
// Expect job to have local disk and clear out the task's disk resource ask
expected := job.Copy()
expected.TaskGroups[0].LocalDisk = &structs.LocalDisk{
DiskMB: 150,
}
expected.TaskGroups[0].Tasks[0].Resources.DiskMB = 0
if !reflect.DeepEqual(out, expected) {
t.Fatalf("Bad: %#v %#v", out, job)
}
notify.verify(t)
}
func TestStateStore_UpsertPeriodicLaunch(t *testing.T) {
state := testStateStore(t)
job := mock.Job()

View File

@ -223,6 +223,12 @@ func (tg *TaskGroup) Diff(other *TaskGroup, contextual bool) (*TaskGroupDiff, er
diff.Objects = append(diff.Objects, rDiff)
}
// LocalDisk diff
diskDiff := primitiveObjectDiff(tg.LocalDisk, other.LocalDisk, nil, "LocalDisk", contextual)
if diskDiff != nil {
diff.Objects = append(diff.Objects, diskDiff)
}
// Tasks diff
tasks, err := taskDiffs(tg.Tasks, other.Tasks, contextual)
if err != nil {

View File

@ -1276,6 +1276,151 @@ func TestTaskGroupDiff(t *testing.T) {
},
},
},
{
// LocalDisk added
Old: &TaskGroup{},
New: &TaskGroup{
LocalDisk: &LocalDisk{
Sticky: true,
DiskMB: 100,
},
},
Expected: &TaskGroupDiff{
Type: DiffTypeEdited,
Objects: []*ObjectDiff{
{
Type: DiffTypeAdded,
Name: "LocalDisk",
Fields: []*FieldDiff{
{
Type: DiffTypeAdded,
Name: "DiskMB",
Old: "",
New: "100",
},
{
Type: DiffTypeAdded,
Name: "Sticky",
Old: "",
New: "true",
},
},
},
},
},
},
{
// LocalDisk deleted
Old: &TaskGroup{
LocalDisk: &LocalDisk{
Sticky: true,
DiskMB: 100,
},
},
New: &TaskGroup{},
Expected: &TaskGroupDiff{
Type: DiffTypeEdited,
Objects: []*ObjectDiff{
{
Type: DiffTypeDeleted,
Name: "LocalDisk",
Fields: []*FieldDiff{
{
Type: DiffTypeDeleted,
Name: "DiskMB",
Old: "100",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "Sticky",
Old: "true",
New: "",
},
},
},
},
},
},
{
// LocalDisk edited
Old: &TaskGroup{
LocalDisk: &LocalDisk{
Sticky: true,
DiskMB: 150,
},
},
New: &TaskGroup{
LocalDisk: &LocalDisk{
Sticky: false,
DiskMB: 90,
},
},
Expected: &TaskGroupDiff{
Type: DiffTypeEdited,
Objects: []*ObjectDiff{
{
Type: DiffTypeEdited,
Name: "LocalDisk",
Fields: []*FieldDiff{
{
Type: DiffTypeEdited,
Name: "DiskMB",
Old: "150",
New: "90",
},
{
Type: DiffTypeEdited,
Name: "Sticky",
Old: "true",
New: "false",
},
},
},
},
},
},
{
// LocalDisk edited with context
Contextual: true,
Old: &TaskGroup{
LocalDisk: &LocalDisk{
Sticky: false,
DiskMB: 100,
},
},
New: &TaskGroup{
LocalDisk: &LocalDisk{
Sticky: true,
DiskMB: 90,
},
},
Expected: &TaskGroupDiff{
Type: DiffTypeEdited,
Objects: []*ObjectDiff{
{
Type: DiffTypeEdited,
Name: "LocalDisk",
Fields: []*FieldDiff{
{
Type: DiffTypeEdited,
Name: "DiskMB",
Old: "100",
New: "90",
},
{
Type: DiffTypeEdited,
Name: "Sticky",
Old: "false",
New: "true",
},
},
},
},
},
},
{
// Tasks edited
Old: &TaskGroup{

View File

@ -63,6 +63,12 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex) (bool, st
return false, "", nil, err
}
} else if alloc.TaskResources != nil {
// Adding the shared resource asks for the allocation to the used
// resources
if err := used.Add(alloc.SharedResources); err != nil {
return false, "", nil, err
}
// Allocations within the plan have the combined resources stripped
// to save space, so sum up the individual task resources.
for _, taskResource := range alloc.TaskResources {

View File

@ -61,6 +61,14 @@ func TestAllocsFit_PortsOvercommitted(t *testing.T) {
}
a1 := &Allocation{
Job: &Job{
TaskGroups: []*TaskGroup{
{
Name: "web",
LocalDisk: DefaultLocalDisk(),
},
},
},
TaskResources: map[string]*Resources{
"web": &Resources{
Networks: []*NetworkResource{

View File

@ -772,7 +772,6 @@ func DefaultResources() *Resources {
return &Resources{
CPU: 100,
MemoryMB: 10,
DiskMB: 300,
IOPS: 0,
}
}
@ -823,9 +822,6 @@ func (r *Resources) MeetsMinResources() error {
if r.MemoryMB < 10 {
mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum MemoryMB value is 10; got %d", r.MemoryMB))
}
if r.DiskMB < 10 {
mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum DiskMB value is 10; got %d", r.DiskMB))
}
if r.IOPS < 0 {
mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum IOPS value is 0; got %d", r.IOPS))
}
@ -1541,6 +1537,9 @@ type TaskGroup struct {
// Tasks are the collection of tasks that this task group needs to run
Tasks []*Task
// LocalDisk is the disk resources that the task group requests
LocalDisk *LocalDisk
// Meta is used to associate arbitrary metadata with this
// task group. This is opaque to Nomad.
Meta map[string]string
@ -1565,6 +1564,10 @@ func (tg *TaskGroup) Copy() *TaskGroup {
}
ntg.Meta = CopyMapStringString(ntg.Meta)
if tg.LocalDisk != nil {
ntg.LocalDisk = tg.LocalDisk.Copy()
}
return ntg
}
@ -1613,6 +1616,14 @@ func (tg *TaskGroup) Validate() error {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have a restart policy", tg.Name))
}
if tg.LocalDisk != nil {
if err := tg.LocalDisk.Validate(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
} else {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have a local disk object", tg.Name))
}
// Check for duplicate tasks
tasks := make(map[string]int)
for idx, task := range tg.Tasks {
@ -1627,7 +1638,7 @@ func (tg *TaskGroup) Validate() error {
// Validate the tasks
for _, task := range tg.Tasks {
if err := task.Validate(); err != nil {
if err := task.Validate(tg.LocalDisk); err != nil {
outer := fmt.Errorf("Task %s validation failed: %s", task.Name, err)
mErr.Errors = append(mErr.Errors, outer)
}
@ -2025,7 +2036,7 @@ func (t *Task) FindHostAndPortFor(portLabel string) (string, int) {
}
// Validate is used to sanity check a task
func (t *Task) Validate() error {
func (t *Task) Validate(localDisk *LocalDisk) error {
var mErr multierror.Error
if t.Name == "" {
mErr.Errors = append(mErr.Errors, errors.New("Missing task name"))
@ -2049,6 +2060,13 @@ func (t *Task) Validate() error {
mErr.Errors = append(mErr.Errors, err)
}
// Ensure the task isn't asking for disk resources
if t.Resources != nil {
if t.Resources.DiskMB > 0 {
mErr.Errors = append(mErr.Errors, errors.New("Task can't ask for disk resources, they have to be specified at the task group level."))
}
}
// Validate the log config
if t.LogConfig == nil {
mErr.Errors = append(mErr.Errors, errors.New("Missing Log Config"))
@ -2068,12 +2086,12 @@ func (t *Task) Validate() error {
mErr.Errors = append(mErr.Errors, err)
}
if t.LogConfig != nil && t.Resources != nil {
if t.LogConfig != nil && localDisk != nil {
logUsage := (t.LogConfig.MaxFiles * t.LogConfig.MaxFileSizeMB)
if t.Resources.DiskMB <= logUsage {
if localDisk.DiskMB <= logUsage {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("log storage (%d MB) must be less than requested disk capacity (%d MB)",
logUsage, t.Resources.DiskMB))
logUsage, localDisk.DiskMB))
}
}
@ -2552,6 +2570,37 @@ func (c *Constraint) Validate() error {
return mErr.ErrorOrNil()
}
// LocalDisk is an ephemeral disk object
type LocalDisk struct {
// Sticky indicates whether the allocation is sticky to a node
Sticky bool
// DiskMB is the size of the local disk
DiskMB int `mapstructure:"disk"`
}
// DefaultLocalDisk returns a LocalDisk with default configurations
func DefaultLocalDisk() *LocalDisk {
return &LocalDisk{
DiskMB: 300,
}
}
// Validate validates LocalDisk
func (d *LocalDisk) Validate() error {
if d.DiskMB < 10 {
return fmt.Errorf("minimum DiskMB value is 10; got %d", d.DiskMB)
}
return nil
}
// Copy copies the LocalDisk struct and returns a new one
func (d *LocalDisk) Copy() *LocalDisk {
ld := new(LocalDisk)
*ld = *d
return ld
}
// Vault stores the set of premissions a task needs access to from Vault.
type Vault struct {
// Policies is the set of policies that the task needs access to
@ -2623,6 +2672,10 @@ type Allocation struct {
// of this allocation of the task group.
Resources *Resources
// SharedResources are the resources that are shared by all the tasks in an
// allocation
SharedResources *Resources
// TaskResources is the set of resources allocated to each
// task. These should sum to the total Resources.
TaskResources map[string]*Resources
@ -2670,6 +2723,7 @@ func (a *Allocation) Copy() *Allocation {
na.Job = na.Job.Copy()
na.Resources = na.Resources.Copy()
na.SharedResources = na.SharedResources.Copy()
if a.TaskResources != nil {
tr := make(map[string]*Resources, len(na.TaskResources))

View File

@ -115,8 +115,9 @@ func testJob() *Job {
},
TaskGroups: []*TaskGroup{
&TaskGroup{
Name: "web",
Count: 10,
Name: "web",
Count: 10,
LocalDisk: DefaultLocalDisk(),
RestartPolicy: &RestartPolicy{
Mode: RestartPolicyModeFail,
Attempts: 3,
@ -147,7 +148,6 @@ func testJob() *Job {
Resources: &Resources{
CPU: 500,
MemoryMB: 256,
DiskMB: 20,
Networks: []*NetworkResource{
&NetworkResource{
MBits: 50,
@ -345,20 +345,24 @@ func TestTaskGroup_Validate(t *testing.T) {
err = tg.Validate()
mErr = err.(*multierror.Error)
if !strings.Contains(mErr.Errors[0].Error(), "2 redefines 'web' from task 1") {
if !strings.Contains(mErr.Errors[0].Error(), "should have a local disk object") {
t.Fatalf("err: %s", err)
}
if !strings.Contains(mErr.Errors[1].Error(), "Task 3 missing name") {
if !strings.Contains(mErr.Errors[1].Error(), "2 redefines 'web' from task 1") {
t.Fatalf("err: %s", err)
}
if !strings.Contains(mErr.Errors[2].Error(), "Task web validation failed") {
if !strings.Contains(mErr.Errors[2].Error(), "Task 3 missing name") {
t.Fatalf("err: %s", err)
}
if !strings.Contains(mErr.Errors[3].Error(), "Task web validation failed") {
t.Fatalf("err: %s", err)
}
}
func TestTask_Validate(t *testing.T) {
task := &Task{}
err := task.Validate()
localDisk := DefaultLocalDisk()
err := task.Validate(localDisk)
mErr := err.(*multierror.Error)
if !strings.Contains(mErr.Errors[0].Error(), "task name") {
t.Fatalf("err: %s", err)
@ -371,7 +375,7 @@ func TestTask_Validate(t *testing.T) {
}
task = &Task{Name: "web/foo"}
err = task.Validate()
err = task.Validate(localDisk)
mErr = err.(*multierror.Error)
if !strings.Contains(mErr.Errors[0].Error(), "slashes") {
t.Fatalf("err: %s", err)
@ -382,13 +386,13 @@ func TestTask_Validate(t *testing.T) {
Driver: "docker",
Resources: &Resources{
CPU: 100,
DiskMB: 200,
MemoryMB: 100,
IOPS: 10,
},
LogConfig: DefaultLogConfig(),
}
err = task.Validate()
localDisk.DiskMB = 200
err = task.Validate(localDisk)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -416,18 +420,20 @@ func TestTask_Validate_Services(t *testing.T) {
Name: "service-name",
}
localDisk := DefaultLocalDisk()
task := &Task{
Name: "web",
Driver: "docker",
Resources: &Resources{
CPU: 100,
DiskMB: 200,
MemoryMB: 100,
IOPS: 10,
},
Services: []*Service{s1, s2},
}
err := task.Validate()
localDisk.DiskMB = 200
err := task.Validate(localDisk)
if err == nil {
t.Fatal("expected an error")
}
@ -494,12 +500,12 @@ func TestTask_Validate_Service_Check(t *testing.T) {
func TestTask_Validate_LogConfig(t *testing.T) {
task := &Task{
LogConfig: DefaultLogConfig(),
Resources: &Resources{
DiskMB: 1,
},
}
localDisk := &LocalDisk{
DiskMB: 1,
}
err := task.Validate()
err := task.Validate(localDisk)
mErr := err.(*multierror.Error)
if !strings.Contains(mErr.Errors[3].Error(), "log storage") {
t.Fatalf("err: %s", err)

View File

@ -455,6 +455,10 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error {
TaskResources: option.TaskResources,
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
SharedResources: &structs.Resources{
DiskMB: missing.TaskGroup.LocalDisk.DiskMB,
},
}
// If the new allocation is replacing an older allocation then we

View File

@ -50,8 +50,8 @@ func TestServiceSched_JobRegister(t *testing.T) {
}
// Ensure the eval has no spawned blocked eval
if len(h.Evals) != 1 {
t.Fatalf("bad: %#v", h.Evals)
if len(h.CreateEvals) != 0 {
t.Fatalf("bad: %#v", h.CreateEvals)
if h.Evals[0].BlockedEval != "" {
t.Fatalf("bad: %#v", h.Evals[0])
}
@ -91,6 +91,71 @@ func TestServiceSched_JobRegister(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestServiceSched_JobRegister_DiskConstraints(t *testing.T) {
h := NewHarness(t)
// Create a node
node := mock.Node()
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
// Create a job with count 2 and disk as 60GB so that only one allocation
// can fit
job := mock.Job()
job.TaskGroups[0].Count = 2
job.TaskGroups[0].LocalDisk.DiskMB = 88 * 1024
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
}
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure a single plan
if len(h.Plans) != 1 {
t.Fatalf("bad: %#v", h.Plans)
}
plan := h.Plans[0]
// Ensure the plan doesn't have annotations.
if plan.Annotations != nil {
t.Fatalf("expected no annotations")
}
// Ensure the eval has a blocked eval
if len(h.CreateEvals) != 1 {
t.Fatalf("bad: %#v", h.CreateEvals)
}
// Ensure the plan allocated only one allocation
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
}
if len(planned) != 1 {
t.Fatalf("bad: %#v", plan)
}
// Lookup the allocations by JobID
out, err := h.State.AllocsByJob(job.ID)
noErr(t, err)
// Ensure only one allocation was placed
if len(out) != 1 {
t.Fatalf("bad: %#v", out)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestServiceSched_JobRegister_Annotate(t *testing.T) {
h := NewHarness(t)

View File

@ -131,11 +131,11 @@ func (iter *StaticRankIterator) Reset() {
// BinPackIterator is a RankIterator that scores potential options
// based on a bin-packing algorithm.
type BinPackIterator struct {
ctx Context
source RankIterator
evict bool
priority int
tasks []*structs.Task
ctx Context
source RankIterator
evict bool
priority int
taskGroup *structs.TaskGroup
}
// NewBinPackIterator returns a BinPackIterator which tries to fit tasks
@ -154,8 +154,8 @@ func (iter *BinPackIterator) SetPriority(p int) {
iter.priority = p
}
func (iter *BinPackIterator) SetTasks(tasks []*structs.Task) {
iter.tasks = tasks
func (iter *BinPackIterator) SetTaskGroup(taskGroup *structs.TaskGroup) {
iter.taskGroup = taskGroup
}
func (iter *BinPackIterator) Next() *RankedNode {
@ -182,8 +182,10 @@ OUTER:
netIdx.AddAllocs(proposed)
// Assign the resources for each task
total := new(structs.Resources)
for _, task := range iter.tasks {
total := &structs.Resources{
DiskMB: iter.taskGroup.LocalDisk.DiskMB,
}
for _, task := range iter.taskGroup.Tasks {
taskResources := task.Resources.Copy()
// Check if we need a network resource

View File

@ -68,16 +68,20 @@ func TestBinPackIterator_NoExistingAlloc(t *testing.T) {
}
static := NewStaticRankIterator(ctx, nodes)
task := &structs.Task{
Name: "web",
Resources: &structs.Resources{
CPU: 1024,
MemoryMB: 1024,
taskGroup := &structs.TaskGroup{
LocalDisk: &structs.LocalDisk{},
Tasks: []*structs.Task{
{
Name: "web",
Resources: &structs.Resources{
CPU: 1024,
MemoryMB: 1024,
},
},
},
}
binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTasks([]*structs.Task{task})
binp.SetTaskGroup(taskGroup)
out := collectRanked(binp)
if len(out) != 2 {
@ -142,16 +146,21 @@ func TestBinPackIterator_PlannedAlloc(t *testing.T) {
},
}
task := &structs.Task{
Name: "web",
Resources: &structs.Resources{
CPU: 1024,
MemoryMB: 1024,
taskGroup := &structs.TaskGroup{
LocalDisk: &structs.LocalDisk{},
Tasks: []*structs.Task{
{
Name: "web",
Resources: &structs.Resources{
CPU: 1024,
MemoryMB: 1024,
},
},
},
}
binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTasks([]*structs.Task{task})
binp.SetTaskGroup(taskGroup)
out := collectRanked(binp)
if len(out) != 1 {
@ -223,16 +232,20 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) {
noErr(t, state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)))
noErr(t, state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2}))
task := &structs.Task{
Name: "web",
Resources: &structs.Resources{
CPU: 1024,
MemoryMB: 1024,
taskGroup := &structs.TaskGroup{
LocalDisk: &structs.LocalDisk{},
Tasks: []*structs.Task{
{
Name: "web",
Resources: &structs.Resources{
CPU: 1024,
MemoryMB: 1024,
},
},
},
}
binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTasks([]*structs.Task{task})
binp.SetTaskGroup(taskGroup)
out := collectRanked(binp)
if len(out) != 1 {
@ -307,16 +320,21 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) {
plan := ctx.Plan()
plan.NodeUpdate[nodes[0].Node.ID] = []*structs.Allocation{alloc1}
task := &structs.Task{
Name: "web",
Resources: &structs.Resources{
CPU: 1024,
MemoryMB: 1024,
taskGroup := &structs.TaskGroup{
LocalDisk: &structs.LocalDisk{},
Tasks: []*structs.Task{
{
Name: "web",
Resources: &structs.Resources{
CPU: 1024,
MemoryMB: 1024,
},
},
},
}
binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTasks([]*structs.Task{task})
binp.SetTaskGroup(taskGroup)
out := collectRanked(binp)
if len(out) != 2 {

View File

@ -154,7 +154,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Reso
s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
s.proposedAllocConstraint.SetTaskGroup(tg)
s.wrappedChecks.SetTaskGroup(tg.Name)
s.binPack.SetTasks(tg.Tasks)
s.binPack.SetTaskGroup(tg)
// Find the node with the max score
option := s.maxScore.Next()
@ -242,7 +242,7 @@ func (s *SystemStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resou
// Update the parameters of iterators
s.taskGroupDrivers.SetDrivers(tgConstr.drivers)
s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
s.binPack.SetTasks(tg.Tasks)
s.binPack.SetTaskGroup(tg)
s.wrappedChecks.SetTaskGroup(tg.Name)
// Get the next option that satisfies the constraints.

View File

@ -319,6 +319,10 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
TaskResources: option.TaskResources,
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
SharedResources: &structs.Resources{
DiskMB: missing.TaskGroup.LocalDisk.DiskMB,
},
}
// If the new allocation is replacing an older allocation then we

View File

@ -80,6 +80,68 @@ func TestSystemSched_JobRegister(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestSystemSched_JobRegister_LocalDiskConstraint(t *testing.T) {
h := NewHarness(t)
// Create a nodes
node := mock.Node()
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
// Create a job
job := mock.SystemJob()
job.TaskGroups[0].LocalDisk.DiskMB = 60 * 1024
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
// Create another job with a lot of disk resource ask so that it doesn't fit
// the node
job1 := mock.SystemJob()
job1.TaskGroups[0].LocalDisk.DiskMB = 60 * 1024
noErr(t, h.State.UpsertJob(h.NextIndex(), job1))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
}
// Process the evaluation
if err := h.Process(NewSystemScheduler, eval); err != nil {
t.Fatalf("err: %v", err)
}
// Lookup the allocations by JobID
out, err := h.State.AllocsByJob(job.ID)
noErr(t, err)
// Ensure all allocations placed
if len(out) != 1 {
t.Fatalf("bad: %#v", out)
}
// Create a new harness to test the scheduling result for the second job
h1 := NewHarnessWithState(t, h.State)
// Create a mock evaluation to register the job
eval1 := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job1.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job1.ID,
}
// Process the evaluation
if err := h1.Process(NewSystemScheduler, eval1); err != nil {
t.Fatalf("err: %v", err)
}
out, err = h1.State.AllocsByJob(job1.ID)
noErr(t, err)
if len(out) != 0 {
t.Fatalf("bad: %#v", out)
}
}
func TestSystemSched_ExhaustResources(t *testing.T) {
h := NewHarness(t)

View File

@ -540,7 +540,7 @@ func taskGroupConstraints(tg *structs.TaskGroup) tgConstrainTuple {
c := tgConstrainTuple{
constraints: make([]*structs.Constraint, 0, len(tg.Constraints)),
drivers: make(map[string]struct{}),
size: new(structs.Resources),
size: &structs.Resources{DiskMB: tg.LocalDisk.DiskMB},
}
c.constraints = append(c.constraints, tg.Constraints...)

View File

@ -846,6 +846,7 @@ func TestTaskGroupConstraints(t *testing.T) {
Name: "web",
Count: 10,
Constraints: []*structs.Constraint{constr},
LocalDisk: &structs.LocalDisk{},
Tasks: []*structs.Task{
&structs.Task{
Driver: "exec",