open-nomad/nomad/job_endpoint.go

1529 lines
45 KiB
Go
Raw Normal View History

package nomad
import (
2016-08-18 20:52:15 +00:00
"context"
"fmt"
2017-07-01 00:23:34 +00:00
"sort"
2016-08-17 00:50:14 +00:00
"strings"
"time"
2019-01-15 19:46:12 +00:00
metrics "github.com/armon/go-metrics"
2018-09-15 23:23:13 +00:00
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
2019-01-15 19:46:12 +00:00
multierror "github.com/hashicorp/go-multierror"
2018-09-15 23:23:13 +00:00
"github.com/golang/snappy"
2016-09-01 19:05:08 +00:00
"github.com/hashicorp/consul/lib"
2017-08-21 04:31:45 +00:00
"github.com/hashicorp/nomad/acl"
2017-01-18 23:55:14 +00:00
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
2017-02-08 04:31:23 +00:00
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
"github.com/pkg/errors"
)
2016-06-08 23:48:02 +00:00
const (
// RegisterEnforceIndexErrPrefix is the prefix to use in errors caused by
// enforcing the job modify index during registers.
RegisterEnforceIndexErrPrefix = "Enforcing job modify index"
2016-11-26 02:04:55 +00:00
2016-12-14 20:50:08 +00:00
// DispatchPayloadSizeLimit is the maximum size of the uncompressed input
2016-11-26 02:04:55 +00:00
// data payload.
2016-12-14 20:50:08 +00:00
DispatchPayloadSizeLimit = 16 * 1024
2016-06-08 23:48:02 +00:00
)
2016-09-01 21:23:40 +00:00
var (
// allowRescheduleTransition is the transition that allows failed
// allocations to be force rescheduled. We create a one off
// variable to avoid creating a new object for every request.
allowForceRescheduleTransition = &structs.DesiredTransition{
ForceReschedule: helper.BoolToPtr(true),
}
2016-09-01 21:23:40 +00:00
)
// Job endpoint is used for job interactions
type Job struct {
2018-09-15 23:23:13 +00:00
srv *Server
logger log.Logger
// builtin admission controllers
mutators []jobMutator
validators []jobValidator
}
// NewJobEndpoints creates a new job endpoint with builtin admission controllers
func NewJobEndpoints(s *Server) *Job {
return &Job{
srv: s,
logger: s.logger.Named("job"),
mutators: []jobMutator{
jobCanonicalizer{},
jobConnectHook{},
jobImpliedConstraints{},
},
validators: []jobValidator{
jobConnectHook{},
jobValidate{},
},
}
}
// Register is used to upsert a job for scheduling
func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegisterResponse) error {
if done, err := j.srv.forward("Job.Register", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "register"}, time.Now())
// Validate the arguments
if args.Job == nil {
return fmt.Errorf("missing job for registration")
}
// defensive check; http layer and RPC requester should ensure namespaces are set consistently
if args.RequestNamespace() != args.Job.Namespace {
return fmt.Errorf("mismatched request namespace in request: %q, %q", args.RequestNamespace(), args.Job.Namespace)
}
// Run admission controllers
job, warnings, err := j.admissionControllers(args.Job)
if err != nil {
return err
}
args.Job = job
// Set the warning message
reply.Warnings = structs.MergeMultierrorWarnings(warnings...)
2017-08-21 04:31:45 +00:00
// Check job submission permissions
2017-10-12 22:16:33 +00:00
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
2017-08-21 04:31:45 +00:00
return err
2017-09-19 14:47:10 +00:00
} else if aclObj != nil {
if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilitySubmitJob) {
2017-09-19 14:47:10 +00:00
return structs.ErrPermissionDenied
}
// Validate Volume Permissions
for _, tg := range args.Job.TaskGroups {
for _, vol := range tg.Volumes {
if vol.Type != structs.VolumeTypeHost {
return structs.ErrPermissionDenied
}
// If a volume is readonly, then we allow access if the user has ReadOnly
// or ReadWrite access to the volume. Otherwise we only allow access if
// they have ReadWrite access.
if vol.ReadOnly {
config: Hoist volume.config.source into volume Currently, using a Volume in a job uses the following configuration: ``` volume "alias-name" { type = "volume-type" read_only = true config { source = "host_volume_name" } } ``` This commit migrates to the following: ``` volume "alias-name" { type = "volume-type" source = "host_volume_name" read_only = true } ``` The original design was based due to being uncertain about the future of storage plugins, and to allow maxium flexibility. However, this causes a few issues, namely: - We frequently need to parse this configuration during submission, scheduling, and mounting - It complicates the configuration from and end users perspective - It complicates the ability to do validation As we understand the problem space of CSI a little more, it has become clear that we won't need the `source` to be in config, as it will be used in the majority of cases: - Host Volumes: Always need a source - Preallocated CSI Volumes: Always needs a source from a volume or claim name - Dynamic Persistent CSI Volumes*: Always needs a source to attach the volumes to for managing upgrades and to avoid dangling. - Dynamic Ephemeral CSI Volumes*: Less thought out, but `source` will probably point to the plugin name, and a `config` block will allow you to pass meta to the plugin. Or will point to a pre-configured ephemeral config. *If implemented The new design simplifies this by merging the source into the volume stanza to solve the above issues with usability, performance, and error handling.
2019-09-13 02:09:58 +00:00
if !aclObj.AllowHostVolumeOperation(vol.Source, acl.HostVolumeCapabilityMountReadOnly) &&
!aclObj.AllowHostVolumeOperation(vol.Source, acl.HostVolumeCapabilityMountReadWrite) {
return structs.ErrPermissionDenied
}
} else {
config: Hoist volume.config.source into volume Currently, using a Volume in a job uses the following configuration: ``` volume "alias-name" { type = "volume-type" read_only = true config { source = "host_volume_name" } } ``` This commit migrates to the following: ``` volume "alias-name" { type = "volume-type" source = "host_volume_name" read_only = true } ``` The original design was based due to being uncertain about the future of storage plugins, and to allow maxium flexibility. However, this causes a few issues, namely: - We frequently need to parse this configuration during submission, scheduling, and mounting - It complicates the configuration from and end users perspective - It complicates the ability to do validation As we understand the problem space of CSI a little more, it has become clear that we won't need the `source` to be in config, as it will be used in the majority of cases: - Host Volumes: Always need a source - Preallocated CSI Volumes: Always needs a source from a volume or claim name - Dynamic Persistent CSI Volumes*: Always needs a source to attach the volumes to for managing upgrades and to avoid dangling. - Dynamic Ephemeral CSI Volumes*: Less thought out, but `source` will probably point to the plugin name, and a `config` block will allow you to pass meta to the plugin. Or will point to a pre-configured ephemeral config. *If implemented The new design simplifies this by merging the source into the volume stanza to solve the above issues with usability, performance, and error handling.
2019-09-13 02:09:58 +00:00
if !aclObj.AllowHostVolumeOperation(vol.Source, acl.HostVolumeCapabilityMountReadWrite) {
return structs.ErrPermissionDenied
}
}
}
for _, t := range tg.Tasks {
for _, vm := range t.VolumeMounts {
vol := tg.Volumes[vm.Volume]
if vm.PropagationMode == structs.VolumeMountPropagationBidirectional &&
!aclObj.AllowHostVolumeOperation(vol.Source, acl.HostVolumeCapabilityMountReadWrite) {
return structs.ErrPermissionDenied
}
}
}
}
2017-09-19 14:47:10 +00:00
// Check if override is set and we do not have permissions
if args.PolicyOverride {
if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilitySentinelOverride) {
2018-09-15 23:23:13 +00:00
j.logger.Warn("policy override attempted without permissions for job", "job", args.Job.ID)
2017-09-19 14:47:10 +00:00
return structs.ErrPermissionDenied
}
2018-09-15 23:23:13 +00:00
j.logger.Warn("policy override set for job", "job", args.Job.ID)
2017-09-19 14:47:10 +00:00
}
2017-08-21 04:31:45 +00:00
}
// Lookup the job
2017-09-19 14:47:10 +00:00
snap, err := j.srv.State().Snapshot()
if err != nil {
return err
}
ws := memdb.NewWatchSet()
2017-09-07 23:56:15 +00:00
existingJob, err := snap.JobByID(ws, args.RequestNamespace(), args.Job.ID)
if err != nil {
return err
}
// If EnforceIndex set, check it before trying to apply
2016-06-08 23:48:02 +00:00
if args.EnforceIndex {
jmi := args.JobModifyIndex
if existingJob != nil {
2016-06-08 23:48:02 +00:00
if jmi == 0 {
return fmt.Errorf("%s 0: job already exists", RegisterEnforceIndexErrPrefix)
} else if jmi != existingJob.JobModifyIndex {
2016-06-08 23:48:02 +00:00
return fmt.Errorf("%s %d: job exists with conflicting job modify index: %d",
RegisterEnforceIndexErrPrefix, jmi, existingJob.JobModifyIndex)
2016-06-08 23:48:02 +00:00
}
} else if jmi != 0 {
return fmt.Errorf("%s %d: job does not exist", RegisterEnforceIndexErrPrefix, jmi)
}
}
// Validate job transitions if its an update
if err := validateJobUpdate(existingJob, args.Job); err != nil {
return err
}
2016-08-17 00:50:14 +00:00
// Ensure that the job has permissions for the requested Vault tokens
2016-09-01 21:23:40 +00:00
policies := args.Job.VaultPolicies()
if len(policies) != 0 {
2016-08-17 00:50:14 +00:00
vconf := j.srv.config.VaultConfig
2016-10-11 01:04:39 +00:00
if !vconf.IsEnabled() {
2016-08-17 00:50:14 +00:00
return fmt.Errorf("Vault not enabled and Vault policies requested")
}
// Have to check if the user has permissions
2016-10-11 01:04:39 +00:00
if !vconf.AllowsUnauthenticated() {
2016-08-17 00:50:14 +00:00
if args.Job.VaultToken == "" {
return fmt.Errorf("Vault policies requested but missing Vault Token")
}
vault := j.srv.vault
2016-08-18 20:52:15 +00:00
s, err := vault.LookupToken(context.Background(), args.Job.VaultToken)
2016-08-17 00:50:14 +00:00
if err != nil {
return err
}
allowedPolicies, err := PoliciesFrom(s)
if err != nil {
return err
}
2016-09-01 19:05:08 +00:00
// If we are given a root token it can access all policies
if !lib.StrContains(allowedPolicies, "root") {
2016-09-01 21:23:40 +00:00
flatPolicies := structs.VaultPoliciesSet(policies)
2017-01-18 23:55:14 +00:00
subset, offending := helper.SliceStringIsSubset(allowedPolicies, flatPolicies)
2016-09-01 19:05:08 +00:00
if !subset {
return fmt.Errorf("Passed Vault Token doesn't allow access to the following policies: %s",
strings.Join(offending, ", "))
}
2016-08-17 00:50:14 +00:00
}
}
}
// helper function that checks if the "operator token" supplied with the
// job has sufficient ACL permissions for establishing consul connect services
checkOperatorToken := func(task string) error {
if j.srv.config.ConsulConfig.AllowsUnauthenticated() {
// if consul.allow_unauthenticated is enabled (which is the default)
// just let the Job through without checking anything.
return nil
}
proxiedTask := strings.TrimPrefix(task, structs.ConnectProxyPrefix+"-")
ctx := context.Background()
if err := j.srv.consulACLs.CheckSIPolicy(ctx, proxiedTask, args.Job.ConsulToken); err != nil {
// not much in the way of exported error types, we could parse
// the content, but all errors are going to be failures anyway
return errors.Wrap(err, "operator token denied")
}
return nil
}
// Enforce that the operator has necessary Consul ACL permissions
for _, tg := range args.Job.ConnectTasks() {
for _, task := range tg {
if err := checkOperatorToken(task); err != nil {
return err
}
}
}
2017-09-19 14:47:10 +00:00
// Enforce Sentinel policies
policyWarnings, err := j.enforceSubmitJob(args.PolicyOverride, args.Job)
if err != nil {
return err
}
if policyWarnings != nil {
warnings = append(warnings, policyWarnings)
reply.Warnings = structs.MergeMultierrorWarnings(warnings...)
2017-09-19 14:47:10 +00:00
}
2016-08-17 00:50:14 +00:00
// Clear the Vault token
args.Job.VaultToken = ""
// Check if the job has changed at all
2017-07-01 00:23:34 +00:00
if existingJob == nil || existingJob.SpecChanged(args.Job) {
2017-06-30 02:08:25 +00:00
// Set the submit time
args.Job.SetSubmitTime()
// Commit this update via Raft
2017-09-19 14:47:10 +00:00
fsmErr, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args)
if err, ok := fsmErr.(error); ok && err != nil {
2018-09-15 23:23:13 +00:00
j.logger.Error("registering job failed", "error", err, "fsm", true)
2017-09-19 14:47:10 +00:00
return err
}
if err != nil {
2018-09-15 23:23:13 +00:00
j.logger.Error("registering job failed", "error", err, "raft", true)
return err
}
// Populate the reply with job information
reply.JobModifyIndex = index
} else {
2017-07-01 00:23:34 +00:00
reply.JobModifyIndex = existingJob.JobModifyIndex
}
2015-12-01 19:40:40 +00:00
// If the job is periodic or parameterized, we don't create an eval.
if args.Job.IsPeriodic() || args.Job.IsParameterized() {
2015-12-01 19:40:40 +00:00
return nil
}
// Create a new evaluation
now := time.Now().UTC().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
2017-09-07 23:56:15 +00:00
Namespace: args.RequestNamespace(),
Priority: args.Job.Priority,
Type: args.Job.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: args.Job.ID,
JobModifyIndex: reply.JobModifyIndex,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}
// Commit this evaluation via Raft
2015-08-16 01:03:05 +00:00
// XXX: There is a risk of partial failure where the JobRegister succeeds
// but that the EvalUpdate does not.
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
2018-09-15 23:23:13 +00:00
j.logger.Error("eval create failed", "error", err, "method", "register")
return err
}
2015-12-01 19:40:40 +00:00
// Populate the reply with eval information
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
return nil
}
// getSignalConstraint builds a suitable constraint based on the required
// signals
func getSignalConstraint(signals []string) *structs.Constraint {
sort.Strings(signals)
return &structs.Constraint{
Operand: structs.ConstraintSetContains,
LTarget: "${attr.os.signals}",
RTarget: strings.Join(signals, ","),
}
}
2017-12-13 17:36:03 +00:00
// Summary retrieves the summary of a job
func (j *Job) Summary(args *structs.JobSummaryRequest,
2016-07-21 21:43:21 +00:00
reply *structs.JobSummaryResponse) error {
if done, err := j.srv.forward("Job.Summary", args, args, reply); done {
2016-07-21 20:04:38 +00:00
return err
}
defer metrics.MeasureSince([]string{"nomad", "job_summary", "get_job_summary"}, time.Now())
2017-09-14 14:52:50 +00:00
// Check for read-job permissions
2017-10-12 22:16:33 +00:00
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
2017-09-14 14:52:50 +00:00
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) {
return structs.ErrPermissionDenied
}
2016-07-21 20:04:38 +00:00
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
2017-02-08 04:31:23 +00:00
run: func(ws memdb.WatchSet, state *state.StateStore) error {
2016-07-21 21:43:21 +00:00
// Look for job summary
2017-09-07 23:56:15 +00:00
out, err := state.JobSummaryByID(ws, args.RequestNamespace(), args.JobID)
2016-07-21 20:04:38 +00:00
if err != nil {
return err
}
// Setup the output
reply.JobSummary = out
if out != nil {
reply.Index = out.ModifyIndex
} else {
2016-07-22 06:13:07 +00:00
// Use the last index that affected the job_summary table
2017-02-08 04:31:23 +00:00
index, err := state.Index("job_summary")
2016-07-21 20:04:38 +00:00
if err != nil {
return err
}
reply.Index = index
}
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return j.srv.blockingRPC(&opts)
}
2017-02-06 19:48:28 +00:00
// Validate validates a job
2017-04-18 20:09:24 +00:00
func (j *Job) Validate(args *structs.JobValidateRequest, reply *structs.JobValidateResponse) error {
defer metrics.MeasureSince([]string{"nomad", "job", "validate"}, time.Now())
2017-02-06 19:48:28 +00:00
// defensive check; http layer and RPC requester should ensure namespaces are set consistently
if args.RequestNamespace() != args.Job.Namespace {
return fmt.Errorf("mismatched request namespace in request: %q, %q", args.RequestNamespace(), args.Job.Namespace)
}
job, mutateWarnings, err := j.admissionMutators(args.Job)
if err != nil {
return err
}
args.Job = job
2017-09-25 17:30:31 +00:00
// Check for read-job permissions
2017-10-12 22:16:33 +00:00
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
2017-09-25 17:30:31 +00:00
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) {
return structs.ErrPermissionDenied
}
// Validate the job and capture any warnings
validateWarnings, err := j.admissionValidators(args.Job)
if err != nil {
2017-02-06 19:48:28 +00:00
if merr, ok := err.(*multierror.Error); ok {
for _, err := range merr.Errors {
reply.ValidationErrors = append(reply.ValidationErrors, err.Error())
}
reply.Error = merr.Error()
2017-02-06 19:48:28 +00:00
} else {
reply.ValidationErrors = append(reply.ValidationErrors, err.Error())
reply.Error = err.Error()
2017-02-06 19:48:28 +00:00
}
}
2017-04-18 20:09:24 +00:00
validateWarnings = append(validateWarnings, mutateWarnings...)
// Set the warning message
reply.Warnings = structs.MergeMultierrorWarnings(validateWarnings...)
2017-02-06 19:48:28 +00:00
reply.DriverConfigValidated = true
return nil
}
2017-04-18 22:11:33 +00:00
// Revert is used to revert the job to a prior version
func (j *Job) Revert(args *structs.JobRevertRequest, reply *structs.JobRegisterResponse) error {
if done, err := j.srv.forward("Job.Revert", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "revert"}, time.Now())
2017-09-25 21:36:22 +00:00
// Check for submit-job permissions
2017-10-12 22:16:33 +00:00
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
2017-09-25 21:36:22 +00:00
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilitySubmitJob) {
return structs.ErrPermissionDenied
}
2017-04-18 22:11:33 +00:00
// Validate the arguments
if args.JobID == "" {
2017-07-06 19:49:13 +00:00
return fmt.Errorf("missing job ID for revert")
2017-04-18 22:11:33 +00:00
}
// Lookup the job by version
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
ws := memdb.NewWatchSet()
2017-09-07 23:56:15 +00:00
cur, err := snap.JobByID(ws, args.RequestNamespace(), args.JobID)
2017-04-19 20:28:29 +00:00
if err != nil {
return err
}
if cur == nil {
return fmt.Errorf("job %q not found", args.JobID)
}
if args.JobVersion == cur.Version {
return fmt.Errorf("can't revert to current version")
}
2017-09-07 23:56:15 +00:00
jobV, err := snap.JobByIDAndVersion(ws, args.RequestNamespace(), args.JobID, args.JobVersion)
2017-04-27 17:51:28 +00:00
if err != nil {
return err
}
if jobV == nil {
2017-09-07 23:56:15 +00:00
return fmt.Errorf("job %q in namespace %q at version %d not found", args.JobID, args.RequestNamespace(), args.JobVersion)
2017-04-27 17:51:28 +00:00
}
2017-04-18 22:11:33 +00:00
// Build the register request
revJob := jobV.Copy()
// Use Vault Token from revert request to perform registration of reverted job.
revJob.VaultToken = args.VaultToken
2017-04-18 22:11:33 +00:00
reg := &structs.JobRegisterRequest{
Job: revJob,
2017-04-18 22:11:33 +00:00
WriteRequest: args.WriteRequest,
}
// If the request is enforcing the existing version do a check.
if args.EnforcePriorVersion != nil {
if cur.Version != *args.EnforcePriorVersion {
return fmt.Errorf("Current job has version %d; enforcing version %d", cur.Version, *args.EnforcePriorVersion)
}
reg.EnforceIndex = true
reg.JobModifyIndex = cur.JobModifyIndex
}
// Register the version.
return j.Register(reg, reply)
}
2017-07-06 19:49:13 +00:00
// Stable is used to mark the job version as stable
func (j *Job) Stable(args *structs.JobStabilityRequest, reply *structs.JobStabilityResponse) error {
if done, err := j.srv.forward("Job.Stable", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "stable"}, time.Now())
2017-09-25 22:17:58 +00:00
// Check for read-job permissions
2017-10-12 22:16:33 +00:00
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
2017-09-25 22:17:58 +00:00
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilitySubmitJob) {
return structs.ErrPermissionDenied
}
2017-07-06 19:49:13 +00:00
// Validate the arguments
if args.JobID == "" {
return fmt.Errorf("missing job ID for marking job as stable")
}
// Lookup the job by version
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
ws := memdb.NewWatchSet()
2017-09-07 23:56:15 +00:00
jobV, err := snap.JobByIDAndVersion(ws, args.RequestNamespace(), args.JobID, args.JobVersion)
2017-07-06 19:49:13 +00:00
if err != nil {
return err
}
if jobV == nil {
2017-09-07 23:56:15 +00:00
return fmt.Errorf("job %q in namespace %q at version %d not found", args.JobID, args.RequestNamespace(), args.JobVersion)
2017-07-06 19:49:13 +00:00
}
2017-07-06 22:19:07 +00:00
// Commit this stability request via Raft
2017-07-06 19:49:13 +00:00
_, modifyIndex, err := j.srv.raftApply(structs.JobStabilityRequestType, args)
if err != nil {
2018-09-15 23:23:13 +00:00
j.logger.Error("submitting job stability request failed", "error", err)
2017-07-06 19:49:13 +00:00
return err
}
// Setup the reply
reply.Index = modifyIndex
return nil
}
// Evaluate is used to force a job for re-evaluation
func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegisterResponse) error {
if done, err := j.srv.forward("Job.Evaluate", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "evaluate"}, time.Now())
// Check for read-job permissions
2017-10-12 22:16:33 +00:00
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) {
return structs.ErrPermissionDenied
}
// Validate the arguments
if args.JobID == "" {
return fmt.Errorf("missing job ID for evaluation")
}
// Lookup the job
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
2017-02-08 04:31:23 +00:00
ws := memdb.NewWatchSet()
2017-09-07 23:56:15 +00:00
job, err := snap.JobByID(ws, args.RequestNamespace(), args.JobID)
if err != nil {
return err
}
if job == nil {
return fmt.Errorf("job not found")
}
2015-12-01 19:40:40 +00:00
if job.IsPeriodic() {
return fmt.Errorf("can't evaluate periodic job")
} else if job.IsParameterized() {
return fmt.Errorf("can't evaluate parameterized job")
2015-12-01 19:40:40 +00:00
}
forceRescheduleAllocs := make(map[string]*structs.DesiredTransition)
if args.EvalOptions.ForceReschedule {
// Find any failed allocs that could be force rescheduled
allocs, err := snap.AllocsByJob(ws, args.RequestNamespace(), args.JobID, false)
if err != nil {
return err
}
for _, alloc := range allocs {
taskGroup := job.LookupTaskGroup(alloc.TaskGroup)
// Forcing rescheduling is only allowed if task group has rescheduling enabled
2018-05-10 19:42:24 +00:00
if taskGroup == nil || !taskGroup.ReschedulePolicy.Enabled() {
continue
}
2018-05-10 19:42:24 +00:00
if alloc.NextAllocation == "" && alloc.ClientStatus == structs.AllocClientStatusFailed && !alloc.DesiredTransition.ShouldForceReschedule() {
forceRescheduleAllocs[alloc.ID] = allowForceRescheduleTransition
}
}
}
// Create a new evaluation
now := time.Now().UTC().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
2017-09-07 23:56:15 +00:00
Namespace: args.RequestNamespace(),
Priority: job.Priority,
Type: job.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
JobModifyIndex: job.ModifyIndex,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
// Create a AllocUpdateDesiredTransitionRequest request with the eval and any forced rescheduled allocs
updateTransitionReq := &structs.AllocUpdateDesiredTransitionRequest{
Allocs: forceRescheduleAllocs,
Evals: []*structs.Evaluation{eval},
}
_, evalIndex, err := j.srv.raftApply(structs.AllocUpdateDesiredTransitionRequestType, updateTransitionReq)
if err != nil {
2018-09-15 23:23:13 +00:00
j.logger.Error("eval create failed", "error", err, "method", "evaluate")
return err
}
// Setup the reply
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.JobModifyIndex = job.ModifyIndex
reply.Index = evalIndex
return nil
}
// Deregister is used to remove a job the cluster.
func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobDeregisterResponse) error {
if done, err := j.srv.forward("Job.Deregister", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "deregister"}, time.Now())
2017-09-27 19:19:14 +00:00
// Check for submit-job permissions
2017-10-12 22:16:33 +00:00
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
2017-09-27 19:19:14 +00:00
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilitySubmitJob) {
return structs.ErrPermissionDenied
}
2015-12-01 19:40:40 +00:00
// Validate the arguments
if args.JobID == "" {
2017-07-06 19:49:13 +00:00
return fmt.Errorf("missing job ID for deregistering")
2015-12-01 19:40:40 +00:00
}
// Lookup the job
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
2017-02-08 04:31:23 +00:00
ws := memdb.NewWatchSet()
2017-09-07 23:56:15 +00:00
job, err := snap.JobByID(ws, args.RequestNamespace(), args.JobID)
2015-12-01 19:40:40 +00:00
if err != nil {
return err
}
// Commit this update via Raft
_, index, err := j.srv.raftApply(structs.JobDeregisterRequestType, args)
if err != nil {
2018-09-15 23:23:13 +00:00
j.logger.Error("deregister failed", "error", err)
return err
}
2015-12-01 19:40:40 +00:00
// Populate the reply with job information
reply.JobModifyIndex = index
// If the job is periodic or parameterized, we don't create an eval.
if job != nil && (job.IsPeriodic() || job.IsParameterized()) {
2015-12-01 19:40:40 +00:00
return nil
}
// Create a new evaluation
// XXX: The job priority / type is strange for this, since it's not a high
2018-03-14 22:32:18 +00:00
// priority even if the job was.
now := time.Now().UTC().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
2017-09-07 23:56:15 +00:00
Namespace: args.RequestNamespace(),
Priority: structs.JobDefaultPriority,
Type: structs.JobTypeService,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: args.JobID,
JobModifyIndex: index,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}
// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
2018-09-15 23:23:13 +00:00
j.logger.Error("eval create failed", "error", err, "method", "deregister")
return err
}
2015-12-01 19:40:40 +00:00
// Populate the reply with eval information
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
return nil
}
2018-03-14 22:32:18 +00:00
// BatchDeregister is used to remove a set of jobs from the cluster.
func (j *Job) BatchDeregister(args *structs.JobBatchDeregisterRequest, reply *structs.JobBatchDeregisterResponse) error {
if done, err := j.srv.forward("Job.BatchDeregister", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "batch_deregister"}, time.Now())
// Resolve the ACL token
aclObj, err := j.srv.ResolveToken(args.AuthToken)
if err != nil {
return err
}
// Validate the arguments
if len(args.Jobs) == 0 {
return fmt.Errorf("given no jobs to deregister")
}
if len(args.Evals) != 0 {
return fmt.Errorf("evaluations should not be populated")
}
// Loop through checking for permissions
for jobNS := range args.Jobs {
// Check for submit-job permissions
if aclObj != nil && !aclObj.AllowNsOp(jobNS.Namespace, acl.NamespaceCapabilitySubmitJob) {
return structs.ErrPermissionDenied
}
}
2018-03-16 17:52:19 +00:00
// Grab a snapshot
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
2018-03-14 22:32:18 +00:00
// Loop through to create evals
for jobNS, options := range args.Jobs {
if options == nil {
return fmt.Errorf("no deregister options provided for %v", jobNS)
}
job, err := snap.JobByID(nil, jobNS.Namespace, jobNS.ID)
if err != nil {
return err
}
// If the job is periodic or parameterized, we don't create an eval.
if job != nil && (job.IsPeriodic() || job.IsParameterized()) {
continue
}
priority := structs.JobDefaultPriority
jtype := structs.JobTypeService
if job != nil {
priority = job.Priority
jtype = job.Type
}
// Create a new evaluation
now := time.Now().UTC().UnixNano()
2018-03-14 22:32:18 +00:00
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: jobNS.Namespace,
Priority: priority,
Type: jtype,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: jobNS.ID,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
2018-03-14 22:32:18 +00:00
}
args.Evals = append(args.Evals, eval)
}
// Commit this update via Raft
_, index, err := j.srv.raftApply(structs.JobBatchDeregisterRequestType, args)
if err != nil {
2018-09-15 23:23:13 +00:00
j.logger.Error("batch deregister failed", "error", err)
2018-03-14 22:32:18 +00:00
return err
}
reply.Index = index
return nil
}
// GetJob is used to request information about a specific job
func (j *Job) GetJob(args *structs.JobSpecificRequest,
reply *structs.SingleJobResponse) error {
if done, err := j.srv.forward("Job.GetJob", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "get_job"}, time.Now())
2017-09-26 17:38:03 +00:00
// Check for read-job permissions
2017-10-12 22:16:33 +00:00
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
2017-09-26 17:38:03 +00:00
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) {
return structs.ErrPermissionDenied
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
2017-02-08 04:31:23 +00:00
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Look for the job
2017-09-07 23:56:15 +00:00
out, err := state.JobByID(ws, args.RequestNamespace(), args.JobID)
if err != nil {
return err
}
// Setup the output
2015-10-30 02:00:02 +00:00
reply.Job = out
if out != nil {
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the nodes table
2017-02-08 04:31:23 +00:00
index, err := state.Index("jobs")
if err != nil {
return err
}
reply.Index = index
}
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return j.srv.blockingRPC(&opts)
}
2015-09-06 19:18:45 +00:00
2017-04-13 22:47:59 +00:00
// GetJobVersions is used to retrieve all tracked versions of a job.
func (j *Job) GetJobVersions(args *structs.JobVersionsRequest,
2017-04-13 22:47:59 +00:00
reply *structs.JobVersionsResponse) error {
if done, err := j.srv.forward("Job.GetJobVersions", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "get_job_versions"}, time.Now())
// Check for read-job permissions
2017-10-12 22:16:33 +00:00
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) {
return structs.ErrPermissionDenied
}
2017-04-13 22:47:59 +00:00
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Look for the job
2017-09-07 23:56:15 +00:00
out, err := state.JobVersionsByID(ws, args.RequestNamespace(), args.JobID)
2017-04-13 22:47:59 +00:00
if err != nil {
return err
}
// Setup the output
reply.Versions = out
if len(out) != 0 {
reply.Index = out[0].ModifyIndex
// Compute the diffs
2017-06-30 01:42:37 +00:00
if args.Diffs {
for i := 0; i < len(out)-1; i++ {
old, new := out[i+1], out[i]
d, err := old.Diff(new, true)
if err != nil {
return fmt.Errorf("failed to create job diff: %v", err)
}
reply.Diffs = append(reply.Diffs, d)
}
}
2017-04-13 22:47:59 +00:00
} else {
// Use the last index that affected the nodes table
index, err := state.Index("job_version")
2017-04-13 22:47:59 +00:00
if err != nil {
return err
}
reply.Index = index
}
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return j.srv.blockingRPC(&opts)
}
2015-09-06 19:18:45 +00:00
// List is used to list the jobs registered in the system
func (j *Job) List(args *structs.JobListRequest,
reply *structs.JobListResponse) error {
if done, err := j.srv.forward("Job.List", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "list"}, time.Now())
2017-09-14 22:46:00 +00:00
// Check for list-job permissions
2017-10-12 22:16:33 +00:00
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
2017-09-14 22:46:00 +00:00
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityListJobs) {
return structs.ErrPermissionDenied
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
2017-02-08 04:31:23 +00:00
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Capture all the jobs
2017-02-08 04:31:23 +00:00
var err error
var iter memdb.ResultIterator
if prefix := args.QueryOptions.Prefix; prefix != "" {
2017-09-07 23:56:15 +00:00
iter, err = state.JobsByIDPrefix(ws, args.RequestNamespace(), prefix)
} else {
2017-09-07 23:56:15 +00:00
iter, err = state.JobsByNamespace(ws, args.RequestNamespace())
}
if err != nil {
return err
}
var jobs []*structs.JobListStub
for {
raw := iter.Next()
if raw == nil {
break
}
job := raw.(*structs.Job)
2017-09-07 23:56:15 +00:00
summary, err := state.JobSummaryByID(ws, args.RequestNamespace(), job.ID)
2016-07-22 06:13:07 +00:00
if err != nil {
return fmt.Errorf("unable to look up summary for job: %v", job.ID)
}
2016-07-21 20:21:47 +00:00
jobs = append(jobs, job.Stub(summary))
}
reply.Jobs = jobs
2018-03-15 17:22:03 +00:00
// Use the last index that affected the jobs table or summary
jindex, err := state.Index("jobs")
if err != nil {
return err
}
2018-03-15 17:22:03 +00:00
sindex, err := state.Index("job_summary")
if err != nil {
return err
}
reply.Index = helper.Uint64Max(jindex, sindex)
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return j.srv.blockingRPC(&opts)
2015-09-06 19:18:45 +00:00
}
// Allocations is used to list the allocations for a job
func (j *Job) Allocations(args *structs.JobSpecificRequest,
reply *structs.JobAllocationsResponse) error {
if done, err := j.srv.forward("Job.Allocations", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "allocations"}, time.Now())
2017-09-26 18:01:23 +00:00
// Check for read-job permissions
2017-10-12 22:16:33 +00:00
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
2017-09-26 18:01:23 +00:00
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) {
return structs.ErrPermissionDenied
}
// Ensure JobID is set otherwise everything works and never returns
// allocations which can hide bugs in request code.
if args.JobID == "" {
return fmt.Errorf("missing job ID")
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
2017-02-08 04:31:23 +00:00
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Capture the allocations
allocs, err := state.AllocsByJob(ws, args.RequestNamespace(), args.JobID, args.All)
if err != nil {
return err
}
2015-09-06 19:18:45 +00:00
// Convert to stubs
if len(allocs) > 0 {
reply.Allocations = make([]*structs.AllocListStub, 0, len(allocs))
for _, alloc := range allocs {
reply.Allocations = append(reply.Allocations, alloc.Stub())
}
}
// Use the last index that affected the allocs table
2017-02-08 04:31:23 +00:00
index, err := state.Index("allocs")
if err != nil {
return err
}
reply.Index = index
2015-09-06 19:24:25 +00:00
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return j.srv.blockingRPC(&opts)
2015-09-06 19:18:45 +00:00
}
// Evaluations is used to list the evaluations for a job
func (j *Job) Evaluations(args *structs.JobSpecificRequest,
reply *structs.JobEvaluationsResponse) error {
if done, err := j.srv.forward("Job.Evaluations", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "evaluations"}, time.Now())
2017-09-26 20:12:37 +00:00
// Check for read-job permissions
2017-10-12 22:16:33 +00:00
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
2017-09-26 20:12:37 +00:00
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) {
return structs.ErrPermissionDenied
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
2017-02-08 04:31:23 +00:00
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Capture the evals
2017-02-08 04:31:23 +00:00
var err error
2017-09-07 23:56:15 +00:00
reply.Evaluations, err = state.EvalsByJob(ws, args.RequestNamespace(), args.JobID)
if err != nil {
return err
}
2015-09-06 19:24:25 +00:00
// Use the last index that affected the evals table
2017-02-08 04:31:23 +00:00
index, err := state.Index("evals")
if err != nil {
return err
}
reply.Index = index
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return j.srv.blockingRPC(&opts)
2015-09-06 19:18:45 +00:00
}
2017-07-01 00:23:34 +00:00
// Deployments is used to list the deployments for a job
func (j *Job) Deployments(args *structs.JobSpecificRequest,
reply *structs.DeploymentListResponse) error {
if done, err := j.srv.forward("Job.Deployments", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "deployments"}, time.Now())
2017-09-26 20:33:03 +00:00
// Check for read-job permissions
2017-10-12 22:16:33 +00:00
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
2017-09-26 20:33:03 +00:00
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) {
return structs.ErrPermissionDenied
}
2017-07-01 00:23:34 +00:00
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Capture the deployments
deploys, err := state.DeploymentsByJobID(ws, args.RequestNamespace(), args.JobID, args.All)
2017-07-01 00:23:34 +00:00
if err != nil {
return err
}
// Use the last index that affected the deployment table
index, err := state.Index("deployment")
if err != nil {
return err
}
reply.Index = index
reply.Deployments = deploys
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return j.srv.blockingRPC(&opts)
}
// LatestDeployment is used to retrieve the latest deployment for a job
func (j *Job) LatestDeployment(args *structs.JobSpecificRequest,
reply *structs.SingleDeploymentResponse) error {
if done, err := j.srv.forward("Job.LatestDeployment", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "latest_deployment"}, time.Now())
2017-09-26 20:53:43 +00:00
// Check for read-job permissions
2017-10-12 22:16:33 +00:00
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
2017-09-26 20:53:43 +00:00
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob) {
return structs.ErrPermissionDenied
}
2017-07-01 00:23:34 +00:00
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Capture the deployments
deploys, err := state.DeploymentsByJobID(ws, args.RequestNamespace(), args.JobID, args.All)
2017-07-01 00:23:34 +00:00
if err != nil {
return err
}
// Use the last index that affected the deployment table
index, err := state.Index("deployment")
if err != nil {
return err
}
reply.Index = index
if len(deploys) > 0 {
sort.Slice(deploys, func(i, j int) bool {
return deploys[i].CreateIndex > deploys[j].CreateIndex
})
reply.Deployment = deploys[0]
}
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return j.srv.blockingRPC(&opts)
}
// Plan is used to cause a dry-run evaluation of the Job and return the results
// with a potential diff containing annotations.
func (j *Job) Plan(args *structs.JobPlanRequest, reply *structs.JobPlanResponse) error {
if done, err := j.srv.forward("Job.Plan", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "plan"}, time.Now())
// Validate the arguments
if args.Job == nil {
return fmt.Errorf("Job required for plan")
}
// Run admission controllers
job, warnings, err := j.admissionControllers(args.Job)
if err != nil {
return err
}
args.Job = job
// Set the warning message
reply.Warnings = structs.MergeMultierrorWarnings(warnings...)
2017-09-19 14:47:10 +00:00
// Check job submission permissions, which we assume is the same for plan
2017-10-12 22:16:33 +00:00
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
2017-09-19 14:47:10 +00:00
return err
} else if aclObj != nil {
if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilitySubmitJob) {
2017-09-19 14:47:10 +00:00
return structs.ErrPermissionDenied
}
// Check if override is set and we do not have permissions
if args.PolicyOverride {
if !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilitySentinelOverride) {
2017-09-19 14:47:10 +00:00
return structs.ErrPermissionDenied
}
}
}
// Enforce Sentinel policies
policyWarnings, err := j.enforceSubmitJob(args.PolicyOverride, args.Job)
if err != nil {
return err
}
if policyWarnings != nil {
warnings = append(warnings, policyWarnings)
reply.Warnings = structs.MergeMultierrorWarnings(warnings...)
2017-09-19 14:47:10 +00:00
}
// Acquire a snapshot of the state
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
// Get the original job
2017-02-08 04:31:23 +00:00
ws := memdb.NewWatchSet()
2017-09-07 23:56:15 +00:00
oldJob, err := snap.JobByID(ws, args.RequestNamespace(), args.Job.ID)
if err != nil {
return err
}
var index uint64
2016-06-08 23:48:02 +00:00
var updatedIndex uint64
2017-05-23 23:33:55 +00:00
2017-06-29 18:01:41 +00:00
if oldJob != nil {
2016-06-08 23:48:02 +00:00
index = oldJob.JobModifyIndex
2017-06-29 18:01:41 +00:00
// We want to reuse deployments where possible, so only insert the job if
// it has changed or the job didn't exist
if oldJob.SpecChanged(args.Job) {
// Insert the updated Job into the snapshot
updatedIndex = oldJob.JobModifyIndex + 1
snap.UpsertJob(updatedIndex, args.Job)
}
2017-05-23 23:33:55 +00:00
} else if oldJob == nil {
// Insert the updated Job into the snapshot
snap.UpsertJob(100, args.Job)
}
// Create an eval and mark it as requiring annotations and insert that as well
now := time.Now().UTC().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
2017-09-07 23:56:15 +00:00
Namespace: args.RequestNamespace(),
Priority: args.Job.Priority,
Type: args.Job.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: args.Job.ID,
2016-06-08 23:48:02 +00:00
JobModifyIndex: updatedIndex,
Status: structs.EvalStatusPending,
AnnotatePlan: true,
// Timestamps are added for consistency but this eval is never persisted
CreateTime: now,
ModifyTime: now,
}
snap.UpsertEvals(100, []*structs.Evaluation{eval})
// Create an in-memory Planner that returns no errors and stores the
// submitted plan and created evals.
2016-05-16 19:49:18 +00:00
planner := &scheduler.Harness{
State: &snap.StateStore,
}
// Create the scheduler and run it
2018-09-15 23:23:13 +00:00
sched, err := scheduler.NewScheduler(eval.Type, j.logger, snap, planner)
if err != nil {
return err
}
if err := sched.Process(eval); err != nil {
return err
}
// Annotate and store the diff
2016-05-16 19:49:18 +00:00
if plans := len(planner.Plans); plans != 1 {
return fmt.Errorf("scheduler resulted in an unexpected number of plans: %v", plans)
2016-05-16 19:49:18 +00:00
}
annotations := planner.Plans[0].Annotations
if args.Diff {
2016-05-11 22:36:28 +00:00
jobDiff, err := oldJob.Diff(args.Job, true)
if err != nil {
return fmt.Errorf("failed to create job diff: %v", err)
}
2016-05-12 18:29:38 +00:00
if err := scheduler.Annotate(jobDiff, annotations); err != nil {
2016-05-11 22:36:28 +00:00
return fmt.Errorf("failed to annotate job diff: %v", err)
}
reply.Diff = jobDiff
}
// Grab the failures
if len(planner.Evals) != 1 {
return fmt.Errorf("scheduler resulted in an unexpected number of eval updates: %v", planner.Evals)
}
updatedEval := planner.Evals[0]
// If it is a periodic job calculate the next launch
if args.Job.IsPeriodic() && args.Job.Periodic.Enabled {
2018-04-26 20:57:45 +00:00
reply.NextPeriodicLaunch, err = args.Job.Periodic.Next(time.Now().In(args.Job.Periodic.GetLocation()))
if err != nil {
2018-04-26 22:15:43 +00:00
return fmt.Errorf("Failed to parse cron expression: %v", err)
2018-04-26 20:57:45 +00:00
}
}
reply.FailedTGAllocs = updatedEval.FailedTGAllocs
2016-05-16 18:48:44 +00:00
reply.JobModifyIndex = index
2016-05-12 18:29:38 +00:00
reply.Annotations = annotations
2016-05-16 19:49:18 +00:00
reply.CreatedEvals = planner.CreateEvals
2016-05-12 01:51:48 +00:00
reply.Index = index
return nil
}
// validateJobUpdate ensures updates to a job are valid.
func validateJobUpdate(old, new *structs.Job) error {
2018-06-11 17:27:48 +00:00
// Validate Dispatch not set on new Jobs
if old == nil {
if new.Dispatched {
return fmt.Errorf("job can't be submitted with 'Dispatched' set")
}
return nil
}
// Type transitions are disallowed
if old.Type != new.Type {
return fmt.Errorf("cannot update job from type %q to %q", old.Type, new.Type)
}
// Transitioning to/from periodic is disallowed
if old.IsPeriodic() && !new.IsPeriodic() {
return fmt.Errorf("cannot update periodic job to being non-periodic")
}
if new.IsPeriodic() && !old.IsPeriodic() {
return fmt.Errorf("cannot update non-periodic job to being periodic")
}
// Transitioning to/from parameterized is disallowed
if old.IsParameterized() && !new.IsParameterized() {
return fmt.Errorf("cannot update non-parameterized job to being parameterized")
}
if new.IsParameterized() && !old.IsParameterized() {
return fmt.Errorf("cannot update parameterized job to being non-parameterized")
}
if old.Dispatched != new.Dispatched {
return fmt.Errorf("field 'Dispatched' is read-only")
}
return nil
}
// Dispatch a parameterized job.
2016-11-26 02:04:55 +00:00
func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispatchResponse) error {
if done, err := j.srv.forward("Job.Dispatch", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "dispatch"}, time.Now())
2017-09-27 16:30:13 +00:00
// Check for submit-job permissions
2017-10-12 22:16:33 +00:00
if aclObj, err := j.srv.ResolveToken(args.AuthToken); err != nil {
2017-09-27 16:30:13 +00:00
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityDispatchJob) {
2017-09-27 16:30:13 +00:00
return structs.ErrPermissionDenied
}
// Lookup the parameterized job
2016-11-26 02:04:55 +00:00
if args.JobID == "" {
return fmt.Errorf("missing parameterized job ID")
2016-11-26 02:04:55 +00:00
}
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
2017-02-08 04:31:23 +00:00
ws := memdb.NewWatchSet()
2017-09-07 23:56:15 +00:00
parameterizedJob, err := snap.JobByID(ws, args.RequestNamespace(), args.JobID)
2016-11-26 02:04:55 +00:00
if err != nil {
return err
}
if parameterizedJob == nil {
return fmt.Errorf("parameterized job not found")
2016-11-26 02:04:55 +00:00
}
if !parameterizedJob.IsParameterized() {
return fmt.Errorf("Specified job %q is not a parameterized job", args.JobID)
2016-11-26 02:04:55 +00:00
}
2017-04-15 23:47:19 +00:00
if parameterizedJob.Stop {
return fmt.Errorf("Specified job %q is stopped", args.JobID)
}
2016-11-26 02:04:55 +00:00
// Validate the arguments
if err := validateDispatchRequest(args, parameterizedJob); err != nil {
2016-11-26 02:04:55 +00:00
return err
}
// Derive the child job and commit it via Raft
dispatchJob := parameterizedJob.Copy()
dispatchJob.ID = structs.DispatchedID(parameterizedJob.ID, time.Now())
dispatchJob.ParentID = parameterizedJob.ID
2016-11-26 02:04:55 +00:00
dispatchJob.Name = dispatchJob.ID
2017-06-30 02:08:25 +00:00
dispatchJob.SetSubmitTime()
dispatchJob.Dispatched = true
2016-11-26 02:04:55 +00:00
2016-11-29 00:05:56 +00:00
// Merge in the meta data
for k, v := range args.Meta {
if dispatchJob.Meta == nil {
dispatchJob.Meta = make(map[string]string, len(args.Meta))
}
2016-11-29 00:05:56 +00:00
dispatchJob.Meta[k] = v
}
2016-12-14 20:50:08 +00:00
// Compress the payload
dispatchJob.Payload = snappy.Encode(nil, args.Payload)
2016-11-26 02:04:55 +00:00
regReq := &structs.JobRegisterRequest{
Job: dispatchJob,
WriteRequest: args.WriteRequest,
}
// Commit this update via Raft
2017-09-19 14:47:10 +00:00
fsmErr, jobCreateIndex, err := j.srv.raftApply(structs.JobRegisterRequestType, regReq)
if err, ok := fsmErr.(error); ok && err != nil {
2018-09-15 23:23:13 +00:00
j.logger.Error("dispatched job register failed", "error", err, "fsm", true)
2017-09-19 14:47:10 +00:00
return err
}
2016-11-26 02:04:55 +00:00
if err != nil {
2018-09-15 23:23:13 +00:00
j.logger.Error("dispatched job register failed", "error", err, "raft", true)
2016-11-26 02:04:55 +00:00
return err
}
reply.JobCreateIndex = jobCreateIndex
reply.DispatchedJobID = dispatchJob.ID
reply.Index = jobCreateIndex
// If the job is periodic, we don't create an eval.
if !dispatchJob.IsPeriodic() {
// Create a new evaluation
now := time.Now().UTC().UnixNano()
eval := &structs.Evaluation{
ID: uuid.Generate(),
2017-09-07 23:56:15 +00:00
Namespace: args.RequestNamespace(),
Priority: dispatchJob.Priority,
Type: dispatchJob.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: dispatchJob.ID,
JobModifyIndex: jobCreateIndex,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}
2016-11-26 02:04:55 +00:00
// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
2018-09-15 23:23:13 +00:00
j.logger.Error("eval create failed", "error", err, "method", "dispatch")
return err
}
// Setup the reply
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.Index = evalIndex
2016-11-26 02:04:55 +00:00
}
return nil
}
// validateDispatchRequest returns whether the request is valid given the
// parameterized job.
2016-12-14 20:50:08 +00:00
func validateDispatchRequest(req *structs.JobDispatchRequest, job *structs.Job) error {
// Check the payload constraint is met
hasInputData := len(req.Payload) != 0
if job.ParameterizedJob.Payload == structs.DispatchPayloadRequired && !hasInputData {
return fmt.Errorf("Payload is not provided but required by parameterized job")
} else if job.ParameterizedJob.Payload == structs.DispatchPayloadForbidden && hasInputData {
return fmt.Errorf("Payload provided but forbidden by parameterized job")
2016-11-26 02:04:55 +00:00
}
2016-12-14 20:50:08 +00:00
// Check the payload doesn't exceed the size limit
if l := len(req.Payload); l > DispatchPayloadSizeLimit {
return fmt.Errorf("Payload exceeds maximum size; %d > %d", l, DispatchPayloadSizeLimit)
2016-11-26 02:04:55 +00:00
}
// Check if the metadata is a set
keys := make(map[string]struct{}, len(req.Meta))
for k := range keys {
if _, ok := keys[k]; ok {
return fmt.Errorf("Duplicate key %q in passed metadata", k)
}
keys[k] = struct{}{}
}
required := helper.SliceStringToSet(job.ParameterizedJob.MetaRequired)
optional := helper.SliceStringToSet(job.ParameterizedJob.MetaOptional)
2016-11-26 02:04:55 +00:00
// Check the metadata key constraints are met
unpermitted := make(map[string]struct{})
for k := range req.Meta {
_, req := required[k]
_, opt := optional[k]
if !req && !opt {
unpermitted[k] = struct{}{}
}
}
if len(unpermitted) != 0 {
flat := make([]string, 0, len(unpermitted))
for k := range unpermitted {
flat = append(flat, k)
}
return fmt.Errorf("Dispatch request included unpermitted metadata keys: %v", flat)
}
missing := make(map[string]struct{})
for _, k := range job.ParameterizedJob.MetaRequired {
2016-11-26 02:04:55 +00:00
if _, ok := req.Meta[k]; !ok {
missing[k] = struct{}{}
}
}
if len(missing) != 0 {
flat := make([]string, 0, len(missing))
for k := range missing {
flat = append(flat, k)
}
2016-12-02 23:37:26 +00:00
return fmt.Errorf("Dispatch did not provide required meta keys: %v", flat)
2016-11-26 02:04:55 +00:00
}
return nil
}