diff --git a/scheduler/service_sched.go b/scheduler/service_sched.go index cfc3ceced..3ea38cff7 100644 --- a/scheduler/service_sched.go +++ b/scheduler/service_sched.go @@ -3,7 +3,9 @@ package scheduler import ( "fmt" "log" + "math" + "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" ) @@ -99,14 +101,152 @@ func (s *ServiceScheduler) handleJobRegister(eval *structs.Evaluation) error { place = append(place, update...) // Attempt to place all the allocations - planAllocations(job, plan, place, groups) + s.planAllocations(job, plan, place, groups) // TODO return nil } -func planAllocations(job *structs.Job, plan *structs.Plan, - place []allocNameID, groups map[string]*structs.TaskGroup) { +type IteratorStack struct { + Context *EvalContext + BaseNodes []*structs.Node + Source *StaticIterator + JobConstraint *ConstraintIterator + TaskGroupDrivers *DriverIterator + TaskGroupConstraint *ConstraintIterator + RankSource *FeasibleRankIterator + BinPack *BinPackIterator + Limit *LimitIterator + MaxScore *MaxScoreIterator +} + +func (s *ServiceScheduler) iterStack(job *structs.Job, + plan *structs.Plan) (*IteratorStack, error) { + // Create a new stack + stack := new(IteratorStack) + + // Create an evaluation context + stack.Context = NewEvalContext(s.state, plan, s.logger) + + // Get the base nodes + nodes, err := s.baseNodes(job) + if err != nil { + return nil, err + } + stack.BaseNodes = nodes + + // Create the source iterator. We randomize the order we visit nodes + // to reduce collisions between schedulers and to do a basic load + // balancing across eligible nodes. + stack.Source = NewRandomIterator(stack.Context, stack.BaseNodes) + + // Attach the job constraints. + stack.JobConstraint = NewConstraintIterator(stack.Context, stack.Source, job.Constraints) + + // Create the task group filters, this must be filled in later + stack.TaskGroupDrivers = NewDriverIterator(stack.Context, stack.JobConstraint, nil) + stack.TaskGroupConstraint = NewConstraintIterator(stack.Context, stack.TaskGroupDrivers, nil) + + // Upgrade from feasible to rank iterator + stack.RankSource = NewFeasibleRankIterator(stack.Context, stack.TaskGroupConstraint) + + // Apply the bin packing, this depends on the resources needed by + // a particular task group. + // TODO: Support eviction in the future + stack.BinPack = NewBinPackIterator(stack.Context, stack.RankSource, nil, false, job.Priority) + + // Apply a limit function. This is to avoid scanning *every* possible node. + // Instead we need to visit "enough". Using a log of the total number of + // nodes is a good restriction, with at least 2 as the floor + limit := 2 + if n := len(nodes); n > 0 { + logLimit := int(math.Ceil(math.Log2(float64(n)))) + if logLimit > limit { + limit = logLimit + } + } + stack.Limit = NewLimitIterator(stack.Context, stack.BinPack, limit) + + // Select the node with the maximum score for placement + stack.MaxScore = NewMaxScoreIterator(stack.Context, stack.Limit) + + return stack, nil +} + +// baseNodes returns all the ready nodes in a datacenter that this +// job has specified is usable. +func (s *ServiceScheduler) baseNodes(job *structs.Job) ([]*structs.Node, error) { + var out []*structs.Node + for _, dc := range job.Datacenters { + iter, err := s.state.NodesByDatacenterStatus(dc, structs.NodeStatusReady) + if err != nil { + return nil, err + } + for { + raw := iter.Next() + if raw == nil { + break + } + out = append(out, raw.(*structs.Node)) + } + } + return out, nil +} + +func (s *ServiceScheduler) planAllocations(job *structs.Job, plan *structs.Plan, + place []allocNameID, groups map[string]*structs.TaskGroup) error { + // Get the iteration stack + stack, err := s.iterStack(job, plan) + if err != nil { + return err + } + + // Attempt to place each missing allocation + for _, missing := range place { + taskGroup := groups[missing.Name] + + // Collect the constraints, drivers and resources required by each + // sub-task to aggregate the TaskGroup totals + constr := make([]*structs.Constraint, 0, len(taskGroup.Constraints)) + drivers := make(map[string]struct{}) + size := new(structs.Resources) + constr = append(constr, taskGroup.Constraints...) + for _, task := range taskGroup.Tasks { + drivers[task.Driver] = struct{}{} + constr = append(constr, task.Constraints...) + size.Add(task.Resources) + } + + // Reset the iterator stack + // stack.MaxScore.Reset() + + // Update the parameters of the sub-iterators + stack.TaskGroupDrivers.SetDrivers(drivers) + stack.TaskGroupConstraint.SetConstraints(constr) + stack.BinPack.SetResources(size) + + // Select the best fit + option := stack.MaxScore.Next() + if option == nil { + s.logger.Printf("[DEBUG] sched: failed to place alloc %s for job %s", + missing, job.ID) + continue + } + + // Create an allocation for this + alloc := &structs.Allocation{ + ID: mock.GenerateUUID(), + Name: missing.Name, + NodeID: option.Node.ID, + JobID: job.ID, + Job: job, + Resources: size, + Metrics: nil, + Status: structs.AllocStatusPending, + } + plan.AppendAlloc(alloc) + } + return nil } // handleJobDeregister is used to handle a job being deregistered @@ -135,9 +275,7 @@ START: // Add each alloc to be evicted for _, alloc := range allocs { - nodeEvict := plan.NodeEvict[alloc.NodeID] - nodeEvict = append(nodeEvict, alloc.ID) - plan.NodeEvict[alloc.NodeID] = nodeEvict + plan.AppendEvict(alloc) } // Submit the plan @@ -250,11 +388,7 @@ func addEvictsToPlan(plan *structs.Plan, if alloc.ID != evict.ID { continue } - - // Add this eviction to the per-node list - nodeEvict := plan.NodeEvict[alloc.NodeID] - nodeEvict = append(nodeEvict, evict.ID) - plan.NodeEvict[alloc.NodeID] = nodeEvict + plan.AppendEvict(alloc) } } }