nomad: adding Eval endpoints for Ack and Nack
This commit is contained in:
parent
0a08c1ce49
commit
e820b3ae3c
|
@ -60,6 +60,7 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest,
|
|||
if done, err := e.srv.forward("Eval.GetEval", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "eval", "dequeue"}, time.Now())
|
||||
|
||||
// Ensure there is at least one scheduler
|
||||
if len(args.Schedulers) == 0 {
|
||||
|
@ -86,3 +87,33 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest,
|
|||
e.srv.setQueryMeta(&reply.QueryMeta)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Ack is used to acknowledge completion of a dequeued evaluation
|
||||
func (e *Eval) Ack(args *structs.EvalSpecificRequest,
|
||||
reply *structs.GenericResponse) error {
|
||||
if done, err := e.srv.forward("Eval.Ack", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "eval", "ack"}, time.Now())
|
||||
|
||||
// Ack the EvalID
|
||||
if err := e.srv.evalBroker.Ack(args.EvalID); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// NAck is used to negative acknowledge completion of a dequeued evaluation
|
||||
func (e *Eval) Nack(args *structs.EvalSpecificRequest,
|
||||
reply *structs.GenericResponse) error {
|
||||
if done, err := e.srv.forward("Eval.Nack", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "eval", "nack"}, time.Now())
|
||||
|
||||
// Nack the EvalID
|
||||
if err := e.srv.evalBroker.Nack(args.EvalID); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package nomad
|
|||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
|
@ -78,3 +79,69 @@ func TestEvalEndpoint_Dequeue(t *testing.T) {
|
|||
t.Fatalf("should be outstanding")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvalEndpoint_Ack(t *testing.T) {
|
||||
s1 := testServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
// Create the register request
|
||||
eval1 := mockEval()
|
||||
s1.evalBroker.Enqueue(eval1)
|
||||
out, _ := s1.evalBroker.Dequeue(defaultSched, time.Second)
|
||||
if out == nil {
|
||||
t.Fatalf("missing eval")
|
||||
}
|
||||
|
||||
// Ack the eval
|
||||
get := &structs.EvalSpecificRequest{
|
||||
EvalID: out.ID,
|
||||
WriteRequest: structs.WriteRequest{Region: "region1"},
|
||||
}
|
||||
var resp structs.GenericResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Eval.Ack", get, &resp); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Ensure outstanding
|
||||
if s1.evalBroker.Outstanding(eval1.ID) {
|
||||
t.Fatalf("should not be outstanding")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvalEndpoint_Nack(t *testing.T) {
|
||||
s1 := testServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
// Create the register request
|
||||
eval1 := mockEval()
|
||||
s1.evalBroker.Enqueue(eval1)
|
||||
out, _ := s1.evalBroker.Dequeue(defaultSched, time.Second)
|
||||
if out == nil {
|
||||
t.Fatalf("missing eval")
|
||||
}
|
||||
|
||||
// Ack the eval
|
||||
get := &structs.EvalSpecificRequest{
|
||||
EvalID: out.ID,
|
||||
WriteRequest: structs.WriteRequest{Region: "region1"},
|
||||
}
|
||||
var resp structs.GenericResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Eval.Nack", get, &resp); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Ensure outstanding
|
||||
if s1.evalBroker.Outstanding(eval1.ID) {
|
||||
t.Fatalf("should not be outstanding")
|
||||
}
|
||||
|
||||
// Should get it back
|
||||
out2, _ := s1.evalBroker.Dequeue(defaultSched, time.Second)
|
||||
if out2 != out {
|
||||
t.Fatalf("nack failed")
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue