Merge pull request #4720 from hashicorp/b-jet-fixes
Series of scheduler fixes / debugging enhancements
This commit is contained in:
commit
9b793531d6
|
@ -226,7 +226,6 @@ func (w *deploymentWatcher) SetAllocHealth(
|
||||||
if j != nil {
|
if j != nil {
|
||||||
resp.RevertedJobVersion = helper.Uint64ToPtr(j.Version)
|
resp.RevertedJobVersion = helper.Uint64ToPtr(j.Version)
|
||||||
}
|
}
|
||||||
w.setLatestEval(index)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -265,7 +264,6 @@ func (w *deploymentWatcher) PromoteDeployment(
|
||||||
resp.EvalCreateIndex = index
|
resp.EvalCreateIndex = index
|
||||||
resp.DeploymentModifyIndex = index
|
resp.DeploymentModifyIndex = index
|
||||||
resp.Index = index
|
resp.Index = index
|
||||||
w.setLatestEval(index)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -297,7 +295,6 @@ func (w *deploymentWatcher) PauseDeployment(
|
||||||
}
|
}
|
||||||
resp.DeploymentModifyIndex = i
|
resp.DeploymentModifyIndex = i
|
||||||
resp.Index = i
|
resp.Index = i
|
||||||
w.setLatestEval(i)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -347,7 +344,6 @@ func (w *deploymentWatcher) FailDeployment(
|
||||||
if rollbackJob != nil {
|
if rollbackJob != nil {
|
||||||
resp.RevertedJobVersion = helper.Uint64ToPtr(rollbackJob.Version)
|
resp.RevertedJobVersion = helper.Uint64ToPtr(rollbackJob.Version)
|
||||||
}
|
}
|
||||||
w.setLatestEval(i)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -490,10 +486,8 @@ FAIL:
|
||||||
// Update the status of the deployment to failed and create an evaluation.
|
// Update the status of the deployment to failed and create an evaluation.
|
||||||
e := w.getEval()
|
e := w.getEval()
|
||||||
u := w.getDeploymentStatusUpdate(structs.DeploymentStatusFailed, desc)
|
u := w.getDeploymentStatusUpdate(structs.DeploymentStatusFailed, desc)
|
||||||
if index, err := w.upsertDeploymentStatusUpdate(u, e, j); err != nil {
|
if _, err := w.upsertDeploymentStatusUpdate(u, e, j); err != nil {
|
||||||
w.logger.Error("failed to update deployment status", "error", err)
|
w.logger.Error("failed to update deployment status", "error", err)
|
||||||
} else {
|
|
||||||
w.setLatestEval(index)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -512,7 +506,7 @@ func (w *deploymentWatcher) handleAllocUpdate(allocs []*structs.AllocListStub) (
|
||||||
var res allocUpdateResult
|
var res allocUpdateResult
|
||||||
|
|
||||||
// Get the latest evaluation index
|
// Get the latest evaluation index
|
||||||
latestEval, err := w.latestEvalIndex()
|
latestEval, blocked, err := w.jobEvalStatus()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == context.Canceled || w.ctx.Err() == context.Canceled {
|
if err == context.Canceled || w.ctx.Err() == context.Canceled {
|
||||||
return res, err
|
return res, err
|
||||||
|
@ -528,19 +522,20 @@ func (w *deploymentWatcher) handleAllocUpdate(allocs []*structs.AllocListStub) (
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Nothing to do for this allocation
|
|
||||||
if alloc.DeploymentStatus == nil || alloc.DeploymentStatus.ModifyIndex <= latestEval {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Determine if the update stanza for this group is progress based
|
// Determine if the update stanza for this group is progress based
|
||||||
progressBased := dstate.ProgressDeadline != 0
|
progressBased := dstate.ProgressDeadline != 0
|
||||||
|
|
||||||
// We need to create an eval so the job can progress.
|
// Check if the allocation has failed and we need to mark it for allow
|
||||||
if alloc.DeploymentStatus.IsHealthy() {
|
// replacements
|
||||||
res.createEval = true
|
if progressBased && alloc.DeploymentStatus.IsUnhealthy() &&
|
||||||
} else if progressBased && alloc.DeploymentStatus.IsUnhealthy() && deployment.Active() && !alloc.DesiredTransition.ShouldReschedule() {
|
deployment.Active() && !alloc.DesiredTransition.ShouldReschedule() {
|
||||||
res.allowReplacements = append(res.allowReplacements, alloc.ID)
|
res.allowReplacements = append(res.allowReplacements, alloc.ID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// We need to create an eval so the job can progress.
|
||||||
|
if !blocked && alloc.DeploymentStatus.IsHealthy() && alloc.DeploymentStatus.ModifyIndex > latestEval {
|
||||||
|
res.createEval = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the group is using a progress deadline, we don't have to do anything.
|
// If the group is using a progress deadline, we don't have to do anything.
|
||||||
|
@ -685,10 +680,8 @@ func (w *deploymentWatcher) createBatchedUpdate(allowReplacements []string, forI
|
||||||
w.l.Unlock()
|
w.l.Unlock()
|
||||||
|
|
||||||
// Create the eval
|
// Create the eval
|
||||||
if index, err := w.createUpdate(replacements, w.getEval()); err != nil {
|
if _, err := w.createUpdate(replacements, w.getEval()); err != nil {
|
||||||
w.logger.Error("failed to create evaluation for deployment", "deployment_id", w.deploymentID, "error", err)
|
w.logger.Error("failed to create evaluation for deployment", "deployment_id", w.deploymentID, "error", err)
|
||||||
} else {
|
|
||||||
w.setLatestEval(index)
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -764,71 +757,68 @@ func (w *deploymentWatcher) getAllocsImpl(ws memdb.WatchSet, state *state.StateS
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
maxIndex := uint64(0)
|
||||||
stubs := make([]*structs.AllocListStub, 0, len(allocs))
|
stubs := make([]*structs.AllocListStub, 0, len(allocs))
|
||||||
for _, alloc := range allocs {
|
for _, alloc := range allocs {
|
||||||
stubs = append(stubs, alloc.Stub())
|
stubs = append(stubs, alloc.Stub())
|
||||||
|
|
||||||
|
if maxIndex < alloc.ModifyIndex {
|
||||||
|
maxIndex = alloc.ModifyIndex
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use the last index that affected the jobs table
|
// Use the last index that affected the allocs table
|
||||||
|
if len(stubs) == 0 {
|
||||||
index, err := state.Index("allocs")
|
index, err := state.Index("allocs")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, index, err
|
return nil, index, err
|
||||||
}
|
}
|
||||||
|
maxIndex = index
|
||||||
return stubs, index, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// latestEvalIndex returns the index of the last evaluation created for
|
return stubs, maxIndex, nil
|
||||||
// the job. The index is used to determine if an allocation update requires an
|
}
|
||||||
// evaluation to be triggered.
|
|
||||||
func (w *deploymentWatcher) latestEvalIndex() (uint64, error) {
|
// jobEvalStatus returns the eval status for a job. It returns the index of the
|
||||||
|
// last evaluation created for the job, as well as whether there exists a
|
||||||
|
// blocked evaluation for the job. The index is used to determine if an
|
||||||
|
// allocation update requires an evaluation to be triggered. If there already is
|
||||||
|
// a blocked evaluations, no eval should be created.
|
||||||
|
func (w *deploymentWatcher) jobEvalStatus() (latestIndex uint64, blocked bool, err error) {
|
||||||
if err := w.queryLimiter.Wait(w.ctx); err != nil {
|
if err := w.queryLimiter.Wait(w.ctx); err != nil {
|
||||||
return 0, err
|
return 0, false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
snap, err := w.state.Snapshot()
|
snap, err := w.state.Snapshot()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
evals, err := snap.EvalsByJob(nil, w.j.Namespace, w.j.ID)
|
evals, err := snap.EvalsByJob(nil, w.j.Namespace, w.j.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(evals) == 0 {
|
if len(evals) == 0 {
|
||||||
idx, err := snap.Index("evals")
|
index, err := snap.Index("evals")
|
||||||
if err != nil {
|
return index, false, err
|
||||||
w.setLatestEval(idx)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return idx, err
|
var max uint64
|
||||||
|
for _, eval := range evals {
|
||||||
|
// If we have a blocked eval, then we do not care what the index is
|
||||||
|
// since we will not need to make a new eval.
|
||||||
|
if eval.ShouldBlock() {
|
||||||
|
return 0, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prefer using the snapshot index. Otherwise use the create index
|
// Prefer using the snapshot index. Otherwise use the create index
|
||||||
e := evals[0]
|
if eval.SnapshotIndex != 0 && max < eval.SnapshotIndex {
|
||||||
if e.SnapshotIndex != 0 {
|
max = eval.SnapshotIndex
|
||||||
w.setLatestEval(e.SnapshotIndex)
|
} else if max < eval.CreateIndex {
|
||||||
return e.SnapshotIndex, nil
|
max = eval.CreateIndex
|
||||||
}
|
|
||||||
|
|
||||||
w.setLatestEval(e.CreateIndex)
|
|
||||||
return e.CreateIndex, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// setLatestEval sets the given index as the latest eval unless the currently
|
|
||||||
// stored index is higher.
|
|
||||||
func (w *deploymentWatcher) setLatestEval(index uint64) {
|
|
||||||
w.l.Lock()
|
|
||||||
defer w.l.Unlock()
|
|
||||||
if index > w.latestEval {
|
|
||||||
w.latestEval = index
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getLatestEval returns the latest eval index.
|
return max, false, nil
|
||||||
func (w *deploymentWatcher) getLatestEval() uint64 {
|
|
||||||
w.l.Lock()
|
|
||||||
defer w.l.Unlock()
|
|
||||||
return w.latestEval
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -447,6 +447,7 @@ func (w *drainingJobWatcher) getJobAllocsImpl(ws memdb.WatchSet, state *state.St
|
||||||
}
|
}
|
||||||
|
|
||||||
// Capture the allocs for each draining job.
|
// Capture the allocs for each draining job.
|
||||||
|
var maxIndex uint64 = 0
|
||||||
resp := make(map[structs.NamespacedID][]*structs.Allocation, l)
|
resp := make(map[structs.NamespacedID][]*structs.Allocation, l)
|
||||||
for jns := range draining {
|
for jns := range draining {
|
||||||
allocs, err := state.AllocsByJob(ws, jns.Namespace, jns.ID, false)
|
allocs, err := state.AllocsByJob(ws, jns.Namespace, jns.ID, false)
|
||||||
|
@ -455,6 +456,17 @@ func (w *drainingJobWatcher) getJobAllocsImpl(ws memdb.WatchSet, state *state.St
|
||||||
}
|
}
|
||||||
|
|
||||||
resp[jns] = allocs
|
resp[jns] = allocs
|
||||||
|
for _, alloc := range allocs {
|
||||||
|
if maxIndex < alloc.ModifyIndex {
|
||||||
|
maxIndex = alloc.ModifyIndex
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prefer using the actual max index of affected allocs since it means less
|
||||||
|
// unblocking
|
||||||
|
if maxIndex != 0 {
|
||||||
|
index = maxIndex
|
||||||
}
|
}
|
||||||
|
|
||||||
return resp, index, nil
|
return resp, index, nil
|
||||||
|
|
|
@ -235,6 +235,7 @@ func (w *nodeDrainWatcher) getNodesImpl(ws memdb.WatchSet, state *state.StateSto
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var maxIndex uint64 = 0
|
||||||
resp := make(map[string]*structs.Node, 64)
|
resp := make(map[string]*structs.Node, 64)
|
||||||
for {
|
for {
|
||||||
raw := iter.Next()
|
raw := iter.Next()
|
||||||
|
@ -244,6 +245,15 @@ func (w *nodeDrainWatcher) getNodesImpl(ws memdb.WatchSet, state *state.StateSto
|
||||||
|
|
||||||
node := raw.(*structs.Node)
|
node := raw.(*structs.Node)
|
||||||
resp[node.ID] = node
|
resp[node.ID] = node
|
||||||
|
if maxIndex < node.ModifyIndex {
|
||||||
|
maxIndex = node.ModifyIndex
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prefer using the actual max index of affected nodes since it means less
|
||||||
|
// unblocking
|
||||||
|
if maxIndex != 0 {
|
||||||
|
index = maxIndex
|
||||||
}
|
}
|
||||||
|
|
||||||
return resp, index, nil
|
return resp, index, nil
|
||||||
|
|
|
@ -2825,6 +2825,7 @@ func (s *StateStore) UpdateDeploymentAllocHealth(index uint64, req *structs.Appl
|
||||||
copy.DeploymentStatus.Healthy = helper.BoolToPtr(healthy)
|
copy.DeploymentStatus.Healthy = helper.BoolToPtr(healthy)
|
||||||
copy.DeploymentStatus.Timestamp = ts
|
copy.DeploymentStatus.Timestamp = ts
|
||||||
copy.DeploymentStatus.ModifyIndex = index
|
copy.DeploymentStatus.ModifyIndex = index
|
||||||
|
copy.ModifyIndex = index
|
||||||
|
|
||||||
if err := s.updateDeploymentWithAlloc(index, copy, old, txn); err != nil {
|
if err := s.updateDeploymentWithAlloc(index, copy, old, txn); err != nil {
|
||||||
return fmt.Errorf("error updating deployment: %v", err)
|
return fmt.Errorf("error updating deployment: %v", err)
|
||||||
|
|
|
@ -111,6 +111,11 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex) (bool, st
|
||||||
|
|
||||||
// For each alloc, add the resources
|
// For each alloc, add the resources
|
||||||
for _, alloc := range allocs {
|
for _, alloc := range allocs {
|
||||||
|
// Do not consider the resource impact of terminal allocations
|
||||||
|
if alloc.TerminalStatus() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
if alloc.Resources != nil {
|
if alloc.Resources != nil {
|
||||||
if err := used.Add(alloc.Resources); err != nil {
|
if err := used.Add(alloc.Resources); err != nil {
|
||||||
return false, "", nil, err
|
return false, "", nil, err
|
||||||
|
|
|
@ -216,6 +216,91 @@ func TestAllocsFit(t *testing.T) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAllocsFit_TerminalAlloc(t *testing.T) {
|
||||||
|
n := &Node{
|
||||||
|
Resources: &Resources{
|
||||||
|
CPU: 2000,
|
||||||
|
MemoryMB: 2048,
|
||||||
|
DiskMB: 10000,
|
||||||
|
IOPS: 100,
|
||||||
|
Networks: []*NetworkResource{
|
||||||
|
{
|
||||||
|
Device: "eth0",
|
||||||
|
CIDR: "10.0.0.0/8",
|
||||||
|
MBits: 100,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Reserved: &Resources{
|
||||||
|
CPU: 1000,
|
||||||
|
MemoryMB: 1024,
|
||||||
|
DiskMB: 5000,
|
||||||
|
IOPS: 50,
|
||||||
|
Networks: []*NetworkResource{
|
||||||
|
{
|
||||||
|
Device: "eth0",
|
||||||
|
IP: "10.0.0.1",
|
||||||
|
MBits: 50,
|
||||||
|
ReservedPorts: []Port{{"main", 80}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
a1 := &Allocation{
|
||||||
|
Resources: &Resources{
|
||||||
|
CPU: 1000,
|
||||||
|
MemoryMB: 1024,
|
||||||
|
DiskMB: 5000,
|
||||||
|
IOPS: 50,
|
||||||
|
Networks: []*NetworkResource{
|
||||||
|
{
|
||||||
|
Device: "eth0",
|
||||||
|
IP: "10.0.0.1",
|
||||||
|
MBits: 50,
|
||||||
|
ReservedPorts: []Port{{"main", 8000}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should fit one allocation
|
||||||
|
fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if !fit {
|
||||||
|
t.Fatalf("Bad")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sanity check the used resources
|
||||||
|
if used.CPU != 2000 {
|
||||||
|
t.Fatalf("bad: %#v", used)
|
||||||
|
}
|
||||||
|
if used.MemoryMB != 2048 {
|
||||||
|
t.Fatalf("bad: %#v", used)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should fit second allocation since it is terminal
|
||||||
|
a2 := a1.Copy()
|
||||||
|
a2.DesiredStatus = AllocDesiredStatusStop
|
||||||
|
fit, _, used, err = AllocsFit(n, []*Allocation{a1, a2}, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if !fit {
|
||||||
|
t.Fatalf("Bad")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sanity check the used resources
|
||||||
|
if used.CPU != 2000 {
|
||||||
|
t.Fatalf("bad: %#v", used)
|
||||||
|
}
|
||||||
|
if used.MemoryMB != 2048 {
|
||||||
|
t.Fatalf("bad: %#v", used)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestScoreFit(t *testing.T) {
|
func TestScoreFit(t *testing.T) {
|
||||||
node := &Node{}
|
node := &Node{}
|
||||||
node.Resources = &Resources{
|
node.Resources = &Resources{
|
||||||
|
|
|
@ -6751,6 +6751,7 @@ const (
|
||||||
EvalTriggerFailedFollowUp = "failed-follow-up"
|
EvalTriggerFailedFollowUp = "failed-follow-up"
|
||||||
EvalTriggerMaxPlans = "max-plan-attempts"
|
EvalTriggerMaxPlans = "max-plan-attempts"
|
||||||
EvalTriggerRetryFailedAlloc = "alloc-failure"
|
EvalTriggerRetryFailedAlloc = "alloc-failure"
|
||||||
|
EvalTriggerQueuedAllocs = "queued-allocs"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -6882,8 +6883,11 @@ type Evaluation struct {
|
||||||
LeaderACL string
|
LeaderACL string
|
||||||
|
|
||||||
// SnapshotIndex is the Raft index of the snapshot used to process the
|
// SnapshotIndex is the Raft index of the snapshot used to process the
|
||||||
// evaluation. As such it will only be set once it has gone through the
|
// evaluation. The index will either be set when it has gone through the
|
||||||
// scheduler.
|
// scheduler or if a blocked evaluation is being created. The index is set
|
||||||
|
// in this case so we can determine if an early unblocking is required since
|
||||||
|
// capacity has changed since the evaluation was created. This can result in
|
||||||
|
// the SnapshotIndex being less than the CreateIndex.
|
||||||
SnapshotIndex uint64
|
SnapshotIndex uint64
|
||||||
|
|
||||||
// Raft Indexes
|
// Raft Indexes
|
||||||
|
@ -7013,7 +7017,7 @@ func (e *Evaluation) CreateBlockedEval(classEligibility map[string]bool,
|
||||||
Namespace: e.Namespace,
|
Namespace: e.Namespace,
|
||||||
Priority: e.Priority,
|
Priority: e.Priority,
|
||||||
Type: e.Type,
|
Type: e.Type,
|
||||||
TriggeredBy: e.TriggeredBy,
|
TriggeredBy: EvalTriggerQueuedAllocs,
|
||||||
JobID: e.JobID,
|
JobID: e.JobID,
|
||||||
JobModifyIndex: e.JobModifyIndex,
|
JobModifyIndex: e.JobModifyIndex,
|
||||||
Status: EvalStatusBlocked,
|
Status: EvalStatusBlocked,
|
||||||
|
@ -7138,6 +7142,10 @@ func (p *Plan) PopUpdate(alloc *Allocation) {
|
||||||
func (p *Plan) AppendAlloc(alloc *Allocation) {
|
func (p *Plan) AppendAlloc(alloc *Allocation) {
|
||||||
node := alloc.NodeID
|
node := alloc.NodeID
|
||||||
existing := p.NodeAllocation[node]
|
existing := p.NodeAllocation[node]
|
||||||
|
|
||||||
|
// Normalize the job
|
||||||
|
alloc.Job = nil
|
||||||
|
|
||||||
p.NodeAllocation[node] = append(existing, alloc)
|
p.NodeAllocation[node] = append(existing, alloc)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -114,18 +114,21 @@ func (w *Worker) run() {
|
||||||
|
|
||||||
// Check for a shutdown
|
// Check for a shutdown
|
||||||
if w.srv.IsShutdown() {
|
if w.srv.IsShutdown() {
|
||||||
|
w.logger.Error("nacking eval because the server is shutting down", "eval", log.Fmt("%#v", eval))
|
||||||
w.sendAck(eval.ID, token, false)
|
w.sendAck(eval.ID, token, false)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for the raft log to catchup to the evaluation
|
// Wait for the raft log to catchup to the evaluation
|
||||||
if err := w.waitForIndex(waitIndex, raftSyncLimit); err != nil {
|
if err := w.waitForIndex(waitIndex, raftSyncLimit); err != nil {
|
||||||
|
w.logger.Error("error waiting for Raft index", "error", err, "index", waitIndex)
|
||||||
w.sendAck(eval.ID, token, false)
|
w.sendAck(eval.ID, token, false)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Invoke the scheduler to determine placements
|
// Invoke the scheduler to determine placements
|
||||||
if err := w.invokeScheduler(eval, token); err != nil {
|
if err := w.invokeScheduler(eval, token); err != nil {
|
||||||
|
w.logger.Error("error invoking scheduler", "error", err)
|
||||||
w.sendAck(eval.ID, token, false)
|
w.sendAck(eval.ID, token, false)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -326,7 +329,7 @@ SUBMIT:
|
||||||
}
|
}
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
} else {
|
} else {
|
||||||
w.logger.Debug("submitted plan for evaluation", "plan_resp_index", resp.Index, "eval_id", plan.EvalID)
|
w.logger.Debug("submitted plan for evaluation", "eval_id", plan.EvalID)
|
||||||
w.backoffReset()
|
w.backoffReset()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -128,9 +128,10 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
|
||||||
switch eval.TriggeredBy {
|
switch eval.TriggeredBy {
|
||||||
case structs.EvalTriggerJobRegister, structs.EvalTriggerJobDeregister,
|
case structs.EvalTriggerJobRegister, structs.EvalTriggerJobDeregister,
|
||||||
structs.EvalTriggerNodeDrain, structs.EvalTriggerNodeUpdate,
|
structs.EvalTriggerNodeDrain, structs.EvalTriggerNodeUpdate,
|
||||||
structs.EvalTriggerRollingUpdate,
|
structs.EvalTriggerRollingUpdate, structs.EvalTriggerQueuedAllocs,
|
||||||
structs.EvalTriggerPeriodicJob, structs.EvalTriggerMaxPlans,
|
structs.EvalTriggerPeriodicJob, structs.EvalTriggerMaxPlans,
|
||||||
structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerRetryFailedAlloc:
|
structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerRetryFailedAlloc,
|
||||||
|
structs.EvalTriggerFailedFollowUp:
|
||||||
default:
|
default:
|
||||||
desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason",
|
desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason",
|
||||||
eval.TriggeredBy)
|
eval.TriggeredBy)
|
||||||
|
|
|
@ -241,6 +241,10 @@ func TestServiceSched_JobRegister_DiskConstraints(t *testing.T) {
|
||||||
t.Fatalf("bad: %#v", h.CreateEvals)
|
t.Fatalf("bad: %#v", h.CreateEvals)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if h.CreateEvals[0].TriggeredBy != structs.EvalTriggerQueuedAllocs {
|
||||||
|
t.Fatalf("bad: %#v", h.CreateEvals[0])
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure the plan allocated only one allocation
|
// Ensure the plan allocated only one allocation
|
||||||
var planned []*structs.Allocation
|
var planned []*structs.Allocation
|
||||||
for _, allocList := range plan.NodeAllocation {
|
for _, allocList := range plan.NodeAllocation {
|
||||||
|
|
|
@ -60,7 +60,7 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error {
|
||||||
|
|
||||||
// Verify the evaluation trigger reason is understood
|
// Verify the evaluation trigger reason is understood
|
||||||
switch eval.TriggeredBy {
|
switch eval.TriggeredBy {
|
||||||
case structs.EvalTriggerJobRegister, structs.EvalTriggerNodeUpdate,
|
case structs.EvalTriggerJobRegister, structs.EvalTriggerNodeUpdate, structs.EvalTriggerFailedFollowUp,
|
||||||
structs.EvalTriggerJobDeregister, structs.EvalTriggerRollingUpdate,
|
structs.EvalTriggerJobDeregister, structs.EvalTriggerRollingUpdate,
|
||||||
structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerNodeDrain:
|
structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerNodeDrain:
|
||||||
default:
|
default:
|
||||||
|
|
Loading…
Reference in New Issue