Add check-index flag to nomad run

This commit is contained in:
Alex Dadgar 2016-06-08 16:48:02 -07:00
parent 2102855ae7
commit 5d181d203c
9 changed files with 339 additions and 9 deletions

View File

@ -14,6 +14,12 @@ const (
JobTypeBatch = "batch"
)
const (
// RegisterEnforceIndexErrPrefix is the prefix to use in errors caused by
// enforcing the job modify index during registers.
RegisterEnforceIndexErrPrefix = "Enforcing job modify index"
)
// Jobs is used to access the job-specific endpoints.
type Jobs struct {
client *Client
@ -27,9 +33,27 @@ func (c *Client) Jobs() *Jobs {
// Register is used to register a new job. It returns the ID
// of the evaluation, along with any errors encountered.
func (j *Jobs) Register(job *Job, q *WriteOptions) (string, *WriteMeta, error) {
var resp registerJobResponse
req := &RegisterJobRequest{job}
req := &RegisterJobRequest{Job: job}
wm, err := j.client.write("/v1/jobs", req, &resp, q)
if err != nil {
return "", nil, err
}
return resp.EvalID, wm, nil
}
// EnforceRegister is used to register a job enforcing its job modify index.
func (j *Jobs) EnforceRegister(job *Job, modifyIndex uint64, q *WriteOptions) (string, *WriteMeta, error) {
var resp registerJobResponse
req := &RegisterJobRequest{
Job: job,
EnforceIndex: true,
JobModifyIndex: modifyIndex,
}
wm, err := j.client.write("/v1/jobs", req, &resp, q)
if err != nil {
return "", nil, err
@ -172,6 +196,7 @@ type Job struct {
StatusDescription string
CreateIndex uint64
ModifyIndex uint64
JobModifyIndex uint64
}
// JobListStub is used to return a subset of information about
@ -186,6 +211,7 @@ type JobListStub struct {
StatusDescription string
CreateIndex uint64
ModifyIndex uint64
JobModifyIndex uint64
}
// JobIDSort is used to sort jobs by their job ID's.
@ -263,7 +289,9 @@ func (j *Job) AddPeriodicConfig(cfg *PeriodicConfig) *Job {
// RegisterJobRequest is used to serialize a job registration
type RegisterJobRequest struct {
Job *Job
Job *Job
EnforceIndex bool
JobModifyIndex uint64
}
// registerJobResponse is used to deserialize a job response

View File

@ -50,6 +50,74 @@ func TestJobs_Register(t *testing.T) {
}
}
func TestJobs_EnforceRegister(t *testing.T) {
c, s := makeClient(t, nil, nil)
defer s.Stop()
jobs := c.Jobs()
// Listing jobs before registering returns nothing
resp, qm, err := jobs.List(nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if qm.LastIndex != 0 {
t.Fatalf("bad index: %d", qm.LastIndex)
}
if n := len(resp); n != 0 {
t.Fatalf("expected 0 jobs, got: %d", n)
}
// Create a job and attempt to register it with an incorrect index.
job := testJob()
eval, wm, err := jobs.EnforceRegister(job, 10, nil)
if err == nil || !strings.Contains(err.Error(), RegisterEnforceIndexErrPrefix) {
t.Fatalf("expected enforcement error: %v", err)
}
// Register
eval, wm, err = jobs.EnforceRegister(job, 0, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if eval == "" {
t.Fatalf("missing eval id")
}
assertWriteMeta(t, wm)
// Query the jobs back out again
resp, qm, err = jobs.List(nil)
if err != nil {
t.Fatalf("err: %s", err)
}
assertQueryMeta(t, qm)
// Check that we got the expected response
if len(resp) != 1 {
t.Fatalf("bad length: %d", len(resp))
}
if resp[0].ID != job.ID {
t.Fatalf("bad: %#v", resp[0])
}
curIndex := resp[0].JobModifyIndex
// Fail at incorrect index
eval, wm, err = jobs.EnforceRegister(job, 123456, nil)
if err == nil || !strings.Contains(err.Error(), RegisterEnforceIndexErrPrefix) {
t.Fatalf("expected enforcement error: %v", err)
}
// Works at correct index
eval, wm, err = jobs.EnforceRegister(job, curIndex, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if eval == "" {
t.Fatalf("missing eval id")
}
assertWriteMeta(t, wm)
}
func TestJobs_Info(t *testing.T) {
c, s := makeClient(t, nil, nil)
defer s.Stop()

View File

@ -84,7 +84,7 @@ func (c *InspectCommand) Run(args []string) int {
}
// Print the contents of the job
req := api.RegisterJobRequest{job}
req := api.RegisterJobRequest{Job: job}
buf, err := json.MarshalIndent(req, "", " ")
if err != nil {
c.Ui.Error(fmt.Sprintf("Error converting job: %s", err))

View File

@ -5,6 +5,8 @@ import (
"encoding/gob"
"encoding/json"
"fmt"
"regexp"
"strconv"
"strings"
"time"
@ -13,6 +15,11 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
var (
// enforceIndexRegex is a regular expression which extracts the enforcement error
enforceIndexRegex = regexp.MustCompile(`\((Enforcing job modify index.*)\)`)
)
type RunCommand struct {
Meta
}
@ -46,6 +53,14 @@ General Options:
Run Options:
-check-index
If set, the job is only registered or updated if the the passed
job modify index matches the server side version. If a check-index value of
zero is passed, the job is only registered if it does not yet exist. If a
non-zero value is passed, it ensures that the job is being updated from a
known state. The use of this flag is most common in conjunction with plan
command.
-detach
Return immediately instead of entering monitor mode. After job submission,
the evaluation ID will be printed to the screen, which can be used to
@ -67,12 +82,14 @@ func (c *RunCommand) Synopsis() string {
func (c *RunCommand) Run(args []string) int {
var detach, verbose, output bool
var checkIndexStr string
flags := c.Meta.FlagSet("run", FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
flags.BoolVar(&detach, "detach", false, "")
flags.BoolVar(&verbose, "verbose", false, "")
flags.BoolVar(&output, "output", false, "")
flags.StringVar(&checkIndexStr, "check-index", "", "")
if err := flags.Parse(args); err != nil {
return 1
@ -119,7 +136,7 @@ func (c *RunCommand) Run(args []string) int {
}
if output {
req := api.RegisterJobRequest{apiJob}
req := api.RegisterJobRequest{Job: apiJob}
buf, err := json.MarshalIndent(req, "", " ")
if err != nil {
c.Ui.Error(fmt.Sprintf("Error converting job: %s", err))
@ -142,9 +159,32 @@ func (c *RunCommand) Run(args []string) int {
client.SetRegion(r)
}
// Submit the job
evalID, _, err := client.Jobs().Register(apiJob, nil)
// Parse the check-index
checkIndex, enforce, err := parseCheckIndex(checkIndexStr)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error parsing check-index value %q: %v", checkIndexStr, err))
return 1
}
// Submit the job
var evalID string
if enforce {
evalID, _, err = client.Jobs().EnforceRegister(apiJob, checkIndex, nil)
} else {
evalID, _, err = client.Jobs().Register(apiJob, nil)
}
if err != nil {
if strings.Contains(err.Error(), api.RegisterEnforceIndexErrPrefix) {
// Format the error specially if the error is due to index
// enforcement
matches := enforceIndexRegex.FindStringSubmatch(err.Error())
if len(matches) == 2 {
c.Ui.Error(matches[1]) // The matched group
c.Ui.Error("Job not updated")
return 1
}
}
c.Ui.Error(fmt.Sprintf("Error submitting job: %s", err))
return 1
}
@ -167,6 +207,17 @@ func (c *RunCommand) Run(args []string) int {
}
// parseCheckIndex parses the check-index flag and returns the index, whether it
// was set and potentially an error during parsing.
func parseCheckIndex(input string) (uint64, bool, error) {
if input == "" {
return 0, false, nil
}
u, err := strconv.ParseUint(input, 10, 64)
return u, true, err
}
// convertStructJob is used to take a *structs.Job and convert it to an *api.Job.
// This function is just a hammer and probably needs to be revisited.
func convertStructJob(in *structs.Job) (*api.Job, error) {

View File

@ -136,4 +136,14 @@ job "job1" {
if out := ui.ErrorWriter.String(); !strings.Contains(out, "Error submitting job") {
t.Fatalf("expected failed query error, got: %s", out)
}
// Fails on invalid check-index (requires a valid job)
if code := cmd.Run([]string{"-check-index=bad", fh3.Name()}); code != 1 {
t.Fatalf("expected exit code 1, got: %d", code)
}
if out := ui.ErrorWriter.String(); !strings.Contains(out, "parsing check-index") {
t.Fatalf("expected parse error, got: %s", out)
}
ui.ErrorWriter.Reset()
}

View File

@ -13,6 +13,12 @@ import (
"github.com/hashicorp/nomad/scheduler"
)
const (
// RegisterEnforceIndexErrPrefix is the prefix to use in errors caused by
// enforcing the job modify index during registers.
RegisterEnforceIndexErrPrefix = "Enforcing job modify index"
)
// Job endpoint is used for job interactions
type Job struct {
srv *Server
@ -38,6 +44,29 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return err
}
if args.EnforceIndex {
// Lookup the job
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
job, err := snap.JobByID(args.Job.ID)
if err != nil {
return err
}
jmi := args.JobModifyIndex
if job != nil {
if jmi == 0 {
return fmt.Errorf("%s 0: job already exists", RegisterEnforceIndexErrPrefix)
} else if jmi != job.JobModifyIndex {
return fmt.Errorf("%s %d: job exists with conflicting job modify index: %d",
RegisterEnforceIndexErrPrefix, jmi, job.JobModifyIndex)
}
} else if jmi != 0 {
return fmt.Errorf("%s %d: job does not exist", RegisterEnforceIndexErrPrefix, jmi)
}
}
// Commit this update via Raft
_, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args)
if err != nil {
@ -422,12 +451,14 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse)
}
var index uint64
var updatedIndex uint64
if oldJob != nil {
index = oldJob.JobModifyIndex + 1
index = oldJob.JobModifyIndex
updatedIndex = oldJob.JobModifyIndex + 1
}
// Insert the updated Job into the snapshot
snap.UpsertJob(index, args.Job)
snap.UpsertJob(updatedIndex, args.Job)
// Create an eval and mark it as requiring annotations and insert that as well
eval := &structs.Evaluation{
@ -436,7 +467,7 @@ func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse)
Type: args.Job.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: args.Job.ID,
JobModifyIndex: index,
JobModifyIndex: updatedIndex,
Status: structs.EvalStatusPending,
AnnotatePlan: true,
}

View File

@ -248,6 +248,118 @@ func TestJobEndpoint_Register_Periodic(t *testing.T) {
}
}
func TestJobEndpoint_Register_EnforceIndex(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request and enforcing an incorrect index
job := mock.Job()
req := &structs.JobRegisterRequest{
Job: job,
EnforceIndex: true,
JobModifyIndex: 100, // Not registered yet so not possible
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var resp structs.JobRegisterResponse
err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)
if err == nil || !strings.Contains(err.Error(), RegisterEnforceIndexErrPrefix) {
t.Fatalf("expected enforcement error")
}
// Create the register request and enforcing it is new
req = &structs.JobRegisterRequest{
Job: job,
EnforceIndex: true,
JobModifyIndex: 0,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
curIndex := resp.JobModifyIndex
// Check for the node in the FSM
state := s1.fsm.State()
out, err := state.JobByID(job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("expected job")
}
if out.CreateIndex != resp.JobModifyIndex {
t.Fatalf("index mis-match")
}
// Reregister request and enforcing it be a new job
req = &structs.JobRegisterRequest{
Job: job,
EnforceIndex: true,
JobModifyIndex: 0,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)
if err == nil || !strings.Contains(err.Error(), RegisterEnforceIndexErrPrefix) {
t.Fatalf("expected enforcement error")
}
// Reregister request and enforcing it be at an incorrect index
req = &structs.JobRegisterRequest{
Job: job,
EnforceIndex: true,
JobModifyIndex: curIndex - 1,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)
if err == nil || !strings.Contains(err.Error(), RegisterEnforceIndexErrPrefix) {
t.Fatalf("expected enforcement error")
}
// Reregister request and enforcing it be at the correct index
job.Priority = job.Priority + 1
req = &structs.JobRegisterRequest{
Job: job,
EnforceIndex: true,
JobModifyIndex: curIndex,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
out, err = state.JobByID(job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("expected job")
}
if out.Priority != job.Priority {
t.Fatalf("priority mis-match")
}
}
func TestJobEndpoint_Evaluate(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue

View File

@ -182,6 +182,13 @@ type NodeSpecificRequest struct {
// to register a job as being a schedulable entity.
type JobRegisterRequest struct {
Job *Job
// If EnforceIndex is set then the job will only be registered if the passed
// JobModifyIndex matches the current Jobs index. If the index is zero, the
// register only occurs if the job is new.
EnforceIndex bool
JobModifyIndex uint64
WriteRequest
}
@ -1075,6 +1082,7 @@ func (j *Job) Stub() *JobListStub {
StatusDescription: j.StatusDescription,
CreateIndex: j.CreateIndex,
ModifyIndex: j.ModifyIndex,
JobModifyIndex: j.JobModifyIndex,
}
}
@ -1095,6 +1103,7 @@ type JobListStub struct {
StatusDescription string
CreateIndex uint64
ModifyIndex uint64
JobModifyIndex uint64
}
// UpdateStrategy is used to modify how updates are done

View File

@ -41,6 +41,13 @@ environment variable are overridden and the the job's region is used.
## Run Options
* `-check-index`: If set, the job is only registered or updated if the the
passed job modify index matches the server side version. If a check-index value
of zero is passed, the job is only registered if it does not yet exist. If a
non-zero value is passed, it ensures that the job is being updated from a known
state. The use of this flag is most common in conjunction with [plan
command](/docs/commands/plan.html).
* `-detach`: Return immediately instead of monitoring. A new evaluation ID
will be output, which can be used to examine the evaluation using the
[eval-status](/docs/commands/eval-status.html) command
@ -65,6 +72,20 @@ $ nomad run job1.nomad
==> Evaluation "52dee78a" finished with status "complete"
```
Update the job using check-index:
```
$ nomad run -check-index 5 example.nomad
Enforcing job modify index 5: job exists with conflicting job modify index: 6
Job not updated
$ nomad run -check-index 6 example.nomad
==> Monitoring evaluation "5ef16dff"
Evaluation triggered by job "example"
Allocation "6ec7d16f" modified: node "6e1f9bf6", group "cache"
Evaluation status changed: "pending" -> "complete"
==> Evaluation "5ef16dff" finished with status "complete"
```
Schedule the job contained in `job1.nomad` and return immediately:
```