Remove old

This commit is contained in:
Alex Dadgar 2017-05-23 16:39:15 -07:00
parent e782c4efbe
commit 4c123500ee

View file

@ -223,7 +223,7 @@ func (s *GenericScheduler) process() (bool, error) {
}
// Compute the target job allocations
if err := s.computeJobAllocs2(); err != nil {
if err := s.computeJobAllocs(); err != nil {
s.logger.Printf("[ERR] sched: %#v: %v", s.eval, err)
return false, err
}
@ -360,7 +360,7 @@ func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) ([
// computeJobAllocs is used to reconcile differences between the job,
// existing allocations and node status to update the allocations.
func (s *GenericScheduler) computeJobAllocs2() error {
func (s *GenericScheduler) computeJobAllocs() error {
// Lookup the allocations by JobID
ws := memdb.NewWatchSet()
allocs, err := s.state.AllocsByJob(ws, s.eval.JobID, true)
@ -427,96 +427,11 @@ func (s *GenericScheduler) computeJobAllocs2() error {
}
// Compute the placements
return s.computePlacements2(results.place)
}
// computeJobAllocs is used to reconcile differences between the job,
// existing allocations and node status to update the allocations.
func (s *GenericScheduler) computeJobAllocs() error {
// Materialize all the task groups, job could be missing if deregistered
var groups map[string]*structs.TaskGroup
if !s.job.Stopped() {
groups = materializeTaskGroups(s.job)
}
// Lookup the allocations by JobID
ws := memdb.NewWatchSet()
allocs, err := s.state.AllocsByJob(ws, s.eval.JobID, true)
if err != nil {
return fmt.Errorf("failed to get allocs for job '%s': %v",
s.eval.JobID, err)
}
// Determine the tainted nodes containing job allocs
tainted, err := taintedNodes(s.state, allocs)
if err != nil {
return fmt.Errorf("failed to get tainted nodes for job '%s': %v",
s.eval.JobID, err)
}
// Update the allocations which are in pending/running state on tainted
// nodes to lost
updateNonTerminalAllocsToLost(s.plan, tainted, allocs)
// Filter out the allocations in a terminal state
allocs, terminalAllocs := s.filterCompleteAllocs(allocs)
// Diff the required and existing allocations
diff := diffAllocs(s.job, tainted, groups, allocs, terminalAllocs)
s.logger.Printf("[DEBUG] sched: %#v: %#v", s.eval, diff)
// Add all the allocs to stop
for _, e := range diff.stop {
s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNotNeeded, "")
}
// Attempt to do the upgrades in place
destructiveUpdates, inplaceUpdates := inplaceUpdate(s.ctx, s.eval, s.job, s.stack, diff.update)
diff.update = destructiveUpdates
if s.eval.AnnotatePlan {
s.plan.Annotations = &structs.PlanAnnotations{
DesiredTGUpdates: desiredUpdates(diff, inplaceUpdates, destructiveUpdates),
}
}
// Check if a rolling upgrade strategy is being used
limit := len(diff.update) + len(diff.migrate) + len(diff.lost)
if !s.job.Stopped() && s.job.Update.Rolling() {
limit = s.job.Update.MaxParallel
}
// Treat migrations as an eviction and a new placement.
s.limitReached = evictAndPlace(s.ctx, diff, diff.migrate, allocMigrating, &limit)
// Treat non in-place updates as an eviction and new placement.
s.limitReached = s.limitReached || evictAndPlace(s.ctx, diff, diff.update, allocUpdating, &limit)
// Lost allocations should be transistioned to desired status stop and client
// status lost and a new placement should be made
s.limitReached = s.limitReached || markLostAndPlace(s.ctx, diff, diff.lost, allocLost, &limit)
// Nothing remaining to do if placement is not required
if len(diff.place) == 0 {
if !s.job.Stopped() {
for _, tg := range s.job.TaskGroups {
s.queuedAllocs[tg.Name] = 0
}
}
return nil
}
// Record the number of allocations that needs to be placed per Task Group
for _, allocTuple := range diff.place {
s.queuedAllocs[allocTuple.TaskGroup.Name] += 1
}
// Compute the placements
return s.computePlacements(diff.place)
return s.computePlacements(results.place)
}
// computePlacements computes placements for allocations
func (s *GenericScheduler) computePlacements2(place []allocPlaceResult) error {
func (s *GenericScheduler) computePlacements(place []allocPlaceResult) error {
// Get the base nodes
nodes, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters)
if err != nil {
@ -539,7 +454,7 @@ func (s *GenericScheduler) computePlacements2(place []allocPlaceResult) error {
}
// Find the preferred node
preferredNode, err := s.findPreferredNode2(&missing)
preferredNode, err := s.findPreferredNode(&missing)
if err != nil {
return err
}
@ -597,83 +512,8 @@ func (s *GenericScheduler) computePlacements2(place []allocPlaceResult) error {
return nil
}
// computePlacements computes placements for allocations
func (s *GenericScheduler) computePlacements(place []allocTuple) error {
// Get the base nodes
nodes, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters)
if err != nil {
return err
}
// Update the set of placement ndoes
s.stack.SetNodes(nodes)
for _, missing := range place {
// Check if this task group has already failed
if metric, ok := s.failedTGAllocs[missing.TaskGroup.Name]; ok {
metric.CoalescedFailures += 1
continue
}
// Find the preferred node
preferredNode, err := s.findPreferredNode(&missing)
if err != nil {
return err
}
// Attempt to match the task group
var option *RankedNode
if preferredNode != nil {
option, _ = s.stack.SelectPreferringNodes(missing.TaskGroup, []*structs.Node{preferredNode})
} else {
option, _ = s.stack.Select(missing.TaskGroup)
}
// Store the available nodes by datacenter
s.ctx.Metrics().NodesAvailable = byDC
// Set fields based on if we found an allocation option
if option != nil {
// Create an allocation for this
alloc := &structs.Allocation{
ID: structs.GenerateUUID(),
EvalID: s.eval.ID,
Name: missing.Name,
JobID: s.job.ID,
TaskGroup: missing.TaskGroup.Name,
Metrics: s.ctx.Metrics(),
NodeID: option.Node.ID,
TaskResources: option.TaskResources,
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
SharedResources: &structs.Resources{
DiskMB: missing.TaskGroup.EphemeralDisk.SizeMB,
},
}
// If the new allocation is replacing an older allocation then we
// set the record the older allocation id so that they are chained
if missing.Alloc != nil {
alloc.PreviousAllocation = missing.Alloc.ID
}
s.plan.AppendAlloc(alloc)
} else {
// Lazy initialize the failed map
if s.failedTGAllocs == nil {
s.failedTGAllocs = make(map[string]*structs.AllocMetric)
}
s.failedTGAllocs[missing.TaskGroup.Name] = s.ctx.Metrics()
}
}
return nil
}
// findPreferredNode finds the preferred node for an allocation
func (s *GenericScheduler) findPreferredNode2(place *allocPlaceResult) (node *structs.Node, err error) {
func (s *GenericScheduler) findPreferredNode(place *allocPlaceResult) (node *structs.Node, err error) {
if place.previousAlloc != nil {
if place.taskGroup.EphemeralDisk.Sticky == true {
var preferredNode *structs.Node
@ -686,23 +526,3 @@ func (s *GenericScheduler) findPreferredNode2(place *allocPlaceResult) (node *st
}
return
}
// findPreferredNode finds the preferred node for an allocation
func (s *GenericScheduler) findPreferredNode(allocTuple *allocTuple) (node *structs.Node, err error) {
if allocTuple.Alloc != nil {
taskGroup := allocTuple.Alloc.Job.LookupTaskGroup(allocTuple.Alloc.TaskGroup)
if taskGroup == nil {
err = fmt.Errorf("can't find task group of existing allocation %q", allocTuple.Alloc.ID)
return
}
if taskGroup.EphemeralDisk.Sticky == true {
var preferredNode *structs.Node
ws := memdb.NewWatchSet()
preferredNode, err = s.state.NodeByID(ws, allocTuple.Alloc.NodeID)
if preferredNode.Ready() {
node = preferredNode
}
}
}
return
}