diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 4e2f48c93..09c645872 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -242,7 +242,7 @@ func (s *StateStore) SnapshotMinIndex(ctx context.Context, index uint64) (*State // Get the states current index snapshotIndex, err := s.LatestIndex() if err != nil { - return nil, fmt.Errorf("failed to determine state store's index: %v", err) + return nil, fmt.Errorf("failed to determine state store's index: %w", err) } // We only need the FSM state to be as recent as the given index diff --git a/nomad/worker.go b/nomad/worker.go index cc73ca9fe..8a15e0fdd 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -3,6 +3,7 @@ package nomad import ( "context" "encoding/json" + "errors" "fmt" "strings" "sync" @@ -148,7 +149,7 @@ func (w *Worker) ID() string { // to see if it paused using IsStarted() func (w *Worker) Start() { w.setStatus(WorkerStarting) - go w.run() + go w.run(raftSyncLimit) } // Pause transitions a worker to the pausing state. Check @@ -383,7 +384,7 @@ func (w *Worker) workerShuttingDown() bool { // ---------------------------------- // run is the long-lived goroutine which is used to run the worker -func (w *Worker) run() { +func (w *Worker) run(raftSyncLimit time.Duration) { defer func() { w.markStopped() }() @@ -401,11 +402,12 @@ func (w *Worker) run() { return } - // since dequeue takes time, we could have shutdown the server after getting an eval that - // needs to be nacked before we exit. Explicitly checking the server to allow this eval - // to be processed on worker shutdown. + // since dequeue takes time, we could have shutdown the server after + // getting an eval that needs to be nacked before we exit. Explicitly + // check the server whether to allow this eval to be processed. if w.srv.IsShutdown() { - w.logger.Error("nacking eval because the server is shutting down", "eval", log.Fmt("%#v", eval)) + w.logger.Warn("nacking eval because the server is shutting down", + "eval", log.Fmt("%#v", eval)) w.sendNack(eval, token) return } @@ -414,8 +416,34 @@ func (w *Worker) run() { w.setWorkloadStatus(WorkloadWaitingForRaft) snap, err := w.snapshotMinIndex(waitIndex, raftSyncLimit) if err != nil { - w.logger.Error("error waiting for Raft index", "error", err, "index", waitIndex) - w.sendNack(eval, token) + var timeoutErr ErrMinIndexDeadlineExceeded + if errors.As(err, &timeoutErr) { + w.logger.Warn("timeout waiting for Raft index required by eval", + "eval", eval.ID, "index", waitIndex, "timeout", raftSyncLimit) + w.sendNack(eval, token) + + // Timing out above means this server is woefully behind the + // leader's index. This can happen when a new server is added to + // a cluster and must initially sync the cluster state. + // Backoff dequeuing another eval until there's some indication + // this server would be up to date enough to process it. + slowServerSyncLimit := 10 * raftSyncLimit + if _, err := w.snapshotMinIndex(waitIndex, slowServerSyncLimit); err != nil { + w.logger.Warn("server is unable to catch up to last eval's index", "error", err) + } + + } else if errors.Is(err, context.Canceled) { + // If the server has shutdown while we're waiting, we'll get the + // Canceled error from the worker's context. We need to nack any + // dequeued evals before we exit. + w.logger.Warn("nacking eval because the server is shutting down", "eval", eval.ID) + w.sendNack(eval, token) + return + } else { + w.logger.Error("error waiting for Raft index", "error", err, "index", waitIndex) + w.sendNack(eval, token) + } + continue } @@ -533,17 +561,35 @@ func (w *Worker) sendAck(eval *structs.Evaluation, token string) { w.sendAcknowledgement(eval, token, true) } +type ErrMinIndexDeadlineExceeded struct { + waitIndex uint64 + timeout time.Duration +} + +// Unwrapping an ErrMinIndexDeadlineExceeded always return +// context.DeadlineExceeded +func (ErrMinIndexDeadlineExceeded) Unwrap() error { + return context.DeadlineExceeded +} + +func (e ErrMinIndexDeadlineExceeded) Error() string { + return fmt.Sprintf("timed out after %s waiting for index=%d", e.timeout, e.waitIndex) +} + // snapshotMinIndex times calls to StateStore.SnapshotAfter which may block. func (w *Worker) snapshotMinIndex(waitIndex uint64, timeout time.Duration) (*state.StateSnapshot, error) { - start := time.Now() + defer metrics.MeasureSince([]string{"nomad", "worker", "wait_for_index"}, time.Now()) + ctx, cancel := context.WithTimeout(w.ctx, timeout) snap, err := w.srv.fsm.State().SnapshotMinIndex(ctx, waitIndex) cancel() - metrics.MeasureSince([]string{"nomad", "worker", "wait_for_index"}, start) - // Wrap error to ensure callers don't disregard timeouts. - if err == context.DeadlineExceeded { - err = fmt.Errorf("timed out after %s waiting for index=%d", timeout, waitIndex) + // Wrap error to ensure callers can detect timeouts. + if errors.Is(err, context.DeadlineExceeded) { + return nil, ErrMinIndexDeadlineExceeded{ + waitIndex: waitIndex, + timeout: timeout, + } } return snap, err diff --git a/nomad/worker_test.go b/nomad/worker_test.go index 9d4131e2d..ee9c3ae27 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -2,6 +2,7 @@ package nomad import ( "context" + "errors" "fmt" "reflect" "sync" @@ -11,6 +12,7 @@ import ( log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/helper" "github.com/shoenig/test/must" "github.com/stretchr/testify/require" @@ -55,9 +57,10 @@ func init() { // NewTestWorker returns the worker without calling it's run method. func NewTestWorker(shutdownCtx context.Context, srv *Server) *Worker { w := &Worker{ - srv: srv, - start: time.Now(), - id: uuid.Generate(), + srv: srv, + start: time.Now(), + id: uuid.Generate(), + enabledSchedulers: srv.config.EnabledSchedulers, } w.logger = srv.logger.ResetNamed("worker").With("worker_id", w.id) w.pauseCond = sync.NewCond(&w.pauseLock) @@ -338,6 +341,54 @@ func TestWorker_sendAck(t *testing.T) { } } +func TestWorker_runBackoff(t *testing.T) { + ci.Parallel(t) + + srv, cleanupSrv := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 + c.EnabledSchedulers = []string{structs.JobTypeService} + }) + defer cleanupSrv() + testutil.WaitForLeader(t, srv.RPC) + + eval1 := mock.Eval() + eval1.ModifyIndex = 1000 + srv.evalBroker.Enqueue(eval1) + must.Eq(t, 1, srv.evalBroker.Stats().TotalReady) + + // make a new context here so we can still check the broker's state after + // we've shut down the worker + workerCtx, workerCancel := context.WithCancel(srv.shutdownCtx) + defer workerCancel() + + w := NewTestWorker(workerCtx, srv) + doneCh := make(chan struct{}) + + go func() { + w.run(time.Millisecond) + doneCh <- struct{}{} + }() + + // We expect to be paused for 10ms + 1ms but otherwise can't be all that + // precise here because of concurrency. But checking coverage for this test + // shows we've covered the logic + t1, cancelT1 := helper.NewSafeTimer(100 * time.Millisecond) + defer cancelT1() + select { + case <-doneCh: + t.Fatal("returned early") + case <-t1.C: + } + + workerCancel() + <-doneCh + + must.Eq(t, 1, srv.evalBroker.Stats().TotalWaiting) + must.Eq(t, 0, srv.evalBroker.Stats().TotalReady) + must.Eq(t, 0, srv.evalBroker.Stats().TotalPending) + must.Eq(t, 0, srv.evalBroker.Stats().TotalUnacked) +} + func TestWorker_waitForIndex(t *testing.T) { ci.Parallel(t) @@ -376,6 +427,7 @@ func TestWorker_waitForIndex(t *testing.T) { require.Nil(t, snap) require.EqualError(t, err, fmt.Sprintf("timed out after %s waiting for index=%d", timeout, waitIndex)) + require.True(t, errors.Is(err, context.DeadlineExceeded), "expect error to wrap DeadlineExceeded") } func TestWorker_invokeScheduler(t *testing.T) {