Merge pull request #1168 from hashicorp/f-plan-endpoint

Job.Plan endpoint
This commit is contained in:
Alex Dadgar 2016-05-16 13:15:40 -07:00
commit a5ab96d40e
18 changed files with 1561 additions and 72 deletions

View File

@ -1,6 +1,7 @@
package api
import (
"fmt"
"sort"
"time"
)
@ -116,6 +117,24 @@ func (j *Jobs) PeriodicForce(jobID string, q *WriteOptions) (string, *WriteMeta,
return resp.EvalID, wm, nil
}
func (j *Jobs) Plan(job *Job, diff bool, q *WriteOptions) (*JobPlanResponse, *WriteMeta, error) {
if job == nil {
return nil, nil, fmt.Errorf("must pass non-nil job")
}
var resp JobPlanResponse
req := &JobPlanRequest{
Job: job,
Diff: diff,
}
wm, err := j.client.write("/v1/job/"+job.ID+"/plan", req, &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, wm, nil
}
// periodicForceResponse is used to deserialize a force response
type periodicForceResponse struct {
EvalID string
@ -256,3 +275,67 @@ type registerJobResponse struct {
type deregisterJobResponse struct {
EvalID string
}
type JobPlanRequest struct {
Job *Job
Diff bool
}
type JobPlanResponse struct {
JobModifyIndex uint64
CreatedEvals []*Evaluation
Diff *JobDiff
Annotations *PlanAnnotations
}
type JobDiff struct {
Type string
ID string
Fields []*FieldDiff
Objects []*ObjectDiff
TaskGroups []*TaskGroupDiff
}
type TaskGroupDiff struct {
Type string
Name string
Fields []*FieldDiff
Objects []*ObjectDiff
Tasks []*TaskDiff
Updates map[string]uint64
}
type TaskDiff struct {
Type string
Name string
Fields []*FieldDiff
Objects []*ObjectDiff
Annotations []string
}
type FieldDiff struct {
Type string
Name string
Old, New string
Annotations []string
}
type ObjectDiff struct {
Type string
Name string
Fields []*FieldDiff
Objects []*ObjectDiff
}
type PlanAnnotations struct {
DesiredTGUpdates map[string]*DesiredUpdates
}
type DesiredUpdates struct {
Ignore uint64
Place uint64
Migrate uint64
Stop uint64
InPlaceUpdate uint64
DestructiveUpdate uint64
}

View File

@ -350,6 +350,76 @@ func TestJobs_PeriodicForce(t *testing.T) {
t.Fatalf("evaluation %q missing", evalID)
}
func TestJobs_Plan(t *testing.T) {
c, s := makeClient(t, nil, nil)
defer s.Stop()
jobs := c.Jobs()
// Create a job and attempt to register it
job := testJob()
eval, wm, err := jobs.Register(job, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if eval == "" {
t.Fatalf("missing eval id")
}
assertWriteMeta(t, wm)
// Check that passing a nil job fails
if _, _, err := jobs.Plan(nil, true, nil); err == nil {
t.Fatalf("expect an error when job isn't provided")
}
// Make a plan request
planResp, wm, err := jobs.Plan(job, true, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if planResp == nil {
t.Fatalf("nil response")
}
if planResp.JobModifyIndex == 0 {
t.Fatalf("bad JobModifyIndex value: %#v", planResp)
}
if planResp.Diff == nil {
t.Fatalf("got nil diff: %#v", planResp)
}
if planResp.Annotations == nil {
t.Fatalf("got nil annotations: %#v", planResp)
}
// Can make this assertion because there are no clients.
if len(planResp.CreatedEvals) == 0 {
t.Fatalf("got no CreatedEvals: %#v", planResp)
}
// Make a plan request w/o the diff
planResp, wm, err = jobs.Plan(job, false, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
assertWriteMeta(t, wm)
if planResp == nil {
t.Fatalf("nil response")
}
if planResp.JobModifyIndex == 0 {
t.Fatalf("bad JobModifyIndex value: %d", planResp.JobModifyIndex)
}
if planResp.Diff != nil {
t.Fatalf("got non-nil diff: %#v", planResp)
}
if planResp.Annotations == nil {
t.Fatalf("got nil annotations: %#v", planResp)
}
// Can make this assertion because there are no clients.
if len(planResp.CreatedEvals) == 0 {
t.Fatalf("got no CreatedEvals: %#v", planResp)
}
}
func TestJobs_NewBatchJob(t *testing.T) {
job := NewBatchJob("job1", "myjob", "region1", 5)
expect := &Job{

View File

@ -51,6 +51,9 @@ func (s *HTTPServer) JobSpecificRequest(resp http.ResponseWriter, req *http.Requ
case strings.HasSuffix(path, "/periodic/force"):
jobName := strings.TrimSuffix(path, "/periodic/force")
return s.periodicForceRequest(resp, req, jobName)
case strings.HasSuffix(path, "/plan"):
jobName := strings.TrimSuffix(path, "/plan")
return s.jobPlan(resp, req, jobName)
default:
return s.jobCRUD(resp, req, path)
}
@ -74,6 +77,32 @@ func (s *HTTPServer) jobForceEvaluate(resp http.ResponseWriter, req *http.Reques
return out, nil
}
func (s *HTTPServer) jobPlan(resp http.ResponseWriter, req *http.Request,
jobName string) (interface{}, error) {
if req.Method != "PUT" && req.Method != "POST" {
return nil, CodedError(405, ErrInvalidMethod)
}
var args structs.JobPlanRequest
if err := decodeBody(req, &args); err != nil {
return nil, CodedError(400, err.Error())
}
if args.Job == nil {
return nil, CodedError(400, "Job must be specified")
}
if jobName != "" && args.Job.ID != jobName {
return nil, CodedError(400, "Job ID does not match")
}
s.parseRegion(req, &args.Region)
var out structs.JobPlanResponse
if err := s.agent.RPC("Job.Plan", &args, &out); err != nil {
return nil, err
}
setIndex(resp, out.Index)
return out, nil
}
func (s *HTTPServer) periodicForceRequest(resp http.ResponseWriter, req *http.Request,
jobName string) (interface{}, error) {
if req.Method != "PUT" && req.Method != "POST" {

View File

@ -483,3 +483,39 @@ func TestHTTP_PeriodicForce(t *testing.T) {
}
})
}
func TestHTTP_JobPlan(t *testing.T) {
httpTest(t, nil, func(s *TestServer) {
// Create the job
job := mock.Job()
args := structs.JobPlanRequest{
Job: job,
Diff: true,
WriteRequest: structs.WriteRequest{Region: "global"},
}
buf := encodeReq(args)
// Make the HTTP request
req, err := http.NewRequest("PUT", "/v1/job/"+job.ID+"/plan", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
respW := httptest.NewRecorder()
// Make the request
obj, err := s.Server.JobSpecificRequest(respW, req)
if err != nil {
t.Fatalf("err: %v", err)
}
// Check the response
plan := obj.(structs.JobPlanResponse)
if plan.Annotations == nil {
t.Fatalf("bad: %v", plan)
}
if plan.Diff == nil {
t.Fatalf("bad: %v", plan)
}
})
}

View File

@ -10,6 +10,7 @@ import (
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/watch"
"github.com/hashicorp/nomad/scheduler"
)
// Job endpoint is used for job interactions
@ -32,39 +33,11 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
// Initialize the job fields (sets defaults and any necessary init work).
args.Job.InitFields()
if err := args.Job.Validate(); err != nil {
// Validate the job.
if err := validateJob(args.Job); err != nil {
return err
}
// Validate the driver configurations.
var driverErrors multierror.Error
for _, tg := range args.Job.TaskGroups {
for _, task := range tg.Tasks {
d, err := driver.NewDriver(
task.Driver,
driver.NewEmptyDriverContext(),
)
if err != nil {
msg := "failed to create driver for task %q in group %q for validation: %v"
driverErrors.Errors = append(driverErrors.Errors, fmt.Errorf(msg, tg.Name, task.Name, err))
continue
}
if err := d.Validate(task.Config); err != nil {
formatted := fmt.Errorf("group %q -> task %q -> config: %v", tg.Name, task.Name, err)
driverErrors.Errors = append(driverErrors.Errors, formatted)
}
}
}
if len(driverErrors.Errors) != 0 {
return driverErrors.ErrorOrNil()
}
if args.Job.Type == structs.JobTypeCore {
return fmt.Errorf("job type cannot be core")
}
// Commit this update via Raft
_, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args)
if err != nil {
@ -414,3 +387,132 @@ func (j *Job) Evaluations(args *structs.JobSpecificRequest,
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}
// Plan is used to cause a dry-run evaluation of the Job and return the results
// with a potential diff containing annotations.
func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse) error {
if done, err := j.srv.forward("Job.Plan", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "plan"}, time.Now())
// Validate the arguments
if args.Job == nil {
return fmt.Errorf("Job required for plan")
}
// Initialize the job fields (sets defaults and any necessary init work).
args.Job.InitFields()
// Validate the job.
if err := validateJob(args.Job); err != nil {
return err
}
// Acquire a snapshot of the state
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
// Get the original job
oldJob, err := snap.JobByID(args.Job.ID)
if err != nil {
return err
}
var index uint64
if oldJob != nil {
index = oldJob.JobModifyIndex + 1
}
// Insert the updated Job into the snapshot
snap.UpsertJob(index, args.Job)
// Create an eval and mark it as requiring annotations and insert that as well
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: args.Job.Priority,
Type: args.Job.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: args.Job.ID,
JobModifyIndex: index,
Status: structs.EvalStatusPending,
AnnotatePlan: true,
}
// Create an in-memory Planner that returns no errors and stores the
// submitted plan and created evals.
planner := &scheduler.Harness{
State: &snap.StateStore,
}
// Create the scheduler and run it
sched, err := scheduler.NewScheduler(eval.Type, j.srv.logger, snap, planner)
if err != nil {
return err
}
if err := sched.Process(eval); err != nil {
return err
}
// Annotate and store the diff
if plans := len(planner.Plans); plans != 1 {
return fmt.Errorf("scheduler resulted in an unexpected number of plans: %d", plans)
}
annotations := planner.Plans[0].Annotations
if args.Diff {
jobDiff, err := oldJob.Diff(args.Job, true)
if err != nil {
return fmt.Errorf("failed to create job diff: %v", err)
}
if err := scheduler.Annotate(jobDiff, annotations); err != nil {
return fmt.Errorf("failed to annotate job diff: %v", err)
}
reply.Diff = jobDiff
}
reply.JobModifyIndex = index
reply.Annotations = annotations
reply.CreatedEvals = planner.CreateEvals
reply.Index = index
return nil
}
// validateJob validates a Job and task drivers and returns an error if there is
// a validation problem or if the Job is of a type a user is not allowed to
// submit.
func validateJob(job *structs.Job) error {
validationErrors := new(multierror.Error)
if err := job.Validate(); err != nil {
multierror.Append(validationErrors, err)
}
// Validate the driver configurations.
for _, tg := range job.TaskGroups {
for _, task := range tg.Tasks {
d, err := driver.NewDriver(
task.Driver,
driver.NewEmptyDriverContext(),
)
if err != nil {
msg := "failed to create driver for task %q in group %q for validation: %v"
multierror.Append(validationErrors, fmt.Errorf(msg, tg.Name, task.Name, err))
continue
}
if err := d.Validate(task.Config); err != nil {
formatted := fmt.Errorf("group %q -> task %q -> config: %v", tg.Name, task.Name, err)
multierror.Append(validationErrors, formatted)
}
}
}
if job.Type == structs.JobTypeCore {
multierror.Append(validationErrors, fmt.Errorf("job type cannot be core"))
}
return validationErrors.ErrorOrNil()
}

View File

@ -914,3 +914,101 @@ func TestJobEndpoint_Evaluations(t *testing.T) {
t.Fatalf("bad: %#v", resp2.Evaluations)
}
}
func TestJobEndpoint_Plan_WithDiff(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
job := mock.Job()
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var resp structs.JobRegisterResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
// Create a plan request
planReq := &structs.JobPlanRequest{
Job: job,
Diff: true,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var planResp structs.JobPlanResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Plan", planReq, &planResp); err != nil {
t.Fatalf("err: %v", err)
}
// Check the response
if planResp.JobModifyIndex == 0 {
t.Fatalf("bad cas: %d", planResp.JobModifyIndex)
}
if planResp.Annotations == nil {
t.Fatalf("no annotations")
}
if planResp.Diff == nil {
t.Fatalf("no diff")
}
}
func TestJobEndpoint_Plan_NoDiff(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
job := mock.Job()
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var resp structs.JobRegisterResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
// Create a plan request
planReq := &structs.JobPlanRequest{
Job: job,
Diff: false,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var planResp structs.JobPlanResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Plan", planReq, &planResp); err != nil {
t.Fatalf("err: %v", err)
}
// Check the response
if planResp.JobModifyIndex == 0 {
t.Fatalf("bad cas: %d", planResp.JobModifyIndex)
}
if planResp.Annotations == nil {
t.Fatalf("no annotations")
}
if planResp.Diff != nil {
t.Fatalf("got diff")
}
}

View File

@ -10,26 +10,6 @@ import (
"github.com/mitchellh/hashstructure"
)
const (
// AnnotationForcesDestructiveUpdate marks a diff as causing a destructive
// update.
AnnotationForcesDestructiveUpdate = "forces create/destroy update"
// AnnotationForcesInplaceUpdate marks a diff as causing an in-place
// update.
AnnotationForcesInplaceUpdate = "forces in-place update"
)
// UpdateTypes denote the type of update to occur against the task group.
const (
UpdateTypeIgnore = "ignore"
UpdateTypeCreate = "create"
UpdateTypeDestroy = "destroy"
UpdateTypeMigrate = "migrate"
UpdateTypeInplaceUpdate = "in-place update"
UpdateTypeDestructiveUpdate = "create/destroy update"
)
// DiffType denotes the type of a diff object.
type DiffType string
@ -81,15 +61,20 @@ func (j *Job) Diff(other *Job, contextual bool) (*JobDiff, error) {
var oldPrimitiveFlat, newPrimitiveFlat map[string]string
filter := []string{"ID", "Status", "StatusDescription", "CreateIndex", "ModifyIndex", "JobModifyIndex"}
// Have to treat this special since it is a struct literal, not a pointer
var jUpdate, otherUpdate *UpdateStrategy
if j == nil && other == nil {
return diff, nil
} else if j == nil {
j = &Job{}
otherUpdate = &other.Update
diff.Type = DiffTypeAdded
newPrimitiveFlat = flatmap.Flatten(other, filter, true)
diff.ID = other.ID
} else if other == nil {
other = &Job{}
jUpdate = &j.Update
diff.Type = DiffTypeDeleted
oldPrimitiveFlat = flatmap.Flatten(j, filter, true)
diff.ID = j.ID
@ -102,6 +87,8 @@ func (j *Job) Diff(other *Job, contextual bool) (*JobDiff, error) {
return nil, fmt.Errorf("can not diff jobs with different IDs: %q and %q", j.ID, other.ID)
}
jUpdate = &j.Update
otherUpdate = &other.Update
oldPrimitiveFlat = flatmap.Flatten(j, filter, true)
newPrimitiveFlat = flatmap.Flatten(other, filter, true)
diff.ID = other.ID
@ -134,7 +121,7 @@ func (j *Job) Diff(other *Job, contextual bool) (*JobDiff, error) {
diff.TaskGroups = tgs
// Update diff
if uDiff := primitiveObjectDiff(j.Update, other.Update, nil, "Update", contextual); uDiff != nil {
if uDiff := primitiveObjectDiff(jUpdate, otherUpdate, nil, "Update", contextual); uDiff != nil {
diff.Objects = append(diff.Objects, uDiff)
}
@ -171,7 +158,7 @@ type TaskGroupDiff struct {
Fields []*FieldDiff
Objects []*ObjectDiff
Tasks []*TaskDiff
Updates map[string]int
Updates map[string]uint64
}
// Diff returns a diff of two task groups. If contextual diff is enabled,
@ -847,9 +834,10 @@ func (o ObjectDiffs) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
func (o ObjectDiffs) Less(i, j int) bool { return o[i].Less(o[j]) }
type FieldDiff struct {
Type DiffType
Name string
Old, New string
Type DiffType
Name string
Old, New string
Annotations []string
}
// fieldDiff returns a FieldDiff if old and new are different otherwise, it
@ -880,7 +868,12 @@ func fieldDiff(old, new, name string, contextual bool) *FieldDiff {
}
func (f *FieldDiff) GoString() string {
return fmt.Sprintf("%q (%s): %q => %q", f.Name, f.Type, f.Old, f.New)
out := fmt.Sprintf("%q (%s): %q => %q", f.Name, f.Type, f.Old, f.New)
if len(f.Annotations) != 0 {
out += fmt.Sprintf(" (%s)", strings.Join(f.Annotations, ", "))
}
return out
}
func (f *FieldDiff) Less(other *FieldDiff) bool {

View File

@ -181,6 +181,103 @@ func TestJobDiff(t *testing.T) {
New: "",
},
},
Objects: []*ObjectDiff{
{
Type: DiffTypeDeleted,
Name: "Update",
Fields: []*FieldDiff{
{
Type: DiffTypeDeleted,
Name: "MaxParallel",
Old: "0",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "Stagger",
Old: "0",
New: "",
},
},
},
},
},
},
{
// Primitive only added job
Old: nil,
New: &Job{
Region: "foo",
ID: "foo",
Name: "foo",
Type: "batch",
Priority: 10,
AllAtOnce: true,
Meta: map[string]string{
"foo": "bar",
},
},
Expected: &JobDiff{
Type: DiffTypeAdded,
ID: "foo",
Fields: []*FieldDiff{
{
Type: DiffTypeAdded,
Name: "AllAtOnce",
Old: "",
New: "true",
},
{
Type: DiffTypeAdded,
Name: "Meta[foo]",
Old: "",
New: "bar",
},
{
Type: DiffTypeAdded,
Name: "Name",
Old: "",
New: "foo",
},
{
Type: DiffTypeAdded,
Name: "Priority",
Old: "",
New: "10",
},
{
Type: DiffTypeAdded,
Name: "Region",
Old: "",
New: "foo",
},
{
Type: DiffTypeAdded,
Name: "Type",
Old: "",
New: "batch",
},
},
Objects: []*ObjectDiff{
{
Type: DiffTypeAdded,
Name: "Update",
Fields: []*FieldDiff{
{
Type: DiffTypeAdded,
Name: "MaxParallel",
Old: "",
New: "0",
},
{
Type: DiffTypeAdded,
Name: "Stagger",
Old: "",
New: "0",
},
},
},
},
},
},
{

View File

@ -209,6 +209,14 @@ type JobListRequest struct {
QueryOptions
}
// JobPlanRequest is used for the Job.Plan endpoint to trigger a dry-run
// evaluation of the Job.
type JobPlanRequest struct {
Job *Job
Diff bool // Toggles an annotated diff
WriteRequest
}
// NodeListRequest is used to parameterize a list request
type NodeListRequest struct {
QueryOptions
@ -390,6 +398,27 @@ type JobListResponse struct {
QueryMeta
}
// JobPlanResponse is used to respond to a job plan request
type JobPlanResponse struct {
// Annotations stores annotations explaining decisions the scheduler made.
Annotations *PlanAnnotations
// JobModifyIndex is the modification index of the job. The value can be
// used when running `nomad run` to ensure that the Job wasnt modified
// since the last plan. If the job is being created, the value is zero.
JobModifyIndex uint64
// CreatedEvals is the set of evaluations created by the scheduler. The
// reasons for this can be rolling-updates or blocked evals.
CreatedEvals []*Evaluation
// Diff contains the diff of the job and annotations on whether the change
// causes an in-place update or create/destroy
Diff *JobDiff
WriteMeta
}
// SingleAllocResponse is used to return a single allocation
type SingleAllocResponse struct {
Alloc *Allocation
@ -2596,6 +2625,10 @@ type Evaluation struct {
// captured by computed node classes.
EscapedComputedClass bool
// AnnotatePlan triggers the scheduler to provide additional annotations
// during the evaluation. This should not be set during normal operations.
AnnotatePlan bool
// Raft Indexes
CreateIndex uint64
ModifyIndex uint64
@ -2740,6 +2773,10 @@ type Plan struct {
// but are persisted so that the user can use the feedback
// to determine the cause.
FailedAllocs []*Allocation
// Annotations contains annotations by the scheduler to be used by operators
// to understand the decisions made by the scheduler.
Annotations *PlanAnnotations
}
func (p *Plan) AppendUpdate(alloc *Allocation, status, desc string) {
@ -2836,6 +2873,24 @@ func (p *PlanResult) FullCommit(plan *Plan) (bool, int, int) {
return actual == expected, expected, actual
}
// PlanAnnotations holds annotations made by the scheduler to give further debug
// information to operators.
type PlanAnnotations struct {
// DesiredTGUpdates is the set of desired updates per task group.
DesiredTGUpdates map[string]*DesiredUpdates
}
// DesiredUpdates is the set of changes the scheduler would like to make given
// sufficient resources and cluster capacity.
type DesiredUpdates struct {
Ignore uint64
Place uint64
Migrate uint64
Stop uint64
InPlaceUpdate uint64
DestructiveUpdate uint64
}
// msgpackHandle is a shared handle for encoding/decoding of structs
var MsgpackHandle = func() *codec.MsgpackHandle {
h := &codec.MsgpackHandle{RawToString: true}

185
scheduler/annotate.go Normal file
View File

@ -0,0 +1,185 @@
package scheduler
import (
"strconv"
"github.com/hashicorp/nomad/nomad/structs"
)
const (
AnnotationForcesCreate = "forces create"
AnnotationForcesDestroy = "forces destroy"
AnnotationForcesInplaceUpdate = "forces in-place update"
AnnotationForcesDestructiveUpdate = "forces create/destroy update"
)
// UpdateTypes denote the type of update to occur against the task group.
const (
UpdateTypeIgnore = "ignore"
UpdateTypeCreate = "create"
UpdateTypeDestroy = "destroy"
UpdateTypeMigrate = "migrate"
UpdateTypeInplaceUpdate = "in-place update"
UpdateTypeDestructiveUpdate = "create/destroy update"
)
// Annotate takes the diff between the old and new version of a Job, the
// scheduler's plan annotations and will add annotations to the diff to aide
// human understanding of the plan.
//
// Currently the things that are annotated are:
// * Task group changes will be annotated with:
// * Count up and count down changes
// * Update counts (creates, destroys, migrates, etc)
// * Task changes will be annotated with:
// * forces create/destroy update
// * forces in-place update
func Annotate(diff *structs.JobDiff, annotations *structs.PlanAnnotations) error {
tgDiffs := diff.TaskGroups
if len(tgDiffs) == 0 {
return nil
}
for _, tgDiff := range tgDiffs {
if err := annotateTaskGroup(tgDiff, annotations); err != nil {
return err
}
}
return nil
}
// annotateTaskGroup takes a task group diff and annotates it.
func annotateTaskGroup(diff *structs.TaskGroupDiff, annotations *structs.PlanAnnotations) error {
// Annotate the updates
if annotations != nil {
tg, ok := annotations.DesiredTGUpdates[diff.Name]
if ok {
if diff.Updates == nil {
diff.Updates = make(map[string]uint64, 6)
}
if tg.Ignore != 0 {
diff.Updates[UpdateTypeIgnore] = tg.Ignore
}
if tg.Place != 0 {
diff.Updates[UpdateTypeCreate] = tg.Place
}
if tg.Migrate != 0 {
diff.Updates[UpdateTypeMigrate] = tg.Migrate
}
if tg.Stop != 0 {
diff.Updates[UpdateTypeDestroy] = tg.Stop
}
if tg.InPlaceUpdate != 0 {
diff.Updates[UpdateTypeInplaceUpdate] = tg.InPlaceUpdate
}
if tg.DestructiveUpdate != 0 {
diff.Updates[UpdateTypeDestructiveUpdate] = tg.DestructiveUpdate
}
}
}
// Annotate the count
if err := annotateCountChange(diff); err != nil {
return err
}
// Annotate the tasks.
taskDiffs := diff.Tasks
if len(taskDiffs) == 0 {
return nil
}
for _, taskDiff := range taskDiffs {
annotateTask(taskDiff, diff)
}
return nil
}
// annotateCountChange takes a task group diff and annotates the count
// parameter.
func annotateCountChange(diff *structs.TaskGroupDiff) error {
var countDiff *structs.FieldDiff
for _, diff := range diff.Fields {
if diff.Name == "Count" {
countDiff = diff
break
}
}
// Didn't find
if countDiff == nil {
return nil
}
var oldV, newV int
var err error
if countDiff.Old == "" {
oldV = 0
} else {
oldV, err = strconv.Atoi(countDiff.Old)
if err != nil {
return err
}
}
if countDiff.New == "" {
newV = 0
} else {
newV, err = strconv.Atoi(countDiff.New)
if err != nil {
return err
}
}
if oldV < newV {
countDiff.Annotations = append(countDiff.Annotations, AnnotationForcesCreate)
} else if newV < oldV {
countDiff.Annotations = append(countDiff.Annotations, AnnotationForcesDestroy)
}
return nil
}
// annotateCountChange takes a task diff and annotates it.
func annotateTask(diff *structs.TaskDiff, parent *structs.TaskGroupDiff) {
if diff.Type == structs.DiffTypeNone {
return
}
// The whole task group is changing
if parent.Type == structs.DiffTypeAdded || parent.Type == structs.DiffTypeDeleted {
if diff.Type == structs.DiffTypeAdded {
diff.Annotations = append(diff.Annotations, AnnotationForcesCreate)
return
} else if diff.Type == structs.DiffTypeDeleted {
diff.Annotations = append(diff.Annotations, AnnotationForcesDestroy)
return
}
}
// All changes to primitive fields result in a destructive update.
destructive := false
if len(diff.Fields) != 0 {
destructive = true
}
// Changes that can be done in-place are log configs, services and
// constraints.
for _, oDiff := range diff.Objects {
switch oDiff.Name {
case "LogConfig", "Service", "Constraint":
continue
default:
destructive = true
break
}
}
if destructive {
diff.Annotations = append(diff.Annotations, AnnotationForcesDestructiveUpdate)
} else {
diff.Annotations = append(diff.Annotations, AnnotationForcesInplaceUpdate)
}
}

422
scheduler/annotate_test.go Normal file
View File

@ -0,0 +1,422 @@
package scheduler
import (
"reflect"
"testing"
"github.com/hashicorp/nomad/nomad/structs"
)
func TestAnnotateTaskGroup_Updates(t *testing.T) {
annotations := &structs.PlanAnnotations{
DesiredTGUpdates: map[string]*structs.DesiredUpdates{
"foo": &structs.DesiredUpdates{
Ignore: 1,
Place: 2,
Migrate: 3,
Stop: 4,
InPlaceUpdate: 5,
DestructiveUpdate: 6,
},
},
}
tgDiff := &structs.TaskGroupDiff{
Type: structs.DiffTypeEdited,
Name: "foo",
}
expected := &structs.TaskGroupDiff{
Type: structs.DiffTypeEdited,
Name: "foo",
Updates: map[string]uint64{
UpdateTypeIgnore: 1,
UpdateTypeCreate: 2,
UpdateTypeMigrate: 3,
UpdateTypeDestroy: 4,
UpdateTypeInplaceUpdate: 5,
UpdateTypeDestructiveUpdate: 6,
},
}
if err := annotateTaskGroup(tgDiff, annotations); err != nil {
t.Fatalf("annotateTaskGroup(%#v, %#v) failed: %#v", tgDiff, annotations, err)
}
if !reflect.DeepEqual(tgDiff, expected) {
t.Fatalf("got %#v, want %#v", tgDiff, expected)
}
}
func TestAnnotateCountChange_NonEdited(t *testing.T) {
tg := &structs.TaskGroupDiff{}
tgOrig := &structs.TaskGroupDiff{}
annotateCountChange(tg)
if !reflect.DeepEqual(tgOrig, tg) {
t.Fatalf("annotateCountChange(%#v) should not have caused any annotation: %#v", tgOrig, tg)
}
}
func TestAnnotateCountChange(t *testing.T) {
up := &structs.FieldDiff{
Type: structs.DiffTypeEdited,
Name: "Count",
Old: "1",
New: "3",
}
down := &structs.FieldDiff{
Type: structs.DiffTypeEdited,
Name: "Count",
Old: "3",
New: "1",
}
tgUp := &structs.TaskGroupDiff{
Type: structs.DiffTypeEdited,
Fields: []*structs.FieldDiff{up},
}
tgDown := &structs.TaskGroupDiff{
Type: structs.DiffTypeEdited,
Fields: []*structs.FieldDiff{down},
}
// Test the up case
if err := annotateCountChange(tgUp); err != nil {
t.Fatalf("annotateCountChange(%#v) failed: %v", tgUp, err)
}
countDiff := tgUp.Fields[0]
if len(countDiff.Annotations) != 1 || countDiff.Annotations[0] != AnnotationForcesCreate {
t.Fatalf("incorrect annotation: %#v", tgUp)
}
// Test the down case
if err := annotateCountChange(tgDown); err != nil {
t.Fatalf("annotateCountChange(%#v) failed: %v", tgDown, err)
}
countDiff = tgDown.Fields[0]
if len(countDiff.Annotations) != 1 || countDiff.Annotations[0] != AnnotationForcesDestroy {
t.Fatalf("incorrect annotation: %#v", tgDown)
}
}
func TestAnnotateTask_NonEdited(t *testing.T) {
tgd := &structs.TaskGroupDiff{Type: structs.DiffTypeNone}
td := &structs.TaskDiff{Type: structs.DiffTypeNone}
tdOrig := &structs.TaskDiff{Type: structs.DiffTypeNone}
annotateTask(td, tgd)
if !reflect.DeepEqual(tdOrig, td) {
t.Fatalf("annotateTask(%#v) should not have caused any annotation: %#v", tdOrig, td)
}
}
func TestAnnotateTask(t *testing.T) {
cases := []struct {
Diff *structs.TaskDiff
Parent *structs.TaskGroupDiff
Desired string
}{
{
Diff: &structs.TaskDiff{
Type: structs.DiffTypeEdited,
Fields: []*structs.FieldDiff{
{
Type: structs.DiffTypeEdited,
Name: "Driver",
Old: "docker",
New: "exec",
},
},
},
Parent: &structs.TaskGroupDiff{Type: structs.DiffTypeEdited},
Desired: AnnotationForcesDestructiveUpdate,
},
{
Diff: &structs.TaskDiff{
Type: structs.DiffTypeEdited,
Fields: []*structs.FieldDiff{
{
Type: structs.DiffTypeEdited,
Name: "User",
Old: "alice",
New: "bob",
},
},
},
Parent: &structs.TaskGroupDiff{Type: structs.DiffTypeEdited},
Desired: AnnotationForcesDestructiveUpdate,
},
{
Diff: &structs.TaskDiff{
Type: structs.DiffTypeEdited,
Fields: []*structs.FieldDiff{
{
Type: structs.DiffTypeAdded,
Name: "Env[foo]",
Old: "foo",
New: "bar",
},
},
},
Parent: &structs.TaskGroupDiff{Type: structs.DiffTypeEdited},
Desired: AnnotationForcesDestructiveUpdate,
},
{
Diff: &structs.TaskDiff{
Type: structs.DiffTypeEdited,
Fields: []*structs.FieldDiff{
{
Type: structs.DiffTypeAdded,
Name: "Meta[foo]",
Old: "foo",
New: "bar",
},
},
},
Parent: &structs.TaskGroupDiff{Type: structs.DiffTypeEdited},
Desired: AnnotationForcesDestructiveUpdate,
},
{
Diff: &structs.TaskDiff{
Type: structs.DiffTypeEdited,
Objects: []*structs.ObjectDiff{
{
Type: structs.DiffTypeAdded,
Name: "Artifact",
Fields: []*structs.FieldDiff{
{
Type: structs.DiffTypeAdded,
Name: "GetterOptions[bam]",
Old: "",
New: "baz",
},
{
Type: structs.DiffTypeAdded,
Name: "GetterSource",
Old: "",
New: "bam",
},
{
Type: structs.DiffTypeAdded,
Name: "RelativeDest",
Old: "",
New: "bam",
},
},
},
},
},
Parent: &structs.TaskGroupDiff{Type: structs.DiffTypeEdited},
Desired: AnnotationForcesDestructiveUpdate,
},
{
Diff: &structs.TaskDiff{
Type: structs.DiffTypeEdited,
Objects: []*structs.ObjectDiff{
{
Type: structs.DiffTypeEdited,
Name: "Resources",
Fields: []*structs.FieldDiff{
{
Type: structs.DiffTypeEdited,
Name: "CPU",
Old: "100",
New: "200",
},
{
Type: structs.DiffTypeEdited,
Name: "DiskMB",
Old: "100",
New: "200",
},
{
Type: structs.DiffTypeEdited,
Name: "IOPS",
Old: "100",
New: "200",
},
{
Type: structs.DiffTypeEdited,
Name: "MemoryMB",
Old: "100",
New: "200",
},
},
},
},
},
Parent: &structs.TaskGroupDiff{Type: structs.DiffTypeEdited},
Desired: AnnotationForcesDestructiveUpdate,
},
{
Diff: &structs.TaskDiff{
Type: structs.DiffTypeEdited,
Objects: []*structs.ObjectDiff{
{
Type: structs.DiffTypeEdited,
Name: "Config",
Fields: []*structs.FieldDiff{
{
Type: structs.DiffTypeEdited,
Name: "bam[1]",
Old: "b",
New: "c",
},
},
},
},
},
Parent: &structs.TaskGroupDiff{Type: structs.DiffTypeEdited},
Desired: AnnotationForcesDestructiveUpdate,
},
{
Diff: &structs.TaskDiff{
Type: structs.DiffTypeEdited,
Objects: []*structs.ObjectDiff{
{
Type: structs.DiffTypeAdded,
Name: "Constraint",
Fields: []*structs.FieldDiff{
{
Type: structs.DiffTypeAdded,
Name: "LTarget",
Old: "",
New: "baz",
},
{
Type: structs.DiffTypeAdded,
Name: "Operand",
Old: "",
New: "baz",
},
{
Type: structs.DiffTypeAdded,
Name: "RTarget",
Old: "",
New: "baz",
},
},
},
},
},
Parent: &structs.TaskGroupDiff{Type: structs.DiffTypeEdited},
Desired: AnnotationForcesInplaceUpdate,
},
{
Diff: &structs.TaskDiff{
Type: structs.DiffTypeEdited,
Objects: []*structs.ObjectDiff{
{
Type: structs.DiffTypeAdded,
Name: "LogConfig",
Fields: []*structs.FieldDiff{
{
Type: structs.DiffTypeAdded,
Name: "MaxFileSizeMB",
Old: "",
New: "10",
},
{
Type: structs.DiffTypeAdded,
Name: "MaxFiles",
Old: "",
New: "1",
},
},
},
},
},
Parent: &structs.TaskGroupDiff{Type: structs.DiffTypeEdited},
Desired: AnnotationForcesInplaceUpdate,
},
{
Diff: &structs.TaskDiff{
Type: structs.DiffTypeEdited,
Objects: []*structs.ObjectDiff{
{
Type: structs.DiffTypeEdited,
Name: "Service",
Fields: []*structs.FieldDiff{
{
Type: structs.DiffTypeEdited,
Name: "PortLabel",
Old: "baz",
New: "baz2",
},
},
},
},
},
Parent: &structs.TaskGroupDiff{Type: structs.DiffTypeEdited},
Desired: AnnotationForcesInplaceUpdate,
},
// Task deleted new parent
{
Diff: &structs.TaskDiff{
Type: structs.DiffTypeDeleted,
Fields: []*structs.FieldDiff{
{
Type: structs.DiffTypeAdded,
Name: "Driver",
Old: "",
New: "exec",
},
},
},
Parent: &structs.TaskGroupDiff{Type: structs.DiffTypeAdded},
Desired: AnnotationForcesDestroy,
},
// Task Added new parent
{
Diff: &structs.TaskDiff{
Type: structs.DiffTypeAdded,
Fields: []*structs.FieldDiff{
{
Type: structs.DiffTypeAdded,
Name: "Driver",
Old: "",
New: "exec",
},
},
},
Parent: &structs.TaskGroupDiff{Type: structs.DiffTypeAdded},
Desired: AnnotationForcesCreate,
},
// Task deleted existing parent
{
Diff: &structs.TaskDiff{
Type: structs.DiffTypeDeleted,
Fields: []*structs.FieldDiff{
{
Type: structs.DiffTypeAdded,
Name: "Driver",
Old: "",
New: "exec",
},
},
},
Parent: &structs.TaskGroupDiff{Type: structs.DiffTypeEdited},
Desired: AnnotationForcesDestructiveUpdate,
},
// Task Added existing parent
{
Diff: &structs.TaskDiff{
Type: structs.DiffTypeAdded,
Fields: []*structs.FieldDiff{
{
Type: structs.DiffTypeAdded,
Name: "Driver",
Old: "",
New: "exec",
},
},
},
Parent: &structs.TaskGroupDiff{Type: structs.DiffTypeEdited},
Desired: AnnotationForcesDestructiveUpdate,
},
}
for i, c := range cases {
annotateTask(c.Diff, c.Parent)
if len(c.Diff.Annotations) != 1 || c.Diff.Annotations[0] != c.Desired {
t.Fatalf("case %d not properly annotated; got %s, want %s", i+1, c.Diff.Annotations[0], c.Desired)
}
}
}

View File

@ -173,8 +173,9 @@ func (s *GenericScheduler) process() (bool, error) {
return false, err
}
// If the plan is a no-op, we can bail
if s.plan.IsNoOp() {
// If the plan is a no-op, we can bail. If AnnotatePlan is set submit the plan
// anyways to get the annotations.
if s.plan.IsNoOp() && !s.eval.AnnotatePlan {
return true, nil
}
@ -323,7 +324,15 @@ func (s *GenericScheduler) computeJobAllocs() error {
}
// Attempt to do the upgrades in place
diff.update = inplaceUpdate(s.ctx, s.eval, s.job, s.stack, diff.update)
destructiveUpdates := inplaceUpdate(s.ctx, s.eval, s.job, s.stack, diff.update)
inplaceUpdates := diff.update[len(destructiveUpdates):]
diff.update = destructiveUpdates
if s.eval.AnnotatePlan {
s.plan.Annotations = &structs.PlanAnnotations{
DesiredTGUpdates: desiredUpdates(diff, inplaceUpdates, destructiveUpdates),
}
}
// Check if a rolling upgrade strategy is being used
limit := len(diff.update) + len(diff.migrate)

View File

@ -2,6 +2,7 @@ package scheduler
import (
"fmt"
"reflect"
"testing"
"time"
@ -42,6 +43,11 @@ func TestServiceSched_JobRegister(t *testing.T) {
}
plan := h.Plans[0]
// Ensure the plan doesn't have annotations.
if plan.Annotations != nil {
t.Fatalf("expected no annotations")
}
// Ensure the plan allocated
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
@ -76,6 +82,81 @@ func TestServiceSched_JobRegister(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestServiceSched_JobRegister_Annotate(t *testing.T) {
h := NewHarness(t)
// Create some nodes
for i := 0; i < 10; i++ {
node := mock.Node()
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
}
// Create a job
job := mock.Job()
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
AnnotatePlan: true,
}
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure a single plan
if len(h.Plans) != 1 {
t.Fatalf("bad: %#v", h.Plans)
}
plan := h.Plans[0]
// Ensure the plan allocated
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
}
if len(planned) != 10 {
t.Fatalf("bad: %#v", plan)
}
// Lookup the allocations by JobID
out, err := h.State.AllocsByJob(job.ID)
noErr(t, err)
// Ensure all allocations placed
if len(out) != 10 {
t.Fatalf("bad: %#v", out)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
// Ensure the plan had annotations.
if plan.Annotations == nil {
t.Fatalf("expected annotations")
}
desiredTGs := plan.Annotations.DesiredTGUpdates
if l := len(desiredTGs); l != 1 {
t.Fatalf("incorrect number of task groups; got %v; want %v", l, 1)
}
desiredChanges, ok := desiredTGs["web"]
if !ok {
t.Fatalf("expected task group web to have desired changes")
}
expected := &structs.DesiredUpdates{Place: 10}
if !reflect.DeepEqual(desiredChanges, expected) {
t.Fatalf("Unexpected desired updates; got %#v; want %#v", desiredChanges, expected)
}
}
func TestServiceSched_JobRegister_CountZero(t *testing.T) {
h := NewHarness(t)

View File

@ -113,8 +113,9 @@ func (s *SystemScheduler) process() (bool, error) {
return false, err
}
// If the plan is a no-op, we can bail
if s.plan.IsNoOp() {
// If the plan is a no-op, we can bail. If AnnotatePlan is set submit the plan
// anyways to get the annotations.
if s.plan.IsNoOp() && !s.eval.AnnotatePlan {
return true, nil
}
@ -185,7 +186,15 @@ func (s *SystemScheduler) computeJobAllocs() error {
}
// Attempt to do the upgrades in place
diff.update = inplaceUpdate(s.ctx, s.eval, s.job, s.stack, diff.update)
destructiveUpdates := inplaceUpdate(s.ctx, s.eval, s.job, s.stack, diff.update)
inplaceUpdates := diff.update[len(destructiveUpdates):]
diff.update = destructiveUpdates
if s.eval.AnnotatePlan {
s.plan.Annotations = &structs.PlanAnnotations{
DesiredTGUpdates: desiredUpdates(diff, inplaceUpdates, destructiveUpdates),
}
}
// Check if a rolling upgrade strategy is being used
limit := len(diff.update)

View File

@ -1,6 +1,7 @@
package scheduler
import (
"reflect"
"testing"
"time"
@ -41,6 +42,11 @@ func TestSystemSched_JobRegister(t *testing.T) {
}
plan := h.Plans[0]
// Ensure the plan doesn't have annotations.
if plan.Annotations != nil {
t.Fatalf("expected no annotations")
}
// Ensure the plan allocated
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
@ -67,6 +73,86 @@ func TestSystemSched_JobRegister(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestSystemSched_JobRegister_Annotate(t *testing.T) {
h := NewHarness(t)
// Create some nodes
for i := 0; i < 10; i++ {
node := mock.Node()
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
}
// Create a job
job := mock.SystemJob()
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
// Create a mock evaluation to deregister the job
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
AnnotatePlan: true,
}
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure a single plan
if len(h.Plans) != 1 {
t.Fatalf("bad: %#v", h.Plans)
}
plan := h.Plans[0]
// Ensure the plan allocated
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
}
if len(planned) != 10 {
t.Fatalf("bad: %#v", plan)
}
// Lookup the allocations by JobID
out, err := h.State.AllocsByJob(job.ID)
noErr(t, err)
// Ensure all allocations placed
if len(out) != 10 {
t.Fatalf("bad: %#v", out)
}
// Check the available nodes
if count, ok := out[0].Metrics.NodesAvailable["dc1"]; !ok || count != 10 {
t.Fatalf("bad: %#v", out[0].Metrics)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
// Ensure the plan had annotations.
if plan.Annotations == nil {
t.Fatalf("expected annotations")
}
desiredTGs := plan.Annotations.DesiredTGUpdates
if l := len(desiredTGs); l != 1 {
t.Fatalf("incorrect number of task groups; got %v; want %v", l, 1)
}
desiredChanges, ok := desiredTGs["web"]
if !ok {
t.Fatalf("expected task group web to have desired changes")
}
expected := &structs.DesiredUpdates{Place: 10}
if !reflect.DeepEqual(desiredChanges, expected) {
t.Fatalf("Unexpected desired updates; got %#v; want %#v", desiredChanges, expected)
}
}
func TestSystemSched_JobRegister_AddNode(t *testing.T) {
h := NewHarness(t)

View File

@ -29,9 +29,9 @@ func (r *RejectPlan) CreateEval(*structs.Evaluation) error {
return nil
}
// Harness is a lightweight testing harness for schedulers.
// It manages a state store copy and provides the planner
// interface. It can be extended for various testing uses.
// Harness is a lightweight testing harness for schedulers. It manages a state
// store copy and provides the planner interface. It can be extended for various
// testing uses or for invoking the scheduler without side effects.
type Harness struct {
State *state.StateStore
@ -178,10 +178,3 @@ func (h *Harness) AssertEvalStatus(t *testing.T, state string) {
t.Fatalf("bad: %#v", update)
}
}
// noErr is used to assert there are no errors
func noErr(t *testing.T, err error) {
if err != nil {
t.Fatalf("err: %v", err)
}
}

View File

@ -500,3 +500,79 @@ func taskGroupConstraints(tg *structs.TaskGroup) tgConstrainTuple {
return c
}
// desiredUpdates takes the diffResult as well as the set of inplace and
// destructive updates and returns a map of task groups to their set of desired
// updates.
func desiredUpdates(diff *diffResult, inplaceUpdates,
destructiveUpdates []allocTuple) map[string]*structs.DesiredUpdates {
desiredTgs := make(map[string]*structs.DesiredUpdates)
for _, tuple := range diff.place {
name := tuple.TaskGroup.Name
des, ok := desiredTgs[name]
if !ok {
des = &structs.DesiredUpdates{}
desiredTgs[name] = des
}
des.Place++
}
for _, tuple := range diff.stop {
name := tuple.Alloc.TaskGroup
des, ok := desiredTgs[name]
if !ok {
des = &structs.DesiredUpdates{}
desiredTgs[name] = des
}
des.Stop++
}
for _, tuple := range diff.ignore {
name := tuple.TaskGroup.Name
des, ok := desiredTgs[name]
if !ok {
des = &structs.DesiredUpdates{}
desiredTgs[name] = des
}
des.Ignore++
}
for _, tuple := range diff.migrate {
name := tuple.TaskGroup.Name
des, ok := desiredTgs[name]
if !ok {
des = &structs.DesiredUpdates{}
desiredTgs[name] = des
}
des.Migrate++
}
for _, tuple := range inplaceUpdates {
name := tuple.TaskGroup.Name
des, ok := desiredTgs[name]
if !ok {
des = &structs.DesiredUpdates{}
desiredTgs[name] = des
}
des.InPlaceUpdate++
}
for _, tuple := range destructiveUpdates {
name := tuple.TaskGroup.Name
des, ok := desiredTgs[name]
if !ok {
des = &structs.DesiredUpdates{}
desiredTgs[name] = des
}
des.DestructiveUpdate++
}
return desiredTgs
}

View File

@ -12,6 +12,13 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
// noErr is used to assert there are no errors
func noErr(t *testing.T, err error) {
if err != nil {
t.Fatalf("err: %v", err)
}
}
func TestMaterializeTaskGroups(t *testing.T) {
job := mock.Job()
index := materializeTaskGroups(job)
@ -779,3 +786,61 @@ func TestProgressMade(t *testing.T) {
t.Fatal("bad")
}
}
func TestDesiredUpdates(t *testing.T) {
tg1 := &structs.TaskGroup{Name: "foo"}
tg2 := &structs.TaskGroup{Name: "bar"}
a2 := &structs.Allocation{TaskGroup: "bar"}
place := []allocTuple{
allocTuple{TaskGroup: tg1},
allocTuple{TaskGroup: tg1},
allocTuple{TaskGroup: tg1},
allocTuple{TaskGroup: tg2},
}
stop := []allocTuple{
allocTuple{TaskGroup: tg2, Alloc: a2},
allocTuple{TaskGroup: tg2, Alloc: a2},
}
ignore := []allocTuple{
allocTuple{TaskGroup: tg1},
}
migrate := []allocTuple{
allocTuple{TaskGroup: tg2},
}
inplace := []allocTuple{
allocTuple{TaskGroup: tg1},
allocTuple{TaskGroup: tg1},
}
destructive := []allocTuple{
allocTuple{TaskGroup: tg1},
allocTuple{TaskGroup: tg2},
allocTuple{TaskGroup: tg2},
}
diff := &diffResult{
place: place,
stop: stop,
ignore: ignore,
migrate: migrate,
}
expected := map[string]*structs.DesiredUpdates{
"foo": {
Place: 3,
Ignore: 1,
InPlaceUpdate: 2,
DestructiveUpdate: 1,
},
"bar": {
Place: 1,
Stop: 2,
Migrate: 1,
DestructiveUpdate: 2,
},
}
desired := desiredUpdates(diff, inplace, destructive)
if !reflect.DeepEqual(desired, expected) {
t.Fatalf("desiredUpdates() returned %#v; want %#v", desired, expected)
}
}