// Copyright (c) HashiCorp, Inc. // SPDX-License-Identifier: MPL-2.0 package api import ( "errors" "fmt" "net/url" "sort" "strconv" "time" "github.com/hashicorp/cronexpr" "golang.org/x/exp/maps" ) const ( // JobTypeService indicates a long-running processes JobTypeService = "service" // JobTypeBatch indicates a short-lived process JobTypeBatch = "batch" // JobTypeSystem indicates a system process that should run on all clients JobTypeSystem = "system" // JobTypeSysbatch indicates a short-lived system process that should run // on all clients. JobTypeSysbatch = "sysbatch" // JobDefaultPriority is the default priority if not specified. JobDefaultPriority = 50 // PeriodicSpecCron is used for a cron spec. PeriodicSpecCron = "cron" // DefaultNamespace is the default namespace. DefaultNamespace = "default" // NodePoolDefault is the default node pool. NodePoolDefault = "default" // For Job configuration, GlobalRegion is a sentinel region value // that users may specify to indicate the job should be run on // the region of the node that the job was submitted to. // For Client configuration, if no region information is given, // the client node will default to be part of the GlobalRegion. GlobalRegion = "global" ) const ( // RegisterEnforceIndexErrPrefix is the prefix to use in errors caused by // enforcing the job modify index during registers. RegisterEnforceIndexErrPrefix = "Enforcing job modify index" ) const ( // JobPeriodicLaunchSuffix is the string appended to the periodic jobs ID // when launching derived instances of it. JobPeriodicLaunchSuffix = "/periodic-" // JobDispatchLaunchSuffix is the string appended to the parameterized job's ID // when dispatching instances of it. JobDispatchLaunchSuffix = "/dispatch-" ) // Jobs is used to access the job-specific endpoints. type Jobs struct { client *Client } // JobsParseRequest is used for arguments of the /v1/jobs/parse endpoint type JobsParseRequest struct { // JobHCL is an hcl jobspec JobHCL string // HCLv1 indicates whether the JobHCL should be parsed with the hcl v1 parser HCLv1 bool `json:"hclv1,omitempty"` // Variables are HCL2 variables associated with the job. Only works with hcl2. // // Interpreted as if it were the content of a variables file. Variables string // Canonicalize is a flag as to if the server should return default values // for unset fields Canonicalize bool } // Jobs returns a handle on the jobs endpoints. func (c *Client) Jobs() *Jobs { return &Jobs{client: c} } // ParseHCL is used to convert the HCL representation of a Job to JSON server side. // To parse the HCL client side see package github.com/hashicorp/nomad/jobspec // Use ParseHCLOpts if you need to customize JobsParseRequest. func (j *Jobs) ParseHCL(jobHCL string, canonicalize bool) (*Job, error) { req := &JobsParseRequest{ JobHCL: jobHCL, Canonicalize: canonicalize, } return j.ParseHCLOpts(req) } // ParseHCLOpts is used to request the server convert the HCL representation of a // Job to JSON on our behalf. Accepts HCL1 or HCL2 jobs as input. func (j *Jobs) ParseHCLOpts(req *JobsParseRequest) (*Job, error) { var job Job _, err := j.client.put("/v1/jobs/parse", req, &job, nil) return &job, err } func (j *Jobs) Validate(job *Job, q *WriteOptions) (*JobValidateResponse, *WriteMeta, error) { var resp JobValidateResponse req := &JobValidateRequest{Job: job} if q != nil { req.WriteRequest = WriteRequest{Region: q.Region} } wm, err := j.client.put("/v1/validate/job", req, &resp, q) return &resp, wm, err } // RegisterOptions is used to pass through job registration parameters type RegisterOptions struct { EnforceIndex bool ModifyIndex uint64 PolicyOverride bool PreserveCounts bool EvalPriority int Submission *JobSubmission } // Register is used to register a new job. It returns the ID // of the evaluation, along with any errors encountered. func (j *Jobs) Register(job *Job, q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) { return j.RegisterOpts(job, nil, q) } // EnforceRegister is used to register a job enforcing its job modify index. func (j *Jobs) EnforceRegister(job *Job, modifyIndex uint64, q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) { opts := RegisterOptions{EnforceIndex: true, ModifyIndex: modifyIndex} return j.RegisterOpts(job, &opts, q) } // RegisterOpts is used to register a new job with the passed RegisterOpts. It // returns the ID of the evaluation, along with any errors encountered. func (j *Jobs) RegisterOpts(job *Job, opts *RegisterOptions, q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) { // Format the request req := &JobRegisterRequest{Job: job} if opts != nil { if opts.EnforceIndex { req.EnforceIndex = true req.JobModifyIndex = opts.ModifyIndex } req.PolicyOverride = opts.PolicyOverride req.PreserveCounts = opts.PreserveCounts req.EvalPriority = opts.EvalPriority req.Submission = opts.Submission } var resp JobRegisterResponse wm, err := j.client.put("/v1/jobs", req, &resp, q) if err != nil { return nil, nil, err } return &resp, wm, nil } type JobListFields struct { Meta bool } type JobListOptions struct { Fields *JobListFields } // List is used to list all of the existing jobs. func (j *Jobs) List(q *QueryOptions) ([]*JobListStub, *QueryMeta, error) { return j.ListOptions(nil, q) } // List is used to list all of the existing jobs. func (j *Jobs) ListOptions(opts *JobListOptions, q *QueryOptions) ([]*JobListStub, *QueryMeta, error) { var resp []*JobListStub destinationURL := "/v1/jobs" if opts != nil && opts.Fields != nil { qp := url.Values{} qp.Add("meta", fmt.Sprint(opts.Fields.Meta)) destinationURL = destinationURL + "?" + qp.Encode() } qm, err := j.client.query(destinationURL, &resp, q) if err != nil { return nil, qm, err } sort.Sort(JobIDSort(resp)) return resp, qm, nil } // PrefixList is used to list all existing jobs that match the prefix. func (j *Jobs) PrefixList(prefix string) ([]*JobListStub, *QueryMeta, error) { return j.List(&QueryOptions{Prefix: prefix}) } // Info is used to retrieve information about a particular // job given its unique ID. func (j *Jobs) Info(jobID string, q *QueryOptions) (*Job, *QueryMeta, error) { var resp Job qm, err := j.client.query("/v1/job/"+url.PathEscape(jobID), &resp, q) if err != nil { return nil, nil, err } return &resp, qm, nil } // Scale is used to retrieve information about a particular // job given its unique ID. func (j *Jobs) Scale(jobID, group string, count *int, message string, error bool, meta map[string]interface{}, q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) { var count64 *int64 if count != nil { count64 = pointerOf(int64(*count)) } req := &ScalingRequest{ Count: count64, Target: map[string]string{ "Job": jobID, "Group": group, }, Error: error, Message: message, Meta: meta, } var resp JobRegisterResponse qm, err := j.client.put(fmt.Sprintf("/v1/job/%s/scale", url.PathEscape(jobID)), req, &resp, q) if err != nil { return nil, nil, err } return &resp, qm, nil } // ScaleStatus is used to retrieve information about a particular // job given its unique ID. func (j *Jobs) ScaleStatus(jobID string, q *QueryOptions) (*JobScaleStatusResponse, *QueryMeta, error) { var resp JobScaleStatusResponse qm, err := j.client.query(fmt.Sprintf("/v1/job/%s/scale", url.PathEscape(jobID)), &resp, q) if err != nil { return nil, nil, err } return &resp, qm, nil } // Versions is used to retrieve all versions of a particular job given its // unique ID. func (j *Jobs) Versions(jobID string, diffs bool, q *QueryOptions) ([]*Job, []*JobDiff, *QueryMeta, error) { var resp JobVersionsResponse qm, err := j.client.query(fmt.Sprintf("/v1/job/%s/versions?diffs=%v", url.PathEscape(jobID), diffs), &resp, q) if err != nil { return nil, nil, nil, err } return resp.Versions, resp.Diffs, qm, nil } // Submission is used to retrieve the original submitted source of a job given its // namespace, jobID, and version number. The original source might not be available, // which case nil is returned with no error. func (j *Jobs) Submission(jobID string, version int, q *QueryOptions) (*JobSubmission, *QueryMeta, error) { var sub JobSubmission s := fmt.Sprintf("/v1/job/%s/submission?version=%d", url.PathEscape(jobID), version) qm, err := j.client.query(s, &sub, q) if err != nil { return nil, nil, err } return &sub, qm, nil } // Allocations is used to return the allocs for a given job ID. func (j *Jobs) Allocations(jobID string, allAllocs bool, q *QueryOptions) ([]*AllocationListStub, *QueryMeta, error) { var resp []*AllocationListStub u, err := url.Parse("/v1/job/" + url.PathEscape(jobID) + "/allocations") if err != nil { return nil, nil, err } v := u.Query() v.Add("all", strconv.FormatBool(allAllocs)) u.RawQuery = v.Encode() qm, err := j.client.query(u.String(), &resp, q) if err != nil { return nil, nil, err } sort.Sort(AllocIndexSort(resp)) return resp, qm, nil } // Deployments is used to query the deployments associated with the given job // ID. func (j *Jobs) Deployments(jobID string, all bool, q *QueryOptions) ([]*Deployment, *QueryMeta, error) { var resp []*Deployment u, err := url.Parse("/v1/job/" + url.PathEscape(jobID) + "/deployments") if err != nil { return nil, nil, err } v := u.Query() v.Add("all", strconv.FormatBool(all)) u.RawQuery = v.Encode() qm, err := j.client.query(u.String(), &resp, q) if err != nil { return nil, nil, err } sort.Sort(DeploymentIndexSort(resp)) return resp, qm, nil } // LatestDeployment is used to query for the latest deployment associated with // the given job ID. func (j *Jobs) LatestDeployment(jobID string, q *QueryOptions) (*Deployment, *QueryMeta, error) { var resp *Deployment qm, err := j.client.query("/v1/job/"+url.PathEscape(jobID)+"/deployment", &resp, q) if err != nil { return nil, nil, err } return resp, qm, nil } // Evaluations is used to query the evaluations associated with the given job // ID. func (j *Jobs) Evaluations(jobID string, q *QueryOptions) ([]*Evaluation, *QueryMeta, error) { var resp []*Evaluation qm, err := j.client.query("/v1/job/"+url.PathEscape(jobID)+"/evaluations", &resp, q) if err != nil { return nil, nil, err } sort.Sort(EvalIndexSort(resp)) return resp, qm, nil } // Deregister is used to remove an existing job. If purge is set to true, the job // is deregistered and purged from the system versus still being queryable and // eventually GC'ed from the system. Most callers should not specify purge. func (j *Jobs) Deregister(jobID string, purge bool, q *WriteOptions) (string, *WriteMeta, error) { var resp JobDeregisterResponse wm, err := j.client.delete(fmt.Sprintf("/v1/job/%v?purge=%t", url.PathEscape(jobID), purge), nil, &resp, q) if err != nil { return "", nil, err } return resp.EvalID, wm, nil } // DeregisterOptions is used to pass through job deregistration parameters type DeregisterOptions struct { // If Purge is set to true, the job is deregistered and purged from the // system versus still being queryable and eventually GC'ed from the // system. Most callers should not specify purge. Purge bool // If Global is set to true, all regions of a multiregion job will be // stopped. Global bool // EvalPriority is an optional priority to use on any evaluation created as // a result on this job deregistration. This value must be between 1-100 // inclusively, where a larger value corresponds to a higher priority. This // is useful when an operator wishes to push through a job deregistration // in busy clusters with a large evaluation backlog. EvalPriority int // NoShutdownDelay, if set to true, will override the group and // task shutdown_delay configuration and ignore the delay for any // allocations stopped as a result of this Deregister call. NoShutdownDelay bool } // DeregisterOpts is used to remove an existing job. See DeregisterOptions // for parameters. func (j *Jobs) DeregisterOpts(jobID string, opts *DeregisterOptions, q *WriteOptions) (string, *WriteMeta, error) { var resp JobDeregisterResponse // The base endpoint to add query params to. endpoint := "/v1/job/" + url.PathEscape(jobID) // Protect against nil opts. url.Values expects a string, and so using // fmt.Sprintf is the best way to do this. if opts != nil { endpoint += fmt.Sprintf("?purge=%t&global=%t&eval_priority=%v&no_shutdown_delay=%t", opts.Purge, opts.Global, opts.EvalPriority, opts.NoShutdownDelay) } wm, err := j.client.delete(endpoint, nil, &resp, q) if err != nil { return "", nil, err } return resp.EvalID, wm, nil } // ForceEvaluate is used to force-evaluate an existing job. func (j *Jobs) ForceEvaluate(jobID string, q *WriteOptions) (string, *WriteMeta, error) { var resp JobRegisterResponse wm, err := j.client.put("/v1/job/"+url.PathEscape(jobID)+"/evaluate", nil, &resp, q) if err != nil { return "", nil, err } return resp.EvalID, wm, nil } // EvaluateWithOpts is used to force-evaluate an existing job and takes additional options // for whether to force reschedule failed allocations func (j *Jobs) EvaluateWithOpts(jobID string, opts EvalOptions, q *WriteOptions) (string, *WriteMeta, error) { req := &JobEvaluateRequest{ JobID: jobID, EvalOptions: opts, } var resp JobRegisterResponse wm, err := j.client.put("/v1/job/"+url.PathEscape(jobID)+"/evaluate", req, &resp, q) if err != nil { return "", nil, err } return resp.EvalID, wm, nil } // PeriodicForce spawns a new instance of the periodic job and returns the eval ID func (j *Jobs) PeriodicForce(jobID string, q *WriteOptions) (string, *WriteMeta, error) { var resp periodicForceResponse wm, err := j.client.put("/v1/job/"+url.PathEscape(jobID)+"/periodic/force", nil, &resp, q) if err != nil { return "", nil, err } return resp.EvalID, wm, nil } // PlanOptions is used to pass through job planning parameters type PlanOptions struct { Diff bool PolicyOverride bool } func (j *Jobs) Plan(job *Job, diff bool, q *WriteOptions) (*JobPlanResponse, *WriteMeta, error) { opts := PlanOptions{Diff: diff} return j.PlanOpts(job, &opts, q) } func (j *Jobs) PlanOpts(job *Job, opts *PlanOptions, q *WriteOptions) (*JobPlanResponse, *WriteMeta, error) { if job == nil { return nil, nil, errors.New("must pass non-nil job") } // Setup the request req := &JobPlanRequest{ Job: job, } if opts != nil { req.Diff = opts.Diff req.PolicyOverride = opts.PolicyOverride } var resp JobPlanResponse wm, err := j.client.put("/v1/job/"+url.PathEscape(*job.ID)+"/plan", req, &resp, q) if err != nil { return nil, nil, err } return &resp, wm, nil } func (j *Jobs) Summary(jobID string, q *QueryOptions) (*JobSummary, *QueryMeta, error) { var resp JobSummary qm, err := j.client.query("/v1/job/"+url.PathEscape(jobID)+"/summary", &resp, q) if err != nil { return nil, nil, err } return &resp, qm, nil } func (j *Jobs) Dispatch(jobID string, meta map[string]string, payload []byte, idPrefixTemplate string, q *WriteOptions) (*JobDispatchResponse, *WriteMeta, error) { var resp JobDispatchResponse req := &JobDispatchRequest{ JobID: jobID, Meta: meta, Payload: payload, IdPrefixTemplate: idPrefixTemplate, } wm, err := j.client.put("/v1/job/"+url.PathEscape(jobID)+"/dispatch", req, &resp, q) if err != nil { return nil, nil, err } return &resp, wm, nil } // Revert is used to revert the given job to the passed version. If // enforceVersion is set, the job is only reverted if the current version is at // the passed version. func (j *Jobs) Revert(jobID string, version uint64, enforcePriorVersion *uint64, q *WriteOptions, consulToken, vaultToken string) (*JobRegisterResponse, *WriteMeta, error) { var resp JobRegisterResponse req := &JobRevertRequest{ JobID: jobID, JobVersion: version, EnforcePriorVersion: enforcePriorVersion, ConsulToken: consulToken, VaultToken: vaultToken, } wm, err := j.client.put("/v1/job/"+url.PathEscape(jobID)+"/revert", req, &resp, q) if err != nil { return nil, nil, err } return &resp, wm, nil } // Stable is used to mark a job version's stability. func (j *Jobs) Stable(jobID string, version uint64, stable bool, q *WriteOptions) (*JobStabilityResponse, *WriteMeta, error) { var resp JobStabilityResponse req := &JobStabilityRequest{ JobID: jobID, JobVersion: version, Stable: stable, } wm, err := j.client.put("/v1/job/"+url.PathEscape(jobID)+"/stable", req, &resp, q) if err != nil { return nil, nil, err } return &resp, wm, nil } // Services is used to return a list of service registrations associated to the // specified jobID. func (j *Jobs) Services(jobID string, q *QueryOptions) ([]*ServiceRegistration, *QueryMeta, error) { var resp []*ServiceRegistration qm, err := j.client.query("/v1/job/"+jobID+"/services", &resp, q) return resp, qm, err } // periodicForceResponse is used to deserialize a force response type periodicForceResponse struct { EvalID string } // UpdateStrategy defines a task groups update strategy. type UpdateStrategy struct { Stagger *time.Duration `mapstructure:"stagger" hcl:"stagger,optional"` MaxParallel *int `mapstructure:"max_parallel" hcl:"max_parallel,optional"` HealthCheck *string `mapstructure:"health_check" hcl:"health_check,optional"` MinHealthyTime *time.Duration `mapstructure:"min_healthy_time" hcl:"min_healthy_time,optional"` HealthyDeadline *time.Duration `mapstructure:"healthy_deadline" hcl:"healthy_deadline,optional"` ProgressDeadline *time.Duration `mapstructure:"progress_deadline" hcl:"progress_deadline,optional"` Canary *int `mapstructure:"canary" hcl:"canary,optional"` AutoRevert *bool `mapstructure:"auto_revert" hcl:"auto_revert,optional"` AutoPromote *bool `mapstructure:"auto_promote" hcl:"auto_promote,optional"` } // DefaultUpdateStrategy provides a baseline that can be used to upgrade // jobs with the old policy or for populating field defaults. func DefaultUpdateStrategy() *UpdateStrategy { return &UpdateStrategy{ Stagger: pointerOf(30 * time.Second), MaxParallel: pointerOf(1), HealthCheck: pointerOf("checks"), MinHealthyTime: pointerOf(10 * time.Second), HealthyDeadline: pointerOf(5 * time.Minute), ProgressDeadline: pointerOf(10 * time.Minute), AutoRevert: pointerOf(false), Canary: pointerOf(0), AutoPromote: pointerOf(false), } } func (u *UpdateStrategy) Copy() *UpdateStrategy { if u == nil { return nil } copy := new(UpdateStrategy) if u.Stagger != nil { copy.Stagger = pointerOf(*u.Stagger) } if u.MaxParallel != nil { copy.MaxParallel = pointerOf(*u.MaxParallel) } if u.HealthCheck != nil { copy.HealthCheck = pointerOf(*u.HealthCheck) } if u.MinHealthyTime != nil { copy.MinHealthyTime = pointerOf(*u.MinHealthyTime) } if u.HealthyDeadline != nil { copy.HealthyDeadline = pointerOf(*u.HealthyDeadline) } if u.ProgressDeadline != nil { copy.ProgressDeadline = pointerOf(*u.ProgressDeadline) } if u.AutoRevert != nil { copy.AutoRevert = pointerOf(*u.AutoRevert) } if u.Canary != nil { copy.Canary = pointerOf(*u.Canary) } if u.AutoPromote != nil { copy.AutoPromote = pointerOf(*u.AutoPromote) } return copy } func (u *UpdateStrategy) Merge(o *UpdateStrategy) { if o == nil { return } if o.Stagger != nil { u.Stagger = pointerOf(*o.Stagger) } if o.MaxParallel != nil { u.MaxParallel = pointerOf(*o.MaxParallel) } if o.HealthCheck != nil { u.HealthCheck = pointerOf(*o.HealthCheck) } if o.MinHealthyTime != nil { u.MinHealthyTime = pointerOf(*o.MinHealthyTime) } if o.HealthyDeadline != nil { u.HealthyDeadline = pointerOf(*o.HealthyDeadline) } if o.ProgressDeadline != nil { u.ProgressDeadline = pointerOf(*o.ProgressDeadline) } if o.AutoRevert != nil { u.AutoRevert = pointerOf(*o.AutoRevert) } if o.Canary != nil { u.Canary = pointerOf(*o.Canary) } if o.AutoPromote != nil { u.AutoPromote = pointerOf(*o.AutoPromote) } } func (u *UpdateStrategy) Canonicalize() { d := DefaultUpdateStrategy() if u.MaxParallel == nil { u.MaxParallel = d.MaxParallel } if u.Stagger == nil { u.Stagger = d.Stagger } if u.HealthCheck == nil { u.HealthCheck = d.HealthCheck } if u.HealthyDeadline == nil { u.HealthyDeadline = d.HealthyDeadline } if u.ProgressDeadline == nil { u.ProgressDeadline = d.ProgressDeadline } if u.MinHealthyTime == nil { u.MinHealthyTime = d.MinHealthyTime } if u.AutoRevert == nil { u.AutoRevert = d.AutoRevert } if u.Canary == nil { u.Canary = d.Canary } if u.AutoPromote == nil { u.AutoPromote = d.AutoPromote } } // Empty returns whether the UpdateStrategy is empty or has user defined values. func (u *UpdateStrategy) Empty() bool { if u == nil { return true } if u.Stagger != nil && *u.Stagger != 0 { return false } if u.MaxParallel != nil && *u.MaxParallel != 0 { return false } if u.HealthCheck != nil && *u.HealthCheck != "" { return false } if u.MinHealthyTime != nil && *u.MinHealthyTime != 0 { return false } if u.HealthyDeadline != nil && *u.HealthyDeadline != 0 { return false } if u.ProgressDeadline != nil && *u.ProgressDeadline != 0 { return false } if u.AutoRevert != nil && *u.AutoRevert { return false } if u.AutoPromote != nil && *u.AutoPromote { return false } if u.Canary != nil && *u.Canary != 0 { return false } return true } type Multiregion struct { Strategy *MultiregionStrategy `hcl:"strategy,block"` Regions []*MultiregionRegion `hcl:"region,block"` } func (m *Multiregion) Canonicalize() { if m.Strategy == nil { m.Strategy = &MultiregionStrategy{ MaxParallel: pointerOf(0), OnFailure: pointerOf(""), } } else { if m.Strategy.MaxParallel == nil { m.Strategy.MaxParallel = pointerOf(0) } if m.Strategy.OnFailure == nil { m.Strategy.OnFailure = pointerOf("") } } if m.Regions == nil { m.Regions = []*MultiregionRegion{} } for _, region := range m.Regions { if region.Count == nil { region.Count = pointerOf(1) } if region.Datacenters == nil { region.Datacenters = []string{} } if region.Meta == nil { region.Meta = map[string]string{} } } } func (m *Multiregion) Copy() *Multiregion { if m == nil { return nil } copy := new(Multiregion) if m.Strategy != nil { copy.Strategy = new(MultiregionStrategy) copy.Strategy.MaxParallel = pointerOf(*m.Strategy.MaxParallel) copy.Strategy.OnFailure = pointerOf(*m.Strategy.OnFailure) } for _, region := range m.Regions { copyRegion := new(MultiregionRegion) copyRegion.Name = region.Name copyRegion.Count = pointerOf(*region.Count) copyRegion.Datacenters = append(copyRegion.Datacenters, region.Datacenters...) copyRegion.NodePool = region.NodePool for k, v := range region.Meta { copyRegion.Meta[k] = v } copy.Regions = append(copy.Regions, copyRegion) } return copy } type MultiregionStrategy struct { MaxParallel *int `mapstructure:"max_parallel" hcl:"max_parallel,optional"` OnFailure *string `mapstructure:"on_failure" hcl:"on_failure,optional"` } type MultiregionRegion struct { Name string `hcl:",label"` Count *int `hcl:"count,optional"` Datacenters []string `hcl:"datacenters,optional"` NodePool string `hcl:"node_pool,optional"` Meta map[string]string `hcl:"meta,block"` } // PeriodicConfig is for serializing periodic config for a job. type PeriodicConfig struct { Enabled *bool `hcl:"enabled,optional"` Spec *string `hcl:"cron,optional"` SpecType *string ProhibitOverlap *bool `mapstructure:"prohibit_overlap" hcl:"prohibit_overlap,optional"` TimeZone *string `mapstructure:"time_zone" hcl:"time_zone,optional"` } func (p *PeriodicConfig) Canonicalize() { if p.Enabled == nil { p.Enabled = pointerOf(true) } if p.Spec == nil { p.Spec = pointerOf("") } if p.SpecType == nil { p.SpecType = pointerOf(PeriodicSpecCron) } if p.ProhibitOverlap == nil { p.ProhibitOverlap = pointerOf(false) } if p.TimeZone == nil || *p.TimeZone == "" { p.TimeZone = pointerOf("UTC") } } // Next returns the closest time instant matching the spec that is after the // passed time. If no matching instance exists, the zero value of time.Time is // returned. The `time.Location` of the returned value matches that of the // passed time. func (p *PeriodicConfig) Next(fromTime time.Time) (time.Time, error) { if p != nil && *p.SpecType == PeriodicSpecCron { e, err := cronexpr.Parse(*p.Spec) if err != nil { return time.Time{}, fmt.Errorf("failed parsing cron expression %q: %v", *p.Spec, err) } return cronParseNext(e, fromTime, *p.Spec) } return time.Time{}, nil } // cronParseNext is a helper that parses the next time for the given expression // but captures any panic that may occur in the underlying library. // --- THIS FUNCTION IS REPLICATED IN nomad/structs/structs.go // and should be kept in sync. func cronParseNext(e *cronexpr.Expression, fromTime time.Time, spec string) (t time.Time, err error) { defer func() { if recover() != nil { t = time.Time{} err = fmt.Errorf("failed parsing cron expression: %q", spec) } }() return e.Next(fromTime), nil } func (p *PeriodicConfig) GetLocation() (*time.Location, error) { if p.TimeZone == nil || *p.TimeZone == "" { return time.UTC, nil } return time.LoadLocation(*p.TimeZone) } // ParameterizedJobConfig is used to configure the parameterized job. type ParameterizedJobConfig struct { Payload string `hcl:"payload,optional"` MetaRequired []string `mapstructure:"meta_required" hcl:"meta_required,optional"` MetaOptional []string `mapstructure:"meta_optional" hcl:"meta_optional,optional"` } // JobSubmission is used to hold information about the original content of a job // specification being submitted to Nomad. // // At any time a JobSubmission may be nil, indicating no information is known about // the job submission. type JobSubmission struct { // Source contains the original job definition (may be in the format of // hcl1, hcl2, or json). Source string // Format indicates what the Source content was (hcl1, hcl2, or json). Format string // VariableFlags contains the CLI "-var" flag arguments as submitted with the // job (hcl2 only). VariableFlags map[string]string // Variables contains the opaque variables configuration as coming from // a var-file or the WebUI variables input (hcl2 only). Variables string } func (js *JobSubmission) Canonicalize() { if js == nil { return } if len(js.VariableFlags) == 0 { js.VariableFlags = nil } } func (js *JobSubmission) Copy() *JobSubmission { if js == nil { return nil } return &JobSubmission{ Source: js.Source, Format: js.Format, VariableFlags: maps.Clone(js.VariableFlags), Variables: js.Variables, } } // Job is used to serialize a job. type Job struct { /* Fields parsed from HCL config */ Region *string `hcl:"region,optional"` Namespace *string `hcl:"namespace,optional"` ID *string `hcl:"id,optional"` Name *string `hcl:"name,optional"` Type *string `hcl:"type,optional"` Priority *int `hcl:"priority,optional"` AllAtOnce *bool `mapstructure:"all_at_once" hcl:"all_at_once,optional"` Datacenters []string `hcl:"datacenters,optional"` NodePool *string `hcl:"node_pool,optional"` Constraints []*Constraint `hcl:"constraint,block"` Affinities []*Affinity `hcl:"affinity,block"` TaskGroups []*TaskGroup `hcl:"group,block"` Update *UpdateStrategy `hcl:"update,block"` Multiregion *Multiregion `hcl:"multiregion,block"` Spreads []*Spread `hcl:"spread,block"` Periodic *PeriodicConfig `hcl:"periodic,block"` ParameterizedJob *ParameterizedJobConfig `hcl:"parameterized,block"` Reschedule *ReschedulePolicy `hcl:"reschedule,block"` Migrate *MigrateStrategy `hcl:"migrate,block"` Meta map[string]string `hcl:"meta,block"` ConsulToken *string `mapstructure:"consul_token" hcl:"consul_token,optional"` VaultToken *string `mapstructure:"vault_token" hcl:"vault_token,optional"` /* Fields set by server, not sourced from job config file */ Stop *bool ParentID *string Dispatched bool DispatchIdempotencyToken *string Payload []byte ConsulNamespace *string `mapstructure:"consul_namespace"` VaultNamespace *string `mapstructure:"vault_namespace"` NomadTokenID *string `mapstructure:"nomad_token_id"` Status *string StatusDescription *string Stable *bool Version *uint64 SubmitTime *int64 CreateIndex *uint64 ModifyIndex *uint64 JobModifyIndex *uint64 } // IsPeriodic returns whether a job is periodic. func (j *Job) IsPeriodic() bool { return j.Periodic != nil } // IsParameterized returns whether a job is parameterized job. func (j *Job) IsParameterized() bool { return j.ParameterizedJob != nil && !j.Dispatched } // IsMultiregion returns whether a job is a multiregion job func (j *Job) IsMultiregion() bool { return j.Multiregion != nil && j.Multiregion.Regions != nil && len(j.Multiregion.Regions) > 0 } func (j *Job) Canonicalize() { if j.ID == nil { j.ID = pointerOf("") } if j.Name == nil { j.Name = pointerOf(*j.ID) } if j.ParentID == nil { j.ParentID = pointerOf("") } if j.Namespace == nil { j.Namespace = pointerOf(DefaultNamespace) } if j.Priority == nil { j.Priority = pointerOf(JobDefaultPriority) } if j.Stop == nil { j.Stop = pointerOf(false) } if j.Region == nil { j.Region = pointerOf(GlobalRegion) } if j.NodePool == nil { j.NodePool = pointerOf(NodePoolDefault) } if j.Type == nil { j.Type = pointerOf("service") } if j.AllAtOnce == nil { j.AllAtOnce = pointerOf(false) } if j.ConsulToken == nil { j.ConsulToken = pointerOf("") } if j.ConsulNamespace == nil { j.ConsulNamespace = pointerOf("") } if j.VaultToken == nil { j.VaultToken = pointerOf("") } if j.VaultNamespace == nil { j.VaultNamespace = pointerOf("") } if j.NomadTokenID == nil { j.NomadTokenID = pointerOf("") } if j.Status == nil { j.Status = pointerOf("") } if j.StatusDescription == nil { j.StatusDescription = pointerOf("") } if j.Stable == nil { j.Stable = pointerOf(false) } if j.Version == nil { j.Version = pointerOf(uint64(0)) } if j.CreateIndex == nil { j.CreateIndex = pointerOf(uint64(0)) } if j.ModifyIndex == nil { j.ModifyIndex = pointerOf(uint64(0)) } if j.JobModifyIndex == nil { j.JobModifyIndex = pointerOf(uint64(0)) } if j.Periodic != nil { j.Periodic.Canonicalize() } if j.Update != nil { j.Update.Canonicalize() } else if *j.Type == JobTypeService { j.Update = DefaultUpdateStrategy() } if j.Multiregion != nil { j.Multiregion.Canonicalize() } for _, tg := range j.TaskGroups { tg.Canonicalize(j) } for _, spread := range j.Spreads { spread.Canonicalize() } for _, a := range j.Affinities { a.Canonicalize() } } // LookupTaskGroup finds a task group by name func (j *Job) LookupTaskGroup(name string) *TaskGroup { for _, tg := range j.TaskGroups { if *tg.Name == name { return tg } } return nil } // JobSummary summarizes the state of the allocations of a job type JobSummary struct { JobID string Namespace string Summary map[string]TaskGroupSummary Children *JobChildrenSummary // Raft Indexes CreateIndex uint64 ModifyIndex uint64 } // JobChildrenSummary contains the summary of children job status type JobChildrenSummary struct { Pending int64 Running int64 Dead int64 } func (jc *JobChildrenSummary) Sum() int { if jc == nil { return 0 } return int(jc.Pending + jc.Running + jc.Dead) } // TaskGroup summarizes the state of all the allocations of a particular // TaskGroup type TaskGroupSummary struct { Queued int Complete int Failed int Running int Starting int Lost int Unknown int } // JobListStub is used to return a subset of information about // jobs during list operations. type JobListStub struct { ID string ParentID string Name string Namespace string `json:",omitempty"` Datacenters []string Type string Priority int Periodic bool ParameterizedJob bool Stop bool Status string StatusDescription string JobSummary *JobSummary CreateIndex uint64 ModifyIndex uint64 JobModifyIndex uint64 SubmitTime int64 Meta map[string]string `json:",omitempty"` } // JobIDSort is used to sort jobs by their job ID's. type JobIDSort []*JobListStub func (j JobIDSort) Len() int { return len(j) } func (j JobIDSort) Less(a, b int) bool { return j[a].ID < j[b].ID } func (j JobIDSort) Swap(a, b int) { j[a], j[b] = j[b], j[a] } // NewServiceJob creates and returns a new service-style job // for long-lived processes using the provided name, ID, and // relative job priority. func NewServiceJob(id, name, region string, pri int) *Job { return newJob(id, name, region, JobTypeService, pri) } // NewBatchJob creates and returns a new batch-style job for // short-lived processes using the provided name and ID along // with the relative job priority. func NewBatchJob(id, name, region string, pri int) *Job { return newJob(id, name, region, JobTypeBatch, pri) } // NewSystemJob creates and returns a new system-style job for processes // designed to run on all clients, using the provided name and ID along with // the relative job priority. func NewSystemJob(id, name, region string, pri int) *Job { return newJob(id, name, region, JobTypeSystem, pri) } // NewSysbatchJob creates and returns a new sysbatch-style job for short-lived // processes designed to run on all clients, using the provided name and ID // along with the relative job priority. func NewSysbatchJob(id, name, region string, pri int) *Job { return newJob(id, name, region, JobTypeSysbatch, pri) } // newJob is used to create a new Job struct. func newJob(id, name, region, typ string, pri int) *Job { return &Job{ Region: ®ion, ID: &id, Name: &name, Type: &typ, Priority: &pri, } } // SetMeta is used to set arbitrary k/v pairs of metadata on a job. func (j *Job) SetMeta(key, val string) *Job { if j.Meta == nil { j.Meta = make(map[string]string) } j.Meta[key] = val return j } // AddDatacenter is used to add a datacenter to a job. func (j *Job) AddDatacenter(dc string) *Job { j.Datacenters = append(j.Datacenters, dc) return j } // Constrain is used to add a constraint to a job. func (j *Job) Constrain(c *Constraint) *Job { j.Constraints = append(j.Constraints, c) return j } // AddAffinity is used to add an affinity to a job. func (j *Job) AddAffinity(a *Affinity) *Job { j.Affinities = append(j.Affinities, a) return j } // AddTaskGroup adds a task group to an existing job. func (j *Job) AddTaskGroup(grp *TaskGroup) *Job { j.TaskGroups = append(j.TaskGroups, grp) return j } // AddPeriodicConfig adds a periodic config to an existing job. func (j *Job) AddPeriodicConfig(cfg *PeriodicConfig) *Job { j.Periodic = cfg return j } func (j *Job) AddSpread(s *Spread) *Job { j.Spreads = append(j.Spreads, s) return j } type WriteRequest struct { // The target region for this write Region string // Namespace is the target namespace for this write Namespace string // SecretID is the secret ID of an ACL token SecretID string } // JobValidateRequest is used to validate a job type JobValidateRequest struct { Job *Job WriteRequest } // JobValidateResponse is the response from validate request type JobValidateResponse struct { // DriverConfigValidated indicates whether the agent validated the driver // config DriverConfigValidated bool // ValidationErrors is a list of validation errors ValidationErrors []string // Error is a string version of any error that may have occurred Error string // Warnings contains any warnings about the given job. These may include // deprecation warnings. Warnings string } // JobRevertRequest is used to revert a job to a prior version. type JobRevertRequest struct { // JobID is the ID of the job being reverted JobID string // JobVersion the version to revert to. JobVersion uint64 // EnforcePriorVersion if set will enforce that the job is at the given // version before reverting. EnforcePriorVersion *uint64 // ConsulToken is the Consul token that proves the submitter of the job revert // has access to the Service Identity policies associated with the job's // Consul Connect enabled services. This field is only used to transfer the // token and is not stored after the Job revert. ConsulToken string `json:",omitempty"` // VaultToken is the Vault token that proves the submitter of the job revert // has access to any Vault policies specified in the targeted job version. This // field is only used to authorize the revert and is not stored after the Job // revert. VaultToken string `json:",omitempty"` WriteRequest } // JobRegisterRequest is used to update a job type JobRegisterRequest struct { Submission *JobSubmission Job *Job // If EnforceIndex is set then the job will only be registered if the passed // JobModifyIndex matches the current Jobs index. If the index is zero, the // register only occurs if the job is new. EnforceIndex bool `json:",omitempty"` JobModifyIndex uint64 `json:",omitempty"` PolicyOverride bool `json:",omitempty"` PreserveCounts bool `json:",omitempty"` // EvalPriority is an optional priority to use on any evaluation created as // a result on this job registration. This value must be between 1-100 // inclusively, where a larger value corresponds to a higher priority. This // is useful when an operator wishes to push through a job registration in // busy clusters with a large evaluation backlog. This avoids needing to // change the job priority which also impacts preemption. EvalPriority int `json:",omitempty"` WriteRequest } // JobRegisterResponse is used to respond to a job registration type JobRegisterResponse struct { EvalID string EvalCreateIndex uint64 JobModifyIndex uint64 // Warnings contains any warnings about the given job. These may include // deprecation warnings. Warnings string QueryMeta } // JobDeregisterResponse is used to respond to a job deregistration type JobDeregisterResponse struct { EvalID string EvalCreateIndex uint64 JobModifyIndex uint64 QueryMeta } type JobPlanRequest struct { Job *Job Diff bool PolicyOverride bool WriteRequest } type JobPlanResponse struct { JobModifyIndex uint64 CreatedEvals []*Evaluation Diff *JobDiff Annotations *PlanAnnotations FailedTGAllocs map[string]*AllocationMetric NextPeriodicLaunch time.Time // Warnings contains any warnings about the given job. These may include // deprecation warnings. Warnings string } type JobDiff struct { Type string ID string Fields []*FieldDiff Objects []*ObjectDiff TaskGroups []*TaskGroupDiff } type TaskGroupDiff struct { Type string Name string Fields []*FieldDiff Objects []*ObjectDiff Tasks []*TaskDiff Updates map[string]uint64 } type TaskDiff struct { Type string Name string Fields []*FieldDiff Objects []*ObjectDiff Annotations []string } type FieldDiff struct { Type string Name string Old, New string Annotations []string } type ObjectDiff struct { Type string Name string Fields []*FieldDiff Objects []*ObjectDiff } type PlanAnnotations struct { DesiredTGUpdates map[string]*DesiredUpdates PreemptedAllocs []*AllocationListStub } type DesiredUpdates struct { Ignore uint64 Place uint64 Migrate uint64 Stop uint64 InPlaceUpdate uint64 DestructiveUpdate uint64 Canary uint64 Preemptions uint64 } type JobDispatchRequest struct { JobID string Payload []byte Meta map[string]string IdPrefixTemplate string } type JobDispatchResponse struct { DispatchedJobID string EvalID string EvalCreateIndex uint64 JobCreateIndex uint64 WriteMeta } // JobVersionsResponse is used for a job get versions request type JobVersionsResponse struct { Versions []*Job Diffs []*JobDiff QueryMeta } // JobSubmissionResponse is used for a job get submission request type JobSubmissionResponse struct { Submission *JobSubmission QueryMeta } // JobStabilityRequest is used to marked a job as stable. type JobStabilityRequest struct { // Job to set the stability on JobID string JobVersion uint64 // Set the stability Stable bool WriteRequest } // JobStabilityResponse is the response when marking a job as stable. type JobStabilityResponse struct { JobModifyIndex uint64 WriteMeta } // JobEvaluateRequest is used when we just need to re-evaluate a target job type JobEvaluateRequest struct { JobID string EvalOptions EvalOptions WriteRequest } // EvalOptions is used to encapsulate options when forcing a job evaluation type EvalOptions struct { ForceReschedule bool }