Add code for plan normalization

This commit is contained in:
Arshneet Singh 2019-03-04 01:49:32 -08:00
parent dadf461c0f
commit b977748a4b
17 changed files with 353 additions and 152 deletions

View File

@ -86,6 +86,9 @@ type nomadFSM struct {
// new state store). Everything internal here is synchronized by the
// Raft side, so doesn't need to lock this.
stateLock sync.RWMutex
// Reference to the server the FSM is running on
server *Server
}
// nomadSnapshot is used to provide a snapshot of the current
@ -121,7 +124,7 @@ type FSMConfig struct {
}
// NewFSMPath is used to construct a new FSM with a blank state
func NewFSM(config *FSMConfig) (*nomadFSM, error) {
func NewFSM(config *FSMConfig, server *Server) (*nomadFSM, error) {
// Create a state store
sconfig := &state.StateStoreConfig{
Logger: config.Logger,
@ -142,6 +145,7 @@ func NewFSM(config *FSMConfig) (*nomadFSM, error) {
timetable: NewTimeTable(timeTableGranularity, timeTableLimit),
enterpriseAppliers: make(map[structs.MessageType]LogApplier, 8),
enterpriseRestorers: make(map[SnapshotType]SnapshotRestorer, 8),
server: server,
}
// Register all the log applier functions
@ -1423,7 +1427,8 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error {
}
snap.UpsertEvals(100, []*structs.Evaluation{eval})
// Create the scheduler and run it
sched, err := scheduler.NewScheduler(eval.Type, n.logger, snap, planner)
allowPlanOptimization := ServersMeetMinimumVersion(n.server.Members(), MinVersionPlanDenormalization, true)
sched, err := scheduler.NewScheduler(eval.Type, n.logger, snap, planner, allowPlanOptimization)
if err != nil {
return err
}

View File

@ -1216,7 +1216,8 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse)
}
// Create the scheduler and run it
sched, err := scheduler.NewScheduler(eval.Type, j.logger, snap, planner)
allowPlanOptimization := ServersMeetMinimumVersion(j.srv.Members(), MinVersionPlanDenormalization, true)
sched, err := scheduler.NewScheduler(eval.Type, j.logger, snap, planner, allowPlanOptimization)
if err != nil {
return err
}

View File

@ -1243,7 +1243,7 @@ func (s *Server) getOrCreateAutopilotConfig() *structs.AutopilotConfig {
return config
}
if !ServersMeetMinimumVersion(s.Members(), minAutopilotVersion) {
if !ServersMeetMinimumVersion(s.Members(), minAutopilotVersion, false) {
s.logger.Named("autopilot").Warn("can't initialize until all servers are above minimum version", "min_version", minAutopilotVersion)
return nil
}
@ -1270,7 +1270,7 @@ func (s *Server) getOrCreateSchedulerConfig() *structs.SchedulerConfiguration {
if config != nil {
return config
}
if !ServersMeetMinimumVersion(s.Members(), minSchedulerConfigVersion) {
if !ServersMeetMinimumVersion(s.Members(), minSchedulerConfigVersion, false) {
s.logger.Named("core").Warn("can't initialize scheduler config until all servers are above minimum version", "min_version", minSchedulerConfigVersion)
return nil
}

View File

@ -237,7 +237,7 @@ func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRe
}
// All servers should be at or above 0.8.0 to apply this operatation
if !ServersMeetMinimumVersion(op.srv.Members(), minAutopilotVersion) {
if !ServersMeetMinimumVersion(op.srv.Members(), minAutopilotVersion, false) {
return fmt.Errorf("All servers should be running version %v to update autopilot config", minAutopilotVersion)
}
@ -305,7 +305,7 @@ func (op *Operator) SchedulerSetConfiguration(args *structs.SchedulerSetConfigRe
}
// All servers should be at or above 0.9.0 to apply this operatation
if !ServersMeetMinimumVersion(op.srv.Members(), minSchedulerConfigVersion) {
if !ServersMeetMinimumVersion(op.srv.Members(), minSchedulerConfigVersion, false) {
return fmt.Errorf("All servers should be running version %v to update scheduler config", minSchedulerConfigVersion)
}
// Apply the update

View File

@ -15,7 +15,7 @@ import (
"github.com/hashicorp/raft"
)
// planner is used to mange the submitted allocation plans that are waiting
// planner is used to manage the submitted allocation plans that are waiting
// to be accessed by the leader
type planner struct {
*Server
@ -149,52 +149,90 @@ func (p *planner) planApply() {
// applyPlan is used to apply the plan result and to return the alloc index
func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap *state.StateSnapshot) (raft.ApplyFuture, error) {
// Determine the minimum number of updates, could be more if there
// are multiple updates per node
minUpdates := len(result.NodeUpdate)
minUpdates += len(result.NodeAllocation)
// Setup the update request
req := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Job: plan.Job,
Alloc: make([]*structs.Allocation, 0, minUpdates),
Job: plan.Job,
},
Deployment: result.Deployment,
DeploymentUpdates: result.DeploymentUpdates,
EvalID: plan.EvalID,
NodePreemptions: make([]*structs.Allocation, 0, len(result.NodePreemptions)),
}
for _, updateList := range result.NodeUpdate {
req.Alloc = append(req.Alloc, updateList...)
}
for _, allocList := range result.NodeAllocation {
req.Alloc = append(req.Alloc, allocList...)
}
for _, preemptions := range result.NodePreemptions {
req.NodePreemptions = append(req.NodePreemptions, preemptions...)
}
// Set the time the alloc was applied for the first time. This can be used
// to approximate the scheduling time.
now := time.Now().UTC().UnixNano()
for _, alloc := range req.Alloc {
if alloc.CreateTime == 0 {
alloc.CreateTime = now
}
alloc.ModifyTime = now
}
// Set modify time for preempted allocs if any
// Also gather jobids to create follow up evals
preemptedJobIDs := make(map[structs.NamespacedID]struct{})
for _, alloc := range req.NodePreemptions {
alloc.ModifyTime = now
id := structs.NamespacedID{Namespace: alloc.Namespace, ID: alloc.JobID}
_, ok := preemptedJobIDs[id]
if !ok {
preemptedJobIDs[id] = struct{}{}
now := time.Now().UTC().UnixNano()
if ServersMeetMinimumVersion(p.Members(), MinVersionPlanDenormalization, true) {
// Initialize the allocs request using the new optimized log entry format.
// Determine the minimum number of updates, could be more if there
// are multiple updates per node
req.AllocsStopped = make([]*structs.Allocation, 0, len(result.NodeUpdate))
req.AllocsUpdated = make([]*structs.Allocation, 0, len(result.NodeAllocation))
for _, updateList := range result.NodeUpdate {
for _, stoppedAlloc := range updateList {
req.AllocsStopped = append(req.AllocsStopped, &structs.Allocation{
ID: stoppedAlloc.ID,
DesiredDescription: stoppedAlloc.DesiredDescription,
ClientStatus: stoppedAlloc.ClientStatus,
ModifyTime: now,
})
}
}
for _, allocList := range result.NodeAllocation {
req.AllocsUpdated = append(req.AllocsUpdated, allocList...)
}
// Set the time the alloc was applied for the first time. This can be used
// to approximate the scheduling time.
updateAllocTimestamps(req.AllocsUpdated, now)
for _, preemptions := range result.NodePreemptions {
for _, preemptedAlloc := range preemptions {
req.NodePreemptions = append(req.NodePreemptions, &structs.Allocation{
ID: preemptedAlloc.ID,
PreemptedByAllocation: preemptedAlloc.PreemptedByAllocation,
ModifyTime: now,
})
// Gather jobids to create follow up evals
appendNamespacedJobID(preemptedJobIDs, preemptedAlloc)
}
}
} else {
// Deprecated: This code path is deprecated and will only be used to support
// application of older log entries. Expected to be removed in a future version.
// Determine the minimum number of updates, could be more if there
// are multiple updates per node
minUpdates := len(result.NodeUpdate)
minUpdates += len(result.NodeAllocation)
// Initialize the allocs request using the older log entry format
req.Alloc = make([]*structs.Allocation, 0, minUpdates)
for _, updateList := range result.NodeUpdate {
req.Alloc = append(req.Alloc, updateList...)
}
for _, allocList := range result.NodeAllocation {
req.Alloc = append(req.Alloc, allocList...)
}
for _, preemptions := range result.NodePreemptions {
req.NodePreemptions = append(req.NodePreemptions, preemptions...)
}
// Set the time the alloc was applied for the first time. This can be used
// to approximate the scheduling time.
updateAllocTimestamps(req.Alloc, now)
// Set modify time for preempted allocs if any
// Also gather jobids to create follow up evals
for _, alloc := range req.NodePreemptions {
alloc.ModifyTime = now
appendNamespacedJobID(preemptedJobIDs, alloc)
}
}
@ -232,6 +270,22 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap
return future, nil
}
func appendNamespacedJobID(jobIDs map[structs.NamespacedID]struct{}, alloc *structs.Allocation) {
id := structs.NamespacedID{Namespace: alloc.Namespace, ID: alloc.JobID}
if _, ok := jobIDs[id]; !ok {
jobIDs[id] = struct{}{}
}
}
func updateAllocTimestamps(allocations []*structs.Allocation, timestamp int64) {
for _, alloc := range allocations {
if alloc.CreateTime == 0 {
alloc.CreateTime = timestamp
}
alloc.ModifyTime = timestamp
}
}
// asyncPlanWait is used to apply and respond to a plan async
func (p *planner) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture,
result *structs.PlanResult, pending *pendingPlan) {
@ -264,6 +318,15 @@ func (p *planner) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture,
func evaluatePlan(pool *EvaluatePool, snap *state.StateSnapshot, plan *structs.Plan, logger log.Logger) (*structs.PlanResult, error) {
defer metrics.MeasureSince([]string{"nomad", "plan", "evaluate"}, time.Now())
err := snap.DenormalizeAllocationsMap(plan.NodeUpdate, plan.Job)
if err != nil {
return nil, err
}
err = snap.DenormalizeAllocationsMap(plan.NodePreemptions, plan.Job)
if err != nil {
return nil, err
}
// Check if the plan exceeds quota
overQuota, err := evaluatePlanQuota(snap, plan)
if err != nil {
@ -521,15 +584,11 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri
// Remove any preempted allocs
if preempted := plan.NodePreemptions[nodeID]; len(preempted) > 0 {
for _, allocs := range preempted {
remove = append(remove, allocs)
}
remove = append(remove, preempted...)
}
if updated := plan.NodeAllocation[nodeID]; len(updated) > 0 {
for _, alloc := range updated {
remove = append(remove, alloc)
}
remove = append(remove, updated...)
}
proposed := structs.RemoveAllocs(existingAlloc, remove)
proposed = append(proposed, plan.NodeAllocation[nodeID]...)

View File

@ -1078,7 +1078,7 @@ func (s *Server) setupRaft() error {
Region: s.Region(),
}
var err error
s.fsm, err = NewFSM(fsmConfig)
s.fsm, err = NewFSM(fsmConfig, s)
if err != nil {
return err
}
@ -1173,7 +1173,7 @@ func (s *Server) setupRaft() error {
if err != nil {
return fmt.Errorf("recovery failed to parse peers.json: %v", err)
}
tmpFsm, err := NewFSM(fsmConfig)
tmpFsm, err := NewFSM(fsmConfig, s)
if err != nil {
return fmt.Errorf("recovery failed to make temp FSM: %v", err)
}

View File

@ -170,6 +170,21 @@ RUN_QUERY:
// UpsertPlanResults is used to upsert the results of a plan.
func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanResultsRequest) error {
snapshot, err := s.Snapshot()
if err != nil {
return err
}
err = snapshot.DenormalizeAllocationsSlice(results.AllocsStopped, results.Job)
if err != nil {
return err
}
err = snapshot.DenormalizeAllocationsSlice(results.NodePreemptions, results.Job)
if err != nil {
return err
}
txn := s.db.Txn(true)
defer txn.Abort()
@ -185,34 +200,6 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR
s.upsertDeploymentUpdates(index, results.DeploymentUpdates, txn)
}
// Attach the job to all the allocations. It is pulled out in the payload to
// avoid the redundancy of encoding, but should be denormalized prior to
// being inserted into MemDB.
structs.DenormalizeAllocationJobs(results.Job, results.Alloc)
// COMPAT(0.11): Remove in 0.11
// Calculate the total resources of allocations. It is pulled out in the
// payload to avoid encoding something that can be computed, but should be
// denormalized prior to being inserted into MemDB.
for _, alloc := range results.Alloc {
if alloc.Resources != nil {
continue
}
alloc.Resources = new(structs.Resources)
for _, task := range alloc.TaskResources {
alloc.Resources.Add(task)
}
// Add the shared resources
alloc.Resources.Add(alloc.SharedResources)
}
// Upsert the allocations
if err := s.upsertAllocsImpl(index, results.Alloc, txn); err != nil {
return err
}
// COMPAT: Nomad versions before 0.7.1 did not include the eval ID when
// applying the plan. Thus while we are upgrading, we ignore updating the
// modify index of evaluations from older plans.
@ -223,35 +210,33 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR
}
}
// Prepare preempted allocs in the plan results for update
var preemptedAllocs []*structs.Allocation
for _, preemptedAlloc := range results.NodePreemptions {
// Look for existing alloc
existing, err := txn.First("allocs", "id", preemptedAlloc.ID)
if err != nil {
return fmt.Errorf("alloc lookup failed: %v", err)
}
// Nothing to do if this does not exist
if existing == nil {
continue
}
exist := existing.(*structs.Allocation)
// Copy everything from the existing allocation
copyAlloc := exist.Copy()
// Only update the fields set by the scheduler
copyAlloc.DesiredStatus = preemptedAlloc.DesiredStatus
copyAlloc.PreemptedByAllocation = preemptedAlloc.PreemptedByAllocation
copyAlloc.DesiredDescription = preemptedAlloc.DesiredDescription
copyAlloc.ModifyTime = preemptedAlloc.ModifyTime
preemptedAllocs = append(preemptedAllocs, copyAlloc)
noOfAllocs := len(results.NodePreemptions)
if len(results.Alloc) > 0 {
// COMPAT 0.11: This branch will be removed, when Alloc is removed
// Attach the job to all the allocations. It is pulled out in the payload to
// avoid the redundancy of encoding, but should be denormalized prior to
// being inserted into MemDB.
addComputedAllocAttrs(results.Alloc, results.Job)
noOfAllocs += len(results.Alloc)
} else {
// Attach the job to all the allocations. It is pulled out in the payload to
// avoid the redundancy of encoding, but should be denormalized prior to
// being inserted into MemDB.
addComputedAllocAttrs(results.AllocsUpdated, results.Job)
noOfAllocs += len(results.AllocsStopped) + len(results.AllocsUpdated)
}
// Upsert the preempted allocations
if err := s.upsertAllocsImpl(index, preemptedAllocs, txn); err != nil {
allocsToUpsert := make([]*structs.Allocation, 0, noOfAllocs)
// COMPAT 0.11: This append should be removed when Alloc is removed
allocsToUpsert = append(allocsToUpsert, results.Alloc...)
allocsToUpsert = append(allocsToUpsert, results.AllocsStopped...)
allocsToUpsert = append(allocsToUpsert, results.AllocsUpdated...)
allocsToUpsert = append(allocsToUpsert, results.NodePreemptions...)
if err := s.upsertAllocsImpl(index, allocsToUpsert, txn); err != nil {
return err
}
@ -266,6 +251,28 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR
return nil
}
func addComputedAllocAttrs(allocs []*structs.Allocation, job *structs.Job) {
structs.DenormalizeAllocationJobs(job, allocs)
// COMPAT(0.11): Remove in 0.11
// Calculate the total resources of allocations. It is pulled out in the
// payload to avoid encoding something that can be computed, but should be
// denormalized prior to being inserted into MemDB.
for _, alloc := range allocs {
if alloc.Resources != nil {
continue
}
alloc.Resources = new(structs.Resources)
for _, task := range alloc.TaskResources {
alloc.Resources.Add(task)
}
// Add the shared resources
alloc.Resources.Add(alloc.SharedResources)
}
}
// upsertDeploymentUpdates updates the deployments given the passed status
// updates.
func (s *StateStore) upsertDeploymentUpdates(index uint64, updates []*structs.DeploymentStatusUpdate, txn *memdb.Txn) error {
@ -4100,6 +4107,67 @@ type StateSnapshot struct {
StateStore
}
// DenormalizeAllocationsMap takes in a map of nodes to allocations, and queries the
// Allocation for each of the Allocation diffs and merges the updated attributes with
// the existing Allocation, and attaches the Job provided
func (s *StateSnapshot) DenormalizeAllocationsMap(nodeAllocations map[string][]*structs.Allocation, job *structs.Job) error {
for _, allocDiffs := range nodeAllocations {
if err := s.DenormalizeAllocationsSlice(allocDiffs, job); err != nil {
return err
}
}
return nil
}
// DenormalizeAllocationsSlice queries the Allocation for each of the Allocation diffs and merges
// the updated attributes with the existing Allocation, and attaches the Job provided
func (s *StateSnapshot) DenormalizeAllocationsSlice(allocDiffs []*structs.Allocation, job *structs.Job) error {
// Output index for denormalized Allocations
j := 0
for _, allocDiff := range allocDiffs {
alloc, err := s.AllocByID(nil, allocDiff.ID)
if err != nil {
return fmt.Errorf("alloc lookup failed: %v", err)
}
if alloc == nil {
continue
}
// Merge the updates to the Allocation
allocCopy := alloc.CopySkipJob()
allocCopy.Job = job
if allocDiff.PreemptedByAllocation != "" {
// If alloc is a preemption
allocCopy.PreemptedByAllocation = allocDiff.PreemptedByAllocation
allocCopy.DesiredDescription = getPreemptedAllocDesiredDescription(allocDiff.PreemptedByAllocation)
allocCopy.DesiredStatus = structs.AllocDesiredStatusEvict
} else {
// If alloc is a stopped alloc
allocCopy.DesiredDescription = allocDiff.DesiredDescription
allocCopy.DesiredStatus = structs.AllocDesiredStatusStop
if allocDiff.ClientStatus != "" {
allocCopy.ClientStatus = allocDiff.ClientStatus
}
}
if allocDiff.ModifyTime != 0 {
allocCopy.ModifyTime = allocDiff.ModifyTime
}
// Update the allocDiff in the slice to equal the denormalized alloc
allocDiffs[j] = allocCopy
j++
}
// Retain only the denormalized Allocations in the slice
allocDiffs = allocDiffs[:j]
return nil
}
func getPreemptedAllocDesiredDescription(PreemptedByAllocID string) string {
return fmt.Sprintf("Preempted by alloc ID %v", PreemptedByAllocID)
}
// StateRestore is used to optimize the performance when
// restoring state by only using a single large transaction
// instead of thousands of sub transactions

View File

@ -28,8 +28,8 @@ import (
"github.com/gorhill/cronexpr"
"github.com/hashicorp/consul/api"
hcodec "github.com/hashicorp/go-msgpack/codec"
multierror "github.com/hashicorp/go-multierror"
version "github.com/hashicorp/go-version"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/args"
@ -660,9 +660,9 @@ type ApplyPlanResultsRequest struct {
// the evaluation itself being updated.
EvalID string
// NodePreemptions is a slice of allocations from other lower priority jobs
// NodePreemptions is a slice of allocation diffs from other lower priority jobs
// that are preempted. Preempted allocations are marked as evicted.
NodePreemptions []*Allocation
NodePreemptions []*AllocationDiff
// PreemptionEvals is a slice of follow up evals for jobs whose allocations
// have been preempted to place allocs in this plan
@ -674,8 +674,16 @@ type ApplyPlanResultsRequest struct {
// within a single transaction
type AllocUpdateRequest struct {
// Alloc is the list of new allocations to assign
// Deprecated: Replaced with two separate slices, one containing stopped allocations
// and another containing updated allocations
Alloc []*Allocation
// Allocations to stop. Contains only the diff, not the entire allocation
AllocsStopped []*AllocationDiff
// New or updated allocations
AllocsUpdated []*Allocation
// Evals is the list of new evaluations to create
// Evals are valid only when used in the Raft RPC
Evals []*Evaluation
@ -7168,6 +7176,9 @@ const (
// Allocation is used to allocate the placement of a task group to a node.
type Allocation struct {
// msgpack omit empty fields during serialization
_struct bool `codec:",omitempty"` // nolint: structcheck
// ID of the allocation (UUID)
ID string
@ -7282,6 +7293,10 @@ type Allocation struct {
ModifyTime int64
}
// AllocationDiff is a type alias for Allocation used to indicate that a diff is
// and not the entire allocation
type AllocationDiff = Allocation
// Index returns the index of the allocation. If the allocation is from a task
// group with count greater than 1, there will be multiple allocations for it.
func (a *Allocation) Index() uint {
@ -7296,11 +7311,12 @@ func (a *Allocation) Index() uint {
return uint(num)
}
// Copy provides a copy of the allocation and deep copies the job
func (a *Allocation) Copy() *Allocation {
return a.copyImpl(true)
}
// Copy provides a copy of the allocation but doesn't deep copy the job
// CopySkipJob provides a copy of the allocation but doesn't deep copy the job
func (a *Allocation) CopySkipJob() *Allocation {
return a.copyImpl(false)
}
@ -8037,6 +8053,9 @@ const (
// potentially taking action (allocation of work) or doing nothing if the state
// of the world does not require it.
type Evaluation struct {
// msgpack omit empty fields during serialization
_struct bool `codec:",omitempty"` // nolint: structcheck
// ID is a randomly generated UUID used for this evaluation. This
// is assigned upon the creation of the evaluation.
ID string
@ -8227,7 +8246,7 @@ func (e *Evaluation) ShouldBlock() bool {
// MakePlan is used to make a plan from the given evaluation
// for a given Job
func (e *Evaluation) MakePlan(j *Job) *Plan {
func (e *Evaluation) MakePlan(j *Job, allowPlanOptimization bool) *Plan {
p := &Plan{
EvalID: e.ID,
Priority: e.Priority,
@ -8235,6 +8254,7 @@ func (e *Evaluation) MakePlan(j *Job) *Plan {
NodeUpdate: make(map[string][]*Allocation),
NodeAllocation: make(map[string][]*Allocation),
NodePreemptions: make(map[string][]*Allocation),
NormalizeAllocs: allowPlanOptimization,
}
if j != nil {
p.AllAtOnce = j.AllAtOnce
@ -8304,6 +8324,9 @@ func (e *Evaluation) CreateFailedFollowUpEval(wait time.Duration) *Evaluation {
// are submitted to the leader which verifies that resources have
// not been overcommitted before admitting the plan.
type Plan struct {
// msgpack omit empty fields during serialization
_struct bool `codec:",omitempty"` // nolint: structcheck
// EvalID is the evaluation ID this plan is associated with
EvalID string
@ -8353,11 +8376,14 @@ type Plan struct {
// lower priority jobs that are preempted. Preempted allocations are marked
// as evicted.
NodePreemptions map[string][]*Allocation
// Indicates whether allocs are normalized in the Plan
NormalizeAllocs bool `codec:"-"`
}
// AppendUpdate marks the allocation for eviction. The clientStatus of the
// AppendStoppedAlloc marks an allocation to be stopped. The clientStatus of the
// allocation may be optionally set by passing in a non-empty value.
func (p *Plan) AppendUpdate(alloc *Allocation, desiredStatus, desiredDesc, clientStatus string) {
func (p *Plan) AppendStoppedAlloc(alloc *Allocation, desiredDesc, clientStatus string) {
newAlloc := new(Allocation)
*newAlloc = *alloc
@ -8373,7 +8399,7 @@ func (p *Plan) AppendUpdate(alloc *Allocation, desiredStatus, desiredDesc, clien
// Strip the resources as it can be rebuilt.
newAlloc.Resources = nil
newAlloc.DesiredStatus = desiredStatus
newAlloc.DesiredStatus = AllocDesiredStatusStop
newAlloc.DesiredDescription = desiredDesc
if clientStatus != "" {
@ -8387,12 +8413,12 @@ func (p *Plan) AppendUpdate(alloc *Allocation, desiredStatus, desiredDesc, clien
// AppendPreemptedAlloc is used to append an allocation that's being preempted to the plan.
// To minimize the size of the plan, this only sets a minimal set of fields in the allocation
func (p *Plan) AppendPreemptedAlloc(alloc *Allocation, desiredStatus, preemptingAllocID string) {
func (p *Plan) AppendPreemptedAlloc(alloc *Allocation, preemptingAllocID string) {
newAlloc := &Allocation{}
newAlloc.ID = alloc.ID
newAlloc.JobID = alloc.JobID
newAlloc.Namespace = alloc.Namespace
newAlloc.DesiredStatus = desiredStatus
newAlloc.DesiredStatus = AllocDesiredStatusEvict
newAlloc.PreemptedByAllocation = preemptingAllocID
desiredDesc := fmt.Sprintf("Preempted by alloc ID %v", preemptingAllocID)
@ -8445,6 +8471,29 @@ func (p *Plan) IsNoOp() bool {
len(p.DeploymentUpdates) == 0
}
func (p *Plan) NormalizeAllocations() {
if p.NormalizeAllocs {
for _, allocs := range p.NodeUpdate {
for i, alloc := range allocs {
allocs[i] = &Allocation{
ID: alloc.ID,
DesiredDescription: alloc.DesiredDescription,
ClientStatus: alloc.ClientStatus,
}
}
}
for _, allocs := range p.NodePreemptions {
for i, alloc := range allocs {
allocs[i] = &Allocation{
ID: alloc.ID,
PreemptedByAllocation: alloc.PreemptedByAllocation,
}
}
}
}
}
// PlanResult is the result of a plan submitted to the leader.
type PlanResult struct {
// NodeUpdate contains all the updates that were committed.

View File

@ -14,6 +14,11 @@ import (
"github.com/hashicorp/serf/serf"
)
// MinVersionPlanDenormalization is the minimum version to support the
// denormalization of Plan in SubmitPlan, and the raft log entry committed
// in ApplyPlanResultsRequest
var MinVersionPlanDenormalization = version.Must(version.NewVersion("0.9.1"))
// ensurePath is used to make sure a path exists
func ensurePath(path string, dir bool) error {
if !dir {
@ -145,9 +150,9 @@ func isNomadServer(m serf.Member) (bool, *serverParts) {
// ServersMeetMinimumVersion returns whether the given alive servers are at least on the
// given Nomad version
func ServersMeetMinimumVersion(members []serf.Member, minVersion *version.Version) bool {
func ServersMeetMinimumVersion(members []serf.Member, minVersion *version.Version, checkFailedServers bool) bool {
for _, member := range members {
if valid, parts := isNomadServer(member); valid && parts.Status == serf.StatusAlive {
if valid, parts := isNomadServer(member); valid && (parts.Status == serf.StatusAlive || (checkFailedServers && parts.Status == serf.StatusFailed)) {
// Check if the versions match - version.LessThan will return true for
// 0.8.0-rc1 < 0.8.0, so we want to ignore the metadata
versionsMatch := slicesMatch(minVersion.Segments(), parts.Build.Segments())

View File

@ -162,7 +162,7 @@ func TestServersMeetMinimumVersion(t *testing.T) {
}
for _, tc := range cases {
result := ServersMeetMinimumVersion(tc.members, tc.ver)
result := ServersMeetMinimumVersion(tc.members, tc.ver, false)
if result != tc.expected {
t.Fatalf("bad: %v, %v, %v", result, tc.ver.String(), tc)
}

View File

@ -284,7 +284,8 @@ func (w *Worker) invokeScheduler(eval *structs.Evaluation, token string) error {
if eval.Type == structs.JobTypeCore {
sched = NewCoreScheduler(w.srv, snap)
} else {
sched, err = scheduler.NewScheduler(eval.Type, w.logger, snap, w)
allowPlanOptimization := ServersMeetMinimumVersion(w.srv.Members(), MinVersionPlanDenormalization, true)
sched, err = scheduler.NewScheduler(eval.Type, w.logger, snap, w, allowPlanOptimization)
if err != nil {
return fmt.Errorf("failed to instantiate scheduler: %v", err)
}
@ -310,6 +311,9 @@ func (w *Worker) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, scheduler.
// Add the evaluation token to the plan
plan.EvalToken = w.evalToken
// Normalize stopped and preempted allocs before RPC
plan.NormalizeAllocations()
// Setup the request
req := structs.PlanRequest{
Plan: plan,

View File

@ -36,7 +36,7 @@ func (n *NoopScheduler) Process(eval *structs.Evaluation) error {
}
func init() {
scheduler.BuiltinSchedulers["noop"] = func(logger log.Logger, s scheduler.State, p scheduler.Planner) scheduler.Scheduler {
scheduler.BuiltinSchedulers["noop"] = func(logger log.Logger, s scheduler.State, p scheduler.Planner, allowPlanOptimization bool) scheduler.Scheduler {
n := &NoopScheduler{
state: s,
planner: p,

View File

@ -77,6 +77,10 @@ type GenericScheduler struct {
planner Planner
batch bool
// Temporary flag introduced till the code for sending/committing full allocs in the Plan can
// be safely removed
allowPlanOptimization bool
eval *structs.Evaluation
job *structs.Job
plan *structs.Plan
@ -94,23 +98,25 @@ type GenericScheduler struct {
}
// NewServiceScheduler is a factory function to instantiate a new service scheduler
func NewServiceScheduler(logger log.Logger, state State, planner Planner) Scheduler {
func NewServiceScheduler(logger log.Logger, state State, planner Planner, allowPlanOptimization bool) Scheduler {
s := &GenericScheduler{
logger: logger.Named("service_sched"),
state: state,
planner: planner,
batch: false,
logger: logger.Named("service_sched"),
state: state,
planner: planner,
batch: false,
allowPlanOptimization: allowPlanOptimization,
}
return s
}
// NewBatchScheduler is a factory function to instantiate a new batch scheduler
func NewBatchScheduler(logger log.Logger, state State, planner Planner) Scheduler {
func NewBatchScheduler(logger log.Logger, state State, planner Planner, allowPlanOptimization bool) Scheduler {
s := &GenericScheduler{
logger: logger.Named("batch_sched"),
state: state,
planner: planner,
batch: true,
logger: logger.Named("batch_sched"),
state: state,
planner: planner,
batch: true,
allowPlanOptimization: allowPlanOptimization,
}
return s
}
@ -224,7 +230,7 @@ func (s *GenericScheduler) process() (bool, error) {
s.followUpEvals = nil
// Create a plan
s.plan = s.eval.MakePlan(s.job)
s.plan = s.eval.MakePlan(s.job, s.allowPlanOptimization)
if !s.batch {
// Get any existing deployment
@ -366,7 +372,7 @@ func (s *GenericScheduler) computeJobAllocs() error {
// Handle the stop
for _, stop := range results.stop {
s.plan.AppendUpdate(stop.alloc, structs.AllocDesiredStatusStop, stop.statusDescription, stop.clientStatus)
s.plan.AppendStoppedAlloc(stop.alloc, stop.statusDescription, stop.clientStatus)
}
// Handle the in-place updates
@ -464,7 +470,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
stopPrevAlloc, stopPrevAllocDesc := missing.StopPreviousAlloc()
prevAllocation := missing.PreviousAllocation()
if stopPrevAlloc {
s.plan.AppendUpdate(prevAllocation, structs.AllocDesiredStatusStop, stopPrevAllocDesc, "")
s.plan.AppendStoppedAlloc(prevAllocation, stopPrevAllocDesc, "")
}
// Compute penalty nodes for rescheduled allocs

View File

@ -28,7 +28,7 @@ var BuiltinSchedulers = map[string]Factory{
// NewScheduler is used to instantiate and return a new scheduler
// given the scheduler name, initial state, and planner.
func NewScheduler(name string, logger log.Logger, state State, planner Planner) (Scheduler, error) {
func NewScheduler(name string, logger log.Logger, state State, planner Planner, allowPlanOptimization bool) (Scheduler, error) {
// Lookup the factory function
factory, ok := BuiltinSchedulers[name]
if !ok {
@ -36,12 +36,12 @@ func NewScheduler(name string, logger log.Logger, state State, planner Planner)
}
// Instantiate the scheduler
sched := factory(logger, state, planner)
sched := factory(logger, state, planner, allowPlanOptimization)
return sched, nil
}
// Factory is used to instantiate a new Scheduler
type Factory func(log.Logger, State, Planner) Scheduler
type Factory func(log.Logger, State, Planner, bool) Scheduler
// Scheduler is the top level instance for a scheduler. A scheduler is
// meant to only encapsulate business logic, pushing the various plumbing

View File

@ -23,6 +23,10 @@ type SystemScheduler struct {
state State
planner Planner
// Temporary flag introduced till the code for sending/committing full allocs in the Plan can
// be safely removed
allowPlanOptimization bool
eval *structs.Evaluation
job *structs.Job
plan *structs.Plan
@ -41,11 +45,12 @@ type SystemScheduler struct {
// NewSystemScheduler is a factory function to instantiate a new system
// scheduler.
func NewSystemScheduler(logger log.Logger, state State, planner Planner) Scheduler {
func NewSystemScheduler(logger log.Logger, state State, planner Planner, allowPlanOptimization bool) Scheduler {
return &SystemScheduler{
logger: logger.Named("system_sched"),
state: state,
planner: planner,
logger: logger.Named("system_sched"),
state: state,
planner: planner,
allowPlanOptimization: allowPlanOptimization,
}
}
@ -110,7 +115,7 @@ func (s *SystemScheduler) process() (bool, error) {
}
// Create a plan
s.plan = s.eval.MakePlan(s.job)
s.plan = s.eval.MakePlan(s.job, s.allowPlanOptimization)
// Reset the failed allocations
s.failedTGAllocs = nil
@ -210,18 +215,18 @@ func (s *SystemScheduler) computeJobAllocs() error {
// Add all the allocs to stop
for _, e := range diff.stop {
s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNotNeeded, "")
s.plan.AppendStoppedAlloc(e.Alloc, allocNotNeeded, "")
}
// Add all the allocs to migrate
for _, e := range diff.migrate {
s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNodeTainted, "")
s.plan.AppendStoppedAlloc(e.Alloc, allocNodeTainted, "")
}
// Lost allocations should be transitioned to desired status stop and client
// status lost.
for _, e := range diff.lost {
s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocLost, structs.AllocClientStatusLost)
s.plan.AppendStoppedAlloc(e.Alloc, allocLost, structs.AllocClientStatusLost)
}
// Attempt to do the upgrades in place
@ -351,7 +356,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
if option.PreemptedAllocs != nil {
var preemptedAllocIDs []string
for _, stop := range option.PreemptedAllocs {
s.plan.AppendPreemptedAlloc(stop, structs.AllocDesiredStatusEvict, alloc.ID)
s.plan.AppendPreemptedAlloc(stop, alloc.ID)
preemptedAllocIDs = append(preemptedAllocIDs, stop.ID)
if s.eval.AnnotatePlan && s.plan.Annotations != nil {

View File

@ -216,7 +216,7 @@ func (h *Harness) Snapshot() State {
// a snapshot of current state using the harness for planning.
func (h *Harness) Scheduler(factory Factory) Scheduler {
logger := testlog.HCLogger(h.t)
return factory(logger, h.Snapshot(), h)
return factory(logger, h.Snapshot(), h, false)
}
// Process is used to process an evaluation given a factory

View File

@ -507,8 +507,7 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job,
// the current allocation is discounted when checking for feasibility.
// Otherwise we would be trying to fit the tasks current resources and
// updated resources. After select is called we can remove the evict.
ctx.Plan().AppendUpdate(update.Alloc, structs.AllocDesiredStatusStop,
allocInPlace, "")
ctx.Plan().AppendStoppedAlloc(update.Alloc, allocInPlace, "")
// Attempt to match the task group
option := stack.Select(update.TaskGroup, nil) // This select only looks at one node so we don't pass selectOptions
@ -573,7 +572,7 @@ func evictAndPlace(ctx Context, diff *diffResult, allocs []allocTuple, desc stri
n := len(allocs)
for i := 0; i < n && i < *limit; i++ {
a := allocs[i]
ctx.Plan().AppendUpdate(a.Alloc, structs.AllocDesiredStatusStop, desc, "")
ctx.Plan().AppendStoppedAlloc(a.Alloc, desc, "")
diff.place = append(diff.place, a)
}
if n <= *limit {
@ -734,7 +733,7 @@ func updateNonTerminalAllocsToLost(plan *structs.Plan, tainted map[string]*struc
if alloc.DesiredStatus == structs.AllocDesiredStatusStop &&
(alloc.ClientStatus == structs.AllocClientStatusRunning ||
alloc.ClientStatus == structs.AllocClientStatusPending) {
plan.AppendUpdate(alloc, structs.AllocDesiredStatusStop, allocLost, structs.AllocClientStatusLost)
plan.AppendStoppedAlloc(alloc, allocLost, structs.AllocClientStatusLost)
}
}
}
@ -784,7 +783,7 @@ func genericAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateTy
// the current allocation is discounted when checking for feasibility.
// Otherwise we would be trying to fit the tasks current resources and
// updated resources. After select is called we can remove the evict.
ctx.Plan().AppendUpdate(existing, structs.AllocDesiredStatusStop, allocInPlace, "")
ctx.Plan().AppendStoppedAlloc(existing, allocInPlace, "")
// Attempt to match the task group
option := stack.Select(newTG, nil) // This select only looks at one node so we don't pass selectOptions