open-nomad/nomad/eval_endpoint_test.go

1011 lines
27 KiB
Go
Raw Normal View History

package nomad
import (
2017-04-16 23:54:02 +00:00
"fmt"
"reflect"
2016-10-26 21:52:48 +00:00
"strings"
"testing"
"time"
2017-02-08 05:22:48 +00:00
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/net-rpc-msgpackrpc"
2017-10-02 22:49:20 +00:00
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
2016-10-26 21:52:48 +00:00
"github.com/hashicorp/nomad/scheduler"
"github.com/hashicorp/nomad/testutil"
2017-10-02 22:49:20 +00:00
"github.com/stretchr/testify/assert"
)
func TestEvalEndpoint_GetEval(t *testing.T) {
2017-07-23 22:04:38 +00:00
t.Parallel()
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
eval1 := mock.Eval()
s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1})
// Lookup the eval
get := &structs.EvalSpecificRequest{
EvalID: eval1.ID,
QueryOptions: structs.QueryOptions{Region: "global"},
}
var resp structs.SingleEvalResponse
if err := msgpackrpc.CallWithCodec(codec, "Eval.GetEval", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index != 1000 {
t.Fatalf("Bad index: %d %d", resp.Index, 1000)
}
if !reflect.DeepEqual(eval1, resp.Eval) {
t.Fatalf("bad: %#v %#v", eval1, resp.Eval)
}
// Lookup non-existing node
get.EvalID = uuid.Generate()
if err := msgpackrpc.CallWithCodec(codec, "Eval.GetEval", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index != 1000 {
t.Fatalf("Bad index: %d %d", resp.Index, 1000)
}
if resp.Eval != nil {
t.Fatalf("unexpected eval")
}
}
2015-07-24 04:58:51 +00:00
2017-10-02 22:49:20 +00:00
func TestEvalEndpoint_GetEval_ACL(t *testing.T) {
t.Parallel()
s1, root := testACLServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
assert := assert.New(t)
// Create the register request
eval1 := mock.Eval()
state := s1.fsm.State()
state.UpsertEvals(1000, []*structs.Evaluation{eval1})
// Create ACL tokens
validToken := mock.CreatePolicyAndToken(t, state, 1003, "test-valid",
mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}))
invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid",
mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityListJobs}))
2017-10-02 22:49:20 +00:00
get := &structs.EvalSpecificRequest{
EvalID: eval1.ID,
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Try with no token and expect permission denied
{
var resp structs.SingleEvalResponse
err := msgpackrpc.CallWithCodec(codec, "Eval.GetEval", get, &resp)
assert.NotNil(err)
assert.Contains(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try with an invalid token and expect permission denied
{
2017-10-12 22:16:33 +00:00
get.AuthToken = invalidToken.SecretID
2017-10-02 22:49:20 +00:00
var resp structs.SingleEvalResponse
err := msgpackrpc.CallWithCodec(codec, "Eval.GetEval", get, &resp)
assert.NotNil(err)
assert.Contains(err.Error(), structs.ErrPermissionDenied.Error())
}
// Lookup the eval using a valid token
{
2017-10-12 22:16:33 +00:00
get.AuthToken = validToken.SecretID
2017-10-02 22:49:20 +00:00
var resp structs.SingleEvalResponse
assert.Nil(msgpackrpc.CallWithCodec(codec, "Eval.GetEval", get, &resp))
assert.Equal(uint64(1000), resp.Index, "Bad index: %d %d", resp.Index, 1000)
assert.Equal(eval1, resp.Eval)
}
// Lookup the eval using a root token
{
2017-10-12 22:16:33 +00:00
get.AuthToken = root.SecretID
2017-10-02 22:49:20 +00:00
var resp structs.SingleEvalResponse
assert.Nil(msgpackrpc.CallWithCodec(codec, "Eval.GetEval", get, &resp))
assert.Equal(uint64(1000), resp.Index, "Bad index: %d %d", resp.Index, 1000)
assert.Equal(eval1, resp.Eval)
}
}
2015-10-30 02:00:02 +00:00
func TestEvalEndpoint_GetEval_Blocking(t *testing.T) {
2017-07-23 22:04:38 +00:00
t.Parallel()
s1 := testServer(t, nil)
defer s1.Shutdown()
state := s1.fsm.State()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the evals
eval1 := mock.Eval()
eval2 := mock.Eval()
// First create an unrelated eval
time.AfterFunc(100*time.Millisecond, func() {
2015-10-30 02:00:02 +00:00
err := state.UpsertEvals(100, []*structs.Evaluation{eval1})
if err != nil {
t.Fatalf("err: %v", err)
}
})
// Upsert the eval we are watching later
time.AfterFunc(200*time.Millisecond, func() {
2015-10-30 02:00:02 +00:00
err := state.UpsertEvals(200, []*structs.Evaluation{eval2})
if err != nil {
t.Fatalf("err: %v", err)
}
})
// Lookup the eval
2015-10-30 02:00:02 +00:00
req := &structs.EvalSpecificRequest{
EvalID: eval2.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
2017-02-08 06:10:33 +00:00
MinQueryIndex: 150,
},
}
var resp structs.SingleEvalResponse
start := time.Now()
2015-10-30 02:00:02 +00:00
if err := msgpackrpc.CallWithCodec(codec, "Eval.GetEval", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
2015-10-30 15:27:47 +00:00
if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
2015-10-30 02:00:02 +00:00
if resp.Index != 200 {
t.Fatalf("Bad index: %d %d", resp.Index, 200)
}
if resp.Eval == nil || resp.Eval.ID != eval2.ID {
t.Fatalf("bad: %#v", resp.Eval)
}
2015-10-30 02:00:02 +00:00
// Eval delete triggers watches
time.AfterFunc(100*time.Millisecond, func() {
err := state.DeleteEval(300, []string{eval2.ID}, []string{})
if err != nil {
t.Fatalf("err: %v", err)
}
})
req.QueryOptions.MinQueryIndex = 250
var resp2 structs.SingleEvalResponse
start = time.Now()
if err := msgpackrpc.CallWithCodec(codec, "Eval.GetEval", req, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
2015-10-30 15:27:47 +00:00
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
2015-10-30 02:00:02 +00:00
t.Fatalf("should block (returned in %s) %#v", elapsed, resp2)
}
if resp2.Index != 300 {
t.Fatalf("Bad index: %d %d", resp2.Index, 300)
}
if resp2.Eval != nil {
t.Fatalf("bad: %#v", resp2.Eval)
}
}
2015-07-24 04:58:51 +00:00
func TestEvalEndpoint_Dequeue(t *testing.T) {
2017-07-23 22:04:38 +00:00
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
2015-07-24 04:58:51 +00:00
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
eval1 := mock.Eval()
s1.evalBroker.Enqueue(eval1)
2015-07-24 04:58:51 +00:00
// Dequeue the eval
get := &structs.EvalDequeueRequest{
2016-10-26 21:52:48 +00:00
Schedulers: defaultSched,
SchedulerVersion: scheduler.SchedulerVersion,
WriteRequest: structs.WriteRequest{Region: "global"},
2015-07-24 04:58:51 +00:00
}
var resp structs.EvalDequeueResponse
2015-07-24 04:58:51 +00:00
if err := msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if !reflect.DeepEqual(eval1, resp.Eval) {
2015-08-15 23:08:12 +00:00
t.Fatalf("bad: %v %v", eval1, resp.Eval)
2015-07-24 04:58:51 +00:00
}
// Ensure outstanding
token, ok := s1.evalBroker.Outstanding(eval1.ID)
if !ok {
2015-07-24 04:58:51 +00:00
t.Fatalf("should be outstanding")
}
if token != resp.Token {
t.Fatalf("bad token: %#v %#v", token, resp.Token)
}
if resp.WaitIndex != eval1.ModifyIndex {
t.Fatalf("bad wait index; got %d; want %d", resp.WaitIndex, eval1.ModifyIndex)
}
}
func TestEvalEndpoint_Dequeue_WaitIndex(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
eval1 := mock.Eval()
eval2 := mock.Eval()
eval2.JobID = eval1.JobID
s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1})
s1.evalBroker.Enqueue(eval1)
s1.fsm.State().UpsertEvals(1001, []*structs.Evaluation{eval2})
// Dequeue the eval
get := &structs.EvalDequeueRequest{
Schedulers: defaultSched,
SchedulerVersion: scheduler.SchedulerVersion,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.EvalDequeueResponse
if err := msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if !reflect.DeepEqual(eval1, resp.Eval) {
t.Fatalf("bad: %v %v", eval1, resp.Eval)
}
// Ensure outstanding
token, ok := s1.evalBroker.Outstanding(eval1.ID)
if !ok {
t.Fatalf("should be outstanding")
}
if token != resp.Token {
t.Fatalf("bad token: %#v %#v", token, resp.Token)
}
if resp.WaitIndex != 1001 {
t.Fatalf("bad wait index; got %d; want %d", resp.WaitIndex, 1001)
}
2015-07-24 04:58:51 +00:00
}
2016-10-26 21:52:48 +00:00
func TestEvalEndpoint_Dequeue_Version_Mismatch(t *testing.T) {
2017-07-23 22:04:38 +00:00
t.Parallel()
2016-10-26 21:52:48 +00:00
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
eval1 := mock.Eval()
s1.evalBroker.Enqueue(eval1)
// Dequeue the eval
get := &structs.EvalDequeueRequest{
Schedulers: defaultSched,
SchedulerVersion: 0,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.EvalDequeueResponse
err := msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp)
if err == nil || !strings.Contains(err.Error(), "scheduler version is 0") {
t.Fatalf("err: %v", err)
}
}
func TestEvalEndpoint_Ack(t *testing.T) {
2017-07-23 22:04:38 +00:00
t.Parallel()
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
2015-08-05 23:46:07 +00:00
testutil.WaitForResult(func() (bool, error) {
return s1.evalBroker.Enabled(), nil
}, func(err error) {
t.Fatalf("should enable eval broker")
})
// Create the register request
eval1 := mock.Eval()
s1.evalBroker.Enqueue(eval1)
out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second)
2015-08-05 00:13:32 +00:00
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("missing eval")
}
// Ack the eval
get := &structs.EvalAckRequest{
EvalID: out.ID,
Token: token,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "Eval.Ack", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}
// Ensure outstanding
if _, ok := s1.evalBroker.Outstanding(eval1.ID); ok {
t.Fatalf("should not be outstanding")
}
}
func TestEvalEndpoint_Nack(t *testing.T) {
2017-07-23 22:04:38 +00:00
t.Parallel()
2015-09-25 23:07:27 +00:00
s1 := testServer(t, func(c *Config) {
// Disable all of the schedulers so we can manually dequeue
// evals and check the queue status
c.NumSchedulers = 0
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
2015-08-05 23:46:07 +00:00
testutil.WaitForResult(func() (bool, error) {
return s1.evalBroker.Enabled(), nil
}, func(err error) {
t.Fatalf("should enable eval broker")
})
// Create the register request
eval1 := mock.Eval()
s1.evalBroker.Enqueue(eval1)
out, token, _ := s1.evalBroker.Dequeue(defaultSched, time.Second)
if out == nil {
t.Fatalf("missing eval")
}
// Nack the eval
get := &structs.EvalAckRequest{
EvalID: out.ID,
Token: token,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "Eval.Nack", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}
// Ensure outstanding
if _, ok := s1.evalBroker.Outstanding(eval1.ID); ok {
t.Fatalf("should not be outstanding")
}
// Should get it back
2017-04-16 23:54:02 +00:00
testutil.WaitForResult(func() (bool, error) {
out2, _, _ := s1.evalBroker.Dequeue(defaultSched, time.Second)
if out2 != out {
return false, fmt.Errorf("nack failed")
}
return true, nil
}, func(err error) {
t.Fatal(err)
})
}
func TestEvalEndpoint_Update(t *testing.T) {
2017-07-23 22:04:38 +00:00
t.Parallel()
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForResult(func() (bool, error) {
return s1.evalBroker.Enabled(), nil
}, func(err error) {
t.Fatalf("should enable eval broker")
})
// Create the register request
eval1 := mock.Eval()
s1.evalBroker.Enqueue(eval1)
out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("missing eval")
}
// Update the eval
eval2 := eval1.Copy()
eval2.Status = structs.EvalStatusComplete
get := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval2},
2015-08-15 21:22:21 +00:00
EvalToken: token,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "Eval.Update", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}
// Ensure updated
2017-02-08 05:22:48 +00:00
ws := memdb.NewWatchSet()
outE, err := s1.fsm.State().EvalByID(ws, eval2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outE.Status != structs.EvalStatusComplete {
t.Fatalf("Bad: %#v", out)
}
}
2015-08-15 22:42:44 +00:00
2015-09-07 21:17:11 +00:00
func TestEvalEndpoint_Create(t *testing.T) {
2017-07-23 22:04:38 +00:00
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
2015-09-07 21:17:11 +00:00
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForResult(func() (bool, error) {
return s1.evalBroker.Enabled(), nil
}, func(err error) {
t.Fatalf("should enable eval broker")
})
// Create the register request
prev := mock.Eval()
s1.evalBroker.Enqueue(prev)
out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("missing eval")
}
2015-09-07 21:17:11 +00:00
// Create the register request
eval1 := mock.Eval()
eval1.PreviousEval = prev.ID
2015-09-07 21:17:11 +00:00
get := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval1},
EvalToken: token,
WriteRequest: structs.WriteRequest{Region: "global"},
2015-09-07 21:17:11 +00:00
}
var resp structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "Eval.Create", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}
// Ensure created
2017-02-08 05:22:48 +00:00
ws := memdb.NewWatchSet()
outE, err := s1.fsm.State().EvalByID(ws, eval1.ID)
2015-09-07 21:17:11 +00:00
if err != nil {
t.Fatalf("err: %v", err)
}
eval1.CreateIndex = resp.Index
eval1.ModifyIndex = resp.Index
if !reflect.DeepEqual(eval1, outE) {
t.Fatalf("Bad: %#v %#v", outE, eval1)
}
}
2015-08-15 22:42:44 +00:00
func TestEvalEndpoint_Reap(t *testing.T) {
2017-07-23 22:04:38 +00:00
t.Parallel()
2015-08-15 22:42:44 +00:00
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
eval1 := mock.Eval()
s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1})
// Reap the eval
get := &structs.EvalDeleteRequest{
Evals: []string{eval1.ID},
WriteRequest: structs.WriteRequest{Region: "global"},
2015-08-15 22:42:44 +00:00
}
var resp structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "Eval.Reap", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index == 0 {
t.Fatalf("Bad index: %d", resp.Index)
}
// Ensure deleted
2017-02-08 05:22:48 +00:00
ws := memdb.NewWatchSet()
outE, err := s1.fsm.State().EvalByID(ws, eval1.ID)
2015-08-15 22:42:44 +00:00
if err != nil {
t.Fatalf("err: %v", err)
}
if outE != nil {
t.Fatalf("Bad: %#v", outE)
}
}
2015-09-06 23:01:16 +00:00
func TestEvalEndpoint_List(t *testing.T) {
2017-07-23 22:04:38 +00:00
t.Parallel()
2015-09-06 23:01:16 +00:00
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
eval1 := mock.Eval()
eval1.ID = "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9"
2015-09-06 23:01:16 +00:00
eval2 := mock.Eval()
eval2.ID = "aaaabbbb-3350-4b4b-d185-0e1992ed43e9"
2015-09-06 23:01:16 +00:00
s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1, eval2})
// Lookup the eval
get := &structs.EvalListRequest{
2017-09-07 23:56:15 +00:00
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: structs.DefaultNamespace,
},
2015-09-06 23:01:16 +00:00
}
var resp structs.EvalListResponse
if err := msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index != 1000 {
t.Fatalf("Bad index: %d %d", resp.Index, 1000)
}
if len(resp.Evaluations) != 2 {
t.Fatalf("bad: %#v", resp.Evaluations)
}
// Lookup the eval by prefix
get = &structs.EvalListRequest{
2017-09-07 23:56:15 +00:00
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: structs.DefaultNamespace,
Prefix: "aaaabb",
},
}
var resp2 structs.EvalListResponse
if err := msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if resp2.Index != 1000 {
t.Fatalf("Bad index: %d %d", resp2.Index, 1000)
}
if len(resp2.Evaluations) != 1 {
t.Fatalf("bad: %#v", resp2.Evaluations)
}
2015-09-06 23:01:16 +00:00
}
2017-10-02 23:53:50 +00:00
func TestEvalEndpoint_List_ACL(t *testing.T) {
t.Parallel()
s1, root := testACLServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
assert := assert.New(t)
// Create the register request
eval1 := mock.Eval()
eval1.ID = "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9"
eval2 := mock.Eval()
eval2.ID = "aaaabbbb-3350-4b4b-d185-0e1992ed43e9"
state := s1.fsm.State()
assert.Nil(state.UpsertEvals(1000, []*structs.Evaluation{eval1, eval2}))
// Create ACL tokens
validToken := mock.CreatePolicyAndToken(t, state, 1003, "test-valid",
mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}))
invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid",
mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityListJobs}))
2017-10-02 23:53:50 +00:00
get := &structs.EvalListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: structs.DefaultNamespace,
},
}
// Try without a token and expect permission denied
{
var resp structs.EvalListResponse
err := msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp)
assert.NotNil(err)
assert.Contains(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try with an invalid token and expect permission denied
{
2017-10-12 22:16:33 +00:00
get.AuthToken = invalidToken.SecretID
2017-10-02 23:53:50 +00:00
var resp structs.EvalListResponse
err := msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp)
assert.NotNil(err)
assert.Contains(err.Error(), structs.ErrPermissionDenied.Error())
}
// List evals with a valid token
{
2017-10-12 22:16:33 +00:00
get.AuthToken = validToken.SecretID
2017-10-02 23:53:50 +00:00
var resp structs.EvalListResponse
assert.Nil(msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp))
assert.Equal(uint64(1000), resp.Index, "Bad index: %d %d", resp.Index, 1000)
assert.Lenf(resp.Evaluations, 2, "bad: %#v", resp.Evaluations)
}
// List evals with a root token
{
2017-10-12 22:16:33 +00:00
get.AuthToken = root.SecretID
2017-10-02 23:53:50 +00:00
var resp structs.EvalListResponse
assert.Nil(msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp))
assert.Equal(uint64(1000), resp.Index, "Bad index: %d %d", resp.Index, 1000)
assert.Lenf(resp.Evaluations, 2, "bad: %#v", resp.Evaluations)
}
}
2015-10-30 02:00:02 +00:00
func TestEvalEndpoint_List_Blocking(t *testing.T) {
2017-07-23 22:04:38 +00:00
t.Parallel()
s1 := testServer(t, nil)
defer s1.Shutdown()
state := s1.fsm.State()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the ieval
eval := mock.Eval()
// Upsert eval triggers watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.UpsertEvals(2, []*structs.Evaluation{eval}); err != nil {
t.Fatalf("err: %v", err)
}
})
req := &structs.EvalListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
2017-09-07 23:56:15 +00:00
Namespace: structs.DefaultNamespace,
MinQueryIndex: 1,
},
}
start := time.Now()
var resp structs.EvalListResponse
if err := msgpackrpc.CallWithCodec(codec, "Eval.List", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
2015-10-30 15:27:47 +00:00
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 2 {
t.Fatalf("Bad index: %d %d", resp.Index, 2)
}
if len(resp.Evaluations) != 1 || resp.Evaluations[0].ID != eval.ID {
t.Fatalf("bad: %#v", resp.Evaluations)
}
// Eval deletion triggers watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.DeleteEval(3, []string{eval.ID}, nil); err != nil {
t.Fatalf("err: %v", err)
}
})
req.MinQueryIndex = 2
start = time.Now()
var resp2 structs.EvalListResponse
if err := msgpackrpc.CallWithCodec(codec, "Eval.List", req, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
2015-10-30 15:27:47 +00:00
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp2)
}
if resp2.Index != 3 {
t.Fatalf("Bad index: %d %d", resp2.Index, 3)
}
if len(resp2.Evaluations) != 0 {
t.Fatalf("bad: %#v", resp2.Evaluations)
}
}
func TestEvalEndpoint_Allocations(t *testing.T) {
2017-07-23 22:04:38 +00:00
t.Parallel()
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
alloc1 := mock.Alloc()
alloc2 := mock.Alloc()
alloc2.EvalID = alloc1.EvalID
state := s1.fsm.State()
2016-07-25 21:11:32 +00:00
state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID))
state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))
2015-09-07 03:47:42 +00:00
err := state.UpsertAllocs(1000,
[]*structs.Allocation{alloc1, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
// Lookup the eval
get := &structs.EvalSpecificRequest{
EvalID: alloc1.EvalID,
QueryOptions: structs.QueryOptions{Region: "global"},
}
var resp structs.EvalAllocationsResponse
if err := msgpackrpc.CallWithCodec(codec, "Eval.Allocations", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index != 1000 {
t.Fatalf("Bad index: %d %d", resp.Index, 1000)
}
if len(resp.Allocations) != 2 {
t.Fatalf("bad: %#v", resp.Allocations)
}
}
2017-10-03 00:21:40 +00:00
func TestEvalEndpoint_Allocations_ACL(t *testing.T) {
t.Parallel()
s1, root := testACLServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
assert := assert.New(t)
// Create the register request
alloc1 := mock.Alloc()
alloc2 := mock.Alloc()
alloc2.EvalID = alloc1.EvalID
state := s1.fsm.State()
assert.Nil(state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID)))
assert.Nil(state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)))
assert.Nil(state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2}))
// Create ACL tokens
validToken := mock.CreatePolicyAndToken(t, state, 1003, "test-valid",
mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}))
invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid",
mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityListJobs}))
2017-10-03 00:21:40 +00:00
get := &structs.EvalSpecificRequest{
EvalID: alloc1.EvalID,
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Try with no token and expect permission denied
{
var resp structs.EvalAllocationsResponse
err := msgpackrpc.CallWithCodec(codec, "Eval.Allocations", get, &resp)
assert.NotNil(err)
assert.Contains(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try with an invalid token and expect permission denied
{
2017-10-12 22:16:33 +00:00
get.AuthToken = invalidToken.SecretID
2017-10-03 00:21:40 +00:00
var resp structs.EvalAllocationsResponse
err := msgpackrpc.CallWithCodec(codec, "Eval.Allocations", get, &resp)
assert.NotNil(err)
assert.Contains(err.Error(), structs.ErrPermissionDenied.Error())
}
// Lookup the eval with a valid token
{
2017-10-12 22:16:33 +00:00
get.AuthToken = validToken.SecretID
2017-10-03 00:21:40 +00:00
var resp structs.EvalAllocationsResponse
assert.Nil(msgpackrpc.CallWithCodec(codec, "Eval.Allocations", get, &resp))
assert.Equal(uint64(1000), resp.Index, "Bad index: %d %d", resp.Index, 1000)
assert.Lenf(resp.Allocations, 2, "bad: %#v", resp.Allocations)
}
// Lookup the eval with a root token
{
2017-10-12 22:16:33 +00:00
get.AuthToken = root.SecretID
2017-10-03 00:21:40 +00:00
var resp structs.EvalAllocationsResponse
assert.Nil(msgpackrpc.CallWithCodec(codec, "Eval.Allocations", get, &resp))
assert.Equal(uint64(1000), resp.Index, "Bad index: %d %d", resp.Index, 1000)
assert.Lenf(resp.Allocations, 2, "bad: %#v", resp.Allocations)
}
}
2015-10-30 02:00:02 +00:00
func TestEvalEndpoint_Allocations_Blocking(t *testing.T) {
2017-07-23 22:04:38 +00:00
t.Parallel()
s1 := testServer(t, nil)
defer s1.Shutdown()
state := s1.fsm.State()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the allocs
alloc1 := mock.Alloc()
alloc2 := mock.Alloc()
// Upsert an unrelated alloc first
time.AfterFunc(100*time.Millisecond, func() {
2016-07-25 21:11:32 +00:00
state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID))
2015-10-30 02:00:02 +00:00
err := state.UpsertAllocs(100, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
}
})
// Upsert an alloc which will trigger the watch later
time.AfterFunc(200*time.Millisecond, func() {
2016-07-25 21:11:32 +00:00
state.UpsertJobSummary(199, mock.JobSummary(alloc2.JobID))
2015-10-30 02:00:02 +00:00
err := state.UpsertAllocs(200, []*structs.Allocation{alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
})
// Lookup the eval
get := &structs.EvalSpecificRequest{
EvalID: alloc2.EvalID,
QueryOptions: structs.QueryOptions{
Region: "global",
2017-02-08 06:10:33 +00:00
MinQueryIndex: 150,
},
}
var resp structs.EvalAllocationsResponse
start := time.Now()
if err := msgpackrpc.CallWithCodec(codec, "Eval.Allocations", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}
2015-10-30 15:27:47 +00:00
if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
2015-10-30 02:00:02 +00:00
if resp.Index != 200 {
t.Fatalf("Bad index: %d %d", resp.Index, 200)
}
if len(resp.Allocations) != 1 || resp.Allocations[0].ID != alloc2.ID {
t.Fatalf("bad: %#v", resp.Allocations)
}
}
func TestEvalEndpoint_Reblock_NonExistent(t *testing.T) {
2017-07-23 22:04:38 +00:00
t.Parallel()
2016-06-07 16:51:20 +00:00
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForResult(func() (bool, error) {
return s1.evalBroker.Enabled(), nil
}, func(err error) {
t.Fatalf("should enable eval broker")
})
// Create the register request
eval1 := mock.Eval()
s1.evalBroker.Enqueue(eval1)
out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("missing eval")
}
get := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval1},
EvalToken: token,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "Eval.Reblock", get, &resp); err == nil {
t.Fatalf("expect error since eval does not exist")
}
}
func TestEvalEndpoint_Reblock_NonBlocked(t *testing.T) {
2017-07-23 22:04:38 +00:00
t.Parallel()
2016-06-07 16:51:20 +00:00
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForResult(func() (bool, error) {
return s1.evalBroker.Enabled(), nil
}, func(err error) {
t.Fatalf("should enable eval broker")
})
// Create the eval
eval1 := mock.Eval()
s1.evalBroker.Enqueue(eval1)
// Insert it into the state store
if err := s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1}); err != nil {
t.Fatal(err)
}
2016-06-03 18:36:50 +00:00
out, token, err := s1.evalBroker.Dequeue(defaultSched, 2*time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("missing eval")
}
get := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval1},
EvalToken: token,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "Eval.Reblock", get, &resp); err == nil {
t.Fatalf("should error since eval was not in blocked state: %v", err)
}
}
func TestEvalEndpoint_Reblock(t *testing.T) {
2017-07-23 22:04:38 +00:00
t.Parallel()
2016-06-07 16:51:20 +00:00
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForResult(func() (bool, error) {
return s1.evalBroker.Enabled(), nil
}, func(err error) {
t.Fatalf("should enable eval broker")
})
// Create the eval
eval1 := mock.Eval()
eval1.Status = structs.EvalStatusBlocked
s1.evalBroker.Enqueue(eval1)
// Insert it into the state store
if err := s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1}); err != nil {
t.Fatal(err)
}
2016-06-07 16:51:20 +00:00
out, token, err := s1.evalBroker.Dequeue(defaultSched, 7*time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("bad: %v", out)
}
get := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval1},
EvalToken: token,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "Eval.Reblock", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}
// Check that it is blocked
bStats := s1.blockedEvals.Stats()
if bStats.TotalBlocked+bStats.TotalEscaped == 0 {
t.Fatalf("ReblockEval didn't insert eval into the blocked eval tracker")
}
}