diff --git a/nomad/fsm.go b/nomad/fsm.go index 3c91b8f5c..fc3cebad1 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -86,6 +86,9 @@ type nomadFSM struct { // new state store). Everything internal here is synchronized by the // Raft side, so doesn't need to lock this. stateLock sync.RWMutex + + // Reference to the server the FSM is running on + server *Server } // nomadSnapshot is used to provide a snapshot of the current @@ -121,7 +124,7 @@ type FSMConfig struct { } // NewFSMPath is used to construct a new FSM with a blank state -func NewFSM(config *FSMConfig) (*nomadFSM, error) { +func NewFSM(config *FSMConfig, server *Server) (*nomadFSM, error) { // Create a state store sconfig := &state.StateStoreConfig{ Logger: config.Logger, @@ -142,6 +145,7 @@ func NewFSM(config *FSMConfig) (*nomadFSM, error) { timetable: NewTimeTable(timeTableGranularity, timeTableLimit), enterpriseAppliers: make(map[structs.MessageType]LogApplier, 8), enterpriseRestorers: make(map[SnapshotType]SnapshotRestorer, 8), + server: server, } // Register all the log applier functions @@ -1423,7 +1427,8 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error { } snap.UpsertEvals(100, []*structs.Evaluation{eval}) // Create the scheduler and run it - sched, err := scheduler.NewScheduler(eval.Type, n.logger, snap, planner) + allowPlanOptimization := ServersMeetMinimumVersion(n.server.Members(), MinVersionPlanDenormalization, true) + sched, err := scheduler.NewScheduler(eval.Type, n.logger, snap, planner, allowPlanOptimization) if err != nil { return err } diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 1a5cdaf43..332c048a4 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -1216,7 +1216,8 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse) } // Create the scheduler and run it - sched, err := scheduler.NewScheduler(eval.Type, j.logger, snap, planner) + allowPlanOptimization := ServersMeetMinimumVersion(j.srv.Members(), MinVersionPlanDenormalization, true) + sched, err := scheduler.NewScheduler(eval.Type, j.logger, snap, planner, allowPlanOptimization) if err != nil { return err } diff --git a/nomad/leader.go b/nomad/leader.go index 8e26a56f5..17c819996 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -1243,7 +1243,7 @@ func (s *Server) getOrCreateAutopilotConfig() *structs.AutopilotConfig { return config } - if !ServersMeetMinimumVersion(s.Members(), minAutopilotVersion) { + if !ServersMeetMinimumVersion(s.Members(), minAutopilotVersion, false) { s.logger.Named("autopilot").Warn("can't initialize until all servers are above minimum version", "min_version", minAutopilotVersion) return nil } @@ -1270,7 +1270,7 @@ func (s *Server) getOrCreateSchedulerConfig() *structs.SchedulerConfiguration { if config != nil { return config } - if !ServersMeetMinimumVersion(s.Members(), minSchedulerConfigVersion) { + if !ServersMeetMinimumVersion(s.Members(), minSchedulerConfigVersion, false) { s.logger.Named("core").Warn("can't initialize scheduler config until all servers are above minimum version", "min_version", minSchedulerConfigVersion) return nil } diff --git a/nomad/operator_endpoint.go b/nomad/operator_endpoint.go index fc9edabdd..c44e14c31 100644 --- a/nomad/operator_endpoint.go +++ b/nomad/operator_endpoint.go @@ -237,7 +237,7 @@ func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRe } // All servers should be at or above 0.8.0 to apply this operatation - if !ServersMeetMinimumVersion(op.srv.Members(), minAutopilotVersion) { + if !ServersMeetMinimumVersion(op.srv.Members(), minAutopilotVersion, false) { return fmt.Errorf("All servers should be running version %v to update autopilot config", minAutopilotVersion) } @@ -305,7 +305,7 @@ func (op *Operator) SchedulerSetConfiguration(args *structs.SchedulerSetConfigRe } // All servers should be at or above 0.9.0 to apply this operatation - if !ServersMeetMinimumVersion(op.srv.Members(), minSchedulerConfigVersion) { + if !ServersMeetMinimumVersion(op.srv.Members(), minSchedulerConfigVersion, false) { return fmt.Errorf("All servers should be running version %v to update scheduler config", minSchedulerConfigVersion) } // Apply the update diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index f40690d00..dc9913232 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -15,7 +15,7 @@ import ( "github.com/hashicorp/raft" ) -// planner is used to mange the submitted allocation plans that are waiting +// planner is used to manage the submitted allocation plans that are waiting // to be accessed by the leader type planner struct { *Server @@ -149,52 +149,90 @@ func (p *planner) planApply() { // applyPlan is used to apply the plan result and to return the alloc index func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap *state.StateSnapshot) (raft.ApplyFuture, error) { - // Determine the minimum number of updates, could be more if there - // are multiple updates per node - minUpdates := len(result.NodeUpdate) - minUpdates += len(result.NodeAllocation) - // Setup the update request req := structs.ApplyPlanResultsRequest{ AllocUpdateRequest: structs.AllocUpdateRequest{ - Job: plan.Job, - Alloc: make([]*structs.Allocation, 0, minUpdates), + Job: plan.Job, }, Deployment: result.Deployment, DeploymentUpdates: result.DeploymentUpdates, EvalID: plan.EvalID, NodePreemptions: make([]*structs.Allocation, 0, len(result.NodePreemptions)), } - for _, updateList := range result.NodeUpdate { - req.Alloc = append(req.Alloc, updateList...) - } - for _, allocList := range result.NodeAllocation { - req.Alloc = append(req.Alloc, allocList...) - } - for _, preemptions := range result.NodePreemptions { - req.NodePreemptions = append(req.NodePreemptions, preemptions...) - } - - // Set the time the alloc was applied for the first time. This can be used - // to approximate the scheduling time. - now := time.Now().UTC().UnixNano() - for _, alloc := range req.Alloc { - if alloc.CreateTime == 0 { - alloc.CreateTime = now - } - alloc.ModifyTime = now - } - - // Set modify time for preempted allocs if any - // Also gather jobids to create follow up evals preemptedJobIDs := make(map[structs.NamespacedID]struct{}) - for _, alloc := range req.NodePreemptions { - alloc.ModifyTime = now - id := structs.NamespacedID{Namespace: alloc.Namespace, ID: alloc.JobID} - _, ok := preemptedJobIDs[id] - if !ok { - preemptedJobIDs[id] = struct{}{} + now := time.Now().UTC().UnixNano() + + if ServersMeetMinimumVersion(p.Members(), MinVersionPlanDenormalization, true) { + // Initialize the allocs request using the new optimized log entry format. + // Determine the minimum number of updates, could be more if there + // are multiple updates per node + req.AllocsStopped = make([]*structs.Allocation, 0, len(result.NodeUpdate)) + req.AllocsUpdated = make([]*structs.Allocation, 0, len(result.NodeAllocation)) + + for _, updateList := range result.NodeUpdate { + for _, stoppedAlloc := range updateList { + req.AllocsStopped = append(req.AllocsStopped, &structs.Allocation{ + ID: stoppedAlloc.ID, + DesiredDescription: stoppedAlloc.DesiredDescription, + ClientStatus: stoppedAlloc.ClientStatus, + ModifyTime: now, + }) + } + } + + for _, allocList := range result.NodeAllocation { + req.AllocsUpdated = append(req.AllocsUpdated, allocList...) + } + + // Set the time the alloc was applied for the first time. This can be used + // to approximate the scheduling time. + updateAllocTimestamps(req.AllocsUpdated, now) + + for _, preemptions := range result.NodePreemptions { + for _, preemptedAlloc := range preemptions { + req.NodePreemptions = append(req.NodePreemptions, &structs.Allocation{ + ID: preemptedAlloc.ID, + PreemptedByAllocation: preemptedAlloc.PreemptedByAllocation, + ModifyTime: now, + }) + + // Gather jobids to create follow up evals + appendNamespacedJobID(preemptedJobIDs, preemptedAlloc) + } + } + } else { + // Deprecated: This code path is deprecated and will only be used to support + // application of older log entries. Expected to be removed in a future version. + + // Determine the minimum number of updates, could be more if there + // are multiple updates per node + minUpdates := len(result.NodeUpdate) + minUpdates += len(result.NodeAllocation) + + // Initialize the allocs request using the older log entry format + req.Alloc = make([]*structs.Allocation, 0, minUpdates) + + for _, updateList := range result.NodeUpdate { + req.Alloc = append(req.Alloc, updateList...) + } + for _, allocList := range result.NodeAllocation { + req.Alloc = append(req.Alloc, allocList...) + } + + for _, preemptions := range result.NodePreemptions { + req.NodePreemptions = append(req.NodePreemptions, preemptions...) + } + + // Set the time the alloc was applied for the first time. This can be used + // to approximate the scheduling time. + updateAllocTimestamps(req.Alloc, now) + + // Set modify time for preempted allocs if any + // Also gather jobids to create follow up evals + for _, alloc := range req.NodePreemptions { + alloc.ModifyTime = now + appendNamespacedJobID(preemptedJobIDs, alloc) } } @@ -232,6 +270,22 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap return future, nil } +func appendNamespacedJobID(jobIDs map[structs.NamespacedID]struct{}, alloc *structs.Allocation) { + id := structs.NamespacedID{Namespace: alloc.Namespace, ID: alloc.JobID} + if _, ok := jobIDs[id]; !ok { + jobIDs[id] = struct{}{} + } +} + +func updateAllocTimestamps(allocations []*structs.Allocation, timestamp int64) { + for _, alloc := range allocations { + if alloc.CreateTime == 0 { + alloc.CreateTime = timestamp + } + alloc.ModifyTime = timestamp + } +} + // asyncPlanWait is used to apply and respond to a plan async func (p *planner) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture, result *structs.PlanResult, pending *pendingPlan) { @@ -264,6 +318,15 @@ func (p *planner) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture, func evaluatePlan(pool *EvaluatePool, snap *state.StateSnapshot, plan *structs.Plan, logger log.Logger) (*structs.PlanResult, error) { defer metrics.MeasureSince([]string{"nomad", "plan", "evaluate"}, time.Now()) + err := snap.DenormalizeAllocationsMap(plan.NodeUpdate, plan.Job) + if err != nil { + return nil, err + } + err = snap.DenormalizeAllocationsMap(plan.NodePreemptions, plan.Job) + if err != nil { + return nil, err + } + // Check if the plan exceeds quota overQuota, err := evaluatePlanQuota(snap, plan) if err != nil { @@ -521,15 +584,11 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri // Remove any preempted allocs if preempted := plan.NodePreemptions[nodeID]; len(preempted) > 0 { - for _, allocs := range preempted { - remove = append(remove, allocs) - } + remove = append(remove, preempted...) } if updated := plan.NodeAllocation[nodeID]; len(updated) > 0 { - for _, alloc := range updated { - remove = append(remove, alloc) - } + remove = append(remove, updated...) } proposed := structs.RemoveAllocs(existingAlloc, remove) proposed = append(proposed, plan.NodeAllocation[nodeID]...) diff --git a/nomad/server.go b/nomad/server.go index c35124c51..f8c4d2e4c 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -1078,7 +1078,7 @@ func (s *Server) setupRaft() error { Region: s.Region(), } var err error - s.fsm, err = NewFSM(fsmConfig) + s.fsm, err = NewFSM(fsmConfig, s) if err != nil { return err } @@ -1173,7 +1173,7 @@ func (s *Server) setupRaft() error { if err != nil { return fmt.Errorf("recovery failed to parse peers.json: %v", err) } - tmpFsm, err := NewFSM(fsmConfig) + tmpFsm, err := NewFSM(fsmConfig, s) if err != nil { return fmt.Errorf("recovery failed to make temp FSM: %v", err) } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 84201cbaa..24b90a2f4 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -170,6 +170,21 @@ RUN_QUERY: // UpsertPlanResults is used to upsert the results of a plan. func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanResultsRequest) error { + snapshot, err := s.Snapshot() + if err != nil { + return err + } + + err = snapshot.DenormalizeAllocationsSlice(results.AllocsStopped, results.Job) + if err != nil { + return err + } + + err = snapshot.DenormalizeAllocationsSlice(results.NodePreemptions, results.Job) + if err != nil { + return err + } + txn := s.db.Txn(true) defer txn.Abort() @@ -185,34 +200,6 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR s.upsertDeploymentUpdates(index, results.DeploymentUpdates, txn) } - // Attach the job to all the allocations. It is pulled out in the payload to - // avoid the redundancy of encoding, but should be denormalized prior to - // being inserted into MemDB. - structs.DenormalizeAllocationJobs(results.Job, results.Alloc) - - // COMPAT(0.11): Remove in 0.11 - // Calculate the total resources of allocations. It is pulled out in the - // payload to avoid encoding something that can be computed, but should be - // denormalized prior to being inserted into MemDB. - for _, alloc := range results.Alloc { - if alloc.Resources != nil { - continue - } - - alloc.Resources = new(structs.Resources) - for _, task := range alloc.TaskResources { - alloc.Resources.Add(task) - } - - // Add the shared resources - alloc.Resources.Add(alloc.SharedResources) - } - - // Upsert the allocations - if err := s.upsertAllocsImpl(index, results.Alloc, txn); err != nil { - return err - } - // COMPAT: Nomad versions before 0.7.1 did not include the eval ID when // applying the plan. Thus while we are upgrading, we ignore updating the // modify index of evaluations from older plans. @@ -223,35 +210,33 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR } } - // Prepare preempted allocs in the plan results for update - var preemptedAllocs []*structs.Allocation - for _, preemptedAlloc := range results.NodePreemptions { - // Look for existing alloc - existing, err := txn.First("allocs", "id", preemptedAlloc.ID) - if err != nil { - return fmt.Errorf("alloc lookup failed: %v", err) - } - - // Nothing to do if this does not exist - if existing == nil { - continue - } - exist := existing.(*structs.Allocation) - - // Copy everything from the existing allocation - copyAlloc := exist.Copy() - - // Only update the fields set by the scheduler - copyAlloc.DesiredStatus = preemptedAlloc.DesiredStatus - copyAlloc.PreemptedByAllocation = preemptedAlloc.PreemptedByAllocation - copyAlloc.DesiredDescription = preemptedAlloc.DesiredDescription - copyAlloc.ModifyTime = preemptedAlloc.ModifyTime - preemptedAllocs = append(preemptedAllocs, copyAlloc) + noOfAllocs := len(results.NodePreemptions) + if len(results.Alloc) > 0 { + // COMPAT 0.11: This branch will be removed, when Alloc is removed + // Attach the job to all the allocations. It is pulled out in the payload to + // avoid the redundancy of encoding, but should be denormalized prior to + // being inserted into MemDB. + addComputedAllocAttrs(results.Alloc, results.Job) + noOfAllocs += len(results.Alloc) + } else { + // Attach the job to all the allocations. It is pulled out in the payload to + // avoid the redundancy of encoding, but should be denormalized prior to + // being inserted into MemDB. + addComputedAllocAttrs(results.AllocsUpdated, results.Job) + noOfAllocs += len(results.AllocsStopped) + len(results.AllocsUpdated) } - // Upsert the preempted allocations - if err := s.upsertAllocsImpl(index, preemptedAllocs, txn); err != nil { + allocsToUpsert := make([]*structs.Allocation, 0, noOfAllocs) + + // COMPAT 0.11: This append should be removed when Alloc is removed + allocsToUpsert = append(allocsToUpsert, results.Alloc...) + + allocsToUpsert = append(allocsToUpsert, results.AllocsStopped...) + allocsToUpsert = append(allocsToUpsert, results.AllocsUpdated...) + allocsToUpsert = append(allocsToUpsert, results.NodePreemptions...) + + if err := s.upsertAllocsImpl(index, allocsToUpsert, txn); err != nil { return err } @@ -266,6 +251,28 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR return nil } +func addComputedAllocAttrs(allocs []*structs.Allocation, job *structs.Job) { + structs.DenormalizeAllocationJobs(job, allocs) + + // COMPAT(0.11): Remove in 0.11 + // Calculate the total resources of allocations. It is pulled out in the + // payload to avoid encoding something that can be computed, but should be + // denormalized prior to being inserted into MemDB. + for _, alloc := range allocs { + if alloc.Resources != nil { + continue + } + + alloc.Resources = new(structs.Resources) + for _, task := range alloc.TaskResources { + alloc.Resources.Add(task) + } + + // Add the shared resources + alloc.Resources.Add(alloc.SharedResources) + } +} + // upsertDeploymentUpdates updates the deployments given the passed status // updates. func (s *StateStore) upsertDeploymentUpdates(index uint64, updates []*structs.DeploymentStatusUpdate, txn *memdb.Txn) error { @@ -4100,6 +4107,67 @@ type StateSnapshot struct { StateStore } +// DenormalizeAllocationsMap takes in a map of nodes to allocations, and queries the +// Allocation for each of the Allocation diffs and merges the updated attributes with +// the existing Allocation, and attaches the Job provided +func (s *StateSnapshot) DenormalizeAllocationsMap(nodeAllocations map[string][]*structs.Allocation, job *structs.Job) error { + for _, allocDiffs := range nodeAllocations { + if err := s.DenormalizeAllocationsSlice(allocDiffs, job); err != nil { + return err + } + } + return nil +} + +// DenormalizeAllocationsSlice queries the Allocation for each of the Allocation diffs and merges +// the updated attributes with the existing Allocation, and attaches the Job provided +func (s *StateSnapshot) DenormalizeAllocationsSlice(allocDiffs []*structs.Allocation, job *structs.Job) error { + // Output index for denormalized Allocations + j := 0 + + for _, allocDiff := range allocDiffs { + alloc, err := s.AllocByID(nil, allocDiff.ID) + if err != nil { + return fmt.Errorf("alloc lookup failed: %v", err) + } + if alloc == nil { + continue + } + + // Merge the updates to the Allocation + allocCopy := alloc.CopySkipJob() + allocCopy.Job = job + + if allocDiff.PreemptedByAllocation != "" { + // If alloc is a preemption + allocCopy.PreemptedByAllocation = allocDiff.PreemptedByAllocation + allocCopy.DesiredDescription = getPreemptedAllocDesiredDescription(allocDiff.PreemptedByAllocation) + allocCopy.DesiredStatus = structs.AllocDesiredStatusEvict + } else { + // If alloc is a stopped alloc + allocCopy.DesiredDescription = allocDiff.DesiredDescription + allocCopy.DesiredStatus = structs.AllocDesiredStatusStop + if allocDiff.ClientStatus != "" { + allocCopy.ClientStatus = allocDiff.ClientStatus + } + } + if allocDiff.ModifyTime != 0 { + allocCopy.ModifyTime = allocDiff.ModifyTime + } + + // Update the allocDiff in the slice to equal the denormalized alloc + allocDiffs[j] = allocCopy + j++ + } + // Retain only the denormalized Allocations in the slice + allocDiffs = allocDiffs[:j] + return nil +} + +func getPreemptedAllocDesiredDescription(PreemptedByAllocID string) string { + return fmt.Sprintf("Preempted by alloc ID %v", PreemptedByAllocID) +} + // StateRestore is used to optimize the performance when // restoring state by only using a single large transaction // instead of thousands of sub transactions diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 48d7b57d9..2d7614dbd 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -28,8 +28,8 @@ import ( "github.com/gorhill/cronexpr" "github.com/hashicorp/consul/api" hcodec "github.com/hashicorp/go-msgpack/codec" - multierror "github.com/hashicorp/go-multierror" - version "github.com/hashicorp/go-version" + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/args" @@ -660,9 +660,9 @@ type ApplyPlanResultsRequest struct { // the evaluation itself being updated. EvalID string - // NodePreemptions is a slice of allocations from other lower priority jobs + // NodePreemptions is a slice of allocation diffs from other lower priority jobs // that are preempted. Preempted allocations are marked as evicted. - NodePreemptions []*Allocation + NodePreemptions []*AllocationDiff // PreemptionEvals is a slice of follow up evals for jobs whose allocations // have been preempted to place allocs in this plan @@ -674,8 +674,16 @@ type ApplyPlanResultsRequest struct { // within a single transaction type AllocUpdateRequest struct { // Alloc is the list of new allocations to assign + // Deprecated: Replaced with two separate slices, one containing stopped allocations + // and another containing updated allocations Alloc []*Allocation + // Allocations to stop. Contains only the diff, not the entire allocation + AllocsStopped []*AllocationDiff + + // New or updated allocations + AllocsUpdated []*Allocation + // Evals is the list of new evaluations to create // Evals are valid only when used in the Raft RPC Evals []*Evaluation @@ -7168,6 +7176,9 @@ const ( // Allocation is used to allocate the placement of a task group to a node. type Allocation struct { + // msgpack omit empty fields during serialization + _struct bool `codec:",omitempty"` // nolint: structcheck + // ID of the allocation (UUID) ID string @@ -7282,6 +7293,10 @@ type Allocation struct { ModifyTime int64 } +// AllocationDiff is a type alias for Allocation used to indicate that a diff is +// and not the entire allocation +type AllocationDiff = Allocation + // Index returns the index of the allocation. If the allocation is from a task // group with count greater than 1, there will be multiple allocations for it. func (a *Allocation) Index() uint { @@ -7296,11 +7311,12 @@ func (a *Allocation) Index() uint { return uint(num) } +// Copy provides a copy of the allocation and deep copies the job func (a *Allocation) Copy() *Allocation { return a.copyImpl(true) } -// Copy provides a copy of the allocation but doesn't deep copy the job +// CopySkipJob provides a copy of the allocation but doesn't deep copy the job func (a *Allocation) CopySkipJob() *Allocation { return a.copyImpl(false) } @@ -8037,6 +8053,9 @@ const ( // potentially taking action (allocation of work) or doing nothing if the state // of the world does not require it. type Evaluation struct { + // msgpack omit empty fields during serialization + _struct bool `codec:",omitempty"` // nolint: structcheck + // ID is a randomly generated UUID used for this evaluation. This // is assigned upon the creation of the evaluation. ID string @@ -8227,7 +8246,7 @@ func (e *Evaluation) ShouldBlock() bool { // MakePlan is used to make a plan from the given evaluation // for a given Job -func (e *Evaluation) MakePlan(j *Job) *Plan { +func (e *Evaluation) MakePlan(j *Job, allowPlanOptimization bool) *Plan { p := &Plan{ EvalID: e.ID, Priority: e.Priority, @@ -8235,6 +8254,7 @@ func (e *Evaluation) MakePlan(j *Job) *Plan { NodeUpdate: make(map[string][]*Allocation), NodeAllocation: make(map[string][]*Allocation), NodePreemptions: make(map[string][]*Allocation), + NormalizeAllocs: allowPlanOptimization, } if j != nil { p.AllAtOnce = j.AllAtOnce @@ -8304,6 +8324,9 @@ func (e *Evaluation) CreateFailedFollowUpEval(wait time.Duration) *Evaluation { // are submitted to the leader which verifies that resources have // not been overcommitted before admitting the plan. type Plan struct { + // msgpack omit empty fields during serialization + _struct bool `codec:",omitempty"` // nolint: structcheck + // EvalID is the evaluation ID this plan is associated with EvalID string @@ -8353,11 +8376,14 @@ type Plan struct { // lower priority jobs that are preempted. Preempted allocations are marked // as evicted. NodePreemptions map[string][]*Allocation + + // Indicates whether allocs are normalized in the Plan + NormalizeAllocs bool `codec:"-"` } -// AppendUpdate marks the allocation for eviction. The clientStatus of the +// AppendStoppedAlloc marks an allocation to be stopped. The clientStatus of the // allocation may be optionally set by passing in a non-empty value. -func (p *Plan) AppendUpdate(alloc *Allocation, desiredStatus, desiredDesc, clientStatus string) { +func (p *Plan) AppendStoppedAlloc(alloc *Allocation, desiredDesc, clientStatus string) { newAlloc := new(Allocation) *newAlloc = *alloc @@ -8373,7 +8399,7 @@ func (p *Plan) AppendUpdate(alloc *Allocation, desiredStatus, desiredDesc, clien // Strip the resources as it can be rebuilt. newAlloc.Resources = nil - newAlloc.DesiredStatus = desiredStatus + newAlloc.DesiredStatus = AllocDesiredStatusStop newAlloc.DesiredDescription = desiredDesc if clientStatus != "" { @@ -8387,12 +8413,12 @@ func (p *Plan) AppendUpdate(alloc *Allocation, desiredStatus, desiredDesc, clien // AppendPreemptedAlloc is used to append an allocation that's being preempted to the plan. // To minimize the size of the plan, this only sets a minimal set of fields in the allocation -func (p *Plan) AppendPreemptedAlloc(alloc *Allocation, desiredStatus, preemptingAllocID string) { +func (p *Plan) AppendPreemptedAlloc(alloc *Allocation, preemptingAllocID string) { newAlloc := &Allocation{} newAlloc.ID = alloc.ID newAlloc.JobID = alloc.JobID newAlloc.Namespace = alloc.Namespace - newAlloc.DesiredStatus = desiredStatus + newAlloc.DesiredStatus = AllocDesiredStatusEvict newAlloc.PreemptedByAllocation = preemptingAllocID desiredDesc := fmt.Sprintf("Preempted by alloc ID %v", preemptingAllocID) @@ -8445,6 +8471,29 @@ func (p *Plan) IsNoOp() bool { len(p.DeploymentUpdates) == 0 } +func (p *Plan) NormalizeAllocations() { + if p.NormalizeAllocs { + for _, allocs := range p.NodeUpdate { + for i, alloc := range allocs { + allocs[i] = &Allocation{ + ID: alloc.ID, + DesiredDescription: alloc.DesiredDescription, + ClientStatus: alloc.ClientStatus, + } + } + } + + for _, allocs := range p.NodePreemptions { + for i, alloc := range allocs { + allocs[i] = &Allocation{ + ID: alloc.ID, + PreemptedByAllocation: alloc.PreemptedByAllocation, + } + } + } + } +} + // PlanResult is the result of a plan submitted to the leader. type PlanResult struct { // NodeUpdate contains all the updates that were committed. diff --git a/nomad/util.go b/nomad/util.go index 44b711924..8a6ff407f 100644 --- a/nomad/util.go +++ b/nomad/util.go @@ -14,6 +14,11 @@ import ( "github.com/hashicorp/serf/serf" ) +// MinVersionPlanDenormalization is the minimum version to support the +// denormalization of Plan in SubmitPlan, and the raft log entry committed +// in ApplyPlanResultsRequest +var MinVersionPlanDenormalization = version.Must(version.NewVersion("0.9.1")) + // ensurePath is used to make sure a path exists func ensurePath(path string, dir bool) error { if !dir { @@ -145,9 +150,9 @@ func isNomadServer(m serf.Member) (bool, *serverParts) { // ServersMeetMinimumVersion returns whether the given alive servers are at least on the // given Nomad version -func ServersMeetMinimumVersion(members []serf.Member, minVersion *version.Version) bool { +func ServersMeetMinimumVersion(members []serf.Member, minVersion *version.Version, checkFailedServers bool) bool { for _, member := range members { - if valid, parts := isNomadServer(member); valid && parts.Status == serf.StatusAlive { + if valid, parts := isNomadServer(member); valid && (parts.Status == serf.StatusAlive || (checkFailedServers && parts.Status == serf.StatusFailed)) { // Check if the versions match - version.LessThan will return true for // 0.8.0-rc1 < 0.8.0, so we want to ignore the metadata versionsMatch := slicesMatch(minVersion.Segments(), parts.Build.Segments()) diff --git a/nomad/util_test.go b/nomad/util_test.go index 4fe670cdc..561406760 100644 --- a/nomad/util_test.go +++ b/nomad/util_test.go @@ -162,7 +162,7 @@ func TestServersMeetMinimumVersion(t *testing.T) { } for _, tc := range cases { - result := ServersMeetMinimumVersion(tc.members, tc.ver) + result := ServersMeetMinimumVersion(tc.members, tc.ver, false) if result != tc.expected { t.Fatalf("bad: %v, %v, %v", result, tc.ver.String(), tc) } diff --git a/nomad/worker.go b/nomad/worker.go index f0aefb62d..6cd8e0107 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -284,7 +284,8 @@ func (w *Worker) invokeScheduler(eval *structs.Evaluation, token string) error { if eval.Type == structs.JobTypeCore { sched = NewCoreScheduler(w.srv, snap) } else { - sched, err = scheduler.NewScheduler(eval.Type, w.logger, snap, w) + allowPlanOptimization := ServersMeetMinimumVersion(w.srv.Members(), MinVersionPlanDenormalization, true) + sched, err = scheduler.NewScheduler(eval.Type, w.logger, snap, w, allowPlanOptimization) if err != nil { return fmt.Errorf("failed to instantiate scheduler: %v", err) } @@ -310,6 +311,9 @@ func (w *Worker) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, scheduler. // Add the evaluation token to the plan plan.EvalToken = w.evalToken + // Normalize stopped and preempted allocs before RPC + plan.NormalizeAllocations() + // Setup the request req := structs.PlanRequest{ Plan: plan, diff --git a/nomad/worker_test.go b/nomad/worker_test.go index 2f3e31728..b7a9e526a 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -36,7 +36,7 @@ func (n *NoopScheduler) Process(eval *structs.Evaluation) error { } func init() { - scheduler.BuiltinSchedulers["noop"] = func(logger log.Logger, s scheduler.State, p scheduler.Planner) scheduler.Scheduler { + scheduler.BuiltinSchedulers["noop"] = func(logger log.Logger, s scheduler.State, p scheduler.Planner, allowPlanOptimization bool) scheduler.Scheduler { n := &NoopScheduler{ state: s, planner: p, diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 6c518f528..b69b31a64 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -77,6 +77,10 @@ type GenericScheduler struct { planner Planner batch bool + // Temporary flag introduced till the code for sending/committing full allocs in the Plan can + // be safely removed + allowPlanOptimization bool + eval *structs.Evaluation job *structs.Job plan *structs.Plan @@ -94,23 +98,25 @@ type GenericScheduler struct { } // NewServiceScheduler is a factory function to instantiate a new service scheduler -func NewServiceScheduler(logger log.Logger, state State, planner Planner) Scheduler { +func NewServiceScheduler(logger log.Logger, state State, planner Planner, allowPlanOptimization bool) Scheduler { s := &GenericScheduler{ - logger: logger.Named("service_sched"), - state: state, - planner: planner, - batch: false, + logger: logger.Named("service_sched"), + state: state, + planner: planner, + batch: false, + allowPlanOptimization: allowPlanOptimization, } return s } // NewBatchScheduler is a factory function to instantiate a new batch scheduler -func NewBatchScheduler(logger log.Logger, state State, planner Planner) Scheduler { +func NewBatchScheduler(logger log.Logger, state State, planner Planner, allowPlanOptimization bool) Scheduler { s := &GenericScheduler{ - logger: logger.Named("batch_sched"), - state: state, - planner: planner, - batch: true, + logger: logger.Named("batch_sched"), + state: state, + planner: planner, + batch: true, + allowPlanOptimization: allowPlanOptimization, } return s } @@ -224,7 +230,7 @@ func (s *GenericScheduler) process() (bool, error) { s.followUpEvals = nil // Create a plan - s.plan = s.eval.MakePlan(s.job) + s.plan = s.eval.MakePlan(s.job, s.allowPlanOptimization) if !s.batch { // Get any existing deployment @@ -366,7 +372,7 @@ func (s *GenericScheduler) computeJobAllocs() error { // Handle the stop for _, stop := range results.stop { - s.plan.AppendUpdate(stop.alloc, structs.AllocDesiredStatusStop, stop.statusDescription, stop.clientStatus) + s.plan.AppendStoppedAlloc(stop.alloc, stop.statusDescription, stop.clientStatus) } // Handle the in-place updates @@ -464,7 +470,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul stopPrevAlloc, stopPrevAllocDesc := missing.StopPreviousAlloc() prevAllocation := missing.PreviousAllocation() if stopPrevAlloc { - s.plan.AppendUpdate(prevAllocation, structs.AllocDesiredStatusStop, stopPrevAllocDesc, "") + s.plan.AppendStoppedAlloc(prevAllocation, stopPrevAllocDesc, "") } // Compute penalty nodes for rescheduled allocs diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 639b2b8cf..76a0a6940 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -28,7 +28,7 @@ var BuiltinSchedulers = map[string]Factory{ // NewScheduler is used to instantiate and return a new scheduler // given the scheduler name, initial state, and planner. -func NewScheduler(name string, logger log.Logger, state State, planner Planner) (Scheduler, error) { +func NewScheduler(name string, logger log.Logger, state State, planner Planner, allowPlanOptimization bool) (Scheduler, error) { // Lookup the factory function factory, ok := BuiltinSchedulers[name] if !ok { @@ -36,12 +36,12 @@ func NewScheduler(name string, logger log.Logger, state State, planner Planner) } // Instantiate the scheduler - sched := factory(logger, state, planner) + sched := factory(logger, state, planner, allowPlanOptimization) return sched, nil } // Factory is used to instantiate a new Scheduler -type Factory func(log.Logger, State, Planner) Scheduler +type Factory func(log.Logger, State, Planner, bool) Scheduler // Scheduler is the top level instance for a scheduler. A scheduler is // meant to only encapsulate business logic, pushing the various plumbing diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index bcfd6e5be..2cefcba34 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -23,6 +23,10 @@ type SystemScheduler struct { state State planner Planner + // Temporary flag introduced till the code for sending/committing full allocs in the Plan can + // be safely removed + allowPlanOptimization bool + eval *structs.Evaluation job *structs.Job plan *structs.Plan @@ -41,11 +45,12 @@ type SystemScheduler struct { // NewSystemScheduler is a factory function to instantiate a new system // scheduler. -func NewSystemScheduler(logger log.Logger, state State, planner Planner) Scheduler { +func NewSystemScheduler(logger log.Logger, state State, planner Planner, allowPlanOptimization bool) Scheduler { return &SystemScheduler{ - logger: logger.Named("system_sched"), - state: state, - planner: planner, + logger: logger.Named("system_sched"), + state: state, + planner: planner, + allowPlanOptimization: allowPlanOptimization, } } @@ -110,7 +115,7 @@ func (s *SystemScheduler) process() (bool, error) { } // Create a plan - s.plan = s.eval.MakePlan(s.job) + s.plan = s.eval.MakePlan(s.job, s.allowPlanOptimization) // Reset the failed allocations s.failedTGAllocs = nil @@ -210,18 +215,18 @@ func (s *SystemScheduler) computeJobAllocs() error { // Add all the allocs to stop for _, e := range diff.stop { - s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNotNeeded, "") + s.plan.AppendStoppedAlloc(e.Alloc, allocNotNeeded, "") } // Add all the allocs to migrate for _, e := range diff.migrate { - s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNodeTainted, "") + s.plan.AppendStoppedAlloc(e.Alloc, allocNodeTainted, "") } // Lost allocations should be transitioned to desired status stop and client // status lost. for _, e := range diff.lost { - s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocLost, structs.AllocClientStatusLost) + s.plan.AppendStoppedAlloc(e.Alloc, allocLost, structs.AllocClientStatusLost) } // Attempt to do the upgrades in place @@ -351,7 +356,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { if option.PreemptedAllocs != nil { var preemptedAllocIDs []string for _, stop := range option.PreemptedAllocs { - s.plan.AppendPreemptedAlloc(stop, structs.AllocDesiredStatusEvict, alloc.ID) + s.plan.AppendPreemptedAlloc(stop, alloc.ID) preemptedAllocIDs = append(preemptedAllocIDs, stop.ID) if s.eval.AnnotatePlan && s.plan.Annotations != nil { diff --git a/scheduler/testing.go b/scheduler/testing.go index f05010102..61fa7f79c 100644 --- a/scheduler/testing.go +++ b/scheduler/testing.go @@ -216,7 +216,7 @@ func (h *Harness) Snapshot() State { // a snapshot of current state using the harness for planning. func (h *Harness) Scheduler(factory Factory) Scheduler { logger := testlog.HCLogger(h.t) - return factory(logger, h.Snapshot(), h) + return factory(logger, h.Snapshot(), h, false) } // Process is used to process an evaluation given a factory diff --git a/scheduler/util.go b/scheduler/util.go index 5f62d31b3..3b9b437de 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -507,8 +507,7 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job, // the current allocation is discounted when checking for feasibility. // Otherwise we would be trying to fit the tasks current resources and // updated resources. After select is called we can remove the evict. - ctx.Plan().AppendUpdate(update.Alloc, structs.AllocDesiredStatusStop, - allocInPlace, "") + ctx.Plan().AppendStoppedAlloc(update.Alloc, allocInPlace, "") // Attempt to match the task group option := stack.Select(update.TaskGroup, nil) // This select only looks at one node so we don't pass selectOptions @@ -573,7 +572,7 @@ func evictAndPlace(ctx Context, diff *diffResult, allocs []allocTuple, desc stri n := len(allocs) for i := 0; i < n && i < *limit; i++ { a := allocs[i] - ctx.Plan().AppendUpdate(a.Alloc, structs.AllocDesiredStatusStop, desc, "") + ctx.Plan().AppendStoppedAlloc(a.Alloc, desc, "") diff.place = append(diff.place, a) } if n <= *limit { @@ -734,7 +733,7 @@ func updateNonTerminalAllocsToLost(plan *structs.Plan, tainted map[string]*struc if alloc.DesiredStatus == structs.AllocDesiredStatusStop && (alloc.ClientStatus == structs.AllocClientStatusRunning || alloc.ClientStatus == structs.AllocClientStatusPending) { - plan.AppendUpdate(alloc, structs.AllocDesiredStatusStop, allocLost, structs.AllocClientStatusLost) + plan.AppendStoppedAlloc(alloc, allocLost, structs.AllocClientStatusLost) } } } @@ -784,7 +783,7 @@ func genericAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateTy // the current allocation is discounted when checking for feasibility. // Otherwise we would be trying to fit the tasks current resources and // updated resources. After select is called we can remove the evict. - ctx.Plan().AppendUpdate(existing, structs.AllocDesiredStatusStop, allocInPlace, "") + ctx.Plan().AppendStoppedAlloc(existing, allocInPlace, "") // Attempt to match the task group option := stack.Select(newTG, nil) // This select only looks at one node so we don't pass selectOptions