scheduler: support in-place allocation updates

This commit is contained in:
Armon Dadgar 2015-09-07 12:27:12 -07:00
parent c2eff48412
commit c8bcb292a0
2 changed files with 189 additions and 22 deletions

View File

@ -25,6 +25,9 @@ const (
// allocUpdating is the status used when a job requires an update
allocUpdating = "alloc is being updated due to job update"
// allocInPlace is the status used when speculating on an in-place update
allocInPlace = "alloc updating in-place"
)
// SetStatusError is used to set the status of the evaluation to the given error
@ -49,6 +52,7 @@ type GenericScheduler struct {
batch bool
eval *structs.Evaluation
job *structs.Job
plan *structs.Plan
ctx *EvalContext
stack *GenericStack
@ -78,7 +82,7 @@ func NewBatchScheduler(logger *log.Logger, state State, planner Planner) Schedul
// setStatus is used to update the status of the evaluation
func (s *GenericScheduler) setStatus(status, desc string) error {
s.logger.Printf("[DEBUG] sched: %#v: setting status to %s (%s)", s.eval, status, desc)
s.logger.Printf("[DEBUG] sched: %#v: setting status to %s", s.eval, status)
newEval := s.eval.Copy()
newEval.Status = status
newEval.StatusDescription = desc
@ -120,23 +124,27 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
// further work or we've made the maximum number of attempts.
func (s *GenericScheduler) process() (bool, error) {
// Lookup the Job by ID
job, err := s.state.JobByID(s.eval.JobID)
var err error
s.job, err = s.state.JobByID(s.eval.JobID)
if err != nil {
return false, fmt.Errorf("failed to get job '%s': %v",
s.eval.JobID, err)
}
// Create a plan
s.plan = s.eval.MakePlan(job)
s.plan = s.eval.MakePlan(s.job)
// Create an evaluation context
s.ctx = NewEvalContext(s.state, s.plan, s.logger)
// Construct the placement stack
s.stack = NewGenericStack(s.batch, s.ctx, nil)
if s.job != nil {
s.stack.SetJob(s.job)
}
// Compute the target job allocations
if err := s.computeJobAllocs(job); err != nil {
if err := s.computeJobAllocs(); err != nil {
s.logger.Printf("[ERR] sched: %#v: %v", s.eval, err)
return false, err
}
@ -173,11 +181,11 @@ func (s *GenericScheduler) process() (bool, error) {
// computeJobAllocs is used to reconcile differences between the job,
// existing allocations and node status to update the allocations.
func (s *GenericScheduler) computeJobAllocs(job *structs.Job) error {
func (s *GenericScheduler) computeJobAllocs() error {
// Materialize all the task groups, job could be missing if deregistered
var groups map[string]*structs.TaskGroup
if job != nil {
groups = materializeTaskGroups(job)
if s.job != nil {
groups = materializeTaskGroups(s.job)
}
// Lookup the allocations by JobID
@ -195,7 +203,7 @@ func (s *GenericScheduler) computeJobAllocs(job *structs.Job) error {
}
// Diff the required and existing allocations
diff := diffAllocs(job, tainted, groups, allocs)
diff := diffAllocs(s.job, tainted, groups, allocs)
s.logger.Printf("[DEBUG] sched: %#v: %#v", s.eval, diff)
// Add all the allocs to stop
@ -203,6 +211,16 @@ func (s *GenericScheduler) computeJobAllocs(job *structs.Job) error {
s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNotNeeded)
}
// Attempt to do the upgrades in place
diff.update = s.inplaceUpdate(diff.update)
// Any updates not possible inplace we treat all as an evict + place.
// XXX: This should be done with rolling in-place updates instead.
for _, e := range diff.update {
s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocUpdating)
}
diff.place = append(diff.place, diff.update...)
// For simplicity, we treat all migrates as an evict + place.
// XXX: This could probably be done more intelligently?
for _, e := range diff.migrate {
@ -210,28 +228,88 @@ func (s *GenericScheduler) computeJobAllocs(job *structs.Job) error {
}
diff.place = append(diff.place, diff.migrate...)
// For simplicity, we treat all updates as an evict + place.
// XXX: This should be done with rolling in-place updates instead.
for _, e := range diff.update {
s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocUpdating)
}
diff.place = append(diff.place, diff.update...)
// Nothing remaining to do if placement is not required
if len(diff.place) == 0 {
return nil
}
// Compute the placements
return s.computePlacements(job, diff.place)
return s.computePlacements(diff.place)
}
func (s *GenericScheduler) computePlacements(job *structs.Job, place []allocTuple) error {
// Create an evaluation context
ctx := NewEvalContext(s.state, s.plan, s.logger)
// inplaceUpdate attempts to update allocations in-place where possible.
func (s *GenericScheduler) inplaceUpdate(updates []allocTuple) []allocTuple {
n := len(updates)
inplace := 0
for i := 0; i < n; i++ {
// Get the udpate
update := updates[i]
// Check if the task drivers or config has changed, requires
// a rolling upgrade since that cannot be done in-place.
existing := update.Alloc.Job.LookupTaskGroup(update.TaskGroup.Name)
if tasksUpdated(update.TaskGroup, existing) {
continue
}
// Get the existing node
node, err := s.state.NodeByID(update.Alloc.NodeID)
if err != nil {
s.logger.Printf("[ERR] sched: %#v failed to get node '%s': %v",
update.Alloc.NodeID, err)
continue
}
if node == nil {
continue
}
// Set the existing node as the base set
s.stack.SetNodes([]*structs.Node{node})
// Stage an eviction of the current allocation
s.plan.AppendUpdate(update.Alloc, structs.AllocDesiredStatusStop,
allocInPlace)
// Attempt to match the task group
option, size := s.stack.Select(update.TaskGroup)
// Pop the allocation
s.plan.PopUpdate(update.Alloc)
// Skip if we could not do an in-place update
if option == nil {
continue
}
// Create a shallow copy
newAlloc := new(structs.Allocation)
*newAlloc = *update.Alloc
// Update the allocation
newAlloc.EvalID = s.eval.ID
newAlloc.Job = s.job
newAlloc.Resources = size
newAlloc.Metrics = s.ctx.Metrics()
newAlloc.DesiredStatus = structs.AllocDesiredStatusRun
newAlloc.ClientStatus = structs.AllocClientStatusPending
s.plan.AppendAlloc(newAlloc)
// Remove this allocation from the slice
updates[i] = updates[n-1]
i--
n--
inplace++
}
if len(updates) > 0 {
s.logger.Printf("[DEBUG] sched: %#v: %d in-place updates of %d", s.eval, inplace, len(updates))
}
return updates[:n]
}
// computePlacements computes placements for allocations
func (s *GenericScheduler) computePlacements(place []allocTuple) error {
// Get the base nodes
nodes, err := readyNodesInDCs(s.state, job.Datacenters)
nodes, err := readyNodesInDCs(s.state, s.job.Datacenters)
if err != nil {
return err
}
@ -271,8 +349,8 @@ func (s *GenericScheduler) computePlacements(job *structs.Job, place []allocTupl
EvalID: s.eval.ID,
Name: missing.Name,
NodeID: nodeID,
JobID: job.ID,
Job: job,
JobID: s.job.ID,
Job: s.job,
TaskGroup: missing.TaskGroup.Name,
Resources: size,
Metrics: s.ctx.Metrics(),

View File

@ -133,6 +133,7 @@ func TestServiceSched_JobModify(t *testing.T) {
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = nodes[i].ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
allocs = append(allocs, alloc)
}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
@ -140,6 +141,9 @@ func TestServiceSched_JobModify(t *testing.T) {
// Update the job
job2 := mock.Job()
job2.ID = job.ID
// Update the task, such that it cannot be done in-place
job2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other"
noErr(t, h.State.UpsertJob(h.NextIndex(), job2))
// Create a mock evaluation to deal with drain
@ -193,6 +197,91 @@ func TestServiceSched_JobModify(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestServiceSched_JobModify_InPlace(t *testing.T) {
h := NewHarness(t)
// Create some nodes
var nodes []*structs.Node
for i := 0; i < 10; i++ {
node := mock.Node()
nodes = append(nodes, node)
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
}
// Generate a fake job with allocations
job := mock.Job()
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
var allocs []*structs.Allocation
for i := 0; i < 10; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = nodes[i].ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
allocs = append(allocs, alloc)
}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
// Update the job
job2 := mock.Job()
job2.ID = job.ID
noErr(t, h.State.UpsertJob(h.NextIndex(), job2))
// Create a mock evaluation to deal with drain
eval := &structs.Evaluation{
ID: mock.GenerateUUID(),
Priority: 50,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
}
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure a single plan
if len(h.Plans) != 1 {
t.Fatalf("bad: %#v", h.Plans)
}
plan := h.Plans[0]
// Ensure the plan did not evict any allocs
var update []*structs.Allocation
for _, updateList := range plan.NodeUpdate {
update = append(update, updateList...)
}
if len(update) != 0 {
t.Fatalf("bad: %#v", plan)
}
// Ensure the plan updated the existing allocs
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
}
if len(planned) != 10 {
t.Fatalf("bad: %#v", plan)
}
for _, p := range planned {
if p.Job != job2 {
t.Fatalf("should update job")
}
}
// Lookup the allocations by JobID
out, err := h.State.AllocsByJob(job.ID)
noErr(t, err)
// Ensure all allocations placed
if len(out) != 10 {
t.Fatalf("bad: %#v", out)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestServiceSched_JobDeregister(t *testing.T) {
h := NewHarness(t)