Dispatch structs

This commit is contained in:
Alex Dadgar 2016-11-23 14:56:50 -08:00
parent 1198a6fc88
commit 54bcde8e36
5 changed files with 178 additions and 14 deletions

View File

@ -187,6 +187,14 @@ type PeriodicConfig struct {
ProhibitOverlap bool ProhibitOverlap bool
} }
// DispatchConfig is used to configure the dispatch template
type DispatchConfig struct {
Paused bool
InputData string
MetaRequired []string
MetaOptional []string
}
// Job is used to serialize a job. // Job is used to serialize a job.
type Job struct { type Job struct {
Region string Region string
@ -201,6 +209,7 @@ type Job struct {
TaskGroups []*TaskGroup TaskGroups []*TaskGroup
Update *UpdateStrategy Update *UpdateStrategy
Periodic *PeriodicConfig Periodic *PeriodicConfig
Dispatch *DispatchConfig
Meta map[string]string Meta map[string]string
VaultToken string VaultToken string
Status string Status string

View File

@ -141,22 +141,29 @@ type LogConfig struct {
MaxFileSizeMB int MaxFileSizeMB int
} }
// DispatchInputConfig configures how a task gets its input from a job dispatch
type DispatchInputConfig struct {
Stdin bool
File string
}
// Task is a single process in a task group. // Task is a single process in a task group.
type Task struct { type Task struct {
Name string Name string
Driver string Driver string
User string User string
Config map[string]interface{} Config map[string]interface{}
Constraints []*Constraint Constraints []*Constraint
Env map[string]string Env map[string]string
Services []Service Services []Service
Resources *Resources Resources *Resources
Meta map[string]string Meta map[string]string
KillTimeout time.Duration KillTimeout time.Duration
LogConfig *LogConfig LogConfig *LogConfig
Artifacts []*TaskArtifact Artifacts []*TaskArtifact
Vault *Vault Vault *Vault
Templates []*Template Templates []*Template
DispatchInput *DispatchInputConfig
} }
// TaskArtifact is used to download artifacts before running a task. // TaskArtifact is used to download artifacts before running a task.

View File

@ -269,6 +269,30 @@ func SliceStringIsSubset(larger, smaller []string) (bool, []string) {
return subset, offending return subset, offending
} }
func SliceSetDisjoint(first, second []string) (bool, []string) {
contained := make(map[string]struct{}, len(first))
for _, k := range first {
contained[k] = struct{}{}
}
offending := make(map[string]struct{})
for _, k := range second {
if _, ok := contained[k]; ok {
offending[k] = struct{}{}
}
}
if len(offending) == 0 {
return true, nil
}
flattened := make([]string, 0, len(offending))
for k := range offending {
flattened = append(flattened, k)
}
return false, flattened
}
// VaultPoliciesSet takes the structure returned by VaultPolicies and returns // VaultPoliciesSet takes the structure returned by VaultPolicies and returns
// the set of required policies // the set of required policies
func VaultPoliciesSet(policies map[string]map[string]*Vault) []string { func VaultPoliciesSet(policies map[string]map[string]*Vault) []string {

View File

@ -1146,6 +1146,9 @@ type Job struct {
// Periodic is used to define the interval the job is run at. // Periodic is used to define the interval the job is run at.
Periodic *PeriodicConfig Periodic *PeriodicConfig
// Dispatch is used to specify the job as a template job for dispatching.
Dispatch *DispatchConfig
// Meta is used to associate arbitrary metadata with this // Meta is used to associate arbitrary metadata with this
// job. This is opaque to Nomad. // job. This is opaque to Nomad.
Meta map[string]string Meta map[string]string
@ -1179,6 +1182,10 @@ func (j *Job) Canonicalize() {
for _, tg := range j.TaskGroups { for _, tg := range j.TaskGroups {
tg.Canonicalize(j) tg.Canonicalize(j)
} }
if j.Dispatch != nil {
j.Dispatch.Canonicalize()
}
} }
// Copy returns a deep copy of the Job. It is expected that callers use recover. // Copy returns a deep copy of the Job. It is expected that callers use recover.
@ -1202,6 +1209,7 @@ func (j *Job) Copy() *Job {
nj.Periodic = nj.Periodic.Copy() nj.Periodic = nj.Periodic.Copy()
nj.Meta = CopyMapStringString(nj.Meta) nj.Meta = CopyMapStringString(nj.Meta)
nj.Dispatch = nj.Dispatch.Copy()
return nj return nj
} }
@ -1276,6 +1284,12 @@ func (j *Job) Validate() error {
} }
} }
if j.IsDispatchTemplate() {
if err := j.Dispatch.Validate(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
return mErr.ErrorOrNil() return mErr.ErrorOrNil()
} }
@ -1311,6 +1325,11 @@ func (j *Job) IsPeriodic() bool {
return j.Periodic != nil return j.Periodic != nil
} }
// IsDispatchTemplate returns whether a job is dispatch template.
func (j *Job) IsDispatchTemplate() bool {
return j.Dispatch != nil
}
// VaultPolicies returns the set of Vault policies per task group, per task // VaultPolicies returns the set of Vault policies per task group, per task
func (j *Job) VaultPolicies() map[string]map[string]*Vault { func (j *Job) VaultPolicies() map[string]map[string]*Vault {
policies := make(map[string]map[string]*Vault, len(j.TaskGroups)) policies := make(map[string]map[string]*Vault, len(j.TaskGroups))
@ -1525,6 +1544,80 @@ type PeriodicLaunch struct {
ModifyIndex uint64 ModifyIndex uint64
} }
const (
DispatchInputDataForbidden = "forbidden"
DispatchInputDataOptional = "optional"
DispatchInputDataRequired = "required"
)
// DispatchConfig is used to configure the dispatch template
type DispatchConfig struct {
// Paused specifies whether jobs can be dispatched based on the template or
// if the job is paused.
Paused bool
// InputData configure the input data requirements
InputData string
// MetaRequired is metadata keys that must be specified by the dispatcher
MetaRequired []string
// MetaOptional is metadata keys that may be specified by the dispatcher
MetaOptional []string
}
func (d *DispatchConfig) Validate() error {
var mErr multierror.Error
switch d.InputData {
case DispatchInputDataOptional, DispatchInputDataRequired, DispatchInputDataForbidden:
default:
multierror.Append(&mErr, fmt.Errorf("Unknown input data requirement: %q", d.InputData))
}
// Check that the meta configurations are disjoint sets
disjoint, offending := SliceSetDisjoint(d.MetaRequired, d.MetaOptional)
if !disjoint {
multierror.Append(&mErr, fmt.Errorf("Required and optional meta keys should be disjoint. Following keys exist in both: %v", offending))
}
return mErr.ErrorOrNil()
}
func (d *DispatchConfig) Canonicalize() {
if d.InputData == "" {
d.InputData = DispatchInputDataOptional
}
}
func (d *DispatchConfig) Copy() *DispatchConfig {
if d == nil {
return nil
}
nd := new(DispatchConfig)
*nd = *d
nd.MetaOptional = CopySliceString(nd.MetaOptional)
nd.MetaRequired = CopySliceString(nd.MetaRequired)
return nd
}
// DispatchInputConfig configures how a task gets its input from a job dispatch
type DispatchInputConfig struct {
// Stdin specifies whether the input should be written to the task's stdin
Stdin bool
// File specifies a relative path to where the input data should be written
File string
}
func (d *DispatchInputConfig) Copy() *DispatchInputConfig {
if d == nil {
return nil
}
nd := new(DispatchInputConfig)
*nd = *d
return nd
}
var ( var (
defaultServiceJobRestartPolicy = RestartPolicy{ defaultServiceJobRestartPolicy = RestartPolicy{
Delay: 15 * time.Second, Delay: 15 * time.Second,
@ -2076,6 +2169,10 @@ type Task struct {
// Resources is the resources needed by this task // Resources is the resources needed by this task
Resources *Resources Resources *Resources
// DispatchInput configures how the task retrieves its input from a dispatch
// template
DispatchInput *DispatchInputConfig
// Meta is used to associate arbitrary metadata with this // Meta is used to associate arbitrary metadata with this
// task. This is opaque to Nomad. // task. This is opaque to Nomad.
Meta map[string]string Meta map[string]string
@ -2113,6 +2210,7 @@ func (t *Task) Copy() *Task {
nt.Vault = nt.Vault.Copy() nt.Vault = nt.Vault.Copy()
nt.Resources = nt.Resources.Copy() nt.Resources = nt.Resources.Copy()
nt.Meta = CopyMapStringString(nt.Meta) nt.Meta = CopyMapStringString(nt.Meta)
nt.DispatchInput = nt.DispatchInput.Copy()
if t.Artifacts != nil { if t.Artifacts != nil {
artifacts := make([]*TaskArtifact, 0, len(t.Artifacts)) artifacts := make([]*TaskArtifact, 0, len(t.Artifacts))

View File

@ -1444,3 +1444,29 @@ func TestVault_Validate(t *testing.T) {
t.Fatalf("Expected signal empty error") t.Fatalf("Expected signal empty error")
} }
} }
func TestDispatchConfig_Validate(t *testing.T) {
d := &DispatchConfig{
InputData: "foo",
}
if err := d.Validate(); err == nil || !strings.Contains(err.Error(), "input data") {
t.Fatalf("Expected unknown input data requirement: %v", err)
}
d.InputData = DispatchInputDataOptional
d.MetaOptional = []string{"foo", "bar"}
d.MetaRequired = []string{"bar", "baz"}
if err := d.Validate(); err == nil || !strings.Contains(err.Error(), "disjoint") {
t.Fatalf("Expected meta not being disjoint error: %v", err)
}
}
func TestDispatchConfig_Canonicalize(t *testing.T) {
d := &DispatchConfig{}
d.Canonicalize()
if d.InputData != DispatchInputDataOptional {
t.Fatalf("Canonicalize failed")
}
}