2019-08-15 15:22:37 +00:00
package nomad
import (
"fmt"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)
2019-11-12 18:42:31 +00:00
const (
// vaultConstraintLTarget is the lefthand side of the Vault constraint
// injected when Vault policies are used. If an existing constraint
// with this target exists it overrides the injected constraint.
vaultConstraintLTarget = "${attr.vault.version}"
)
var (
// vaultConstraint is the implicit constraint added to jobs requesting a
// Vault token
vaultConstraint = & structs . Constraint {
LTarget : vaultConstraintLTarget ,
RTarget : ">= 0.6.1" ,
2019-11-13 23:36:15 +00:00
Operand : structs . ConstraintSemver ,
2019-11-12 18:42:31 +00:00
}
2022-03-14 11:42:12 +00:00
2022-04-20 12:09:13 +00:00
// consulServiceDiscoveryConstraint is the implicit constraint added to
// task groups which include services utilising the Consul provider. The
// Consul version is pinned to a minimum of that which introduced the
// namespace feature.
consulServiceDiscoveryConstraint = & structs . Constraint {
LTarget : "${attr.consul.version}" ,
RTarget : ">= 1.7.0" ,
Operand : structs . ConstraintSemver ,
}
2022-03-14 11:42:12 +00:00
// nativeServiceDiscoveryConstraint is the constraint injected into task
// groups that utilise Nomad's native service discovery feature. This is
// needed, as operators can disable the client functionality, and therefore
// we need to ensure task groups are placed where they can run
// successfully.
nativeServiceDiscoveryConstraint = & structs . Constraint {
LTarget : "${attr.nomad.service_discovery}" ,
RTarget : "true" ,
Operand : "=" ,
}
2019-11-12 18:42:31 +00:00
)
2019-08-15 15:22:37 +00:00
type admissionController interface {
Name ( ) string
}
type jobMutator interface {
admissionController
Mutate ( * structs . Job ) ( out * structs . Job , warnings [ ] error , err error )
}
type jobValidator interface {
admissionController
Validate ( * structs . Job ) ( warnings [ ] error , err error )
}
func ( j * Job ) admissionControllers ( job * structs . Job ) ( out * structs . Job , warnings [ ] error , err error ) {
2021-03-10 14:12:46 +00:00
// Mutators run first before validators, so validators view the final rendered job.
// So, mutators must handle invalid jobs.
2019-08-15 15:22:37 +00:00
out , warnings , err = j . admissionMutators ( job )
if err != nil {
return nil , nil , err
}
validateWarnings , err := j . admissionValidators ( job )
if err != nil {
return nil , nil , err
}
warnings = append ( warnings , validateWarnings ... )
return out , warnings , nil
}
// admissionMutator returns an updated job as well as warnings or an error.
func ( j * Job ) admissionMutators ( job * structs . Job ) ( _ * structs . Job , warnings [ ] error , err error ) {
var w [ ] error
for _ , mutator := range j . mutators {
job , w , err = mutator . Mutate ( job )
j . logger . Trace ( "job mutate results" , "mutator" , mutator . Name ( ) , "warnings" , w , "error" , err )
if err != nil {
return nil , nil , fmt . Errorf ( "error in job mutator %s: %v" , mutator . Name ( ) , err )
}
warnings = append ( warnings , w ... )
}
return job , warnings , err
}
// admissionValidators returns a slice of validation warnings and a multierror
// of validation failures.
2019-10-28 14:49:08 +00:00
func ( j * Job ) admissionValidators ( origJob * structs . Job ) ( [ ] error , error ) {
2019-08-15 15:22:37 +00:00
// ensure job is not mutated
job := origJob . Copy ( )
2019-10-28 14:49:08 +00:00
var warnings [ ] error
var errs error
2019-08-15 15:22:37 +00:00
for _ , validator := range j . validators {
2019-10-28 14:49:08 +00:00
w , err := validator . Validate ( job )
2019-08-15 15:22:37 +00:00
j . logger . Trace ( "job validate results" , "validator" , validator . Name ( ) , "warnings" , w , "error" , err )
if err != nil {
2019-10-28 14:49:08 +00:00
errs = multierror . Append ( errs , err )
2019-08-15 15:22:37 +00:00
}
warnings = append ( warnings , w ... )
}
2019-10-28 14:49:08 +00:00
return warnings , errs
2019-08-15 15:22:37 +00:00
}
// jobCanonicalizer calls job.Canonicalize (sets defaults and initializes
// fields) and returns any errors as warnings.
type jobCanonicalizer struct { }
func ( jobCanonicalizer ) Name ( ) string {
return "canonicalize"
}
func ( jobCanonicalizer ) Mutate ( job * structs . Job ) ( * structs . Job , [ ] error , error ) {
2021-01-14 20:46:35 +00:00
job . Canonicalize ( )
return job , nil , nil
2019-08-15 15:22:37 +00:00
}
// jobImpliedConstraints adds constraints to a job implied by other job fields
// and stanzas.
type jobImpliedConstraints struct { }
func ( jobImpliedConstraints ) Name ( ) string {
return "constraints"
}
func ( jobImpliedConstraints ) Mutate ( j * structs . Job ) ( * structs . Job , [ ] error , error ) {
2022-04-05 18:18:10 +00:00
// Get the Vault blocks in the job
vaultBlocks := j . Vault ( )
2019-08-15 15:22:37 +00:00
// Get the required signals
signals := j . RequiredSignals ( )
2022-03-14 11:42:12 +00:00
// Identify which task groups are utilising Nomad native service discovery.
nativeServiceDisco := j . RequiredNativeServiceDiscovery ( )
2022-04-20 12:09:13 +00:00
// Identify which task groups are utilising Consul service discovery.
consulServiceDisco := j . RequiredConsulServiceDiscovery ( )
2019-08-15 15:22:37 +00:00
// Hot path
2022-04-20 12:09:13 +00:00
if len ( signals ) == 0 && len ( vaultBlocks ) == 0 &&
len ( nativeServiceDisco ) == 0 && len ( consulServiceDisco ) == 0 {
2019-08-15 15:22:37 +00:00
return j , nil , nil
}
2022-04-20 12:09:13 +00:00
// Iterate through all the task groups within the job and add any required
// constraints. When adding new implicit constraints, they should go inside
// this single loop, with a new constraintMatcher if needed.
2019-08-15 15:22:37 +00:00
for _ , tg := range j . TaskGroups {
2022-04-20 12:09:13 +00:00
// If the task group utilises Vault, run the mutator.
if _ , ok := vaultBlocks [ tg . Name ] ; ok {
mutateConstraint ( constraintMatcherLeft , tg , vaultConstraint )
2019-08-15 15:22:37 +00:00
}
2022-04-20 12:09:13 +00:00
// Check whether the task group is using signals. In the case that it
// is, we flatten the signals and build a constraint, then run the
// mutator.
if tgSignals , ok := signals [ tg . Name ] ; ok {
required := helper . MapStringStringSliceValueSet ( tgSignals )
sigConstraint := getSignalConstraint ( required )
mutateConstraint ( constraintMatcherFull , tg , sigConstraint )
2019-08-15 15:22:37 +00:00
}
2022-04-20 12:09:13 +00:00
// If the task group utilises Nomad service discovery, run the mutator.
if ok := nativeServiceDisco [ tg . Name ] ; ok {
mutateConstraint ( constraintMatcherFull , tg , nativeServiceDiscoveryConstraint )
2019-08-15 15:22:37 +00:00
}
2022-04-20 12:09:13 +00:00
// If the task group utilises Consul service discovery, run the mutator.
if ok := consulServiceDisco [ tg . Name ] ; ok {
mutateConstraint ( constraintMatcherLeft , tg , consulServiceDiscoveryConstraint )
2019-08-15 15:22:37 +00:00
}
2022-04-20 12:09:13 +00:00
}
2019-08-15 15:22:37 +00:00
2022-04-20 12:09:13 +00:00
return j , nil , nil
}
2019-08-15 15:22:37 +00:00
2022-04-20 12:09:13 +00:00
// constraintMatcher is a custom type which helps control how constraints are
// identified as being present within a task group.
type constraintMatcher uint
2019-08-15 15:22:37 +00:00
2022-04-20 12:09:13 +00:00
const (
// constraintMatcherFull ensures that a constraint is only considered found
// when they match totally. This check is performed using the
// structs.Constraint Equals function.
constraintMatcherFull constraintMatcher = iota
// constraintMatcherLeft ensure that a constraint is considered found if
// the constraints LTarget is matched only. This allows an existing
// constraint to override the proposed implicit one.
constraintMatcherLeft
)
2019-08-15 15:22:37 +00:00
2022-04-20 12:09:13 +00:00
// mutateConstraint is a generic mutator used to set implicit constraints
// within the task group if they are needed.
func mutateConstraint ( matcher constraintMatcher , taskGroup * structs . TaskGroup , constraint * structs . Constraint ) {
2022-03-14 11:42:12 +00:00
2022-04-20 12:09:13 +00:00
var found bool
// It's possible to switch on the matcher within the constraint loop to
// reduce repetition. This, however, means switching per constraint,
// therefore we do it here.
switch matcher {
case constraintMatcherFull :
for _ , c := range taskGroup . Constraints {
if c . Equals ( constraint ) {
2022-03-14 11:42:12 +00:00
found = true
break
}
}
2022-04-20 12:09:13 +00:00
case constraintMatcherLeft :
for _ , c := range taskGroup . Constraints {
if c . LTarget == constraint . LTarget {
found = true
break
}
2022-03-14 11:42:12 +00:00
}
}
2022-04-20 12:09:13 +00:00
// If we didn't find a suitable constraint match, add one.
if ! found {
taskGroup . Constraints = append ( taskGroup . Constraints , constraint )
}
2019-08-15 15:22:37 +00:00
}
// jobValidate validates a Job and task drivers and returns an error if there is
// a validation problem or if the Job is of a type a user is not allowed to
// submit.
type jobValidate struct { }
func ( jobValidate ) Name ( ) string {
return "validate"
}
func ( jobValidate ) Validate ( job * structs . Job ) ( warnings [ ] error , err error ) {
validationErrors := new ( multierror . Error )
if err := job . Validate ( ) ; err != nil {
multierror . Append ( validationErrors , err )
}
// Get any warnings
jobWarnings := job . Warnings ( )
if jobWarnings != nil {
if multi , ok := jobWarnings . ( * multierror . Error ) ; ok {
// Unpack multiple warnings
warnings = append ( warnings , multi . Errors ... )
} else {
warnings = append ( warnings , jobWarnings )
}
}
// TODO: Validate the driver configurations. These had to be removed in 0.9
// to support driver plugins, but see issue: #XXXX for more info.
if job . Type == structs . JobTypeCore {
multierror . Append ( validationErrors , fmt . Errorf ( "job type cannot be core" ) )
}
if len ( job . Payload ) != 0 {
multierror . Append ( validationErrors , fmt . Errorf ( "job can't be submitted with a payload, only dispatched" ) )
}
return warnings , validationErrors . ErrorOrNil ( )
}
2021-04-30 02:09:56 +00:00
type memoryOversubscriptionValidate struct {
srv * Server
}
func ( * memoryOversubscriptionValidate ) Name ( ) string {
return "memory_oversubscription"
}
func ( v * memoryOversubscriptionValidate ) Validate ( job * structs . Job ) ( warnings [ ] error , err error ) {
_ , c , err := v . srv . State ( ) . SchedulerConfig ( )
if err != nil {
return nil , err
}
if c != nil && c . MemoryOversubscriptionEnabled {
return nil , nil
}
for _ , tg := range job . TaskGroups {
for _ , t := range tg . Tasks {
if t . Resources != nil && t . Resources . MemoryMaxMB != 0 {
warnings = append ( warnings , fmt . Errorf ( "Memory oversubscription is not enabled; Task \"%v.%v\" memory_max value will be ignored. Update the Scheduler Configuration to allow oversubscription." , tg . Name , t . Name ) )
}
}
}
return warnings , err
}