open-nomad/nomad/job_endpoint.go

685 lines
18 KiB
Go
Raw Normal View History

package nomad
import (
2016-08-18 20:52:15 +00:00
"context"
"fmt"
2016-08-17 00:50:14 +00:00
"strings"
"time"
"github.com/armon/go-metrics"
2016-09-01 19:05:08 +00:00
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-memdb"
2016-04-13 22:55:46 +00:00
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/watch"
"github.com/hashicorp/nomad/scheduler"
)
2016-06-08 23:48:02 +00:00
const (
// RegisterEnforceIndexErrPrefix is the prefix to use in errors caused by
// enforcing the job modify index during registers.
RegisterEnforceIndexErrPrefix = "Enforcing job modify index"
)
2016-09-01 21:23:40 +00:00
var (
// vaultConstraint is the implicit constraint added to jobs requesting a
// Vault token
vaultConstraint = &structs.Constraint{
LTarget: "${attr.vault.version}",
RTarget: ">= 0.6.1",
Operand: structs.ConstraintVersion,
}
)
// Job endpoint is used for job interactions
type Job struct {
srv *Server
}
// Register is used to upsert a job for scheduling
func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegisterResponse) error {
if done, err := j.srv.forward("Job.Register", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "register"}, time.Now())
// Validate the arguments
if args.Job == nil {
return fmt.Errorf("missing job for registration")
}
// Initialize the job fields (sets defaults and any necessary init work).
2016-07-20 23:07:15 +00:00
args.Job.Canonicalize()
// Validate the job.
if err := validateJob(args.Job); err != nil {
return err
}
2016-06-08 23:48:02 +00:00
if args.EnforceIndex {
// Lookup the job
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
job, err := snap.JobByID(args.Job.ID)
if err != nil {
return err
}
jmi := args.JobModifyIndex
if job != nil {
if jmi == 0 {
return fmt.Errorf("%s 0: job already exists", RegisterEnforceIndexErrPrefix)
} else if jmi != job.JobModifyIndex {
return fmt.Errorf("%s %d: job exists with conflicting job modify index: %d",
RegisterEnforceIndexErrPrefix, jmi, job.JobModifyIndex)
}
} else if jmi != 0 {
return fmt.Errorf("%s %d: job does not exist", RegisterEnforceIndexErrPrefix, jmi)
}
}
2016-08-17 00:50:14 +00:00
// Ensure that the job has permissions for the requested Vault tokens
2016-09-01 21:23:40 +00:00
policies := args.Job.VaultPolicies()
if len(policies) != 0 {
2016-08-17 00:50:14 +00:00
vconf := j.srv.config.VaultConfig
2016-10-11 01:04:39 +00:00
if !vconf.IsEnabled() {
2016-08-17 00:50:14 +00:00
return fmt.Errorf("Vault not enabled and Vault policies requested")
}
// Have to check if the user has permissions
2016-10-11 01:04:39 +00:00
if !vconf.AllowsUnauthenticated() {
2016-08-17 00:50:14 +00:00
if args.Job.VaultToken == "" {
return fmt.Errorf("Vault policies requested but missing Vault Token")
}
vault := j.srv.vault
2016-08-18 20:52:15 +00:00
s, err := vault.LookupToken(context.Background(), args.Job.VaultToken)
2016-08-17 00:50:14 +00:00
if err != nil {
return err
}
allowedPolicies, err := PoliciesFrom(s)
if err != nil {
return err
}
2016-09-01 19:05:08 +00:00
// If we are given a root token it can access all policies
if !lib.StrContains(allowedPolicies, "root") {
2016-09-01 21:23:40 +00:00
flatPolicies := structs.VaultPoliciesSet(policies)
subset, offending := structs.SliceStringIsSubset(allowedPolicies, flatPolicies)
2016-09-01 19:05:08 +00:00
if !subset {
return fmt.Errorf("Passed Vault Token doesn't allow access to the following policies: %s",
strings.Join(offending, ", "))
}
2016-08-17 00:50:14 +00:00
}
}
2016-09-01 21:23:40 +00:00
// Add implicit constraints that the task groups are run on a Node with
// Vault
for _, tg := range args.Job.TaskGroups {
_, ok := policies[tg.Name]
if !ok {
// Not requesting Vault
continue
}
found := false
for _, c := range tg.Constraints {
if c.Equal(vaultConstraint) {
found = true
break
}
}
if !found {
tg.Constraints = append(tg.Constraints, vaultConstraint)
}
}
2016-08-17 00:50:14 +00:00
}
// Clear the Vault token
args.Job.VaultToken = ""
// Commit this update via Raft
_, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args)
if err != nil {
j.srv.logger.Printf("[ERR] nomad.job: Register failed: %v", err)
return err
}
2015-12-01 19:40:40 +00:00
// Populate the reply with job information
reply.JobModifyIndex = index
// If the job is periodic, we don't create an eval.
if args.Job.IsPeriodic() {
return nil
}
// Create a new evaluation
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: args.Job.Priority,
Type: args.Job.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: args.Job.ID,
JobModifyIndex: index,
Status: structs.EvalStatusPending,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}
// Commit this evaluation via Raft
2015-08-16 01:03:05 +00:00
// XXX: There is a risk of partial failure where the JobRegister succeeds
// but that the EvalUpdate does not.
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
j.srv.logger.Printf("[ERR] nomad.job: Eval create failed: %v", err)
return err
}
2015-12-01 19:40:40 +00:00
// Populate the reply with eval information
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
return nil
}
// Summary retreives the summary of a job
func (j *Job) Summary(args *structs.JobSummaryRequest,
2016-07-21 21:43:21 +00:00
reply *structs.JobSummaryResponse) error {
if done, err := j.srv.forward("Job.Summary", args, args, reply); done {
2016-07-21 20:04:38 +00:00
return err
}
defer metrics.MeasureSince([]string{"nomad", "job_summary", "get_job_summary"}, time.Now())
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{JobSummary: args.JobID}),
run: func() error {
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
2016-07-21 21:43:21 +00:00
// Look for job summary
2016-07-21 20:04:38 +00:00
out, err := snap.JobSummaryByID(args.JobID)
if err != nil {
return err
}
// Setup the output
reply.JobSummary = out
if out != nil {
reply.Index = out.ModifyIndex
} else {
2016-07-22 06:13:07 +00:00
// Use the last index that affected the job_summary table
2016-07-21 20:04:38 +00:00
index, err := snap.Index("job_summary")
if err != nil {
return err
}
reply.Index = index
}
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return j.srv.blockingRPC(&opts)
}
// Evaluate is used to force a job for re-evaluation
func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegisterResponse) error {
if done, err := j.srv.forward("Job.Evaluate", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "evaluate"}, time.Now())
// Validate the arguments
if args.JobID == "" {
return fmt.Errorf("missing job ID for evaluation")
}
// Lookup the job
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
2015-09-07 03:56:38 +00:00
job, err := snap.JobByID(args.JobID)
if err != nil {
return err
}
if job == nil {
return fmt.Errorf("job not found")
}
2015-12-01 19:40:40 +00:00
if job.IsPeriodic() {
return fmt.Errorf("can't evaluate periodic job")
}
// Create a new evaluation
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job.Priority,
Type: job.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
JobModifyIndex: job.ModifyIndex,
Status: structs.EvalStatusPending,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}
// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
j.srv.logger.Printf("[ERR] nomad.job: Eval create failed: %v", err)
return err
}
// Setup the reply
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.JobModifyIndex = job.ModifyIndex
reply.Index = evalIndex
return nil
}
// Deregister is used to remove a job the cluster.
func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobDeregisterResponse) error {
if done, err := j.srv.forward("Job.Deregister", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "deregister"}, time.Now())
2015-12-01 19:40:40 +00:00
// Validate the arguments
if args.JobID == "" {
return fmt.Errorf("missing job ID for evaluation")
}
// Lookup the job
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
job, err := snap.JobByID(args.JobID)
if err != nil {
return err
}
// Commit this update via Raft
_, index, err := j.srv.raftApply(structs.JobDeregisterRequestType, args)
if err != nil {
j.srv.logger.Printf("[ERR] nomad.job: Deregister failed: %v", err)
return err
}
2015-12-01 19:40:40 +00:00
// Populate the reply with job information
reply.JobModifyIndex = index
// If the job is periodic, we don't create an eval.
if job != nil && job.IsPeriodic() {
2015-12-01 19:40:40 +00:00
return nil
}
// Create a new evaluation
// XXX: The job priority / type is strange for this, since it's not a high
// priority even if the job was. The scheduler itself also doesn't matter,
// since all should be able to handle deregistration in the same way.
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: structs.JobDefaultPriority,
Type: structs.JobTypeService,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: args.JobID,
JobModifyIndex: index,
Status: structs.EvalStatusPending,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}
// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
j.srv.logger.Printf("[ERR] nomad.job: Eval create failed: %v", err)
return err
}
2015-12-01 19:40:40 +00:00
// Populate the reply with eval information
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
return nil
}
// GetJob is used to request information about a specific job
func (j *Job) GetJob(args *structs.JobSpecificRequest,
reply *structs.SingleJobResponse) error {
if done, err := j.srv.forward("Job.GetJob", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "get_job"}, time.Now())
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{Job: args.JobID}),
run: func() error {
// Look for the job
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
out, err := snap.JobByID(args.JobID)
if err != nil {
return err
}
// Setup the output
2015-10-30 02:00:02 +00:00
reply.Job = out
if out != nil {
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the nodes table
index, err := snap.Index("jobs")
if err != nil {
return err
}
reply.Index = index
}
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return j.srv.blockingRPC(&opts)
}
2015-09-06 19:18:45 +00:00
// List is used to list the jobs registered in the system
func (j *Job) List(args *structs.JobListRequest,
reply *structs.JobListResponse) error {
if done, err := j.srv.forward("Job.List", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "list"}, time.Now())
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{Table: "jobs"}),
run: func() error {
// Capture all the jobs
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
var iter memdb.ResultIterator
if prefix := args.QueryOptions.Prefix; prefix != "" {
iter, err = snap.JobsByIDPrefix(prefix)
} else {
iter, err = snap.Jobs()
}
if err != nil {
return err
}
var jobs []*structs.JobListStub
for {
raw := iter.Next()
if raw == nil {
break
}
job := raw.(*structs.Job)
2016-07-22 06:13:07 +00:00
summary, err := snap.JobSummaryByID(job.ID)
if err != nil {
return fmt.Errorf("unable to look up summary for job: %v", job.ID)
}
2016-07-21 20:21:47 +00:00
jobs = append(jobs, job.Stub(summary))
}
reply.Jobs = jobs
// Use the last index that affected the jobs table
index, err := snap.Index("jobs")
if err != nil {
return err
}
reply.Index = index
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return j.srv.blockingRPC(&opts)
2015-09-06 19:18:45 +00:00
}
// Allocations is used to list the allocations for a job
func (j *Job) Allocations(args *structs.JobSpecificRequest,
reply *structs.JobAllocationsResponse) error {
if done, err := j.srv.forward("Job.Allocations", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "allocations"}, time.Now())
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{AllocJob: args.JobID}),
run: func() error {
// Capture the allocations
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
allocs, err := snap.AllocsByJob(args.JobID)
if err != nil {
return err
}
2015-09-06 19:18:45 +00:00
// Convert to stubs
if len(allocs) > 0 {
reply.Allocations = make([]*structs.AllocListStub, 0, len(allocs))
for _, alloc := range allocs {
reply.Allocations = append(reply.Allocations, alloc.Stub())
}
}
// Use the last index that affected the allocs table
index, err := snap.Index("allocs")
if err != nil {
return err
}
reply.Index = index
2015-09-06 19:24:25 +00:00
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return j.srv.blockingRPC(&opts)
2015-09-06 19:18:45 +00:00
}
// Evaluations is used to list the evaluations for a job
func (j *Job) Evaluations(args *structs.JobSpecificRequest,
reply *structs.JobEvaluationsResponse) error {
if done, err := j.srv.forward("Job.Evaluations", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "evaluations"}, time.Now())
// Capture the evaluations
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
reply.Evaluations, err = snap.EvalsByJob(args.JobID)
if err != nil {
return err
}
// Use the last index that affected the evals table
2015-09-07 03:56:38 +00:00
index, err := snap.Index("evals")
2015-09-06 19:18:45 +00:00
if err != nil {
return err
}
reply.Index = index
2015-09-06 19:24:25 +00:00
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
2015-09-06 19:18:45 +00:00
return nil
}
// Plan is used to cause a dry-run evaluation of the Job and return the results
// with a potential diff containing annotations.
func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse) error {
if done, err := j.srv.forward("Job.Plan", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "plan"}, time.Now())
// Validate the arguments
if args.Job == nil {
return fmt.Errorf("Job required for plan")
}
// Initialize the job fields (sets defaults and any necessary init work).
2016-07-20 23:07:15 +00:00
args.Job.Canonicalize()
// Validate the job.
if err := validateJob(args.Job); err != nil {
return err
}
// Acquire a snapshot of the state
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
// Get the original job
oldJob, err := snap.JobByID(args.Job.ID)
if err != nil {
return err
}
var index uint64
2016-06-08 23:48:02 +00:00
var updatedIndex uint64
if oldJob != nil {
2016-06-08 23:48:02 +00:00
index = oldJob.JobModifyIndex
updatedIndex = oldJob.JobModifyIndex + 1
}
// Insert the updated Job into the snapshot
2016-06-08 23:48:02 +00:00
snap.UpsertJob(updatedIndex, args.Job)
// Create an eval and mark it as requiring annotations and insert that as well
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: args.Job.Priority,
Type: args.Job.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: args.Job.ID,
2016-06-08 23:48:02 +00:00
JobModifyIndex: updatedIndex,
Status: structs.EvalStatusPending,
AnnotatePlan: true,
}
// Create an in-memory Planner that returns no errors and stores the
// submitted plan and created evals.
2016-05-16 19:49:18 +00:00
planner := &scheduler.Harness{
State: &snap.StateStore,
}
// Create the scheduler and run it
sched, err := scheduler.NewScheduler(eval.Type, j.srv.logger, snap, planner)
if err != nil {
return err
}
if err := sched.Process(eval); err != nil {
return err
}
// Annotate and store the diff
2016-05-16 19:49:18 +00:00
if plans := len(planner.Plans); plans != 1 {
return fmt.Errorf("scheduler resulted in an unexpected number of plans: %v", plans)
2016-05-16 19:49:18 +00:00
}
annotations := planner.Plans[0].Annotations
if args.Diff {
2016-05-11 22:36:28 +00:00
jobDiff, err := oldJob.Diff(args.Job, true)
if err != nil {
return fmt.Errorf("failed to create job diff: %v", err)
}
2016-05-12 18:29:38 +00:00
if err := scheduler.Annotate(jobDiff, annotations); err != nil {
2016-05-11 22:36:28 +00:00
return fmt.Errorf("failed to annotate job diff: %v", err)
}
reply.Diff = jobDiff
}
// Grab the failures
if len(planner.Evals) != 1 {
return fmt.Errorf("scheduler resulted in an unexpected number of eval updates: %v", planner.Evals)
}
updatedEval := planner.Evals[0]
// If it is a periodic job calculate the next launch
if args.Job.IsPeriodic() && args.Job.Periodic.Enabled {
reply.NextPeriodicLaunch = args.Job.Periodic.Next(time.Now().UTC())
}
reply.FailedTGAllocs = updatedEval.FailedTGAllocs
2016-05-16 18:48:44 +00:00
reply.JobModifyIndex = index
2016-05-12 18:29:38 +00:00
reply.Annotations = annotations
2016-05-16 19:49:18 +00:00
reply.CreatedEvals = planner.CreateEvals
2016-05-12 01:51:48 +00:00
reply.Index = index
return nil
}
// validateJob validates a Job and task drivers and returns an error if there is
// a validation problem or if the Job is of a type a user is not allowed to
// submit.
func validateJob(job *structs.Job) error {
validationErrors := new(multierror.Error)
if err := job.Validate(); err != nil {
multierror.Append(validationErrors, err)
}
// Validate the driver configurations.
for _, tg := range job.TaskGroups {
for _, task := range tg.Tasks {
d, err := driver.NewDriver(
task.Driver,
driver.NewEmptyDriverContext(),
)
if err != nil {
msg := "failed to create driver for task %q in group %q for validation: %v"
multierror.Append(validationErrors, fmt.Errorf(msg, tg.Name, task.Name, err))
continue
}
if err := d.Validate(task.Config); err != nil {
formatted := fmt.Errorf("group %q -> task %q -> config: %v", tg.Name, task.Name, err)
multierror.Append(validationErrors, formatted)
}
}
}
if job.Type == structs.JobTypeCore {
multierror.Append(validationErrors, fmt.Errorf("job type cannot be core"))
}
return validationErrors.ErrorOrNil()
}