nomad: evaluate plans after previous plan index

The previous commit prevented evaluating plans against a state snapshot
which is older than the snapshot at which the plan was created.  This is
correct and prevents failures trying to retrieve referenced objects that
may not exist until the plan's snapshot. However, this is insufficient
to guarantee consistency if the following events occur:

1. P1, P2, and P3 are enqueued with snapshot @ 100
2. Leader evaluates and applies Plan P1 with snapshot @ 100
3. Leader evaluates Plan P2 with snapshot+P1 @ 100
4. P1 commits @ 101
4. Leader evaluates applies Plan P3 with snapshot+P2 @ 100

Since only the previous plan is optimistically applied to the state
store, the snapshot used to evaluate a plan may not contain the N-2
plan!

To ensure plans are evaluated and applied serially we must consider all
previous plan's committed indexes when evaluating further plans.

Therefore combined with the last PR, the minimum index at which to
evaluate a plan is:

    min(previousPlanResultIndex, plan.SnapshotIndex)
This commit is contained in:
Michael Schurter 2019-06-24 11:48:02 -07:00
parent e10fea1d7a
commit 0f8164b2f1
1 changed files with 78 additions and 28 deletions

View File

@ -69,11 +69,22 @@ func newPlanner(s *Server) (*planner, error) {
// but there are many of those and only a single plan verifier.
//
func (p *planner) planApply() {
// waitCh is used to track an outstanding application while snap
// holds an optimistic state which includes that plan application.
var waitCh chan struct{}
// planIndexCh is used to track an outstanding application and receive
// its committed index while snap holds an optimistic state which
// includes that plan application.
var planIndexCh chan uint64
var snap *state.StateSnapshot
// prevPlanResultIndex is the index when the last PlanResult was
// committed. Since only the last plan is optimistically applied to the
// snapshot, it's possible the current snapshot's and plan's indexes
// are less than the index the previous plan result was committed at.
// prevPlanResultIndex also guards against the previous plan committing
// during Dequeue, thus causing the snapshot containing the optimistic
// commit to be discarded and potentially evaluating the current plan
// against an index older than the previous plan was committed at.
var prevPlanResultIndex uint64
// Setup a worker pool with half the cores, with at least 1
poolSize := runtime.NumCPU() / 2
if poolSize == 0 {
@ -89,27 +100,29 @@ func (p *planner) planApply() {
return
}
// Check if out last plan has completed
// If last plan has completed get a new snapshot
select {
case <-waitCh:
waitCh = nil
case idx := <-planIndexCh:
// Previous plan committed. Discard snapshot and ensure
// future snapshots include this plan.
prevPlanResultIndex = max(prevPlanResultIndex, idx)
planIndexCh = nil
snap = nil
default:
}
if snap != nil {
// If snapshot isn't new enough, discard it
minIndex := max(prevPlanResultIndex, pending.plan.SnapshotIndex)
if idx, err := snap.LatestIndex(); err != nil || idx < minIndex {
snap = nil
}
}
// Snapshot the state so that we have a consistent view of the world
// if no snapshot is available
if waitCh == nil || snap == nil {
const timeout = 5 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
snap, err = p.fsm.State().SnapshotAfter(ctx, pending.plan.SnapshotIndex)
cancel()
if err == context.DeadlineExceeded {
p.logger.Error("timed out synchronizing to planner's index",
"timeout", timeout, "plan_index", pending.plan.SnapshotIndex)
err = fmt.Errorf("timed out after %s waiting for index=%d",
timeout, pending.plan.SnapshotIndex)
}
if planIndexCh == nil || snap == nil {
snap, err = p.snapshotMinIndex(prevPlanResultIndex, pending.plan.SnapshotIndex)
if err != nil {
p.logger.Error("failed to snapshot state", "error", err)
pending.respond(nil, err)
@ -133,11 +146,12 @@ func (p *planner) planApply() {
// Ensure any parallel apply is complete before starting the next one.
// This also limits how out of date our snapshot can be.
if waitCh != nil {
<-waitCh
snap, err = p.fsm.State().Snapshot()
if planIndexCh != nil {
idx := <-planIndexCh
prevPlanResultIndex = max(prevPlanResultIndex, idx)
snap, err = p.snapshotMinIndex(prevPlanResultIndex, pending.plan.SnapshotIndex)
if err != nil {
p.logger.Error("failed to snapshot state", "error", err)
p.logger.Error("failed to update snapshot state", "error", err)
pending.respond(nil, err)
continue
}
@ -151,12 +165,35 @@ func (p *planner) planApply() {
continue
}
// Respond to the plan in async
waitCh = make(chan struct{})
go p.asyncPlanWait(waitCh, future, result, pending)
// Respond to the plan in async; receive plan's committed index via chan
planIndexCh = make(chan uint64, 1)
go p.asyncPlanWait(planIndexCh, future, result, pending)
}
}
// snapshotMinIndex wraps SnapshotAfter with a 5s timeout and converts timeout
// errors to a more descriptive error message. The snapshot is guaranteed to
// include both the previous plan and all objects referenced by the plan or
// return an error.
func (p *planner) snapshotMinIndex(prevPlanResultIndex, planSnapshotIndex uint64) (*state.StateSnapshot, error) {
defer metrics.MeasureSince([]string{"nomad", "plan", "wait_for_index"}, time.Now())
// Minimum index the snapshot must include is the max of the previous
// plan result's and current plan's snapshot index.
minIndex := max(prevPlanResultIndex, planSnapshotIndex)
const timeout = 5 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
snap, err := p.fsm.State().SnapshotAfter(ctx, minIndex)
cancel()
if err == context.DeadlineExceeded {
return nil, fmt.Errorf("timed out after %s waiting for index=%d (previous plan result index=%d; plan snapshot index=%d)",
timeout, minIndex, prevPlanResultIndex, planSnapshotIndex)
}
return snap, err
}
// applyPlan is used to apply the plan result and to return the alloc index
func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap *state.StateSnapshot) (raft.ApplyFuture, error) {
// Setup the update request
@ -316,21 +353,26 @@ func updateAllocTimestamps(allocations []*structs.Allocation, timestamp int64) {
}
}
// asyncPlanWait is used to apply and respond to a plan async
func (p *planner) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture,
// asyncPlanWait is used to apply and respond to a plan async. On successful
// commit the plan's index will be sent on the chan. On error the chan will be
// closed.
func (p *planner) asyncPlanWait(indexCh chan<- uint64, future raft.ApplyFuture,
result *structs.PlanResult, pending *pendingPlan) {
defer metrics.MeasureSince([]string{"nomad", "plan", "apply"}, time.Now())
defer close(waitCh)
// Wait for the plan to apply
if err := future.Error(); err != nil {
p.logger.Error("failed to apply plan", "error", err)
pending.respond(nil, err)
// Close indexCh on error
close(indexCh)
return
}
// Respond to the plan
result.AllocIndex = future.Index()
index := future.Index()
result.AllocIndex = index
// If this is a partial plan application, we need to ensure the scheduler
// at least has visibility into any placements it made to avoid double placement.
@ -340,6 +382,7 @@ func (p *planner) asyncPlanWait(waitCh chan struct{}, future raft.ApplyFuture,
result.RefreshIndex = maxUint64(result.RefreshIndex, result.AllocIndex)
}
pending.respond(result, nil)
indexCh <- index
}
// evaluatePlan is used to determine what portions of a plan
@ -629,3 +672,10 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri
fit, reason, _, err := structs.AllocsFit(node, proposed, nil, true)
return fit, reason, err
}
func max(a, b uint64) uint64 {
if a > b {
return a
}
return b
}