Add idempotency token to dispatch request instead of special meta key

This commit is contained in:
Alex Munda 2021-06-29 15:52:12 -05:00
parent 122136b657
commit ca86c7ba0c
No known key found for this signature in database
GPG Key ID: D7BE1AD2403C2655
5 changed files with 65 additions and 33 deletions

View File

@ -397,6 +397,22 @@ func (j *Jobs) Dispatch(jobID string, meta map[string]string,
return &resp, wm, nil
}
func (j *Jobs) DispatchIdempotent(jobID string, meta map[string]string,
payload []byte, idempotencyToken string, q *WriteOptions) (*JobDispatchResponse, *WriteMeta, error) {
var resp JobDispatchResponse
req := &JobDispatchRequest{
JobID: jobID,
Meta: meta,
Payload: payload,
IdempotencyToken: idempotencyToken,
}
wm, err := j.client.write("/v1/job/"+url.PathEscape(jobID)+"/dispatch", req, &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, wm, nil
}
// Revert is used to revert the given job to the passed version. If
// enforceVersion is set, the job is only reverted if the current version is at
// the passed version.
@ -1262,9 +1278,10 @@ type DesiredUpdates struct {
}
type JobDispatchRequest struct {
JobID string
Payload []byte
Meta map[string]string
JobID string
Payload []byte
Meta map[string]string
IdempotencyToken string
}
type JobDispatchResponse struct {

View File

@ -32,11 +32,6 @@ const (
// DispatchPayloadSizeLimit is the maximum size of the uncompressed input
// data payload.
DispatchPayloadSizeLimit = 16 * 1024
// MetaDispatchIdempotencyKey is the meta key that when provided, is used
// to perform an idempotency check to ensure only 1 child of a parameterized job
// with the supplied key may be running (or pending) at a time.
MetaDispatchIdempotencyKey = "nomad_dispatch_idempotency_key"
)
// ErrMultipleNamespaces is send when multiple namespaces are used in the OSS setup
@ -1904,6 +1899,7 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
dispatchJob.Dispatched = true
dispatchJob.Status = ""
dispatchJob.StatusDescription = ""
dispatchJob.DispatchIdempotencyToken = args.IdempotencyToken
// Merge in the meta data
for k, v := range args.Meta {
@ -1913,8 +1909,8 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
dispatchJob.Meta[k] = v
}
// Check to see if an idempotency key was provided on the meta
if idempotencyKey, ok := dispatchJob.Meta[MetaDispatchIdempotencyKey]; ok {
// Check to see if an idempotency token was specified on the request
if args.IdempotencyToken != "" {
// Fetch all jobs that match the parameterized job ID prefix
iter, err := snap.JobsByIDPrefix(ws, parameterizedJob.Namespace, parameterizedJob.ID)
if err != nil {
@ -1934,12 +1930,12 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
continue
}
// Idempotency keys match. Ensure existing job is not currently running.
if ik, ok := existingJob.Meta[MetaDispatchIdempotencyKey]; ok && ik == idempotencyKey {
// Idempotency tokens match. Ensure existing job is terminal.
if existingJob.DispatchIdempotencyToken == args.IdempotencyToken {
// The existing job is either pending or running.
// Registering a new job would violate the idempotency key.
// Registering a new job would violate the idempotency token.
if existingJob.Status != structs.JobStatusDead {
return fmt.Errorf("dispatch violates idempotency key of non-terminal child job: %s", existingJob.ID)
return fmt.Errorf("dispatch violates idempotency token of non-terminal child job: %s", existingJob.ID)
}
}
}
@ -2039,8 +2035,7 @@ func validateDispatchRequest(req *structs.JobDispatchRequest, job *structs.Job)
for k := range req.Meta {
_, req := required[k]
_, opt := optional[k]
// Always allow the idempotency key
if !req && !opt && k != MetaDispatchIdempotencyKey {
if !req && !opt {
unpermitted[k] = struct{}{}
}
}

View File

@ -6133,10 +6133,8 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
reqInputDataTooLarge := &structs.JobDispatchRequest{
Payload: make([]byte, DispatchPayloadSizeLimit+100),
}
reqIdempotentMeta := &structs.JobDispatchRequest{
Meta: map[string]string{
MetaDispatchIdempotencyKey: "foo",
},
reqIdempotentToken := &structs.JobDispatchRequest{
IdempotencyToken: "foo",
}
type existingIdempotentChildJob struct {
@ -6244,26 +6242,26 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
errStr: "stopped",
},
{
name: "idempotent meta key, no existing child job",
name: "idempotency token, no existing child job",
parameterizedJob: d1,
dispatchReq: reqIdempotentMeta,
dispatchReq: reqIdempotentToken,
err: false,
existingIdempotentJob: nil,
},
{
name: "idempotent meta key, w/ existing non-terminal child job",
name: "idempotency token, w/ existing non-terminal child job",
parameterizedJob: d1,
dispatchReq: reqIdempotentMeta,
dispatchReq: reqIdempotentToken,
err: true,
errStr: "dispatch violates idempotency key of non-terminal child job",
errStr: "dispatch violates idempotency token of non-terminal child job",
existingIdempotentJob: &existingIdempotentChildJob{
isTerminal: false,
},
},
{
name: "idempotent meta key, w/ existing terminal job",
name: "idempotency token, w/ existing terminal job",
parameterizedJob: d1,
dispatchReq: reqIdempotentMeta,
dispatchReq: reqIdempotentToken,
err: false,
existingIdempotentJob: &existingIdempotentChildJob{
isTerminal: true,

View File

@ -728,9 +728,10 @@ type JobScaleStatusRequest struct {
// JobDispatchRequest is used to dispatch a job based on a parameterized job
type JobDispatchRequest struct {
JobID string
Payload []byte
Meta map[string]string
JobID string
Payload []byte
Meta map[string]string
IdempotencyToken string
WriteRequest
}
@ -4016,6 +4017,10 @@ type Job struct {
// parameterized job.
Dispatched bool
// DispatchIdempotencyToken is optionally used to ensure that a dispatched job does not have any
// non-terminal siblings which have the same token value.
DispatchIdempotencyToken string
// Payload is the payload supplied when the job was dispatched.
Payload []byte

View File

@ -397,6 +397,22 @@ func (j *Jobs) Dispatch(jobID string, meta map[string]string,
return &resp, wm, nil
}
func (j *Jobs) DispatchIdempotent(jobID string, meta map[string]string,
payload []byte, idempotencyToken string, q *WriteOptions) (*JobDispatchResponse, *WriteMeta, error) {
var resp JobDispatchResponse
req := &JobDispatchRequest{
JobID: jobID,
Meta: meta,
Payload: payload,
IdempotencyToken: idempotencyToken,
}
wm, err := j.client.write("/v1/job/"+url.PathEscape(jobID)+"/dispatch", req, &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, wm, nil
}
// Revert is used to revert the given job to the passed version. If
// enforceVersion is set, the job is only reverted if the current version is at
// the passed version.
@ -1262,9 +1278,10 @@ type DesiredUpdates struct {
}
type JobDispatchRequest struct {
JobID string
Payload []byte
Meta map[string]string
JobID string
Payload []byte
Meta map[string]string
IdempotencyToken string
}
type JobDispatchResponse struct {