15b991e039
HTTP header values must be ASCII. Also constant time compare tokens and test the generate and compare helper functions.
1034 lines
28 KiB
Go
1034 lines
28 KiB
Go
package nomad
|
|
|
|
import (
|
|
"encoding/base64"
|
|
"fmt"
|
|
"reflect"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
memdb "github.com/hashicorp/go-memdb"
|
|
"github.com/hashicorp/net-rpc-msgpackrpc"
|
|
"github.com/hashicorp/nomad/acl"
|
|
"github.com/hashicorp/nomad/helper/uuid"
|
|
"github.com/hashicorp/nomad/nomad/mock"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/hashicorp/nomad/scheduler"
|
|
"github.com/hashicorp/nomad/testutil"
|
|
"github.com/stretchr/testify/assert"
|
|
)
|
|
|
|
func TestEvalEndpoint_GetEval(t *testing.T) {
|
|
t.Parallel()
|
|
s1 := testServer(t, nil)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Create the register request
|
|
eval1 := mock.Eval()
|
|
s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1})
|
|
|
|
// Lookup the eval
|
|
get := &structs.EvalSpecificRequest{
|
|
EvalID: eval1.ID,
|
|
QueryOptions: structs.QueryOptions{Region: "global"},
|
|
}
|
|
var resp structs.SingleEvalResponse
|
|
if err := msgpackrpc.CallWithCodec(codec, "Eval.GetEval", get, &resp); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if resp.Index != 1000 {
|
|
t.Fatalf("Bad index: %d %d", resp.Index, 1000)
|
|
}
|
|
|
|
if !reflect.DeepEqual(eval1, resp.Eval) {
|
|
t.Fatalf("bad: %#v %#v", eval1, resp.Eval)
|
|
}
|
|
|
|
// Lookup non-existing node
|
|
get.EvalID = uuid.Generate()
|
|
if err := msgpackrpc.CallWithCodec(codec, "Eval.GetEval", get, &resp); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if resp.Index != 1000 {
|
|
t.Fatalf("Bad index: %d %d", resp.Index, 1000)
|
|
}
|
|
if resp.Eval != nil {
|
|
t.Fatalf("unexpected eval")
|
|
}
|
|
}
|
|
|
|
func TestEvalEndpoint_GetEval_ACL(t *testing.T) {
|
|
t.Parallel()
|
|
s1, root := testACLServer(t, nil)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
assert := assert.New(t)
|
|
|
|
// Create the register request
|
|
eval1 := mock.Eval()
|
|
state := s1.fsm.State()
|
|
state.UpsertEvals(1000, []*structs.Evaluation{eval1})
|
|
|
|
// Create ACL tokens
|
|
validToken := mock.CreatePolicyAndToken(t, state, 1003, "test-valid",
|
|
mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}))
|
|
invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid",
|
|
mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityListJobs}))
|
|
|
|
get := &structs.EvalSpecificRequest{
|
|
EvalID: eval1.ID,
|
|
QueryOptions: structs.QueryOptions{Region: "global"},
|
|
}
|
|
|
|
// Try with no token and expect permission denied
|
|
{
|
|
var resp structs.SingleEvalResponse
|
|
err := msgpackrpc.CallWithCodec(codec, "Eval.GetEval", get, &resp)
|
|
assert.NotNil(err)
|
|
assert.Contains(err.Error(), structs.ErrPermissionDenied.Error())
|
|
}
|
|
|
|
// Try with an invalid token and expect permission denied
|
|
{
|
|
get.AuthToken = invalidToken.SecretID
|
|
var resp structs.SingleEvalResponse
|
|
err := msgpackrpc.CallWithCodec(codec, "Eval.GetEval", get, &resp)
|
|
assert.NotNil(err)
|
|
assert.Contains(err.Error(), structs.ErrPermissionDenied.Error())
|
|
}
|
|
|
|
// Lookup the eval using a valid token
|
|
{
|
|
get.AuthToken = validToken.SecretID
|
|
var resp structs.SingleEvalResponse
|
|
assert.Nil(msgpackrpc.CallWithCodec(codec, "Eval.GetEval", get, &resp))
|
|
assert.Equal(uint64(1000), resp.Index, "Bad index: %d %d", resp.Index, 1000)
|
|
assert.Equal(eval1, resp.Eval)
|
|
}
|
|
|
|
// Lookup the eval using a root token
|
|
{
|
|
get.AuthToken = root.SecretID
|
|
var resp structs.SingleEvalResponse
|
|
assert.Nil(msgpackrpc.CallWithCodec(codec, "Eval.GetEval", get, &resp))
|
|
assert.Equal(uint64(1000), resp.Index, "Bad index: %d %d", resp.Index, 1000)
|
|
assert.Equal(eval1, resp.Eval)
|
|
}
|
|
}
|
|
|
|
func TestEvalEndpoint_GetEval_Blocking(t *testing.T) {
|
|
t.Parallel()
|
|
s1 := testServer(t, nil)
|
|
defer s1.Shutdown()
|
|
state := s1.fsm.State()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Create the evals
|
|
eval1 := mock.Eval()
|
|
eval2 := mock.Eval()
|
|
|
|
// First create an unrelated eval
|
|
time.AfterFunc(100*time.Millisecond, func() {
|
|
err := state.UpsertEvals(100, []*structs.Evaluation{eval1})
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
})
|
|
|
|
// Upsert the eval we are watching later
|
|
time.AfterFunc(200*time.Millisecond, func() {
|
|
err := state.UpsertEvals(200, []*structs.Evaluation{eval2})
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
})
|
|
|
|
// Lookup the eval
|
|
req := &structs.EvalSpecificRequest{
|
|
EvalID: eval2.ID,
|
|
QueryOptions: structs.QueryOptions{
|
|
Region: "global",
|
|
MinQueryIndex: 150,
|
|
},
|
|
}
|
|
var resp structs.SingleEvalResponse
|
|
start := time.Now()
|
|
if err := msgpackrpc.CallWithCodec(codec, "Eval.GetEval", req, &resp); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
|
|
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
|
|
}
|
|
if resp.Index != 200 {
|
|
t.Fatalf("Bad index: %d %d", resp.Index, 200)
|
|
}
|
|
if resp.Eval == nil || resp.Eval.ID != eval2.ID {
|
|
t.Fatalf("bad: %#v", resp.Eval)
|
|
}
|
|
|
|
// Eval delete triggers watches
|
|
time.AfterFunc(100*time.Millisecond, func() {
|
|
err := state.DeleteEval(300, []string{eval2.ID}, []string{})
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
})
|
|
|
|
req.QueryOptions.MinQueryIndex = 250
|
|
var resp2 structs.SingleEvalResponse
|
|
start = time.Now()
|
|
if err := msgpackrpc.CallWithCodec(codec, "Eval.GetEval", req, &resp2); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
|
|
t.Fatalf("should block (returned in %s) %#v", elapsed, resp2)
|
|
}
|
|
if resp2.Index != 300 {
|
|
t.Fatalf("Bad index: %d %d", resp2.Index, 300)
|
|
}
|
|
if resp2.Eval != nil {
|
|
t.Fatalf("bad: %#v", resp2.Eval)
|
|
}
|
|
}
|
|
|
|
func TestEvalEndpoint_Dequeue(t *testing.T) {
|
|
t.Parallel()
|
|
s1 := testServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0 // Prevent automatic dequeue
|
|
})
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Create the register request
|
|
eval1 := mock.Eval()
|
|
s1.evalBroker.Enqueue(eval1)
|
|
|
|
// Dequeue the eval
|
|
get := &structs.EvalDequeueRequest{
|
|
Schedulers: defaultSched,
|
|
SchedulerVersion: scheduler.SchedulerVersion,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var resp structs.EvalDequeueResponse
|
|
if err := msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
if !reflect.DeepEqual(eval1, resp.Eval) {
|
|
t.Fatalf("bad: %v %v", eval1, resp.Eval)
|
|
}
|
|
|
|
// Ensure outstanding
|
|
token, ok := s1.evalBroker.Outstanding(eval1.ID)
|
|
if !ok {
|
|
t.Fatalf("should be outstanding")
|
|
}
|
|
if token != resp.Token {
|
|
t.Fatalf("bad token: %#v %#v", token, resp.Token)
|
|
}
|
|
|
|
if resp.WaitIndex != eval1.ModifyIndex {
|
|
t.Fatalf("bad wait index; got %d; want %d", resp.WaitIndex, eval1.ModifyIndex)
|
|
}
|
|
}
|
|
|
|
func TestEvalEndpoint_Dequeue_WaitIndex(t *testing.T) {
|
|
t.Parallel()
|
|
s1 := testServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0 // Prevent automatic dequeue
|
|
})
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Create the register request
|
|
eval1 := mock.Eval()
|
|
eval2 := mock.Eval()
|
|
eval2.JobID = eval1.JobID
|
|
s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1})
|
|
s1.evalBroker.Enqueue(eval1)
|
|
s1.fsm.State().UpsertEvals(1001, []*structs.Evaluation{eval2})
|
|
|
|
// Dequeue the eval
|
|
get := &structs.EvalDequeueRequest{
|
|
Schedulers: defaultSched,
|
|
SchedulerVersion: scheduler.SchedulerVersion,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var resp structs.EvalDequeueResponse
|
|
if err := msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
if !reflect.DeepEqual(eval1, resp.Eval) {
|
|
t.Fatalf("bad: %v %v", eval1, resp.Eval)
|
|
}
|
|
|
|
// Ensure outstanding
|
|
token, ok := s1.evalBroker.Outstanding(eval1.ID)
|
|
if !ok {
|
|
t.Fatalf("should be outstanding")
|
|
}
|
|
if token != resp.Token {
|
|
t.Fatalf("bad token: %#v %#v", token, resp.Token)
|
|
}
|
|
|
|
if resp.WaitIndex != 1001 {
|
|
t.Fatalf("bad wait index; got %d; want %d", resp.WaitIndex, 1001)
|
|
}
|
|
}
|
|
|
|
func TestEvalEndpoint_Dequeue_Version_Mismatch(t *testing.T) {
|
|
t.Parallel()
|
|
s1 := testServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0 // Prevent automatic dequeue
|
|
})
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Create the register request
|
|
eval1 := mock.Eval()
|
|
s1.evalBroker.Enqueue(eval1)
|
|
|
|
// Dequeue the eval
|
|
get := &structs.EvalDequeueRequest{
|
|
Schedulers: defaultSched,
|
|
SchedulerVersion: 0,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var resp structs.EvalDequeueResponse
|
|
err := msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp)
|
|
if err == nil || !strings.Contains(err.Error(), "scheduler version is 0") {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestEvalEndpoint_Ack(t *testing.T) {
|
|
t.Parallel()
|
|
s1 := testServer(t, nil)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
return s1.evalBroker.Enabled(), nil
|
|
}, func(err error) {
|
|
t.Fatalf("should enable eval broker")
|
|
})
|
|
|
|
// Create the register request
|
|
eval1 := mock.Eval()
|
|
s1.evalBroker.Enqueue(eval1)
|
|
out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if out == nil {
|
|
t.Fatalf("missing eval")
|
|
}
|
|
|
|
// Ack the eval
|
|
get := &structs.EvalAckRequest{
|
|
EvalID: out.ID,
|
|
Token: token,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var resp structs.GenericResponse
|
|
if err := msgpackrpc.CallWithCodec(codec, "Eval.Ack", get, &resp); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Ensure outstanding
|
|
if _, ok := s1.evalBroker.Outstanding(eval1.ID); ok {
|
|
t.Fatalf("should not be outstanding")
|
|
}
|
|
}
|
|
|
|
func TestEvalEndpoint_Nack(t *testing.T) {
|
|
t.Parallel()
|
|
s1 := testServer(t, func(c *Config) {
|
|
// Disable all of the schedulers so we can manually dequeue
|
|
// evals and check the queue status
|
|
c.NumSchedulers = 0
|
|
})
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
return s1.evalBroker.Enabled(), nil
|
|
}, func(err error) {
|
|
t.Fatalf("should enable eval broker")
|
|
})
|
|
|
|
// Create the register request
|
|
eval1 := mock.Eval()
|
|
s1.evalBroker.Enqueue(eval1)
|
|
out, token, _ := s1.evalBroker.Dequeue(defaultSched, time.Second)
|
|
if out == nil {
|
|
t.Fatalf("missing eval")
|
|
}
|
|
|
|
// Nack the eval
|
|
get := &structs.EvalAckRequest{
|
|
EvalID: out.ID,
|
|
Token: token,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var resp structs.GenericResponse
|
|
if err := msgpackrpc.CallWithCodec(codec, "Eval.Nack", get, &resp); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Ensure outstanding
|
|
if _, ok := s1.evalBroker.Outstanding(eval1.ID); ok {
|
|
t.Fatalf("should not be outstanding")
|
|
}
|
|
|
|
// Should get it back
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
out2, _, _ := s1.evalBroker.Dequeue(defaultSched, time.Second)
|
|
if out2 != out {
|
|
return false, fmt.Errorf("nack failed")
|
|
}
|
|
|
|
return true, nil
|
|
}, func(err error) {
|
|
t.Fatal(err)
|
|
})
|
|
}
|
|
|
|
func TestEvalEndpoint_Update(t *testing.T) {
|
|
t.Parallel()
|
|
s1 := testServer(t, nil)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
return s1.evalBroker.Enabled(), nil
|
|
}, func(err error) {
|
|
t.Fatalf("should enable eval broker")
|
|
})
|
|
|
|
// Create the register request
|
|
eval1 := mock.Eval()
|
|
s1.evalBroker.Enqueue(eval1)
|
|
out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if out == nil {
|
|
t.Fatalf("missing eval")
|
|
}
|
|
|
|
// Update the eval
|
|
eval2 := eval1.Copy()
|
|
eval2.Status = structs.EvalStatusComplete
|
|
|
|
get := &structs.EvalUpdateRequest{
|
|
Evals: []*structs.Evaluation{eval2},
|
|
EvalToken: token,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var resp structs.GenericResponse
|
|
if err := msgpackrpc.CallWithCodec(codec, "Eval.Update", get, &resp); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Ensure updated
|
|
ws := memdb.NewWatchSet()
|
|
outE, err := s1.fsm.State().EvalByID(ws, eval2.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outE.Status != structs.EvalStatusComplete {
|
|
t.Fatalf("Bad: %#v", out)
|
|
}
|
|
}
|
|
|
|
func TestEvalEndpoint_Create(t *testing.T) {
|
|
t.Parallel()
|
|
s1 := testServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0 // Prevent automatic dequeue
|
|
})
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
return s1.evalBroker.Enabled(), nil
|
|
}, func(err error) {
|
|
t.Fatalf("should enable eval broker")
|
|
})
|
|
|
|
// Create the register request
|
|
prev := mock.Eval()
|
|
s1.evalBroker.Enqueue(prev)
|
|
out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if out == nil {
|
|
t.Fatalf("missing eval")
|
|
}
|
|
|
|
// Create the register request
|
|
eval1 := mock.Eval()
|
|
eval1.PreviousEval = prev.ID
|
|
get := &structs.EvalUpdateRequest{
|
|
Evals: []*structs.Evaluation{eval1},
|
|
EvalToken: token,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var resp structs.GenericResponse
|
|
if err := msgpackrpc.CallWithCodec(codec, "Eval.Create", get, &resp); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Ensure created
|
|
ws := memdb.NewWatchSet()
|
|
outE, err := s1.fsm.State().EvalByID(ws, eval1.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
eval1.CreateIndex = resp.Index
|
|
eval1.ModifyIndex = resp.Index
|
|
if !reflect.DeepEqual(eval1, outE) {
|
|
t.Fatalf("Bad: %#v %#v", outE, eval1)
|
|
}
|
|
}
|
|
|
|
func TestEvalEndpoint_Reap(t *testing.T) {
|
|
t.Parallel()
|
|
s1 := testServer(t, nil)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Create the register request
|
|
eval1 := mock.Eval()
|
|
s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1})
|
|
|
|
// Reap the eval
|
|
get := &structs.EvalDeleteRequest{
|
|
Evals: []string{eval1.ID},
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var resp structs.GenericResponse
|
|
if err := msgpackrpc.CallWithCodec(codec, "Eval.Reap", get, &resp); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if resp.Index == 0 {
|
|
t.Fatalf("Bad index: %d", resp.Index)
|
|
}
|
|
|
|
// Ensure deleted
|
|
ws := memdb.NewWatchSet()
|
|
outE, err := s1.fsm.State().EvalByID(ws, eval1.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outE != nil {
|
|
t.Fatalf("Bad: %#v", outE)
|
|
}
|
|
}
|
|
|
|
func TestEvalEndpoint_List(t *testing.T) {
|
|
t.Parallel()
|
|
s1 := testServer(t, nil)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Create the register request
|
|
eval1 := mock.Eval()
|
|
eval1.ID = "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9"
|
|
eval2 := mock.Eval()
|
|
eval2.ID = "aaaabbbb-3350-4b4b-d185-0e1992ed43e9"
|
|
s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1, eval2})
|
|
|
|
// Lookup the eval
|
|
get := &structs.EvalListRequest{
|
|
QueryOptions: structs.QueryOptions{
|
|
Region: "global",
|
|
Namespace: structs.DefaultNamespace,
|
|
},
|
|
}
|
|
var resp structs.EvalListResponse
|
|
if err := msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if resp.Index != 1000 {
|
|
t.Fatalf("Bad index: %d %d", resp.Index, 1000)
|
|
}
|
|
|
|
if len(resp.Evaluations) != 2 {
|
|
t.Fatalf("bad: %#v", resp.Evaluations)
|
|
}
|
|
|
|
// Lookup the eval by prefix
|
|
get = &structs.EvalListRequest{
|
|
QueryOptions: structs.QueryOptions{
|
|
Region: "global",
|
|
Namespace: structs.DefaultNamespace,
|
|
Prefix: "aaaabb",
|
|
},
|
|
}
|
|
var resp2 structs.EvalListResponse
|
|
if err := msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp2); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if resp2.Index != 1000 {
|
|
t.Fatalf("Bad index: %d %d", resp2.Index, 1000)
|
|
}
|
|
|
|
if len(resp2.Evaluations) != 1 {
|
|
t.Fatalf("bad: %#v", resp2.Evaluations)
|
|
}
|
|
|
|
}
|
|
|
|
func TestEvalEndpoint_List_ACL(t *testing.T) {
|
|
t.Parallel()
|
|
s1, root := testACLServer(t, nil)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
assert := assert.New(t)
|
|
|
|
// Create the register request
|
|
eval1 := mock.Eval()
|
|
eval1.ID = "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9"
|
|
eval2 := mock.Eval()
|
|
eval2.ID = "aaaabbbb-3350-4b4b-d185-0e1992ed43e9"
|
|
state := s1.fsm.State()
|
|
assert.Nil(state.UpsertEvals(1000, []*structs.Evaluation{eval1, eval2}))
|
|
|
|
// Create ACL tokens
|
|
validToken := mock.CreatePolicyAndToken(t, state, 1003, "test-valid",
|
|
mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}))
|
|
invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid",
|
|
mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityListJobs}))
|
|
|
|
get := &structs.EvalListRequest{
|
|
QueryOptions: structs.QueryOptions{
|
|
Region: "global",
|
|
Namespace: structs.DefaultNamespace,
|
|
},
|
|
}
|
|
|
|
// Try without a token and expect permission denied
|
|
{
|
|
var resp structs.EvalListResponse
|
|
err := msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp)
|
|
assert.NotNil(err)
|
|
assert.Contains(err.Error(), structs.ErrPermissionDenied.Error())
|
|
}
|
|
|
|
// Try with an invalid token and expect permission denied
|
|
{
|
|
get.AuthToken = invalidToken.SecretID
|
|
var resp structs.EvalListResponse
|
|
err := msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp)
|
|
assert.NotNil(err)
|
|
assert.Contains(err.Error(), structs.ErrPermissionDenied.Error())
|
|
}
|
|
|
|
// List evals with a valid token
|
|
{
|
|
get.AuthToken = validToken.SecretID
|
|
var resp structs.EvalListResponse
|
|
assert.Nil(msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp))
|
|
assert.Equal(uint64(1000), resp.Index, "Bad index: %d %d", resp.Index, 1000)
|
|
assert.Lenf(resp.Evaluations, 2, "bad: %#v", resp.Evaluations)
|
|
}
|
|
|
|
// List evals with a root token
|
|
{
|
|
get.AuthToken = root.SecretID
|
|
var resp structs.EvalListResponse
|
|
assert.Nil(msgpackrpc.CallWithCodec(codec, "Eval.List", get, &resp))
|
|
assert.Equal(uint64(1000), resp.Index, "Bad index: %d %d", resp.Index, 1000)
|
|
assert.Lenf(resp.Evaluations, 2, "bad: %#v", resp.Evaluations)
|
|
}
|
|
}
|
|
|
|
func TestEvalEndpoint_List_Blocking(t *testing.T) {
|
|
t.Parallel()
|
|
s1 := testServer(t, nil)
|
|
defer s1.Shutdown()
|
|
state := s1.fsm.State()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Create the ieval
|
|
eval := mock.Eval()
|
|
|
|
// Upsert eval triggers watches
|
|
time.AfterFunc(100*time.Millisecond, func() {
|
|
if err := state.UpsertEvals(2, []*structs.Evaluation{eval}); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
})
|
|
|
|
req := &structs.EvalListRequest{
|
|
QueryOptions: structs.QueryOptions{
|
|
Region: "global",
|
|
Namespace: structs.DefaultNamespace,
|
|
MinQueryIndex: 1,
|
|
},
|
|
}
|
|
start := time.Now()
|
|
var resp structs.EvalListResponse
|
|
if err := msgpackrpc.CallWithCodec(codec, "Eval.List", req, &resp); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
|
|
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
|
|
}
|
|
if resp.Index != 2 {
|
|
t.Fatalf("Bad index: %d %d", resp.Index, 2)
|
|
}
|
|
if len(resp.Evaluations) != 1 || resp.Evaluations[0].ID != eval.ID {
|
|
t.Fatalf("bad: %#v", resp.Evaluations)
|
|
}
|
|
|
|
// Eval deletion triggers watches
|
|
time.AfterFunc(100*time.Millisecond, func() {
|
|
if err := state.DeleteEval(3, []string{eval.ID}, nil); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
})
|
|
|
|
req.MinQueryIndex = 2
|
|
start = time.Now()
|
|
var resp2 structs.EvalListResponse
|
|
if err := msgpackrpc.CallWithCodec(codec, "Eval.List", req, &resp2); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
|
|
t.Fatalf("should block (returned in %s) %#v", elapsed, resp2)
|
|
}
|
|
if resp2.Index != 3 {
|
|
t.Fatalf("Bad index: %d %d", resp2.Index, 3)
|
|
}
|
|
if len(resp2.Evaluations) != 0 {
|
|
t.Fatalf("bad: %#v", resp2.Evaluations)
|
|
}
|
|
}
|
|
|
|
func TestEvalEndpoint_Allocations(t *testing.T) {
|
|
t.Parallel()
|
|
s1 := testServer(t, nil)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Create the register request
|
|
alloc1 := mock.Alloc()
|
|
alloc2 := mock.Alloc()
|
|
alloc2.EvalID = alloc1.EvalID
|
|
state := s1.fsm.State()
|
|
state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID))
|
|
state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))
|
|
err := state.UpsertAllocs(1000,
|
|
[]*structs.Allocation{alloc1, alloc2})
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Lookup the eval
|
|
get := &structs.EvalSpecificRequest{
|
|
EvalID: alloc1.EvalID,
|
|
QueryOptions: structs.QueryOptions{Region: "global"},
|
|
}
|
|
var resp structs.EvalAllocationsResponse
|
|
if err := msgpackrpc.CallWithCodec(codec, "Eval.Allocations", get, &resp); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if resp.Index != 1000 {
|
|
t.Fatalf("Bad index: %d %d", resp.Index, 1000)
|
|
}
|
|
|
|
if len(resp.Allocations) != 2 {
|
|
t.Fatalf("bad: %#v", resp.Allocations)
|
|
}
|
|
}
|
|
|
|
func TestEvalEndpoint_Allocations_ACL(t *testing.T) {
|
|
t.Parallel()
|
|
s1, root := testACLServer(t, nil)
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
assert := assert.New(t)
|
|
|
|
// Create the register request
|
|
alloc1 := mock.Alloc()
|
|
alloc2 := mock.Alloc()
|
|
alloc2.EvalID = alloc1.EvalID
|
|
state := s1.fsm.State()
|
|
assert.Nil(state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID)))
|
|
assert.Nil(state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)))
|
|
assert.Nil(state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2}))
|
|
|
|
// Create ACL tokens
|
|
validToken := mock.CreatePolicyAndToken(t, state, 1003, "test-valid",
|
|
mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}))
|
|
invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid",
|
|
mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityListJobs}))
|
|
|
|
get := &structs.EvalSpecificRequest{
|
|
EvalID: alloc1.EvalID,
|
|
QueryOptions: structs.QueryOptions{Region: "global"},
|
|
}
|
|
|
|
// Try with no token and expect permission denied
|
|
{
|
|
var resp structs.EvalAllocationsResponse
|
|
err := msgpackrpc.CallWithCodec(codec, "Eval.Allocations", get, &resp)
|
|
assert.NotNil(err)
|
|
assert.Contains(err.Error(), structs.ErrPermissionDenied.Error())
|
|
}
|
|
|
|
// Try with an invalid token and expect permission denied
|
|
{
|
|
get.AuthToken = invalidToken.SecretID
|
|
var resp structs.EvalAllocationsResponse
|
|
err := msgpackrpc.CallWithCodec(codec, "Eval.Allocations", get, &resp)
|
|
assert.NotNil(err)
|
|
assert.Contains(err.Error(), structs.ErrPermissionDenied.Error())
|
|
}
|
|
|
|
// Lookup the eval with a valid token
|
|
{
|
|
get.AuthToken = validToken.SecretID
|
|
var resp structs.EvalAllocationsResponse
|
|
assert.Nil(msgpackrpc.CallWithCodec(codec, "Eval.Allocations", get, &resp))
|
|
assert.Equal(uint64(1000), resp.Index, "Bad index: %d %d", resp.Index, 1000)
|
|
assert.Lenf(resp.Allocations, 2, "bad: %#v", resp.Allocations)
|
|
}
|
|
|
|
// Lookup the eval with a root token
|
|
{
|
|
get.AuthToken = root.SecretID
|
|
var resp structs.EvalAllocationsResponse
|
|
assert.Nil(msgpackrpc.CallWithCodec(codec, "Eval.Allocations", get, &resp))
|
|
assert.Equal(uint64(1000), resp.Index, "Bad index: %d %d", resp.Index, 1000)
|
|
assert.Lenf(resp.Allocations, 2, "bad: %#v", resp.Allocations)
|
|
}
|
|
}
|
|
|
|
func TestEvalEndpoint_Allocations_Blocking(t *testing.T) {
|
|
t.Parallel()
|
|
s1 := testServer(t, nil)
|
|
defer s1.Shutdown()
|
|
state := s1.fsm.State()
|
|
codec := rpcClient(t, s1)
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Create the allocs
|
|
alloc1 := mock.Alloc()
|
|
alloc2 := mock.Alloc()
|
|
|
|
// Upsert an unrelated alloc first
|
|
time.AfterFunc(100*time.Millisecond, func() {
|
|
state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID))
|
|
err := state.UpsertAllocs(100, []*structs.Allocation{alloc1})
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
})
|
|
|
|
// Upsert an alloc which will trigger the watch later
|
|
time.AfterFunc(200*time.Millisecond, func() {
|
|
state.UpsertJobSummary(199, mock.JobSummary(alloc2.JobID))
|
|
err := state.UpsertAllocs(200, []*structs.Allocation{alloc2})
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
})
|
|
|
|
// Lookup the eval
|
|
get := &structs.EvalSpecificRequest{
|
|
EvalID: alloc2.EvalID,
|
|
QueryOptions: structs.QueryOptions{
|
|
Region: "global",
|
|
MinQueryIndex: 150,
|
|
},
|
|
}
|
|
var resp structs.EvalAllocationsResponse
|
|
start := time.Now()
|
|
if err := msgpackrpc.CallWithCodec(codec, "Eval.Allocations", get, &resp); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
|
|
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
|
|
}
|
|
if resp.Index != 200 {
|
|
t.Fatalf("Bad index: %d %d", resp.Index, 200)
|
|
}
|
|
if len(resp.Allocations) != 1 || resp.Allocations[0].ID != alloc2.ID {
|
|
t.Fatalf("bad: %#v", resp.Allocations)
|
|
}
|
|
}
|
|
|
|
func TestEvalEndpoint_Reblock_NonExistent(t *testing.T) {
|
|
t.Parallel()
|
|
s1 := testServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0 // Prevent automatic dequeue
|
|
})
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
return s1.evalBroker.Enabled(), nil
|
|
}, func(err error) {
|
|
t.Fatalf("should enable eval broker")
|
|
})
|
|
|
|
// Create the register request
|
|
eval1 := mock.Eval()
|
|
s1.evalBroker.Enqueue(eval1)
|
|
out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if out == nil {
|
|
t.Fatalf("missing eval")
|
|
}
|
|
|
|
get := &structs.EvalUpdateRequest{
|
|
Evals: []*structs.Evaluation{eval1},
|
|
EvalToken: token,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var resp structs.GenericResponse
|
|
if err := msgpackrpc.CallWithCodec(codec, "Eval.Reblock", get, &resp); err == nil {
|
|
t.Fatalf("expect error since eval does not exist")
|
|
}
|
|
}
|
|
|
|
func TestEvalEndpoint_Reblock_NonBlocked(t *testing.T) {
|
|
t.Parallel()
|
|
s1 := testServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0 // Prevent automatic dequeue
|
|
})
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
return s1.evalBroker.Enabled(), nil
|
|
}, func(err error) {
|
|
t.Fatalf("should enable eval broker")
|
|
})
|
|
|
|
// Create the eval
|
|
eval1 := mock.Eval()
|
|
s1.evalBroker.Enqueue(eval1)
|
|
|
|
// Insert it into the state store
|
|
if err := s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1}); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
out, token, err := s1.evalBroker.Dequeue(defaultSched, 2*time.Second)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if out == nil {
|
|
t.Fatalf("missing eval")
|
|
}
|
|
|
|
get := &structs.EvalUpdateRequest{
|
|
Evals: []*structs.Evaluation{eval1},
|
|
EvalToken: token,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var resp structs.GenericResponse
|
|
if err := msgpackrpc.CallWithCodec(codec, "Eval.Reblock", get, &resp); err == nil {
|
|
t.Fatalf("should error since eval was not in blocked state: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestEvalEndpoint_Reblock(t *testing.T) {
|
|
t.Parallel()
|
|
s1 := testServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0 // Prevent automatic dequeue
|
|
})
|
|
defer s1.Shutdown()
|
|
codec := rpcClient(t, s1)
|
|
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
return s1.evalBroker.Enabled(), nil
|
|
}, func(err error) {
|
|
t.Fatalf("should enable eval broker")
|
|
})
|
|
|
|
// Create the eval
|
|
eval1 := mock.Eval()
|
|
eval1.Status = structs.EvalStatusBlocked
|
|
s1.evalBroker.Enqueue(eval1)
|
|
|
|
// Insert it into the state store
|
|
if err := s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1}); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
out, token, err := s1.evalBroker.Dequeue(defaultSched, 7*time.Second)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if out == nil {
|
|
t.Fatalf("bad: %v", out)
|
|
}
|
|
|
|
get := &structs.EvalUpdateRequest{
|
|
Evals: []*structs.Evaluation{eval1},
|
|
EvalToken: token,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var resp structs.GenericResponse
|
|
if err := msgpackrpc.CallWithCodec(codec, "Eval.Reblock", get, &resp); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Check that it is blocked
|
|
bStats := s1.blockedEvals.Stats()
|
|
if bStats.TotalBlocked+bStats.TotalEscaped == 0 {
|
|
t.Fatalf("ReblockEval didn't insert eval into the blocked eval tracker")
|
|
}
|
|
}
|
|
|
|
// TestGenerateMigrateToken asserts the migrate token is valid for use in HTTP
|
|
// headers and CompareMigrateToken works as expected.
|
|
func TestGenerateMigrateToken(t *testing.T) {
|
|
assert := assert.New(t)
|
|
allocID := uuid.Generate()
|
|
nodeSecret := uuid.Generate()
|
|
token, err := GenerateMigrateToken(allocID, nodeSecret)
|
|
assert.Nil(err)
|
|
_, err = base64.URLEncoding.DecodeString(token)
|
|
assert.Nil(err)
|
|
|
|
assert.True(CompareMigrateToken(allocID, nodeSecret, token))
|
|
assert.False(CompareMigrateToken("x", nodeSecret, token))
|
|
assert.False(CompareMigrateToken(allocID, "x", token))
|
|
assert.False(CompareMigrateToken(allocID, nodeSecret, "x"))
|
|
|
|
token2, err := GenerateMigrateToken("x", nodeSecret)
|
|
assert.Nil(err)
|
|
assert.False(CompareMigrateToken(allocID, nodeSecret, token2))
|
|
assert.True(CompareMigrateToken("x", nodeSecret, token2))
|
|
}
|