Respond to review comments

This commit is contained in:
Alex Dadgar 2017-04-19 10:54:03 -07:00
parent 1769fe468a
commit 5a2449d236
5 changed files with 21 additions and 29 deletions

View file

@ -4,7 +4,6 @@ import (
"fmt" "fmt"
"io" "io"
"log" "log"
"sort"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
@ -469,7 +468,7 @@ func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *memd
} }
if _, err = txn.DeleteAll("job_versions", "id", job.ID, job.Version); err != nil { if _, err = txn.DeleteAll("job_versions", "id", job.ID, job.Version); err != nil {
return fmt.Errorf("deleing job versions failed: %v", err) return fmt.Errorf("deleting job versions failed: %v", err)
} }
} }
@ -595,10 +594,10 @@ func (s *StateStore) jobVersionByID(txn *memdb.Txn, ws *memdb.WatchSet, id strin
all = append(all, j) all = append(all, j)
} }
// Sort with highest versions first // Reverse so that highest versions first
sort.Slice(all, func(i, j int) bool { for i, j := 0, len(all)-1; i < j; i, j = i+1, j-1 {
return all[i].Version >= all[j].Version all[i], all[j] = all[j], all[i]
}) }
return all, nil return all, nil
} }

View file

@ -1388,6 +1388,11 @@ func (j *Job) CombinedTaskMeta(groupName, taskName string) map[string]string {
return meta return meta
} }
// Stopped returns if a job is stopped.
func (j *Job) Stopped() bool {
return j == nil || j.Stop
}
// Stub is used to return a summary of the job // Stub is used to return a summary of the job
func (j *Job) Stub(summary *JobSummary) *JobListStub { func (j *Job) Stub(summary *JobSummary) *JobListStub {
return &JobListStub{ return &JobListStub{

View file

@ -179,12 +179,6 @@ func (s *GenericScheduler) createBlockedEval(planFailure bool) error {
return s.planner.CreateEval(s.blocked) return s.planner.CreateEval(s.blocked)
} }
// isStoppedJob returns if the scheduling is for a stopped job and the scheduler
// should stop all its allocations.
func (s *GenericScheduler) isStoppedJob() bool {
return s.job == nil || s.job.Stop
}
// process is wrapped in retryMax to iteratively run the handler until we have no // process is wrapped in retryMax to iteratively run the handler until we have no
// further work or we've made the maximum number of attempts. // further work or we've made the maximum number of attempts.
func (s *GenericScheduler) process() (bool, error) { func (s *GenericScheduler) process() (bool, error) {
@ -197,7 +191,7 @@ func (s *GenericScheduler) process() (bool, error) {
s.eval.JobID, err) s.eval.JobID, err)
} }
numTaskGroups := 0 numTaskGroups := 0
if !s.isStoppedJob() { if !s.job.Stopped() {
numTaskGroups = len(s.job.TaskGroups) numTaskGroups = len(s.job.TaskGroups)
} }
s.queuedAllocs = make(map[string]int, numTaskGroups) s.queuedAllocs = make(map[string]int, numTaskGroups)
@ -213,7 +207,7 @@ func (s *GenericScheduler) process() (bool, error) {
// Construct the placement stack // Construct the placement stack
s.stack = NewGenericStack(s.batch, s.ctx) s.stack = NewGenericStack(s.batch, s.ctx)
if !s.isStoppedJob() { if !s.job.Stopped() {
s.stack.SetJob(s.job) s.stack.SetJob(s.job)
} }
@ -357,7 +351,7 @@ func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) ([
func (s *GenericScheduler) computeJobAllocs() error { func (s *GenericScheduler) computeJobAllocs() error {
// Materialize all the task groups, job could be missing if deregistered // Materialize all the task groups, job could be missing if deregistered
var groups map[string]*structs.TaskGroup var groups map[string]*structs.TaskGroup
if !s.isStoppedJob() { if !s.job.Stopped() {
groups = materializeTaskGroups(s.job) groups = materializeTaskGroups(s.job)
} }
@ -404,7 +398,7 @@ func (s *GenericScheduler) computeJobAllocs() error {
// Check if a rolling upgrade strategy is being used // Check if a rolling upgrade strategy is being used
limit := len(diff.update) + len(diff.migrate) + len(diff.lost) limit := len(diff.update) + len(diff.migrate) + len(diff.lost)
if !s.isStoppedJob() && s.job.Update.Rolling() { if !s.job.Stopped() && s.job.Update.Rolling() {
limit = s.job.Update.MaxParallel limit = s.job.Update.MaxParallel
} }
@ -420,7 +414,7 @@ func (s *GenericScheduler) computeJobAllocs() error {
// Nothing remaining to do if placement is not required // Nothing remaining to do if placement is not required
if len(diff.place) == 0 { if len(diff.place) == 0 {
if !s.isStoppedJob() { if !s.job.Stopped() {
for _, tg := range s.job.TaskGroups { for _, tg := range s.job.TaskGroups {
s.queuedAllocs[tg.Name] = 0 s.queuedAllocs[tg.Name] = 0
} }

View file

@ -83,12 +83,6 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error {
s.queuedAllocs) s.queuedAllocs)
} }
// isStoppedJob returns if the scheduling is for a stopped job and the scheduler
// should stop all its allocations.
func (s *SystemScheduler) isStoppedJob() bool {
return s.job == nil || s.job.Stop
}
// process is wrapped in retryMax to iteratively run the handler until we have no // process is wrapped in retryMax to iteratively run the handler until we have no
// further work or we've made the maximum number of attempts. // further work or we've made the maximum number of attempts.
func (s *SystemScheduler) process() (bool, error) { func (s *SystemScheduler) process() (bool, error) {
@ -101,13 +95,13 @@ func (s *SystemScheduler) process() (bool, error) {
s.eval.JobID, err) s.eval.JobID, err)
} }
numTaskGroups := 0 numTaskGroups := 0
if !s.isStoppedJob() { if !s.job.Stopped() {
numTaskGroups = len(s.job.TaskGroups) numTaskGroups = len(s.job.TaskGroups)
} }
s.queuedAllocs = make(map[string]int, numTaskGroups) s.queuedAllocs = make(map[string]int, numTaskGroups)
// Get the ready nodes in the required datacenters // Get the ready nodes in the required datacenters
if !s.isStoppedJob() { if !s.job.Stopped() {
s.nodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters) s.nodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters)
if err != nil { if err != nil {
return false, fmt.Errorf("failed to get ready nodes: %v", err) return false, fmt.Errorf("failed to get ready nodes: %v", err)
@ -125,7 +119,7 @@ func (s *SystemScheduler) process() (bool, error) {
// Construct the placement stack // Construct the placement stack
s.stack = NewSystemStack(s.ctx) s.stack = NewSystemStack(s.ctx)
if !s.isStoppedJob() { if !s.job.Stopped() {
s.stack.SetJob(s.job) s.stack.SetJob(s.job)
} }
@ -234,7 +228,7 @@ func (s *SystemScheduler) computeJobAllocs() error {
// Check if a rolling upgrade strategy is being used // Check if a rolling upgrade strategy is being used
limit := len(diff.update) limit := len(diff.update)
if !s.isStoppedJob() && s.job.Update.Rolling() { if !s.job.Stopped() && s.job.Update.Rolling() {
limit = s.job.Update.MaxParallel limit = s.job.Update.MaxParallel
} }
@ -243,7 +237,7 @@ func (s *SystemScheduler) computeJobAllocs() error {
// Nothing remaining to do if placement is not required // Nothing remaining to do if placement is not required
if len(diff.place) == 0 { if len(diff.place) == 0 {
if !s.isStoppedJob() { if !s.job.Stopped() {
for _, tg := range s.job.TaskGroups { for _, tg := range s.job.TaskGroups {
s.queuedAllocs[tg.Name] = 0 s.queuedAllocs[tg.Name] = 0
} }

View file

@ -21,7 +21,7 @@ type allocTuple struct {
// a job requires. This is used to do the count expansion. // a job requires. This is used to do the count expansion.
func materializeTaskGroups(job *structs.Job) map[string]*structs.TaskGroup { func materializeTaskGroups(job *structs.Job) map[string]*structs.TaskGroup {
out := make(map[string]*structs.TaskGroup) out := make(map[string]*structs.TaskGroup)
if job == nil || job.Stop { if job.Stopped() {
return out return out
} }