scheduler: working on node selection

This commit is contained in:
Armon Dadgar 2015-08-13 14:03:03 -07:00
parent 33f4e3f0dc
commit faad7db9f7
1 changed files with 145 additions and 11 deletions

View File

@ -3,7 +3,9 @@ package scheduler
import (
"fmt"
"log"
"math"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -99,14 +101,152 @@ func (s *ServiceScheduler) handleJobRegister(eval *structs.Evaluation) error {
place = append(place, update...)
// Attempt to place all the allocations
planAllocations(job, plan, place, groups)
s.planAllocations(job, plan, place, groups)
// TODO
return nil
}
func planAllocations(job *structs.Job, plan *structs.Plan,
place []allocNameID, groups map[string]*structs.TaskGroup) {
type IteratorStack struct {
Context *EvalContext
BaseNodes []*structs.Node
Source *StaticIterator
JobConstraint *ConstraintIterator
TaskGroupDrivers *DriverIterator
TaskGroupConstraint *ConstraintIterator
RankSource *FeasibleRankIterator
BinPack *BinPackIterator
Limit *LimitIterator
MaxScore *MaxScoreIterator
}
func (s *ServiceScheduler) iterStack(job *structs.Job,
plan *structs.Plan) (*IteratorStack, error) {
// Create a new stack
stack := new(IteratorStack)
// Create an evaluation context
stack.Context = NewEvalContext(s.state, plan, s.logger)
// Get the base nodes
nodes, err := s.baseNodes(job)
if err != nil {
return nil, err
}
stack.BaseNodes = nodes
// Create the source iterator. We randomize the order we visit nodes
// to reduce collisions between schedulers and to do a basic load
// balancing across eligible nodes.
stack.Source = NewRandomIterator(stack.Context, stack.BaseNodes)
// Attach the job constraints.
stack.JobConstraint = NewConstraintIterator(stack.Context, stack.Source, job.Constraints)
// Create the task group filters, this must be filled in later
stack.TaskGroupDrivers = NewDriverIterator(stack.Context, stack.JobConstraint, nil)
stack.TaskGroupConstraint = NewConstraintIterator(stack.Context, stack.TaskGroupDrivers, nil)
// Upgrade from feasible to rank iterator
stack.RankSource = NewFeasibleRankIterator(stack.Context, stack.TaskGroupConstraint)
// Apply the bin packing, this depends on the resources needed by
// a particular task group.
// TODO: Support eviction in the future
stack.BinPack = NewBinPackIterator(stack.Context, stack.RankSource, nil, false, job.Priority)
// Apply a limit function. This is to avoid scanning *every* possible node.
// Instead we need to visit "enough". Using a log of the total number of
// nodes is a good restriction, with at least 2 as the floor
limit := 2
if n := len(nodes); n > 0 {
logLimit := int(math.Ceil(math.Log2(float64(n))))
if logLimit > limit {
limit = logLimit
}
}
stack.Limit = NewLimitIterator(stack.Context, stack.BinPack, limit)
// Select the node with the maximum score for placement
stack.MaxScore = NewMaxScoreIterator(stack.Context, stack.Limit)
return stack, nil
}
// baseNodes returns all the ready nodes in a datacenter that this
// job has specified is usable.
func (s *ServiceScheduler) baseNodes(job *structs.Job) ([]*structs.Node, error) {
var out []*structs.Node
for _, dc := range job.Datacenters {
iter, err := s.state.NodesByDatacenterStatus(dc, structs.NodeStatusReady)
if err != nil {
return nil, err
}
for {
raw := iter.Next()
if raw == nil {
break
}
out = append(out, raw.(*structs.Node))
}
}
return out, nil
}
func (s *ServiceScheduler) planAllocations(job *structs.Job, plan *structs.Plan,
place []allocNameID, groups map[string]*structs.TaskGroup) error {
// Get the iteration stack
stack, err := s.iterStack(job, plan)
if err != nil {
return err
}
// Attempt to place each missing allocation
for _, missing := range place {
taskGroup := groups[missing.Name]
// Collect the constraints, drivers and resources required by each
// sub-task to aggregate the TaskGroup totals
constr := make([]*structs.Constraint, 0, len(taskGroup.Constraints))
drivers := make(map[string]struct{})
size := new(structs.Resources)
constr = append(constr, taskGroup.Constraints...)
for _, task := range taskGroup.Tasks {
drivers[task.Driver] = struct{}{}
constr = append(constr, task.Constraints...)
size.Add(task.Resources)
}
// Reset the iterator stack
// stack.MaxScore.Reset()
// Update the parameters of the sub-iterators
stack.TaskGroupDrivers.SetDrivers(drivers)
stack.TaskGroupConstraint.SetConstraints(constr)
stack.BinPack.SetResources(size)
// Select the best fit
option := stack.MaxScore.Next()
if option == nil {
s.logger.Printf("[DEBUG] sched: failed to place alloc %s for job %s",
missing, job.ID)
continue
}
// Create an allocation for this
alloc := &structs.Allocation{
ID: mock.GenerateUUID(),
Name: missing.Name,
NodeID: option.Node.ID,
JobID: job.ID,
Job: job,
Resources: size,
Metrics: nil,
Status: structs.AllocStatusPending,
}
plan.AppendAlloc(alloc)
}
return nil
}
// handleJobDeregister is used to handle a job being deregistered
@ -135,9 +275,7 @@ START:
// Add each alloc to be evicted
for _, alloc := range allocs {
nodeEvict := plan.NodeEvict[alloc.NodeID]
nodeEvict = append(nodeEvict, alloc.ID)
plan.NodeEvict[alloc.NodeID] = nodeEvict
plan.AppendEvict(alloc)
}
// Submit the plan
@ -250,11 +388,7 @@ func addEvictsToPlan(plan *structs.Plan,
if alloc.ID != evict.ID {
continue
}
// Add this eviction to the per-node list
nodeEvict := plan.NodeEvict[alloc.NodeID]
nodeEvict = append(nodeEvict, evict.ID)
plan.NodeEvict[alloc.NodeID] = nodeEvict
plan.AppendEvict(alloc)
}
}
}