package nomad import ( "fmt" multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" ) const ( // vaultConstraintLTarget is the lefthand side of the Vault constraint // injected when Vault policies are used. If an existing constraint // with this target exists it overrides the injected constraint. vaultConstraintLTarget = "${attr.vault.version}" ) var ( // vaultConstraint is the implicit constraint added to jobs requesting a // Vault token vaultConstraint = &structs.Constraint{ LTarget: vaultConstraintLTarget, RTarget: ">= 0.6.1", Operand: structs.ConstraintSemver, } // consulServiceDiscoveryConstraint is the implicit constraint added to // task groups which include services utilising the Consul provider. The // Consul version is pinned to a minimum of that which introduced the // namespace feature. consulServiceDiscoveryConstraint = &structs.Constraint{ LTarget: "${attr.consul.version}", RTarget: ">= 1.7.0", Operand: structs.ConstraintSemver, } // nativeServiceDiscoveryConstraint is the constraint injected into task // groups that utilise Nomad's native service discovery feature. This is // needed, as operators can disable the client functionality, and therefore // we need to ensure task groups are placed where they can run // successfully. nativeServiceDiscoveryConstraint = &structs.Constraint{ LTarget: "${attr.nomad.service_discovery}", RTarget: "true", Operand: "=", } ) type admissionController interface { Name() string } type jobMutator interface { admissionController Mutate(*structs.Job) (out *structs.Job, warnings []error, err error) } type jobValidator interface { admissionController Validate(*structs.Job) (warnings []error, err error) } func (j *Job) admissionControllers(job *structs.Job) (out *structs.Job, warnings []error, err error) { // Mutators run first before validators, so validators view the final rendered job. // So, mutators must handle invalid jobs. out, warnings, err = j.admissionMutators(job) if err != nil { return nil, nil, err } validateWarnings, err := j.admissionValidators(job) if err != nil { return nil, nil, err } warnings = append(warnings, validateWarnings...) return out, warnings, nil } // admissionMutator returns an updated job as well as warnings or an error. func (j *Job) admissionMutators(job *structs.Job) (_ *structs.Job, warnings []error, err error) { var w []error for _, mutator := range j.mutators { job, w, err = mutator.Mutate(job) j.logger.Trace("job mutate results", "mutator", mutator.Name(), "warnings", w, "error", err) if err != nil { return nil, nil, fmt.Errorf("error in job mutator %s: %v", mutator.Name(), err) } warnings = append(warnings, w...) } return job, warnings, err } // admissionValidators returns a slice of validation warnings and a multierror // of validation failures. func (j *Job) admissionValidators(origJob *structs.Job) ([]error, error) { // ensure job is not mutated job := origJob.Copy() var warnings []error var errs error for _, validator := range j.validators { w, err := validator.Validate(job) j.logger.Trace("job validate results", "validator", validator.Name(), "warnings", w, "error", err) if err != nil { errs = multierror.Append(errs, err) } warnings = append(warnings, w...) } return warnings, errs } // jobCanonicalizer calls job.Canonicalize (sets defaults and initializes // fields) and returns any errors as warnings. type jobCanonicalizer struct{} func (jobCanonicalizer) Name() string { return "canonicalize" } func (jobCanonicalizer) Mutate(job *structs.Job) (*structs.Job, []error, error) { job.Canonicalize() return job, nil, nil } // jobImpliedConstraints adds constraints to a job implied by other job fields // and stanzas. type jobImpliedConstraints struct{} func (jobImpliedConstraints) Name() string { return "constraints" } func (jobImpliedConstraints) Mutate(j *structs.Job) (*structs.Job, []error, error) { // Get the Vault blocks in the job vaultBlocks := j.Vault() // Get the required signals signals := j.RequiredSignals() // Identify which task groups are utilising Nomad native service discovery. nativeServiceDisco := j.RequiredNativeServiceDiscovery() // Identify which task groups are utilising Consul service discovery. consulServiceDisco := j.RequiredConsulServiceDiscovery() // Hot path if len(signals) == 0 && len(vaultBlocks) == 0 && len(nativeServiceDisco) == 0 && len(consulServiceDisco) == 0 { return j, nil, nil } // Iterate through all the task groups within the job and add any required // constraints. When adding new implicit constraints, they should go inside // this single loop, with a new constraintMatcher if needed. for _, tg := range j.TaskGroups { // If the task group utilises Vault, run the mutator. if _, ok := vaultBlocks[tg.Name]; ok { mutateConstraint(constraintMatcherLeft, tg, vaultConstraint) } // Check whether the task group is using signals. In the case that it // is, we flatten the signals and build a constraint, then run the // mutator. if tgSignals, ok := signals[tg.Name]; ok { required := helper.UniqueMapSliceValues(tgSignals) sigConstraint := getSignalConstraint(required) mutateConstraint(constraintMatcherFull, tg, sigConstraint) } // If the task group utilises Nomad service discovery, run the mutator. if ok := nativeServiceDisco[tg.Name]; ok { mutateConstraint(constraintMatcherFull, tg, nativeServiceDiscoveryConstraint) } // If the task group utilises Consul service discovery, run the mutator. if ok := consulServiceDisco[tg.Name]; ok { mutateConstraint(constraintMatcherLeft, tg, consulServiceDiscoveryConstraint) } } return j, nil, nil } // constraintMatcher is a custom type which helps control how constraints are // identified as being present within a task group. type constraintMatcher uint const ( // constraintMatcherFull ensures that a constraint is only considered found // when they match totally. This check is performed using the // structs.Constraint Equals function. constraintMatcherFull constraintMatcher = iota // constraintMatcherLeft ensure that a constraint is considered found if // the constraints LTarget is matched only. This allows an existing // constraint to override the proposed implicit one. constraintMatcherLeft ) // mutateConstraint is a generic mutator used to set implicit constraints // within the task group if they are needed. func mutateConstraint(matcher constraintMatcher, taskGroup *structs.TaskGroup, constraint *structs.Constraint) { var found bool // It's possible to switch on the matcher within the constraint loop to // reduce repetition. This, however, means switching per constraint, // therefore we do it here. switch matcher { case constraintMatcherFull: for _, c := range taskGroup.Constraints { if c.Equals(constraint) { found = true break } } case constraintMatcherLeft: for _, c := range taskGroup.Constraints { if c.LTarget == constraint.LTarget { found = true break } } } // If we didn't find a suitable constraint match, add one. if !found { taskGroup.Constraints = append(taskGroup.Constraints, constraint) } } // jobValidate validates a Job and task drivers and returns an error if there is // a validation problem or if the Job is of a type a user is not allowed to // submit. type jobValidate struct{} func (jobValidate) Name() string { return "validate" } func (jobValidate) Validate(job *structs.Job) (warnings []error, err error) { validationErrors := new(multierror.Error) if err := job.Validate(); err != nil { multierror.Append(validationErrors, err) } // Get any warnings jobWarnings := job.Warnings() if jobWarnings != nil { if multi, ok := jobWarnings.(*multierror.Error); ok { // Unpack multiple warnings warnings = append(warnings, multi.Errors...) } else { warnings = append(warnings, jobWarnings) } } // TODO: Validate the driver configurations. These had to be removed in 0.9 // to support driver plugins, but see issue: #XXXX for more info. if job.Type == structs.JobTypeCore { multierror.Append(validationErrors, fmt.Errorf("job type cannot be core")) } if len(job.Payload) != 0 { multierror.Append(validationErrors, fmt.Errorf("job can't be submitted with a payload, only dispatched")) } return warnings, validationErrors.ErrorOrNil() } type memoryOversubscriptionValidate struct { srv *Server } func (*memoryOversubscriptionValidate) Name() string { return "memory_oversubscription" } func (v *memoryOversubscriptionValidate) Validate(job *structs.Job) (warnings []error, err error) { _, c, err := v.srv.State().SchedulerConfig() if err != nil { return nil, err } if c != nil && c.MemoryOversubscriptionEnabled { return nil, nil } for _, tg := range job.TaskGroups { for _, t := range tg.Tasks { if t.Resources != nil && t.Resources.MemoryMaxMB != 0 { warnings = append(warnings, fmt.Errorf("Memory oversubscription is not enabled; Task \"%v.%v\" memory_max value will be ignored. Update the Scheduler Configuration to allow oversubscription.", tg.Name, t.Name)) } } } return warnings, err }