Rename structs
This commit is contained in:
parent
1235fc6581
commit
4a5c3c8db0
24
api/jobs.go
24
api/jobs.go
|
@ -169,12 +169,12 @@ func (j *Jobs) Summary(jobID string, q *QueryOptions) (*JobSummary, *QueryMeta,
|
|||
}
|
||||
|
||||
func (j *Jobs) Dispatch(jobID string, meta map[string]string,
|
||||
inputData []byte, q *WriteOptions) (*JobDispatchResponse, *WriteMeta, error) {
|
||||
payload []byte, q *WriteOptions) (*JobDispatchResponse, *WriteMeta, error) {
|
||||
var resp JobDispatchResponse
|
||||
req := &JobDispatchRequest{
|
||||
JobID: jobID,
|
||||
Meta: meta,
|
||||
InputData: inputData,
|
||||
JobID: jobID,
|
||||
Meta: meta,
|
||||
Payload: payload,
|
||||
}
|
||||
wm, err := j.client.write("/v1/job/"+jobID+"/dispatch", req, &resp, q)
|
||||
if err != nil {
|
||||
|
@ -202,9 +202,9 @@ type PeriodicConfig struct {
|
|||
ProhibitOverlap bool
|
||||
}
|
||||
|
||||
// DispatchConfig is used to configure the dispatch template
|
||||
type DispatchConfig struct {
|
||||
InputData string
|
||||
// ConstructorConfig is used to configure the constructor job
|
||||
type ConstructorConfig struct {
|
||||
Payload string
|
||||
MetaRequired []string
|
||||
MetaOptional []string
|
||||
}
|
||||
|
@ -223,8 +223,8 @@ type Job struct {
|
|||
TaskGroups []*TaskGroup
|
||||
Update *UpdateStrategy
|
||||
Periodic *PeriodicConfig
|
||||
Dispatch *DispatchConfig
|
||||
InputData []byte
|
||||
Constructor *ConstructorConfig
|
||||
Payload []byte
|
||||
Meta map[string]string
|
||||
VaultToken string
|
||||
Status string
|
||||
|
@ -436,9 +436,9 @@ type DesiredUpdates struct {
|
|||
}
|
||||
|
||||
type JobDispatchRequest struct {
|
||||
JobID string
|
||||
InputData []byte
|
||||
Meta map[string]string
|
||||
JobID string
|
||||
Payload []byte
|
||||
Meta map[string]string
|
||||
}
|
||||
|
||||
type JobDispatchResponse struct {
|
||||
|
|
|
@ -143,8 +143,7 @@ type LogConfig struct {
|
|||
|
||||
// DispatchInputConfig configures how a task gets its input from a job dispatch
|
||||
type DispatchInputConfig struct {
|
||||
Stdin bool
|
||||
File string
|
||||
File string
|
||||
}
|
||||
|
||||
// Task is a single process in a task group.
|
||||
|
|
|
@ -59,15 +59,15 @@ func (s *HTTPServer) AllocSpecificRequest(resp http.ResponseWriter, req *http.Re
|
|||
return nil, CodedError(404, "alloc not found")
|
||||
}
|
||||
|
||||
// Decode the input data if there is any
|
||||
// Decode the payload if there is any
|
||||
alloc := out.Alloc
|
||||
if alloc.Job != nil && len(alloc.Job.InputData) != 0 {
|
||||
decoded, err := snappy.Decode(nil, alloc.Job.InputData)
|
||||
if alloc.Job != nil && len(alloc.Job.Payload) != 0 {
|
||||
decoded, err := snappy.Decode(nil, alloc.Job.Payload)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
alloc = alloc.Copy()
|
||||
alloc.Job.InputData = decoded
|
||||
alloc.Job.Payload = decoded
|
||||
}
|
||||
|
||||
return alloc, nil
|
||||
|
|
|
@ -163,7 +163,7 @@ func TestHTTP_AllocQuery(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestHTTP_AllocQuery_InputData(t *testing.T) {
|
||||
func TestHTTP_AllocQuery_Payload(t *testing.T) {
|
||||
httpTest(t, nil, func(s *TestServer) {
|
||||
// Directly manipulate the state
|
||||
state := s.Agent.server.State()
|
||||
|
@ -172,10 +172,10 @@ func TestHTTP_AllocQuery_InputData(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Insert InputData compressed
|
||||
// Insert Payload compressed
|
||||
expected := []byte("hello world")
|
||||
compressed := snappy.Encode(nil, expected)
|
||||
alloc.Job.InputData = compressed
|
||||
alloc.Job.Payload = compressed
|
||||
|
||||
err := state.UpsertAllocs(1000, []*structs.Allocation{alloc})
|
||||
if err != nil {
|
||||
|
@ -212,9 +212,9 @@ func TestHTTP_AllocQuery_InputData(t *testing.T) {
|
|||
t.Fatalf("bad: %#v", a)
|
||||
}
|
||||
|
||||
// Check the input data is decompressed
|
||||
if !reflect.DeepEqual(a.Job.InputData, expected) {
|
||||
t.Fatalf("InputData not decompressed properly; got %#v; want %#v", a.Job.InputData, expected)
|
||||
// Check the payload is decompressed
|
||||
if !reflect.DeepEqual(a.Job.Payload, expected) {
|
||||
t.Fatalf("Payload not decompressed properly; got %#v; want %#v", a.Job.Payload, expected)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -210,15 +210,15 @@ func (s *HTTPServer) jobQuery(resp http.ResponseWriter, req *http.Request,
|
|||
return nil, CodedError(404, "job not found")
|
||||
}
|
||||
|
||||
// Decode the input data if there is any
|
||||
// Decode the payload if there is any
|
||||
job := out.Job
|
||||
if len(job.InputData) != 0 {
|
||||
decoded, err := snappy.Decode(nil, out.Job.InputData)
|
||||
if len(job.Payload) != 0 {
|
||||
decoded, err := snappy.Decode(nil, out.Job.Payload)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
job = job.Copy()
|
||||
job.InputData = decoded
|
||||
job.Payload = decoded
|
||||
}
|
||||
|
||||
return job, nil
|
||||
|
|
|
@ -207,15 +207,15 @@ func TestHTTP_JobQuery(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestHTTP_JobQuery_InputData(t *testing.T) {
|
||||
func TestHTTP_JobQuery_Payload(t *testing.T) {
|
||||
httpTest(t, nil, func(s *TestServer) {
|
||||
// Create the job
|
||||
job := mock.Job()
|
||||
|
||||
// Insert InputData compressed
|
||||
// Insert Payload compressed
|
||||
expected := []byte("hello world")
|
||||
compressed := snappy.Encode(nil, expected)
|
||||
job.InputData = compressed
|
||||
job.Payload = compressed
|
||||
|
||||
args := structs.JobRegisterRequest{
|
||||
Job: job,
|
||||
|
@ -256,9 +256,9 @@ func TestHTTP_JobQuery_InputData(t *testing.T) {
|
|||
t.Fatalf("bad: %#v", j)
|
||||
}
|
||||
|
||||
// Check the input data is decompressed
|
||||
if !reflect.DeepEqual(j.InputData, expected) {
|
||||
t.Fatalf("InputData not decompressed properly; got %#v; want %#v", j.InputData, expected)
|
||||
// Check the payload is decompressed
|
||||
if !reflect.DeepEqual(j.Payload, expected) {
|
||||
t.Fatalf("Payload not decompressed properly; got %#v; want %#v", j.Payload, expected)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -582,9 +582,9 @@ func TestHTTP_JobPlan(t *testing.T) {
|
|||
|
||||
func TestHTTP_JobDispatch(t *testing.T) {
|
||||
httpTest(t, nil, func(s *TestServer) {
|
||||
// Create the dispatch template job
|
||||
// Create the constructor job
|
||||
job := mock.Job()
|
||||
job.Dispatch = &structs.DispatchConfig{}
|
||||
job.Constructor = &structs.ConstructorConfig{}
|
||||
|
||||
args := structs.JobRegisterRequest{
|
||||
Job: job,
|
||||
|
|
|
@ -75,16 +75,16 @@ func (c *JobDispatchCommand) Run(args []string) int {
|
|||
}
|
||||
|
||||
templateJob := args[0]
|
||||
var inputData []byte
|
||||
var payload []byte
|
||||
var readErr error
|
||||
|
||||
// Read the input
|
||||
if len(args) == 2 {
|
||||
switch args[1] {
|
||||
case "-":
|
||||
inputData, readErr = ioutil.ReadAll(os.Stdin)
|
||||
payload, readErr = ioutil.ReadAll(os.Stdin)
|
||||
default:
|
||||
inputData, readErr = ioutil.ReadFile(args[1])
|
||||
payload, readErr = ioutil.ReadFile(args[1])
|
||||
}
|
||||
if readErr != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Error reading input data: %v", readErr))
|
||||
|
@ -112,7 +112,7 @@ func (c *JobDispatchCommand) Run(args []string) int {
|
|||
}
|
||||
|
||||
// Dispatch the job
|
||||
resp, _, err := client.Jobs().Dispatch(templateJob, metaMap, inputData, nil)
|
||||
resp, _, err := client.Jobs().Dispatch(templateJob, metaMap, payload, nil)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Failed to dispatch job: %s", err))
|
||||
return 1
|
||||
|
|
|
@ -147,9 +147,9 @@ func (c *RunCommand) Run(args []string) int {
|
|||
return 1
|
||||
}
|
||||
|
||||
// Check if the job is periodic or is a dispatch template
|
||||
// Check if the job is periodic or is a constructor job
|
||||
periodic := job.IsPeriodic()
|
||||
template := job.IsDispatchTemplate()
|
||||
constructor := job.IsConstructor()
|
||||
|
||||
// Parse the Vault token
|
||||
if vaultToken == "" {
|
||||
|
@ -240,14 +240,14 @@ func (c *RunCommand) Run(args []string) int {
|
|||
}
|
||||
|
||||
// Check if we should enter monitor mode
|
||||
if detach || periodic || template {
|
||||
if detach || periodic || constructor {
|
||||
c.Ui.Output("Job registration successful")
|
||||
if periodic {
|
||||
now := time.Now().UTC()
|
||||
next := job.Periodic.Next(now)
|
||||
c.Ui.Output(fmt.Sprintf("Approximate next launch time: %s (%s from now)",
|
||||
formatTime(next), formatTimeDifference(now, next, time.Second)))
|
||||
} else if !template {
|
||||
} else if !constructor {
|
||||
c.Ui.Output("Evaluation ID: " + evalID)
|
||||
}
|
||||
|
||||
|
|
|
@ -102,7 +102,7 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error {
|
|||
delete(m, "update")
|
||||
delete(m, "periodic")
|
||||
delete(m, "vault")
|
||||
delete(m, "dispatch")
|
||||
delete(m, "constructor")
|
||||
|
||||
// Set the ID and name to the object key
|
||||
result.ID = obj.Keys[0].Token.Value().(string)
|
||||
|
@ -131,7 +131,7 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error {
|
|||
"all_at_once",
|
||||
"constraint",
|
||||
"datacenters",
|
||||
"dispatch",
|
||||
"constructor",
|
||||
"group",
|
||||
"id",
|
||||
"meta",
|
||||
|
@ -170,10 +170,10 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error {
|
|||
}
|
||||
}
|
||||
|
||||
// If we have a dispatch definition, then parse that
|
||||
if o := listVal.Filter("dispatch"); len(o.Items) > 0 {
|
||||
if err := parseDispatch(&result.Dispatch, o); err != nil {
|
||||
return multierror.Prefix(err, "dispatch ->")
|
||||
// If we have a constructor definition, then parse that
|
||||
if o := listVal.Filter("constructor"); len(o.Items) > 0 {
|
||||
if err := parseConstructor(&result.Constructor, o); err != nil {
|
||||
return multierror.Prefix(err, "constructor ->")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -747,14 +747,13 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l
|
|||
// If we have a dispatch_input block parse that
|
||||
if o := listVal.Filter("dispatch_input"); len(o.Items) > 0 {
|
||||
if len(o.Items) > 1 {
|
||||
return fmt.Errorf("only one dispatch_input block is allowed in a task. Number of logs block found: %d", len(o.Items))
|
||||
return fmt.Errorf("only one dispatch_input block is allowed in a task. Number of dispatch_input blocks found: %d", len(o.Items))
|
||||
}
|
||||
var m map[string]interface{}
|
||||
dispatchBlock := o.Items[0]
|
||||
|
||||
// Check for invalid keys
|
||||
valid := []string{
|
||||
"stdin",
|
||||
"file",
|
||||
}
|
||||
if err := checkHCLKeys(dispatchBlock.Val, valid); err != nil {
|
||||
|
@ -1243,10 +1242,10 @@ func parseVault(result *structs.Vault, list *ast.ObjectList) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func parseDispatch(result **structs.DispatchConfig, list *ast.ObjectList) error {
|
||||
func parseConstructor(result **structs.ConstructorConfig, list *ast.ObjectList) error {
|
||||
list = list.Elem()
|
||||
if len(list.Items) > 1 {
|
||||
return fmt.Errorf("only one 'dispatch' block allowed per job")
|
||||
return fmt.Errorf("only one 'constructor' block allowed per job")
|
||||
}
|
||||
|
||||
// Get our resource object
|
||||
|
@ -1261,16 +1260,15 @@ func parseDispatch(result **structs.DispatchConfig, list *ast.ObjectList) error
|
|||
|
||||
// Check for invalid keys
|
||||
valid := []string{
|
||||
"input_data",
|
||||
"payload",
|
||||
"meta_keys",
|
||||
"paused",
|
||||
}
|
||||
if err := checkHCLKeys(o.Val, valid); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Build the dispatch block
|
||||
var d structs.DispatchConfig
|
||||
// Build the constructor block
|
||||
var d structs.ConstructorConfig
|
||||
if err := mapstructure.WeakDecode(m, &d); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1279,7 +1277,7 @@ func parseDispatch(result **structs.DispatchConfig, list *ast.ObjectList) error
|
|||
if ot, ok := o.Val.(*ast.ObjectType); ok {
|
||||
listVal = ot.List
|
||||
} else {
|
||||
return fmt.Errorf("dispatch block should be an object")
|
||||
return fmt.Errorf("constructor block should be an object")
|
||||
}
|
||||
|
||||
// Parse the meta block
|
||||
|
|
|
@ -539,16 +539,16 @@ func TestParse(t *testing.T) {
|
|||
},
|
||||
|
||||
{
|
||||
"dispatch.hcl",
|
||||
"constructor.hcl",
|
||||
&structs.Job{
|
||||
ID: "dispatch",
|
||||
Name: "dispatch",
|
||||
ID: "constructor",
|
||||
Name: "constructor",
|
||||
Type: "service",
|
||||
Priority: 50,
|
||||
Region: "global",
|
||||
|
||||
Dispatch: &structs.DispatchConfig{
|
||||
InputData: "required",
|
||||
Constructor: &structs.ConstructorConfig{
|
||||
Payload: "required",
|
||||
MetaRequired: []string{"foo", "bar"},
|
||||
MetaOptional: []string{"baz", "bam"},
|
||||
},
|
||||
|
@ -572,8 +572,7 @@ func TestParse(t *testing.T) {
|
|||
MaxFileSizeMB: 10,
|
||||
},
|
||||
DispatchInput: &structs.DispatchInputConfig{
|
||||
Stdin: true,
|
||||
File: "foo/bar",
|
||||
File: "foo/bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
job "dispatch" {
|
||||
dispatch {
|
||||
input_data = "required"
|
||||
job "constructor" {
|
||||
constructor {
|
||||
payload = "required"
|
||||
meta_keys {
|
||||
required = ["foo", "bar"]
|
||||
optional = ["baz", "bam"]
|
||||
|
@ -12,7 +12,6 @@ job "dispatch" {
|
|||
resources {}
|
||||
|
||||
dispatch_input {
|
||||
stdin = true
|
||||
file = "foo/bar"
|
||||
}
|
||||
}
|
|
@ -22,9 +22,9 @@ const (
|
|||
// enforcing the job modify index during registers.
|
||||
RegisterEnforceIndexErrPrefix = "Enforcing job modify index"
|
||||
|
||||
// DispatchInputDataSizeLimit is the maximum size of the uncompressed input
|
||||
// DispatchPayloadSizeLimit is the maximum size of the uncompressed input
|
||||
// data payload.
|
||||
DispatchInputDataSizeLimit = 16 * 1024
|
||||
DispatchPayloadSizeLimit = 16 * 1024
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -138,8 +138,8 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
|
|||
// Populate the reply with job information
|
||||
reply.JobModifyIndex = index
|
||||
|
||||
// If the job is periodic or a dispatch template, we don't create an eval.
|
||||
if args.Job.IsPeriodic() || args.Job.IsDispatchTemplate() {
|
||||
// If the job is periodic or a constructor, we don't create an eval.
|
||||
if args.Job.IsPeriodic() || args.Job.IsConstructor() {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -316,8 +316,8 @@ func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegis
|
|||
|
||||
if job.IsPeriodic() {
|
||||
return fmt.Errorf("can't evaluate periodic job")
|
||||
} else if job.IsDispatchTemplate() {
|
||||
return fmt.Errorf("can't evaluate dispatch template job")
|
||||
} else if job.IsConstructor() {
|
||||
return fmt.Errorf("can't evaluate constructor job")
|
||||
}
|
||||
|
||||
// Create a new evaluation
|
||||
|
@ -382,8 +382,8 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
|
|||
// Populate the reply with job information
|
||||
reply.JobModifyIndex = index
|
||||
|
||||
// If the job is periodic or a dispatch template, we don't create an eval.
|
||||
if job != nil && (job.IsPeriodic() || job.IsDispatchTemplate()) {
|
||||
// If the job is periodic or a construcotr, we don't create an eval.
|
||||
if job != nil && (job.IsPeriodic() || job.IsConstructor()) {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -776,7 +776,7 @@ func validateJob(job *structs.Job) error {
|
|||
return validationErrors.ErrorOrNil()
|
||||
}
|
||||
|
||||
// Dispatch is used to dispatch a job based on a dispatch job template.
|
||||
// Dispatch is used to dispatch a job based on a constructor job.
|
||||
func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispatchResponse) error {
|
||||
if done, err := j.srv.forward("Job.Dispatch", args, args, reply); done {
|
||||
return err
|
||||
|
@ -785,35 +785,35 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
|
|||
|
||||
// Lookup the job
|
||||
if args.JobID == "" {
|
||||
return fmt.Errorf("missing dispatch template job ID")
|
||||
return fmt.Errorf("missing constructor job ID")
|
||||
}
|
||||
|
||||
snap, err := j.srv.fsm.State().Snapshot()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tmpl, err := snap.JobByID(args.JobID)
|
||||
constructor, err := snap.JobByID(args.JobID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if tmpl == nil {
|
||||
return fmt.Errorf("dispatch template job not found")
|
||||
if constructor == nil {
|
||||
return fmt.Errorf("constructor job not found")
|
||||
}
|
||||
|
||||
if !tmpl.IsDispatchTemplate() {
|
||||
return fmt.Errorf("Specified job %q is not a dispatch template", args.JobID)
|
||||
if !constructor.IsConstructor() {
|
||||
return fmt.Errorf("Specified job %q is not a constructor job", args.JobID)
|
||||
}
|
||||
|
||||
// Validate the arguments
|
||||
if err := validateDispatchRequest(args, tmpl); err != nil {
|
||||
if err := validateDispatchRequest(args, constructor); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Derive the child job and commit it via Raft
|
||||
dispatchJob := tmpl.Copy()
|
||||
dispatchJob.Dispatch = nil
|
||||
dispatchJob.ID = structs.DispatchedID(tmpl.ID, time.Now())
|
||||
dispatchJob.ParentID = tmpl.ID
|
||||
dispatchJob := constructor.Copy()
|
||||
dispatchJob.Constructor = nil
|
||||
dispatchJob.ID = structs.DispatchedID(constructor.ID, time.Now())
|
||||
dispatchJob.ParentID = constructor.ID
|
||||
dispatchJob.Name = dispatchJob.ID
|
||||
|
||||
// Merge in the meta data
|
||||
|
@ -821,8 +821,8 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
|
|||
dispatchJob.Meta[k] = v
|
||||
}
|
||||
|
||||
// Compress the input
|
||||
dispatchJob.InputData = snappy.Encode(nil, args.InputData)
|
||||
// Compress the payload
|
||||
dispatchJob.Payload = snappy.Encode(nil, args.Payload)
|
||||
|
||||
regReq := &structs.JobRegisterRequest{
|
||||
Job: dispatchJob,
|
||||
|
@ -868,19 +868,19 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
|
|||
}
|
||||
|
||||
// validateDispatchRequest returns whether the request is valid given the
|
||||
// dispatch configuration of the template job
|
||||
func validateDispatchRequest(req *structs.JobDispatchRequest, tmpl *structs.Job) error {
|
||||
// Check the input data constraint is met
|
||||
hasInputData := len(req.InputData) != 0
|
||||
if tmpl.Dispatch.InputData == structs.DispatchInputDataRequired && !hasInputData {
|
||||
return fmt.Errorf("Input data is not provided but required by dispatch template")
|
||||
} else if tmpl.Dispatch.InputData == structs.DispatchInputDataForbidden && hasInputData {
|
||||
return fmt.Errorf("Input data provided but forbidden by dispatch template")
|
||||
// jobs constructor
|
||||
func validateDispatchRequest(req *structs.JobDispatchRequest, job *structs.Job) error {
|
||||
// Check the payload constraint is met
|
||||
hasInputData := len(req.Payload) != 0
|
||||
if job.Constructor.Payload == structs.DispatchPayloadRequired && !hasInputData {
|
||||
return fmt.Errorf("Payload is not provided but required by constructor")
|
||||
} else if job.Constructor.Payload == structs.DispatchPayloadForbidden && hasInputData {
|
||||
return fmt.Errorf("Payload provided but forbidden by constructor")
|
||||
}
|
||||
|
||||
// Check the input data doesn't exceed the size limit
|
||||
if l := len(req.InputData); l > DispatchInputDataSizeLimit {
|
||||
return fmt.Errorf("Input data exceeds maximum size; %d > %d", l, DispatchInputDataSizeLimit)
|
||||
// Check the payload doesn't exceed the size limit
|
||||
if l := len(req.Payload); l > DispatchPayloadSizeLimit {
|
||||
return fmt.Errorf("Payload exceeds maximum size; %d > %d", l, DispatchPayloadSizeLimit)
|
||||
}
|
||||
|
||||
// Check if the metadata is a set
|
||||
|
@ -892,8 +892,8 @@ func validateDispatchRequest(req *structs.JobDispatchRequest, tmpl *structs.Job)
|
|||
keys[k] = struct{}{}
|
||||
}
|
||||
|
||||
required := structs.SliceStringToSet(tmpl.Dispatch.MetaRequired)
|
||||
optional := structs.SliceStringToSet(tmpl.Dispatch.MetaOptional)
|
||||
required := structs.SliceStringToSet(job.Constructor.MetaRequired)
|
||||
optional := structs.SliceStringToSet(job.Constructor.MetaOptional)
|
||||
|
||||
// Check the metadata key constraints are met
|
||||
unpermitted := make(map[string]struct{})
|
||||
|
@ -915,7 +915,7 @@ func validateDispatchRequest(req *structs.JobDispatchRequest, tmpl *structs.Job)
|
|||
}
|
||||
|
||||
missing := make(map[string]struct{})
|
||||
for _, k := range tmpl.Dispatch.MetaRequired {
|
||||
for _, k := range job.Constructor.MetaRequired {
|
||||
if _, ok := req.Meta[k]; !ok {
|
||||
missing[k] = struct{}{}
|
||||
}
|
||||
|
|
|
@ -249,7 +249,7 @@ func TestJobEndpoint_Register_Periodic(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestJobEndpoint_Register_Dispatch_Template(t *testing.T) {
|
||||
func TestJobEndpoint_Register_Constructor(t *testing.T) {
|
||||
s1 := testServer(t, func(c *Config) {
|
||||
c.NumSchedulers = 0 // Prevent automatic dequeue
|
||||
})
|
||||
|
@ -257,9 +257,9 @@ func TestJobEndpoint_Register_Dispatch_Template(t *testing.T) {
|
|||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
// Create the register request for a dispatch template job.
|
||||
// Create the register request for a constructor job.
|
||||
job := mock.Job()
|
||||
job.Dispatch = &structs.DispatchConfig{}
|
||||
job.Constructor = &structs.ConstructorConfig{}
|
||||
req := &structs.JobRegisterRequest{
|
||||
Job: job,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
|
@ -274,7 +274,7 @@ func TestJobEndpoint_Register_Dispatch_Template(t *testing.T) {
|
|||
t.Fatalf("bad index: %d", resp.Index)
|
||||
}
|
||||
|
||||
// Check for the node in the FSM
|
||||
// Check for the job in the FSM
|
||||
state := s1.fsm.State()
|
||||
out, err := state.JobByID(job.ID)
|
||||
if err != nil {
|
||||
|
@ -287,7 +287,7 @@ func TestJobEndpoint_Register_Dispatch_Template(t *testing.T) {
|
|||
t.Fatalf("index mis-match")
|
||||
}
|
||||
if resp.EvalID != "" {
|
||||
t.Fatalf("Register created an eval for a dispatch template job")
|
||||
t.Fatalf("Register created an eval for a constructor job")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -756,7 +756,7 @@ func TestJobEndpoint_Evaluate_Periodic(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestJobEndpoint_Evaluate_Dispatch_Template(t *testing.T) {
|
||||
func TestJobEndpoint_Evaluate_Constructor(t *testing.T) {
|
||||
s1 := testServer(t, func(c *Config) {
|
||||
c.NumSchedulers = 0 // Prevent automatic dequeue
|
||||
})
|
||||
|
@ -766,7 +766,7 @@ func TestJobEndpoint_Evaluate_Dispatch_Template(t *testing.T) {
|
|||
|
||||
// Create the register request
|
||||
job := mock.Job()
|
||||
job.Dispatch = &structs.DispatchConfig{}
|
||||
job.Constructor = &structs.ConstructorConfig{}
|
||||
req := &structs.JobRegisterRequest{
|
||||
Job: job,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
|
@ -973,7 +973,7 @@ func TestJobEndpoint_Deregister_Periodic(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestJobEndpoint_Deregister_Dispatch_Template(t *testing.T) {
|
||||
func TestJobEndpoint_Deregister_Constructor(t *testing.T) {
|
||||
s1 := testServer(t, func(c *Config) {
|
||||
c.NumSchedulers = 0 // Prevent automatic dequeue
|
||||
})
|
||||
|
@ -983,7 +983,7 @@ func TestJobEndpoint_Deregister_Dispatch_Template(t *testing.T) {
|
|||
|
||||
// Create the register request
|
||||
job := mock.Job()
|
||||
job.Dispatch = &structs.DispatchConfig{}
|
||||
job.Constructor = &structs.ConstructorConfig{}
|
||||
reg := &structs.JobRegisterRequest{
|
||||
Job: job,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
|
@ -1019,7 +1019,7 @@ func TestJobEndpoint_Deregister_Dispatch_Template(t *testing.T) {
|
|||
}
|
||||
|
||||
if resp.EvalID != "" {
|
||||
t.Fatalf("Deregister created an eval for a dispatch template job")
|
||||
t.Fatalf("Deregister created an eval for a constructor job")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1859,35 +1859,35 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
|
|||
|
||||
// No requirements
|
||||
d1 := mock.Job()
|
||||
d1.Dispatch = &structs.DispatchConfig{}
|
||||
d1.Constructor = &structs.ConstructorConfig{}
|
||||
|
||||
// Require input data
|
||||
d2 := mock.Job()
|
||||
d2.Dispatch = &structs.DispatchConfig{
|
||||
InputData: structs.DispatchInputDataRequired,
|
||||
d2.Constructor = &structs.ConstructorConfig{
|
||||
Payload: structs.DispatchPayloadRequired,
|
||||
}
|
||||
|
||||
// Disallow input data
|
||||
d3 := mock.Job()
|
||||
d3.Dispatch = &structs.DispatchConfig{
|
||||
InputData: structs.DispatchInputDataForbidden,
|
||||
d3.Constructor = &structs.ConstructorConfig{
|
||||
Payload: structs.DispatchPayloadForbidden,
|
||||
}
|
||||
|
||||
// Require meta
|
||||
d4 := mock.Job()
|
||||
d4.Dispatch = &structs.DispatchConfig{
|
||||
d4.Constructor = &structs.ConstructorConfig{
|
||||
MetaRequired: []string{"foo", "bar"},
|
||||
}
|
||||
|
||||
// Optional meta
|
||||
d5 := mock.Job()
|
||||
d5.Dispatch = &structs.DispatchConfig{
|
||||
d5.Constructor = &structs.ConstructorConfig{
|
||||
MetaOptional: []string{"foo", "bar"},
|
||||
}
|
||||
|
||||
reqNoInputNoMeta := &structs.JobDispatchRequest{}
|
||||
reqInputDataNoMeta := &structs.JobDispatchRequest{
|
||||
InputData: []byte("hello world"),
|
||||
Payload: []byte("hello world"),
|
||||
}
|
||||
reqNoInputDataMeta := &structs.JobDispatchRequest{
|
||||
Meta: map[string]string{
|
||||
|
@ -1896,14 +1896,14 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
|
|||
},
|
||||
}
|
||||
reqInputDataMeta := &structs.JobDispatchRequest{
|
||||
InputData: []byte("hello world"),
|
||||
Payload: []byte("hello world"),
|
||||
Meta: map[string]string{
|
||||
"foo": "f1",
|
||||
"bar": "f2",
|
||||
},
|
||||
}
|
||||
reqBadMeta := &structs.JobDispatchRequest{
|
||||
InputData: []byte("hello world"),
|
||||
Payload: []byte("hello world"),
|
||||
Meta: map[string]string{
|
||||
"foo": "f1",
|
||||
"bar": "f2",
|
||||
|
@ -1911,93 +1911,93 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
|
|||
},
|
||||
}
|
||||
reqInputDataTooLarge := &structs.JobDispatchRequest{
|
||||
InputData: make([]byte, DispatchInputDataSizeLimit+100),
|
||||
Payload: make([]byte, DispatchPayloadSizeLimit+100),
|
||||
}
|
||||
|
||||
type testCase struct {
|
||||
name string
|
||||
dispatchTemplate *structs.Job
|
||||
dispatchReq *structs.JobDispatchRequest
|
||||
err bool
|
||||
errStr string
|
||||
name string
|
||||
constructor *structs.Job
|
||||
dispatchReq *structs.JobDispatchRequest
|
||||
err bool
|
||||
errStr string
|
||||
}
|
||||
cases := []testCase{
|
||||
{
|
||||
name: "optional input data w/ data",
|
||||
dispatchTemplate: d1,
|
||||
dispatchReq: reqInputDataNoMeta,
|
||||
err: false,
|
||||
name: "optional input data w/ data",
|
||||
constructor: d1,
|
||||
dispatchReq: reqInputDataNoMeta,
|
||||
err: false,
|
||||
},
|
||||
{
|
||||
name: "optional input data w/o data",
|
||||
dispatchTemplate: d1,
|
||||
dispatchReq: reqNoInputNoMeta,
|
||||
err: false,
|
||||
name: "optional input data w/o data",
|
||||
constructor: d1,
|
||||
dispatchReq: reqNoInputNoMeta,
|
||||
err: false,
|
||||
},
|
||||
{
|
||||
name: "require input data w/ data",
|
||||
dispatchTemplate: d2,
|
||||
dispatchReq: reqInputDataNoMeta,
|
||||
err: false,
|
||||
name: "require input data w/ data",
|
||||
constructor: d2,
|
||||
dispatchReq: reqInputDataNoMeta,
|
||||
err: false,
|
||||
},
|
||||
{
|
||||
name: "require input data w/o data",
|
||||
dispatchTemplate: d2,
|
||||
dispatchReq: reqNoInputNoMeta,
|
||||
err: true,
|
||||
errStr: "not provided but required",
|
||||
name: "require input data w/o data",
|
||||
constructor: d2,
|
||||
dispatchReq: reqNoInputNoMeta,
|
||||
err: true,
|
||||
errStr: "not provided but required",
|
||||
},
|
||||
{
|
||||
name: "disallow input data w/o data",
|
||||
dispatchTemplate: d3,
|
||||
dispatchReq: reqNoInputNoMeta,
|
||||
err: false,
|
||||
name: "disallow input data w/o data",
|
||||
constructor: d3,
|
||||
dispatchReq: reqNoInputNoMeta,
|
||||
err: false,
|
||||
},
|
||||
{
|
||||
name: "disallow input data w/ data",
|
||||
dispatchTemplate: d3,
|
||||
dispatchReq: reqInputDataNoMeta,
|
||||
err: true,
|
||||
errStr: "provided but forbidden",
|
||||
name: "disallow input data w/ data",
|
||||
constructor: d3,
|
||||
dispatchReq: reqInputDataNoMeta,
|
||||
err: true,
|
||||
errStr: "provided but forbidden",
|
||||
},
|
||||
{
|
||||
name: "require meta w/ meta",
|
||||
dispatchTemplate: d4,
|
||||
dispatchReq: reqInputDataMeta,
|
||||
err: false,
|
||||
name: "require meta w/ meta",
|
||||
constructor: d4,
|
||||
dispatchReq: reqInputDataMeta,
|
||||
err: false,
|
||||
},
|
||||
{
|
||||
name: "require meta w/o meta",
|
||||
dispatchTemplate: d4,
|
||||
dispatchReq: reqNoInputNoMeta,
|
||||
err: true,
|
||||
errStr: "did not provide required meta keys",
|
||||
name: "require meta w/o meta",
|
||||
constructor: d4,
|
||||
dispatchReq: reqNoInputNoMeta,
|
||||
err: true,
|
||||
errStr: "did not provide required meta keys",
|
||||
},
|
||||
{
|
||||
name: "optional meta w/ meta",
|
||||
dispatchTemplate: d5,
|
||||
dispatchReq: reqNoInputDataMeta,
|
||||
err: false,
|
||||
name: "optional meta w/ meta",
|
||||
constructor: d5,
|
||||
dispatchReq: reqNoInputDataMeta,
|
||||
err: false,
|
||||
},
|
||||
{
|
||||
name: "optional meta w/o meta",
|
||||
dispatchTemplate: d5,
|
||||
dispatchReq: reqNoInputNoMeta,
|
||||
err: false,
|
||||
name: "optional meta w/o meta",
|
||||
constructor: d5,
|
||||
dispatchReq: reqNoInputNoMeta,
|
||||
err: false,
|
||||
},
|
||||
{
|
||||
name: "optional meta w/ bad meta",
|
||||
dispatchTemplate: d5,
|
||||
dispatchReq: reqBadMeta,
|
||||
err: true,
|
||||
errStr: "unpermitted metadata keys",
|
||||
name: "optional meta w/ bad meta",
|
||||
constructor: d5,
|
||||
dispatchReq: reqBadMeta,
|
||||
err: true,
|
||||
errStr: "unpermitted metadata keys",
|
||||
},
|
||||
{
|
||||
name: "optional input w/ too big of input",
|
||||
dispatchTemplate: d1,
|
||||
dispatchReq: reqInputDataTooLarge,
|
||||
err: true,
|
||||
errStr: "data exceeds maximum size",
|
||||
name: "optional input w/ too big of input",
|
||||
constructor: d1,
|
||||
dispatchReq: reqInputDataTooLarge,
|
||||
err: true,
|
||||
errStr: "Payload exceeds maximum size",
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -2012,7 +2012,7 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
|
|||
|
||||
// Create the register request
|
||||
regReq := &structs.JobRegisterRequest{
|
||||
Job: tc.dispatchTemplate,
|
||||
Job: tc.constructor,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
}
|
||||
|
||||
|
@ -2023,7 +2023,7 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
|
|||
}
|
||||
|
||||
// Now try to dispatch
|
||||
tc.dispatchReq.JobID = tc.dispatchTemplate.ID
|
||||
tc.dispatchReq.JobID = tc.constructor.ID
|
||||
tc.dispatchReq.WriteRequest = structs.WriteRequest{Region: "global"}
|
||||
|
||||
var dispatchResp structs.JobDispatchResponse
|
||||
|
@ -2050,7 +2050,7 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
|
|||
if out.CreateIndex != dispatchResp.JobCreateIndex {
|
||||
t.Fatalf("index mis-match")
|
||||
}
|
||||
if out.ParentID != tc.dispatchTemplate.ID {
|
||||
if out.ParentID != tc.constructor.ID {
|
||||
t.Fatalf("bad parent ID")
|
||||
}
|
||||
|
||||
|
|
|
@ -1605,9 +1605,9 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b
|
|||
}
|
||||
|
||||
// If there are no allocations or evaluations it is a new job. If the job is
|
||||
// periodic or is a dispatch template, we mark it as running as it will
|
||||
// never have an allocation/evaluation against it.
|
||||
if job.IsPeriodic() || job.IsDispatchTemplate() {
|
||||
// periodic or is a constructor, we mark it as running as it will never have
|
||||
// an allocation/evaluation against it.
|
||||
if job.IsPeriodic() || job.IsConstructor() {
|
||||
return structs.JobStatusRunning, nil
|
||||
}
|
||||
return structs.JobStatusPending, nil
|
||||
|
|
|
@ -272,11 +272,11 @@ type JobSummaryRequest struct {
|
|||
QueryOptions
|
||||
}
|
||||
|
||||
// JobDispatchRequest is used to dispatch a job based on a job dispatch template
|
||||
// JobDispatchRequest is used to dispatch a job based on a constructor job
|
||||
type JobDispatchRequest struct {
|
||||
JobID string
|
||||
InputData []byte
|
||||
Meta map[string]string
|
||||
JobID string
|
||||
Payload []byte
|
||||
Meta map[string]string
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
|
@ -1129,11 +1129,11 @@ type Job struct {
|
|||
// Periodic is used to define the interval the job is run at.
|
||||
Periodic *PeriodicConfig
|
||||
|
||||
// Dispatch is used to specify the job as a template job for dispatching.
|
||||
Dispatch *DispatchConfig
|
||||
// Constructor is used to specify the job as a constructor job for dispatching.
|
||||
Constructor *ConstructorConfig
|
||||
|
||||
// InputData is the input data supplied when the job was dispatched.
|
||||
InputData []byte
|
||||
// Payload is the payload supplied when the job was dispatched.
|
||||
Payload []byte
|
||||
|
||||
// Meta is used to associate arbitrary metadata with this
|
||||
// job. This is opaque to Nomad.
|
||||
|
@ -1169,8 +1169,8 @@ func (j *Job) Canonicalize() {
|
|||
tg.Canonicalize(j)
|
||||
}
|
||||
|
||||
if j.Dispatch != nil {
|
||||
j.Dispatch.Canonicalize()
|
||||
if j.Constructor != nil {
|
||||
j.Constructor.Canonicalize()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1195,7 +1195,7 @@ func (j *Job) Copy() *Job {
|
|||
|
||||
nj.Periodic = nj.Periodic.Copy()
|
||||
nj.Meta = CopyMapStringString(nj.Meta)
|
||||
nj.Dispatch = nj.Dispatch.Copy()
|
||||
nj.Constructor = nj.Constructor.Copy()
|
||||
return nj
|
||||
}
|
||||
|
||||
|
@ -1270,8 +1270,8 @@ func (j *Job) Validate() error {
|
|||
}
|
||||
}
|
||||
|
||||
if j.IsDispatchTemplate() {
|
||||
if err := j.Dispatch.Validate(); err != nil {
|
||||
if j.IsConstructor() {
|
||||
if err := j.Constructor.Validate(); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
}
|
||||
|
@ -1311,9 +1311,9 @@ func (j *Job) IsPeriodic() bool {
|
|||
return j.Periodic != nil
|
||||
}
|
||||
|
||||
// IsDispatchTemplate returns whether a job is dispatch template.
|
||||
func (j *Job) IsDispatchTemplate() bool {
|
||||
return j.Dispatch != nil
|
||||
// IsConstructor returns whether a job is constructor job.
|
||||
func (j *Job) IsConstructor() bool {
|
||||
return j.Constructor != nil
|
||||
}
|
||||
|
||||
// VaultPolicies returns the set of Vault policies per task group, per task
|
||||
|
@ -1589,19 +1589,19 @@ type PeriodicLaunch struct {
|
|||
}
|
||||
|
||||
const (
|
||||
DispatchInputDataForbidden = "forbidden"
|
||||
DispatchInputDataOptional = "optional"
|
||||
DispatchInputDataRequired = "required"
|
||||
DispatchPayloadForbidden = "forbidden"
|
||||
DispatchPayloadOptional = "optional"
|
||||
DispatchPayloadRequired = "required"
|
||||
|
||||
// DispatchLaunchSuffic is the string appended to the dispatch job
|
||||
// templates's ID when dispatching instances of it.
|
||||
// DispatchLaunchSuffic is the string appended to the constructor job's ID
|
||||
// when dispatching instances of it.
|
||||
DispatchLaunchSuffic = "/dispatch-"
|
||||
)
|
||||
|
||||
// DispatchConfig is used to configure the dispatch template
|
||||
type DispatchConfig struct {
|
||||
// InputData configure the input data requirements
|
||||
InputData string `mapstructure:"input_data"`
|
||||
// ConstructorConfig is used to configure the constructor job
|
||||
type ConstructorConfig struct {
|
||||
// Payload configure the payload requirements
|
||||
Payload string
|
||||
|
||||
// MetaRequired is metadata keys that must be specified by the dispatcher
|
||||
MetaRequired []string `mapstructure:"required"`
|
||||
|
@ -1610,12 +1610,12 @@ type DispatchConfig struct {
|
|||
MetaOptional []string `mapstructure:"optional"`
|
||||
}
|
||||
|
||||
func (d *DispatchConfig) Validate() error {
|
||||
func (d *ConstructorConfig) Validate() error {
|
||||
var mErr multierror.Error
|
||||
switch d.InputData {
|
||||
case DispatchInputDataOptional, DispatchInputDataRequired, DispatchInputDataForbidden:
|
||||
switch d.Payload {
|
||||
case DispatchPayloadOptional, DispatchPayloadRequired, DispatchPayloadForbidden:
|
||||
default:
|
||||
multierror.Append(&mErr, fmt.Errorf("Unknown input data requirement: %q", d.InputData))
|
||||
multierror.Append(&mErr, fmt.Errorf("Unknown payload requirement: %q", d.Payload))
|
||||
}
|
||||
|
||||
// Check that the meta configurations are disjoint sets
|
||||
|
@ -1627,17 +1627,17 @@ func (d *DispatchConfig) Validate() error {
|
|||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
func (d *DispatchConfig) Canonicalize() {
|
||||
if d.InputData == "" {
|
||||
d.InputData = DispatchInputDataOptional
|
||||
func (d *ConstructorConfig) Canonicalize() {
|
||||
if d.Payload == "" {
|
||||
d.Payload = DispatchPayloadOptional
|
||||
}
|
||||
}
|
||||
|
||||
func (d *DispatchConfig) Copy() *DispatchConfig {
|
||||
func (d *ConstructorConfig) Copy() *ConstructorConfig {
|
||||
if d == nil {
|
||||
return nil
|
||||
}
|
||||
nd := new(DispatchConfig)
|
||||
nd := new(ConstructorConfig)
|
||||
*nd = *d
|
||||
nd.MetaOptional = CopySliceString(nd.MetaOptional)
|
||||
nd.MetaRequired = CopySliceString(nd.MetaRequired)
|
||||
|
@ -1645,7 +1645,7 @@ func (d *DispatchConfig) Copy() *DispatchConfig {
|
|||
}
|
||||
|
||||
// DispatchedID returns an ID appropriate for a job dispatched against a
|
||||
// particular template
|
||||
// particular constructor
|
||||
func DispatchedID(templateID string, t time.Time) string {
|
||||
u := GenerateUUID()[:8]
|
||||
return fmt.Sprintf("%s%s%d-%s", templateID, DispatchLaunchSuffic, t.Unix(), u)
|
||||
|
@ -1653,9 +1653,6 @@ func DispatchedID(templateID string, t time.Time) string {
|
|||
|
||||
// DispatchInputConfig configures how a task gets its input from a job dispatch
|
||||
type DispatchInputConfig struct {
|
||||
// Stdin specifies whether the input should be written to the task's stdin
|
||||
Stdin bool
|
||||
|
||||
// File specifies a relative path to where the input data should be written
|
||||
File string
|
||||
}
|
||||
|
@ -2221,7 +2218,6 @@ type Task struct {
|
|||
Resources *Resources
|
||||
|
||||
// DispatchInput configures how the task retrieves its input from a dispatch
|
||||
// template
|
||||
DispatchInput *DispatchInputConfig
|
||||
|
||||
// Meta is used to associate arbitrary metadata with this
|
||||
|
|
Loading…
Reference in New Issue