Merge pull request #1339 from hashicorp/b-wait-index-statestore

Worker waitForIndex uses StateStore index, not Raft Applied Index
This commit is contained in:
Alex Dadgar 2016-06-22 10:12:43 -07:00 committed by GitHub
commit 175f363de3
6 changed files with 61 additions and 34 deletions

View File

@ -44,8 +44,7 @@ func TestCoreScheduler_EvalGC(t *testing.T) {
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
@ -112,8 +111,7 @@ func TestCoreScheduler_EvalGC_Partial(t *testing.T) {
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
@ -173,8 +171,7 @@ func TestCoreScheduler_EvalGC_Batch_NoAllocs(t *testing.T) {
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
@ -235,8 +232,7 @@ func TestCoreScheduler_EvalGC_Batch_Allocs_WithJob(t *testing.T) {
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
@ -296,8 +292,7 @@ func TestCoreScheduler_EvalGC_Batch_Allocs_NoJob(t *testing.T) {
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
@ -344,7 +339,7 @@ func TestCoreScheduler_EvalGC_Force(t *testing.T) {
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobForceGC)
gc := s1.coreJobEval(structs.CoreJobForceGC, 1001)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
@ -394,8 +389,7 @@ func TestCoreScheduler_NodeGC(t *testing.T) {
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobNodeGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobNodeGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
@ -444,8 +438,7 @@ func TestCoreScheduler_NodeGC_TerminalAllocs(t *testing.T) {
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobNodeGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobNodeGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
@ -496,8 +489,7 @@ func TestCoreScheduler_NodeGC_RunningAllocs(t *testing.T) {
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobNodeGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobNodeGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
@ -535,7 +527,7 @@ func TestCoreScheduler_NodeGC_Force(t *testing.T) {
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobForceGC)
gc := s1.coreJobEval(structs.CoreJobForceGC, 1000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
@ -621,8 +613,7 @@ func TestCoreScheduler_JobGC(t *testing.T) {
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobJobGC)
gc.ModifyIndex = 2000
gc := s1.coreJobEval(structs.CoreJobJobGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
@ -721,7 +712,7 @@ func TestCoreScheduler_JobGC_Force(t *testing.T) {
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobForceGC)
gc := s1.coreJobEval(structs.CoreJobForceGC, 1002)
err = core.Process(gc)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)

View File

@ -251,14 +251,33 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) {
jobGC := time.NewTicker(s.config.JobGCInterval)
defer jobGC.Stop()
// getLatest grabs the latest index from the state store. It returns true if
// the index was retrieved successfully.
getLatest := func() (uint64, bool) {
snapshotIndex, err := s.fsm.State().LatestIndex()
if err != nil {
s.logger.Printf("[ERR] nomad: failed to determine state store's index: %v", err)
return 0, false
}
return snapshotIndex, true
}
for {
select {
case <-evalGC.C:
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobEvalGC))
if index, ok := getLatest(); ok {
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobEvalGC, index))
}
case <-nodeGC.C:
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobNodeGC))
if index, ok := getLatest(); ok {
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobNodeGC, index))
}
case <-jobGC.C:
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobJobGC))
if index, ok := getLatest(); ok {
s.evalBroker.Enqueue(s.coreJobEval(structs.CoreJobJobGC, index))
}
case <-stopCh:
return
}
@ -266,7 +285,7 @@ func (s *Server) schedulePeriodic(stopCh chan struct{}) {
}
// coreJobEval returns an evaluation for a core job
func (s *Server) coreJobEval(job string) *structs.Evaluation {
func (s *Server) coreJobEval(job string, modifyIndex uint64) *structs.Evaluation {
return &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: structs.CoreJobPriority,
@ -274,7 +293,7 @@ func (s *Server) coreJobEval(job string) *structs.Evaluation {
TriggeredBy: structs.EvalTriggerScheduled,
JobID: job,
Status: structs.EvalStatusPending,
ModifyIndex: s.raft.AppliedIndex(),
ModifyIndex: modifyIndex,
}
}

View File

@ -1,6 +1,8 @@
package nomad
import (
"fmt"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -16,6 +18,12 @@ func (s *System) GarbageCollect(args *structs.GenericRequest, reply *structs.Gen
return err
}
s.srv.evalBroker.Enqueue(s.srv.coreJobEval(structs.CoreJobForceGC))
// Get the states current index
snapshotIndex, err := s.srv.fsm.State().LatestIndex()
if err != nil {
return fmt.Errorf("failed to determine state store's index: %v", err)
}
s.srv.evalBroker.Enqueue(s.srv.coreJobEval(structs.CoreJobForceGC, snapshotIndex))
return nil
}

View File

@ -11,9 +11,6 @@ import (
)
func TestSystemEndpoint_GarbageCollect(t *testing.T) {
//s1 := testServer(t, func(c *Config) {
//c.NumSchedulers = 0 // Prevent automatic dequeue
//})
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
@ -23,7 +20,7 @@ func TestSystemEndpoint_GarbageCollect(t *testing.T) {
state := s1.fsm.State()
job := mock.Job()
job.Type = structs.JobTypeBatch
if err := state.UpsertJob(0, job); err != nil {
if err := state.UpsertJob(1000, job); err != nil {
t.Fatalf("UpsertAllocs() failed: %v", err)
}

View File

@ -212,12 +212,21 @@ func (w *Worker) sendAck(evalID, token string, ack bool) {
// state (attempt to allocate to a failed/dead node), we may need
// to sync our state again and do the planning with more recent data.
func (w *Worker) waitForIndex(index uint64, timeout time.Duration) error {
// XXX: Potential optimization is to set up a watch on the state stores
// index table and only unblock via a trigger rather than timing out and
// checking.
start := time.Now()
defer metrics.MeasureSince([]string{"nomad", "worker", "wait_for_index"}, start)
CHECK:
// Get the states current index
snapshotIndex, err := w.srv.fsm.State().LatestIndex()
if err != nil {
return fmt.Errorf("failed to determine state store's index: %v", err)
}
// We only need the FSM state to be as recent as the given index
appliedIndex := w.srv.raft.AppliedIndex()
if index <= appliedIndex {
if index <= snapshotIndex {
w.backoffReset()
return nil
}

View File

@ -203,7 +203,10 @@ func TestWorker_waitForIndex(t *testing.T) {
// Cause an increment
go func() {
time.Sleep(10 * time.Millisecond)
s1.raft.Barrier(0)
n := mock.Node()
if err := s1.fsm.state.UpsertNode(index+1, n); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
}()
// Wait for a future index