From 176de1bfe621df5bd8de9d4b976f1796041a9715 Mon Sep 17 00:00:00 2001 From: Charlie Voiselle <464492+angrycub@users.noreply.github.com> Date: Wed, 17 Nov 2021 10:49:55 -0500 Subject: [PATCH] Refactor sendAck(3) into sendAck(2),sendNack(2),sendAcknowledgement(3) (#11506) --- nomad/worker.go | 25 +++++++++++++++++++------ nomad/worker_test.go | 8 ++++---- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/nomad/worker.go b/nomad/worker.go index 2e0d36b25..d5b9699fd 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -113,7 +113,7 @@ func (w *Worker) run() { // Check for a shutdown if w.srv.IsShutdown() { w.logger.Error("nacking eval because the server is shutting down", "eval", log.Fmt("%#v", eval)) - w.sendAck(eval.ID, token, false) + w.sendNack(eval.ID, token) return } @@ -121,19 +121,19 @@ func (w *Worker) run() { snap, err := w.snapshotMinIndex(waitIndex, raftSyncLimit) if err != nil { w.logger.Error("error waiting for Raft index", "error", err, "index", waitIndex) - w.sendAck(eval.ID, token, false) + w.sendNack(eval.ID, token) continue } // Invoke the scheduler to determine placements if err := w.invokeScheduler(snap, eval, token); err != nil { w.logger.Error("error invoking scheduler", "error", err) - w.sendAck(eval.ID, token, false) + w.sendNack(eval.ID, token) continue } // Complete the evaluation - w.sendAck(eval.ID, token, true) + w.sendAck(eval.ID, token) } } @@ -193,9 +193,10 @@ REQ: goto REQ } -// sendAck makes a best effort to ack or nack the evaluation. +// sendAcknowledgement should not be called directly. Call `sendAck` or `sendNack` instead. +// This function implements `ack`ing or `nack`ing the evaluation generally. // Any errors are logged but swallowed. -func (w *Worker) sendAck(evalID, token string, ack bool) { +func (w *Worker) sendAcknowledgement(evalID, token string, ack bool) { defer metrics.MeasureSince([]string{"nomad", "worker", "send_ack"}, time.Now()) // Setup the request req := structs.EvalAckRequest{ @@ -224,6 +225,18 @@ func (w *Worker) sendAck(evalID, token string, ack bool) { } } +// sendNack makes a best effort to nack the evaluation. +// Any errors are logged but swallowed. +func (w *Worker) sendNack(evalID, token string) { + w.sendAcknowledgement(evalID, token, false) +} + +// sendAck makes a best effort to ack the evaluation. +// Any errors are logged but swallowed. +func (w *Worker) sendAck(evalID, token string) { + w.sendAcknowledgement(evalID, token, true) +} + // snapshotMinIndex times calls to StateStore.SnapshotAfter which may block. func (w *Worker) snapshotMinIndex(waitIndex uint64, timeout time.Duration) (*state.StateSnapshot, error) { start := time.Now() diff --git a/nomad/worker_test.go b/nomad/worker_test.go index cde8d4886..34327e61f 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -133,7 +133,7 @@ func TestWorker_dequeueEvaluation_SerialJobs(t *testing.T) { } // Send the Ack - w.sendAck(eval1.ID, token, true) + w.sendAck(eval1.ID, token) // Attempt second dequeue eval, token, waitIndex, shutdown = w.dequeueEvaluation(10 * time.Millisecond) @@ -258,7 +258,7 @@ func TestWorker_sendAck(t *testing.T) { } // Send the Nack - w.sendAck(eval.ID, token, false) + w.sendNack(eval.ID, token) // Check the depth is 1, nothing unacked stats = s1.evalBroker.Stats() @@ -270,7 +270,7 @@ func TestWorker_sendAck(t *testing.T) { eval, token, _, _ = w.dequeueEvaluation(10 * time.Millisecond) // Send the Ack - w.sendAck(eval.ID, token, true) + w.sendAck(eval.ID, token) // Check the depth is 0 stats = s1.evalBroker.Stats() @@ -674,7 +674,7 @@ func TestWorker_ReblockEval(t *testing.T) { } // Ack the eval - w.sendAck(evalOut.ID, token, true) + w.sendAck(evalOut.ID, token) // Check that it is blocked bStats := s1.blockedEvals.Stats()