open-nomad/nomad/periodic_endpoint_test.go
Seth Hoenig ba728f8f97
api: enable support for setting original job source (#16763)
* api: enable support for setting original source alongside job

This PR adds support for setting job source material along with
the registration of a job.

This includes a new HTTP endpoint and a new RPC endpoint for
making queries for the original source of a job. The
HTTP endpoint is /v1/job/<id>/submission?version=<version> and
the RPC method is Job.GetJobSubmission.

The job source (if submitted, and doing so is always optional), is
stored in the job_submission memdb table, separately from the
actual job. This way we do not incur overhead of reading the large
string field throughout normal job operations.

The server config now includes job_max_source_size for configuring
the maximum size the job source may be, before the server simply
drops the source material. This should help prevent Bad Things from
happening when huge jobs are submitted. If the value is set to 0,
all job source material will be dropped.

* api: avoid writing var content to disk for parsing

* api: move submission validation into RPC layer

* api: return an error if updating a job submission without namespace or job id

* api: be exact about the job index we associate a submission with (modify)

* api: reword api docs scheduling

* api: prune all but the last 6 job submissions

* api: protect against nil job submission in job validation

* api: set max job source size in test server

* api: fixups from pr
2023-04-11 08:45:08 -05:00

201 lines
5.6 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package nomad
import (
"testing"
memdb "github.com/hashicorp/go-memdb"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
)
func TestPeriodicEndpoint_Force(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer cleanupS1()
state := s1.fsm.State()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create and insert a periodic job.
job := mock.PeriodicJob()
job.Periodic.ProhibitOverlap = true // Shouldn't affect anything.
if err := state.UpsertJob(structs.MsgTypeTestSetup, 100, nil, job); err != nil {
t.Fatalf("err: %v", err)
}
s1.periodicDispatcher.Add(job)
// Force launch it.
req := &structs.PeriodicForceRequest{
JobID: job.ID,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
// Fetch the response
var resp structs.PeriodicForceResponse
if err := msgpackrpc.CallWithCodec(codec, "Periodic.Force", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
// Lookup the evaluation
ws := memdb.NewWatchSet()
eval, err := state.EvalByID(ws, resp.EvalID)
if err != nil {
t.Fatalf("err: %v", err)
}
if eval == nil {
t.Fatalf("expected eval")
}
if eval.CreateIndex != resp.EvalCreateIndex {
t.Fatalf("index mis-match")
}
}
func TestPeriodicEndpoint_Force_ACL(t *testing.T) {
ci.Parallel(t)
s1, root, cleanupS1 := TestACLServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer cleanupS1()
state := s1.fsm.State()
assert := assert.New(t)
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create and insert a periodic job.
job := mock.PeriodicJob()
job.Periodic.ProhibitOverlap = true // Shouldn't affect anything.
assert.Nil(state.UpsertJob(structs.MsgTypeTestSetup, 100, nil, job))
err := s1.periodicDispatcher.Add(job)
assert.Nil(err)
// Force launch it.
req := &structs.PeriodicForceRequest{
JobID: job.ID,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
// Try with no token and expect permission denied
{
var resp structs.PeriodicForceResponse
err := msgpackrpc.CallWithCodec(codec, "Periodic.Force", req, &resp)
assert.NotNil(err)
assert.Contains(err.Error(), structs.ErrPermissionDenied.Error())
}
// Try with an invalid token and expect permission denied
{
invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "invalid", mock.NodePolicy(acl.PolicyWrite))
req.AuthToken = invalidToken.SecretID
var resp structs.PeriodicForceResponse
err := msgpackrpc.CallWithCodec(codec, "Periodic.Force", req, &resp)
assert.NotNil(err)
assert.Contains(err.Error(), structs.ErrPermissionDenied.Error())
}
// Fetch the response with a valid token
{
policy := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilitySubmitJob})
token := mock.CreatePolicyAndToken(t, state, 1005, "valid", policy)
req.AuthToken = token.SecretID
var resp structs.PeriodicForceResponse
assert.Nil(msgpackrpc.CallWithCodec(codec, "Periodic.Force", req, &resp))
assert.NotEqual(uint64(0), resp.Index)
// Lookup the evaluation
ws := memdb.NewWatchSet()
eval, err := state.EvalByID(ws, resp.EvalID)
assert.Nil(err)
if assert.NotNil(eval) {
assert.Equal(eval.CreateIndex, resp.EvalCreateIndex)
}
}
// Fetch the response with a valid token having dispatch permission
{
policy := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityDispatchJob})
token := mock.CreatePolicyAndToken(t, state, 1005, "valid", policy)
req.AuthToken = token.SecretID
var resp structs.PeriodicForceResponse
assert.Nil(msgpackrpc.CallWithCodec(codec, "Periodic.Force", req, &resp))
assert.NotEqual(uint64(0), resp.Index)
// Lookup the evaluation
ws := memdb.NewWatchSet()
eval, err := state.EvalByID(ws, resp.EvalID)
assert.Nil(err)
if assert.NotNil(eval) {
assert.Equal(eval.CreateIndex, resp.EvalCreateIndex)
}
}
// Fetch the response with management token
{
req.AuthToken = root.SecretID
var resp structs.PeriodicForceResponse
assert.Nil(msgpackrpc.CallWithCodec(codec, "Periodic.Force", req, &resp))
assert.NotEqual(uint64(0), resp.Index)
// Lookup the evaluation
ws := memdb.NewWatchSet()
eval, err := state.EvalByID(ws, resp.EvalID)
assert.Nil(err)
if assert.NotNil(eval) {
assert.Equal(eval.CreateIndex, resp.EvalCreateIndex)
}
}
}
func TestPeriodicEndpoint_Force_NonPeriodic(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer cleanupS1()
state := s1.fsm.State()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create and insert a non-periodic job.
job := mock.Job()
if err := state.UpsertJob(structs.MsgTypeTestSetup, 100, nil, job); err != nil {
t.Fatalf("err: %v", err)
}
// Force launch it.
req := &structs.PeriodicForceRequest{
JobID: job.ID,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
// Fetch the response
var resp structs.PeriodicForceResponse
if err := msgpackrpc.CallWithCodec(codec, "Periodic.Force", req, &resp); err == nil {
t.Fatalf("Force on non-periodic job should err")
}
}