Merge pull request #839 from hashicorp/f-refreshindex

nomad: cleanup handling of refresh index
This commit is contained in:
Armon Dadgar 2016-02-22 11:49:21 -08:00
commit e6ba72556a
2 changed files with 32 additions and 13 deletions

View file

@ -187,6 +187,14 @@ func (s *Server) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture,
// Respond to the plan
result.AllocIndex = future.Index()
// If this is a partial plan application, we need to ensure the scheduler
// at least has visibility into any placements it made to avoid double placement.
// The RefreshIndex computed by evaluatePlan may be stale due to evaluation
// against an optimistic copy of the state.
if result.RefreshIndex != 0 {
result.RefreshIndex = maxUint64(result.RefreshIndex, result.AllocIndex)
}
pending.respond(result, nil)
}
@ -215,6 +223,7 @@ func evaluatePlan(pool *EvaluatePool, snap *state.StateSnapshot, plan *structs.P
// Setup a multierror to handle potentially getting many
// errors since we are processing in parallel.
var mErr multierror.Error
partialCommit := false
// handleResult is used to process the result of evaluateNodePlan
handleResult := func(nodeID string, fit bool, err error) (cancel bool) {
@ -224,19 +233,8 @@ func evaluatePlan(pool *EvaluatePool, snap *state.StateSnapshot, plan *structs.P
return true
}
if !fit {
// Scheduler must have stale data, RefreshIndex should force
// the latest view of allocations and nodes
allocIndex, err := snap.Index("allocs")
if err != nil {
mErr.Errors = append(mErr.Errors, err)
return true
}
nodeIndex, err := snap.Index("nodes")
if err != nil {
mErr.Errors = append(mErr.Errors, err)
return true
}
result.RefreshIndex = maxUint64(nodeIndex, allocIndex)
// Set that this is a partial commit
partialCommit = true
// If we require all-at-once scheduling, there is no point
// to continue the evaluation, as we've already failed.
@ -294,6 +292,21 @@ func evaluatePlan(pool *EvaluatePool, snap *state.StateSnapshot, plan *structs.P
}
outstanding--
}
// If the plan resulted in a partial commit, we need to determine
// a minimum refresh index to force the scheduler to work on a more
// up-to-date state to avoid the failures.
if partialCommit {
allocIndex, err := snap.Index("allocs")
if err != nil {
mErr.Errors = append(mErr.Errors, err)
}
nodeIndex, err := snap.Index("nodes")
if err != nil {
mErr.Errors = append(mErr.Errors, err)
}
result.RefreshIndex = maxUint64(nodeIndex, allocIndex)
}
return result, mErr.ErrorOrNil()
}

View file

@ -236,6 +236,9 @@ func TestPlanApply_EvalPlan_Partial(t *testing.T) {
if _, ok := result.NodeAllocation[node2.ID]; ok {
t.Fatalf("should not allow alloc2")
}
if result.RefreshIndex != 1001 {
t.Fatalf("bad: %d", result.RefreshIndex)
}
}
func TestPlanApply_EvalPlan_Partial_AllAtOnce(t *testing.T) {
@ -271,6 +274,9 @@ func TestPlanApply_EvalPlan_Partial_AllAtOnce(t *testing.T) {
if len(result.NodeAllocation) != 0 {
t.Fatalf("should not alloc: %v", result.NodeAllocation)
}
if result.RefreshIndex != 1001 {
t.Fatalf("bad: %d", result.RefreshIndex)
}
}
func TestPlanApply_EvalNodePlan_Simple(t *testing.T) {