Refactor sendAck(3) into sendAck(2),sendNack(2),sendAcknowledgement(3) (#11506)

This commit is contained in:
Charlie Voiselle 2021-11-17 10:49:55 -05:00 committed by GitHub
parent 8f8d6c13cd
commit 176de1bfe6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 23 additions and 10 deletions

View File

@ -113,7 +113,7 @@ func (w *Worker) run() {
// Check for a shutdown
if w.srv.IsShutdown() {
w.logger.Error("nacking eval because the server is shutting down", "eval", log.Fmt("%#v", eval))
w.sendAck(eval.ID, token, false)
w.sendNack(eval.ID, token)
return
}
@ -121,19 +121,19 @@ func (w *Worker) run() {
snap, err := w.snapshotMinIndex(waitIndex, raftSyncLimit)
if err != nil {
w.logger.Error("error waiting for Raft index", "error", err, "index", waitIndex)
w.sendAck(eval.ID, token, false)
w.sendNack(eval.ID, token)
continue
}
// Invoke the scheduler to determine placements
if err := w.invokeScheduler(snap, eval, token); err != nil {
w.logger.Error("error invoking scheduler", "error", err)
w.sendAck(eval.ID, token, false)
w.sendNack(eval.ID, token)
continue
}
// Complete the evaluation
w.sendAck(eval.ID, token, true)
w.sendAck(eval.ID, token)
}
}
@ -193,9 +193,10 @@ REQ:
goto REQ
}
// sendAck makes a best effort to ack or nack the evaluation.
// sendAcknowledgement should not be called directly. Call `sendAck` or `sendNack` instead.
// This function implements `ack`ing or `nack`ing the evaluation generally.
// Any errors are logged but swallowed.
func (w *Worker) sendAck(evalID, token string, ack bool) {
func (w *Worker) sendAcknowledgement(evalID, token string, ack bool) {
defer metrics.MeasureSince([]string{"nomad", "worker", "send_ack"}, time.Now())
// Setup the request
req := structs.EvalAckRequest{
@ -224,6 +225,18 @@ func (w *Worker) sendAck(evalID, token string, ack bool) {
}
}
// sendNack makes a best effort to nack the evaluation.
// Any errors are logged but swallowed.
func (w *Worker) sendNack(evalID, token string) {
w.sendAcknowledgement(evalID, token, false)
}
// sendAck makes a best effort to ack the evaluation.
// Any errors are logged but swallowed.
func (w *Worker) sendAck(evalID, token string) {
w.sendAcknowledgement(evalID, token, true)
}
// snapshotMinIndex times calls to StateStore.SnapshotAfter which may block.
func (w *Worker) snapshotMinIndex(waitIndex uint64, timeout time.Duration) (*state.StateSnapshot, error) {
start := time.Now()

View File

@ -133,7 +133,7 @@ func TestWorker_dequeueEvaluation_SerialJobs(t *testing.T) {
}
// Send the Ack
w.sendAck(eval1.ID, token, true)
w.sendAck(eval1.ID, token)
// Attempt second dequeue
eval, token, waitIndex, shutdown = w.dequeueEvaluation(10 * time.Millisecond)
@ -258,7 +258,7 @@ func TestWorker_sendAck(t *testing.T) {
}
// Send the Nack
w.sendAck(eval.ID, token, false)
w.sendNack(eval.ID, token)
// Check the depth is 1, nothing unacked
stats = s1.evalBroker.Stats()
@ -270,7 +270,7 @@ func TestWorker_sendAck(t *testing.T) {
eval, token, _, _ = w.dequeueEvaluation(10 * time.Millisecond)
// Send the Ack
w.sendAck(eval.ID, token, true)
w.sendAck(eval.ID, token)
// Check the depth is 0
stats = s1.evalBroker.Stats()
@ -674,7 +674,7 @@ func TestWorker_ReblockEval(t *testing.T) {
}
// Ack the eval
w.sendAck(evalOut.ID, token, true)
w.sendAck(evalOut.ID, token)
// Check that it is blocked
bStats := s1.blockedEvals.Stats()