open-nomad/api/jobs.go

620 lines
15 KiB
Go
Raw Normal View History

2015-09-08 23:24:26 +00:00
package api
2015-09-17 19:40:51 +00:00
import (
2016-05-12 01:51:48 +00:00
"fmt"
"net/url"
2015-09-17 19:40:51 +00:00
"sort"
"strconv"
2015-10-20 23:42:53 +00:00
"time"
2017-02-06 19:48:28 +00:00
"github.com/gorhill/cronexpr"
"github.com/hashicorp/nomad/helper"
2015-09-17 19:40:51 +00:00
)
2015-09-10 00:29:43 +00:00
const (
// JobTypeService indicates a long-running processes
JobTypeService = "service"
// JobTypeBatch indicates a short-lived process
JobTypeBatch = "batch"
2017-02-06 19:48:28 +00:00
// PeriodicSpecCron is used for a cron spec.
PeriodicSpecCron = "cron"
2015-09-10 00:29:43 +00:00
)
2016-06-08 23:48:02 +00:00
const (
// RegisterEnforceIndexErrPrefix is the prefix to use in errors caused by
// enforcing the job modify index during registers.
RegisterEnforceIndexErrPrefix = "Enforcing job modify index"
)
2015-09-08 23:24:26 +00:00
// Jobs is used to access the job-specific endpoints.
type Jobs struct {
client *Client
}
// Jobs returns a handle on the jobs endpoints.
func (c *Client) Jobs() *Jobs {
return &Jobs{client: c}
}
2017-02-06 19:48:28 +00:00
func (j *Jobs) Validate(job *Job, q *WriteOptions) (*JobValidateResponse, *WriteMeta, error) {
var resp JobValidateResponse
req := &JobValidateRequest{Job: job}
if q != nil {
req.WriteRequest = WriteRequest{Region: q.Region}
}
wm, err := j.client.write("/v1/validate/job", req, &resp, q)
return &resp, wm, err
}
2015-09-08 23:24:26 +00:00
// Register is used to register a new job. It returns the ID
// of the evaluation, along with any errors encountered.
func (j *Jobs) Register(job *Job, q *WriteOptions) (string, *WriteMeta, error) {
2016-06-08 23:48:02 +00:00
var resp registerJobResponse
req := &RegisterJobRequest{Job: job}
wm, err := j.client.write("/v1/jobs", req, &resp, q)
if err != nil {
return "", nil, err
}
return resp.EvalID, wm, nil
}
// EnforceRegister is used to register a job enforcing its job modify index.
func (j *Jobs) EnforceRegister(job *Job, modifyIndex uint64, q *WriteOptions) (string, *WriteMeta, error) {
2015-09-08 23:24:26 +00:00
var resp registerJobResponse
2016-06-08 23:48:02 +00:00
req := &RegisterJobRequest{
Job: job,
EnforceIndex: true,
JobModifyIndex: modifyIndex,
}
2015-09-08 23:24:26 +00:00
wm, err := j.client.write("/v1/jobs", req, &resp, q)
if err != nil {
return "", nil, err
}
return resp.EvalID, wm, nil
}
// List is used to list all of the existing jobs.
2015-09-14 02:55:47 +00:00
func (j *Jobs) List(q *QueryOptions) ([]*JobListStub, *QueryMeta, error) {
var resp []*JobListStub
2015-09-09 20:18:50 +00:00
qm, err := j.client.query("/v1/jobs", &resp, q)
2015-09-08 23:24:26 +00:00
if err != nil {
2015-09-08 23:45:16 +00:00
return nil, qm, err
2015-09-08 23:24:26 +00:00
}
2015-09-17 19:40:51 +00:00
sort.Sort(JobIDSort(resp))
2015-09-08 23:45:16 +00:00
return resp, qm, nil
2015-09-08 23:24:26 +00:00
}
// PrefixList is used to list all existing jobs that match the prefix.
func (j *Jobs) PrefixList(prefix string) ([]*JobListStub, *QueryMeta, error) {
return j.List(&QueryOptions{Prefix: prefix})
}
2015-09-09 00:49:31 +00:00
// Info is used to retrieve information about a particular
2015-09-09 00:20:52 +00:00
// job given its unique ID.
2015-09-09 20:18:50 +00:00
func (j *Jobs) Info(jobID string, q *QueryOptions) (*Job, *QueryMeta, error) {
2015-09-09 00:20:52 +00:00
var resp Job
2015-09-09 20:18:50 +00:00
qm, err := j.client.query("/v1/job/"+jobID, &resp, q)
2015-09-09 00:20:52 +00:00
if err != nil {
return nil, nil, err
}
return &resp, qm, nil
}
2015-09-09 00:49:31 +00:00
// Allocations is used to return the allocs for a given job ID.
func (j *Jobs) Allocations(jobID string, allAllocs bool, q *QueryOptions) ([]*AllocationListStub, *QueryMeta, error) {
2015-09-14 02:55:47 +00:00
var resp []*AllocationListStub
u, err := url.Parse("/v1/job/" + jobID + "/allocations")
if err != nil {
return nil, nil, err
}
v := u.Query()
v.Add("all", strconv.FormatBool(allAllocs))
u.RawQuery = v.Encode()
qm, err := j.client.query(u.String(), &resp, q)
2015-09-09 00:49:31 +00:00
if err != nil {
return nil, nil, err
}
2015-09-17 19:40:51 +00:00
sort.Sort(AllocIndexSort(resp))
2015-09-09 00:49:31 +00:00
return resp, qm, nil
}
2015-09-09 01:42:34 +00:00
// Evaluations is used to query the evaluations associated with
// the given job ID.
2015-09-09 20:18:50 +00:00
func (j *Jobs) Evaluations(jobID string, q *QueryOptions) ([]*Evaluation, *QueryMeta, error) {
2015-09-09 01:42:34 +00:00
var resp []*Evaluation
2015-09-09 20:18:50 +00:00
qm, err := j.client.query("/v1/job/"+jobID+"/evaluations", &resp, q)
2015-09-09 01:42:34 +00:00
if err != nil {
return nil, nil, err
}
2015-09-17 19:40:51 +00:00
sort.Sort(EvalIndexSort(resp))
2015-09-09 01:42:34 +00:00
return resp, qm, nil
}
// Deregister is used to remove an existing job.
func (j *Jobs) Deregister(jobID string, q *WriteOptions) (string, *WriteMeta, error) {
2015-09-17 00:12:48 +00:00
var resp deregisterJobResponse
wm, err := j.client.delete("/v1/job/"+jobID, &resp, q)
2015-09-09 01:42:34 +00:00
if err != nil {
2015-09-17 00:12:48 +00:00
return "", nil, err
2015-09-09 01:42:34 +00:00
}
2015-09-17 00:12:48 +00:00
return resp.EvalID, wm, nil
2015-09-09 01:42:34 +00:00
}
2015-09-10 01:39:24 +00:00
// ForceEvaluate is used to force-evaluate an existing job.
func (j *Jobs) ForceEvaluate(jobID string, q *WriteOptions) (string, *WriteMeta, error) {
var resp registerJobResponse
wm, err := j.client.write("/v1/job/"+jobID+"/evaluate", nil, &resp, q)
if err != nil {
return "", nil, err
}
return resp.EvalID, wm, nil
}
2016-01-19 19:09:36 +00:00
// PeriodicForce spawns a new instance of the periodic job and returns the eval ID
func (j *Jobs) PeriodicForce(jobID string, q *WriteOptions) (string, *WriteMeta, error) {
var resp periodicForceResponse
wm, err := j.client.write("/v1/job/"+jobID+"/periodic/force", nil, &resp, q)
if err != nil {
return "", nil, err
}
return resp.EvalID, wm, nil
}
2016-05-12 01:51:48 +00:00
func (j *Jobs) Plan(job *Job, diff bool, q *WriteOptions) (*JobPlanResponse, *WriteMeta, error) {
if job == nil {
return nil, nil, fmt.Errorf("must pass non-nil job")
}
var resp JobPlanResponse
req := &JobPlanRequest{
Job: job,
Diff: diff,
}
2017-02-06 19:48:28 +00:00
wm, err := j.client.write("/v1/job/"+*job.ID+"/plan", req, &resp, q)
2016-05-12 01:51:48 +00:00
if err != nil {
return nil, nil, err
}
return &resp, wm, nil
}
func (j *Jobs) Summary(jobID string, q *QueryOptions) (*JobSummary, *QueryMeta, error) {
var resp JobSummary
qm, err := j.client.query("/v1/job/"+jobID+"/summary", &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, qm, nil
}
2016-12-05 05:22:13 +00:00
func (j *Jobs) Dispatch(jobID string, meta map[string]string,
2016-12-14 20:50:08 +00:00
payload []byte, q *WriteOptions) (*JobDispatchResponse, *WriteMeta, error) {
2016-12-05 05:22:13 +00:00
var resp JobDispatchResponse
req := &JobDispatchRequest{
2016-12-14 20:50:08 +00:00
JobID: jobID,
Meta: meta,
Payload: payload,
2016-12-05 05:22:13 +00:00
}
wm, err := j.client.write("/v1/job/"+jobID+"/dispatch", req, &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, wm, nil
}
2016-01-19 19:09:36 +00:00
// periodicForceResponse is used to deserialize a force response
type periodicForceResponse struct {
EvalID string
}
// UpdateStrategy is for serializing update strategy for a job.
type UpdateStrategy struct {
Stagger time.Duration
MaxParallel int `mapstructure:"max_parallel"`
}
// PeriodicConfig is for serializing periodic config for a job.
type PeriodicConfig struct {
2017-02-06 19:48:28 +00:00
Enabled *bool
Spec *string
SpecType *string
ProhibitOverlap *bool `mapstructure:"prohibit_overlap"`
TimeZone *string `mapstructure:"time_zone"`
2017-02-06 19:48:28 +00:00
}
func (p *PeriodicConfig) Canonicalize() {
if p.Enabled == nil {
p.Enabled = helper.BoolToPtr(true)
}
2017-02-24 22:18:09 +00:00
if p.Spec == nil {
p.Spec = helper.StringToPtr("")
}
2017-02-06 19:48:28 +00:00
if p.SpecType == nil {
p.SpecType = helper.StringToPtr(PeriodicSpecCron)
}
if p.ProhibitOverlap == nil {
p.ProhibitOverlap = helper.BoolToPtr(false)
}
if p.TimeZone == nil || *p.TimeZone == "" {
p.TimeZone = helper.StringToPtr("UTC")
}
2017-02-06 19:48:28 +00:00
}
// Next returns the closest time instant matching the spec that is after the
// passed time. If no matching instance exists, the zero value of time.Time is
// returned. The `time.Location` of the returned value matches that of the
// passed time.
func (p *PeriodicConfig) Next(fromTime time.Time) time.Time {
if *p.SpecType == PeriodicSpecCron {
if e, err := cronexpr.Parse(*p.Spec); err == nil {
return e.Next(fromTime)
}
}
return time.Time{}
}
2017-02-15 22:37:06 +00:00
func (p *PeriodicConfig) GetLocation() (*time.Location, error) {
if p.TimeZone == nil || *p.TimeZone == "" {
return time.UTC, nil
}
return time.LoadLocation(*p.TimeZone)
}
// ParameterizedJobConfig is used to configure the parameterized job.
type ParameterizedJobConfig struct {
2016-12-14 20:50:08 +00:00
Payload string
MetaRequired []string `mapstructure:"meta_required"`
MetaOptional []string `mapstructure:"meta_optional"`
2016-11-23 22:56:50 +00:00
}
2015-09-08 23:24:26 +00:00
// Job is used to serialize a job.
type Job struct {
2017-02-06 19:48:28 +00:00
Region *string
ID *string
ParentID *string
Name *string
Type *string
Priority *int
AllAtOnce *bool `mapstructure:"all_at_once"`
2015-09-08 23:24:26 +00:00
Datacenters []string
2015-09-09 02:27:04 +00:00
Constraints []*Constraint
2015-09-10 00:29:43 +00:00
TaskGroups []*TaskGroup
Update *UpdateStrategy
Periodic *PeriodicConfig
ParameterizedJob *ParameterizedJobConfig
2016-12-14 20:50:08 +00:00
Payload []byte
2015-09-08 23:24:26 +00:00
Meta map[string]string
VaultToken *string `mapstructure:"vault_token"`
2017-02-06 19:48:28 +00:00
Status *string
StatusDescription *string
CreateIndex *uint64
ModifyIndex *uint64
JobModifyIndex *uint64
}
// IsPeriodic returns whether a job is periodic.
func (j *Job) IsPeriodic() bool {
return j.Periodic != nil
}
// IsParameterized returns whether a job is parameterized job.
func (j *Job) IsParameterized() bool {
return j.ParameterizedJob != nil
}
func (j *Job) Canonicalize() {
if j.ID == nil {
j.ID = helper.StringToPtr("")
}
if j.Name == nil {
j.Name = j.ID
}
2017-02-13 23:18:17 +00:00
if j.ParentID == nil {
j.ParentID = helper.StringToPtr("")
}
2017-02-06 19:48:28 +00:00
if j.Priority == nil {
j.Priority = helper.IntToPtr(50)
}
if j.Region == nil {
j.Region = helper.StringToPtr("global")
}
if j.Type == nil {
j.Type = helper.StringToPtr("service")
}
if j.AllAtOnce == nil {
j.AllAtOnce = helper.BoolToPtr(false)
}
if j.VaultToken == nil {
j.VaultToken = helper.StringToPtr("")
}
if j.Status == nil {
j.Status = helper.StringToPtr("")
}
if j.StatusDescription == nil {
j.StatusDescription = helper.StringToPtr("")
}
if j.CreateIndex == nil {
j.CreateIndex = helper.Uint64ToPtr(0)
}
if j.ModifyIndex == nil {
j.ModifyIndex = helper.Uint64ToPtr(0)
}
if j.JobModifyIndex == nil {
j.JobModifyIndex = helper.Uint64ToPtr(0)
}
if j.Periodic != nil {
j.Periodic.Canonicalize()
}
for _, tg := range j.TaskGroups {
tg.Canonicalize(j)
2017-02-06 19:48:28 +00:00
}
2015-09-14 02:55:47 +00:00
}
// JobSummary summarizes the state of the allocations of a job
type JobSummary struct {
2016-12-06 01:24:37 +00:00
JobID string
Summary map[string]TaskGroupSummary
Children *JobChildrenSummary
// Raft Indexes
CreateIndex uint64
ModifyIndex uint64
}
2016-12-06 01:24:37 +00:00
// JobChildrenSummary contains the summary of children job status
type JobChildrenSummary struct {
Pending int64
Running int64
Dead int64
}
func (jc *JobChildrenSummary) Sum() int {
if jc == nil {
return 0
}
return int(jc.Pending + jc.Running + jc.Dead)
2016-12-06 01:24:37 +00:00
}
// TaskGroup summarizes the state of all the allocations of a particular
// TaskGroup
type TaskGroupSummary struct {
Queued int
Complete int
Failed int
Running int
Starting int
Lost int
}
2015-09-14 02:55:47 +00:00
// JobListStub is used to return a subset of information about
// jobs during list operations.
type JobListStub struct {
ID string
ParentID string
2015-09-14 02:55:47 +00:00
Name string
Type string
Priority int
Status string
StatusDescription string
JobSummary *JobSummary
2015-09-14 02:55:47 +00:00
CreateIndex uint64
ModifyIndex uint64
2016-06-08 23:48:02 +00:00
JobModifyIndex uint64
2015-09-08 23:24:26 +00:00
}
2015-09-17 19:40:51 +00:00
// JobIDSort is used to sort jobs by their job ID's.
type JobIDSort []*JobListStub
func (j JobIDSort) Len() int {
return len(j)
}
func (j JobIDSort) Less(a, b int) bool {
return j[a].ID < j[b].ID
}
func (j JobIDSort) Swap(a, b int) {
j[a], j[b] = j[b], j[a]
}
2015-09-10 00:29:43 +00:00
// NewServiceJob creates and returns a new service-style job
// for long-lived processes using the provided name, ID, and
// relative job priority.
2015-09-16 18:42:08 +00:00
func NewServiceJob(id, name, region string, pri int) *Job {
return newJob(id, name, region, JobTypeService, pri)
2015-09-10 00:29:43 +00:00
}
// NewBatchJob creates and returns a new batch-style job for
// short-lived processes using the provided name and ID along
// with the relative job priority.
2015-09-16 18:42:08 +00:00
func NewBatchJob(id, name, region string, pri int) *Job {
return newJob(id, name, region, JobTypeBatch, pri)
2015-09-10 00:29:43 +00:00
}
// newJob is used to create a new Job struct.
2015-09-16 18:42:08 +00:00
func newJob(id, name, region, typ string, pri int) *Job {
2015-09-10 00:29:43 +00:00
return &Job{
2017-02-06 19:48:28 +00:00
Region: &region,
ID: &id,
Name: &name,
Type: &typ,
Priority: &pri,
2015-09-10 00:29:43 +00:00
}
}
// SetMeta is used to set arbitrary k/v pairs of metadata on a job.
func (j *Job) SetMeta(key, val string) *Job {
if j.Meta == nil {
j.Meta = make(map[string]string)
}
j.Meta[key] = val
return j
}
// AddDatacenter is used to add a datacenter to a job.
func (j *Job) AddDatacenter(dc string) *Job {
j.Datacenters = append(j.Datacenters, dc)
return j
}
// Constrain is used to add a constraint to a job.
func (j *Job) Constrain(c *Constraint) *Job {
j.Constraints = append(j.Constraints, c)
return j
}
// AddTaskGroup adds a task group to an existing job.
func (j *Job) AddTaskGroup(grp *TaskGroup) *Job {
j.TaskGroups = append(j.TaskGroups, grp)
return j
}
2016-01-13 18:19:53 +00:00
// AddPeriodicConfig adds a periodic config to an existing job.
func (j *Job) AddPeriodicConfig(cfg *PeriodicConfig) *Job {
j.Periodic = cfg
return j
}
2017-02-06 19:48:28 +00:00
type WriteRequest struct {
// The target region for this write
Region string
}
// JobValidateRequest is used to validate a job
type JobValidateRequest struct {
Job *Job
WriteRequest
}
// JobValidateResponse is the response from validate request
type JobValidateResponse struct {
// DriverConfigValidated indicates whether the agent validated the driver
// config
DriverConfigValidated bool
// ValidationErrors is a list of validation errors
ValidationErrors []string
}
2017-02-16 00:56:36 +00:00
// JobUpdateRequest is used to update a job
type JobRegisterRequest struct {
2017-02-16 00:56:36 +00:00
Job *Job
// If EnforceIndex is set then the job will only be registered if the passed
// JobModifyIndex matches the current Jobs index. If the index is zero, the
// register only occurs if the job is new.
EnforceIndex bool
JobModifyIndex uint64
WriteRequest
}
2016-03-29 22:02:14 +00:00
// RegisterJobRequest is used to serialize a job registration
type RegisterJobRequest struct {
2016-06-08 23:48:02 +00:00
Job *Job
2016-07-28 21:48:25 +00:00
EnforceIndex bool `json:",omitempty"`
JobModifyIndex uint64 `json:",omitempty"`
2015-09-08 23:24:26 +00:00
}
// registerJobResponse is used to deserialize a job response
type registerJobResponse struct {
EvalID string
}
2015-09-17 00:12:48 +00:00
// deregisterJobResponse is used to decode a deregister response
type deregisterJobResponse struct {
EvalID string
}
2016-05-12 01:51:48 +00:00
type JobPlanRequest struct {
Job *Job
Diff bool
WriteRequest
2016-05-12 01:51:48 +00:00
}
type JobPlanResponse struct {
JobModifyIndex uint64
CreatedEvals []*Evaluation
Diff *JobDiff
Annotations *PlanAnnotations
FailedTGAllocs map[string]*AllocationMetric
NextPeriodicLaunch time.Time
2016-05-12 01:51:48 +00:00
}
type JobDiff struct {
Type string
ID string
Fields []*FieldDiff
Objects []*ObjectDiff
TaskGroups []*TaskGroupDiff
}
type TaskGroupDiff struct {
Type string
Name string
Fields []*FieldDiff
Objects []*ObjectDiff
Tasks []*TaskDiff
Updates map[string]uint64
}
type TaskDiff struct {
Type string
Name string
Fields []*FieldDiff
Objects []*ObjectDiff
Annotations []string
}
type FieldDiff struct {
Type string
Name string
Old, New string
Annotations []string
}
type ObjectDiff struct {
Type string
Name string
Fields []*FieldDiff
Objects []*ObjectDiff
}
type PlanAnnotations struct {
DesiredTGUpdates map[string]*DesiredUpdates
}
type DesiredUpdates struct {
Ignore uint64
Place uint64
Migrate uint64
Stop uint64
InPlaceUpdate uint64
DestructiveUpdate uint64
}
2016-12-05 05:22:13 +00:00
type JobDispatchRequest struct {
2016-12-14 20:50:08 +00:00
JobID string
Payload []byte
Meta map[string]string
2016-12-05 05:22:13 +00:00
}
type JobDispatchResponse struct {
DispatchedJobID string
EvalID string
EvalCreateIndex uint64
JobCreateIndex uint64
2017-02-16 23:47:36 +00:00
WriteMeta
2016-12-05 05:22:13 +00:00
}