package jobspec import ( "fmt" "strings" "time" multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/hcl" "github.com/hashicorp/hcl/hcl/ast" "github.com/hashicorp/nomad/api" "github.com/mitchellh/mapstructure" ) var ( commonTaskKeys = []string{ "driver", "user", "config", "env", "resources", "meta", "logs", "kill_timeout", "shutdown_delay", "kill_signal", "scaling", } normalTaskKeys = append(commonTaskKeys, "artifact", "constraint", "affinity", "dispatch_payload", "lifecycle", "leader", "restart", "service", "template", "vault", "kind", "volume_mount", "csi_plugin", ) sidecarTaskKeys = append(commonTaskKeys, "name", ) ) func parseTasks(result *[]*api.Task, list *ast.ObjectList) error { list = list.Children() if len(list.Items) == 0 { return nil } // Go through each object and turn it into an actual result. seen := make(map[string]struct{}) for _, item := range list.Items { n := item.Keys[0].Token.Value().(string) // Make sure we haven't already found this if _, ok := seen[n]; ok { return fmt.Errorf("task '%s' defined more than once", n) } seen[n] = struct{}{} t, err := parseTask(item, normalTaskKeys) if err != nil { return multierror.Prefix(err, fmt.Sprintf("'%s',", n)) } t.Name = n *result = append(*result, t) } return nil } func parseTask(item *ast.ObjectItem, keys []string) (*api.Task, error) { // We need this later var listVal *ast.ObjectList if ot, ok := item.Val.(*ast.ObjectType); ok { listVal = ot.List } else { return nil, fmt.Errorf("should be an object") } // Check for invalid keys if err := checkHCLKeys(listVal, keys); err != nil { return nil, err } var m map[string]interface{} if err := hcl.DecodeObject(&m, item.Val); err != nil { return nil, err } delete(m, "artifact") delete(m, "config") delete(m, "constraint") delete(m, "affinity") delete(m, "dispatch_payload") delete(m, "lifecycle") delete(m, "env") delete(m, "logs") delete(m, "meta") delete(m, "resources") delete(m, "restart") delete(m, "service") delete(m, "template") delete(m, "vault") delete(m, "volume_mount") delete(m, "csi_plugin") delete(m, "scaling") // Build the task var t api.Task dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ DecodeHook: mapstructure.StringToTimeDurationHookFunc(), WeaklyTypedInput: true, Result: &t, }) if err != nil { return nil, err } if err := dec.Decode(m); err != nil { return nil, err } // If we have env, then parse them if o := listVal.Filter("env"); len(o.Items) > 0 { for _, o := range o.Elem().Items { var m map[string]interface{} if err := hcl.DecodeObject(&m, o.Val); err != nil { return nil, err } if err := mapstructure.WeakDecode(m, &t.Env); err != nil { return nil, err } } } if o := listVal.Filter("service"); len(o.Items) > 0 { services, err := parseServices(o) if err != nil { return nil, err } t.Services = services } if o := listVal.Filter("csi_plugin"); len(o.Items) > 0 { if len(o.Items) != 1 { return nil, fmt.Errorf("csi_plugin -> Expected single stanza, got %d", len(o.Items)) } i := o.Elem().Items[0] var m map[string]interface{} if err := hcl.DecodeObject(&m, i.Val); err != nil { return nil, err } var cfg api.TaskCSIPluginConfig if err := mapstructure.WeakDecode(m, &cfg); err != nil { return nil, err } t.CSIPluginConfig = &cfg } // If we have config, then parse that if o := listVal.Filter("config"); len(o.Items) > 0 { for _, o := range o.Elem().Items { var m map[string]interface{} if err := hcl.DecodeObject(&m, o.Val); err != nil { return nil, err } if err := mapstructure.WeakDecode(m, &t.Config); err != nil { return nil, err } } } // Parse constraints if o := listVal.Filter("constraint"); len(o.Items) > 0 { if err := parseConstraints(&t.Constraints, o); err != nil { return nil, multierror.Prefix(err, "constraint ->") } } // Parse affinities if o := listVal.Filter("affinity"); len(o.Items) > 0 { if err := parseAffinities(&t.Affinities, o); err != nil { return nil, multierror.Prefix(err, "affinity ->") } } // Parse out meta fields. These are in HCL as a list so we need // to iterate over them and merge them. if metaO := listVal.Filter("meta"); len(metaO.Items) > 0 { for _, o := range metaO.Elem().Items { var m map[string]interface{} if err := hcl.DecodeObject(&m, o.Val); err != nil { return nil, err } if err := mapstructure.WeakDecode(m, &t.Meta); err != nil { return nil, err } } } // Parse volume mounts if o := listVal.Filter("volume_mount"); len(o.Items) > 0 { if err := parseVolumeMounts(&t.VolumeMounts, o); err != nil { return nil, multierror.Prefix(err, "volume_mount ->") } } // If we have resources, then parse that if o := listVal.Filter("resources"); len(o.Items) > 0 { var r api.Resources if err := parseResources(&r, o); err != nil { return nil, multierror.Prefix(err, "resources ->") } t.Resources = &r } // Parse restart policy if o := listVal.Filter("restart"); len(o.Items) > 0 { if err := parseRestartPolicy(&t.RestartPolicy, o); err != nil { return nil, multierror.Prefix(err, "restart ->") } } // If we have logs then parse that if o := listVal.Filter("logs"); len(o.Items) > 0 { if len(o.Items) > 1 { return nil, fmt.Errorf("only one logs block is allowed in a Task. Number of logs block found: %d", len(o.Items)) } var m map[string]interface{} logsBlock := o.Items[0] // Check for invalid keys valid := []string{ "max_files", "max_file_size", } if err := checkHCLKeys(logsBlock.Val, valid); err != nil { return nil, multierror.Prefix(err, "logs ->") } if err := hcl.DecodeObject(&m, logsBlock.Val); err != nil { return nil, err } var log api.LogConfig if err := mapstructure.WeakDecode(m, &log); err != nil { return nil, err } t.LogConfig = &log } // Parse artifacts if o := listVal.Filter("artifact"); len(o.Items) > 0 { if err := parseArtifacts(&t.Artifacts, o); err != nil { return nil, multierror.Prefix(err, "artifact ->") } } // Parse templates if o := listVal.Filter("template"); len(o.Items) > 0 { if err := parseTemplates(&t.Templates, o); err != nil { return nil, multierror.Prefix(err, "template ->") } } // Parse scaling policies if o := listVal.Filter("scaling"); len(o.Items) > 0 { if err := parseTaskScalingPolicies(&t.ScalingPolicies, o); err != nil { return nil, err } } // If we have a vault block, then parse that if o := listVal.Filter("vault"); len(o.Items) > 0 { v := &api.Vault{ Env: boolToPtr(true), ChangeMode: stringToPtr("restart"), } if err := parseVault(v, o); err != nil { return nil, multierror.Prefix(err, "vault ->") } t.Vault = v } // If we have a dispatch_payload block parse that if o := listVal.Filter("dispatch_payload"); len(o.Items) > 0 { if len(o.Items) > 1 { return nil, fmt.Errorf("only one dispatch_payload block is allowed in a task. Number of dispatch_payload blocks found: %d", len(o.Items)) } var m map[string]interface{} dispatchBlock := o.Items[0] // Check for invalid keys valid := []string{ "file", } if err := checkHCLKeys(dispatchBlock.Val, valid); err != nil { return nil, multierror.Prefix(err, "dispatch_payload ->") } if err := hcl.DecodeObject(&m, dispatchBlock.Val); err != nil { return nil, err } t.DispatchPayload = &api.DispatchPayloadConfig{} if err := mapstructure.WeakDecode(m, t.DispatchPayload); err != nil { return nil, err } } // If we have a lifecycle block parse that if o := listVal.Filter("lifecycle"); len(o.Items) > 0 { if len(o.Items) > 1 { return nil, fmt.Errorf("only one lifecycle block is allowed in a task. Number of lifecycle blocks found: %d", len(o.Items)) } var m map[string]interface{} lifecycleBlock := o.Items[0] // Check for invalid keys valid := []string{ "hook", "sidecar", } if err := checkHCLKeys(lifecycleBlock.Val, valid); err != nil { return nil, multierror.Prefix(err, "lifecycle ->") } if err := hcl.DecodeObject(&m, lifecycleBlock.Val); err != nil { return nil, err } t.Lifecycle = &api.TaskLifecycle{} if err := mapstructure.WeakDecode(m, t.Lifecycle); err != nil { return nil, err } } return &t, nil } func parseArtifacts(result *[]*api.TaskArtifact, list *ast.ObjectList) error { for _, o := range list.Elem().Items { // Check for invalid keys valid := []string{ "source", "options", "mode", "destination", } if err := checkHCLKeys(o.Val, valid); err != nil { return err } var m map[string]interface{} if err := hcl.DecodeObject(&m, o.Val); err != nil { return err } delete(m, "options") var ta api.TaskArtifact if err := mapstructure.WeakDecode(m, &ta); err != nil { return err } var optionList *ast.ObjectList if ot, ok := o.Val.(*ast.ObjectType); ok { optionList = ot.List } else { return fmt.Errorf("artifact should be an object") } if oo := optionList.Filter("options"); len(oo.Items) > 0 { options := make(map[string]string) if err := parseArtifactOption(options, oo); err != nil { return multierror.Prefix(err, "options: ") } ta.GetterOptions = options } *result = append(*result, &ta) } return nil } func parseArtifactOption(result map[string]string, list *ast.ObjectList) error { list = list.Elem() if len(list.Items) > 1 { return fmt.Errorf("only one 'options' block allowed per artifact") } // Get our resource object o := list.Items[0] var m map[string]interface{} if err := hcl.DecodeObject(&m, o.Val); err != nil { return err } if err := mapstructure.WeakDecode(m, &result); err != nil { return err } return nil } func parseTemplates(result *[]*api.Template, list *ast.ObjectList) error { for _, o := range list.Elem().Items { // Check for invalid keys valid := []string{ "change_mode", "change_signal", "data", "destination", "left_delimiter", "perms", "right_delimiter", "source", "splay", "env", "vault_grace", //COMPAT(0.12) not used; emits warning in 0.11. } if err := checkHCLKeys(o.Val, valid); err != nil { return err } var m map[string]interface{} if err := hcl.DecodeObject(&m, o.Val); err != nil { return err } templ := &api.Template{ ChangeMode: stringToPtr("restart"), Splay: timeToPtr(5 * time.Second), Perms: stringToPtr("0644"), } dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ DecodeHook: mapstructure.StringToTimeDurationHookFunc(), WeaklyTypedInput: true, Result: templ, }) if err != nil { return err } if err := dec.Decode(m); err != nil { return err } *result = append(*result, templ) } return nil } func parseTaskScalingPolicies(result *[]*api.ScalingPolicy, list *ast.ObjectList) error { if len(list.Items) == 0 { return nil } errPrefix := "scaling ->" // Go through each object and turn it into an actual result. seen := make(map[string]bool) for _, item := range list.Items { if l := len(item.Keys); l == 0 { return multierror.Prefix(fmt.Errorf("task scaling policy missing name"), errPrefix) } else if l > 1 { return multierror.Prefix(fmt.Errorf("task scaling policy should only have one name"), errPrefix) } n := item.Keys[0].Token.Value().(string) errPrefix = fmt.Sprintf("scaling[%v] ->", n) var policyType string switch strings.ToLower(n) { case "cpu": policyType = "vertical_cpu" case "mem": policyType = "vertical_mem" default: return multierror.Prefix(fmt.Errorf(`scaling policy name must be "cpu" or "mem"`), errPrefix) } // Make sure we haven't already found this if seen[n] { return multierror.Prefix(fmt.Errorf("scaling policy cannot be defined more than once"), errPrefix) } seen[n] = true p, err := parseScalingPolicy(item) if err != nil { return multierror.Prefix(err, errPrefix) } if p.Type == "" { p.Type = policyType } else if p.Type != policyType { return multierror.Prefix(fmt.Errorf("policy had invalid 'type': %q", p.Type), errPrefix) } *result = append(*result, p) } return nil } func parseResources(result *api.Resources, list *ast.ObjectList) error { list = list.Elem() if len(list.Items) == 0 { return nil } if len(list.Items) > 1 { return fmt.Errorf("only one 'resource' block allowed per task") } // Get our resource object o := list.Items[0] // We need this later var listVal *ast.ObjectList if ot, ok := o.Val.(*ast.ObjectType); ok { listVal = ot.List } else { return fmt.Errorf("resource: should be an object") } // Check for invalid keys valid := []string{ "cpu", "iops", // COMPAT(0.10): Remove after one release to allow it to be removed from jobspecs "disk", "memory", "network", "device", } if err := checkHCLKeys(listVal, valid); err != nil { return multierror.Prefix(err, "resources ->") } var m map[string]interface{} if err := hcl.DecodeObject(&m, o.Val); err != nil { return err } delete(m, "network") delete(m, "device") if err := mapstructure.WeakDecode(m, result); err != nil { return err } // Parse the network resources if o := listVal.Filter("network"); len(o.Items) > 0 { r, err := ParseNetwork(o) if err != nil { return fmt.Errorf("resource, %v", err) } result.Networks = []*api.NetworkResource{r} } // Parse the device resources if o := listVal.Filter("device"); len(o.Items) > 0 { result.Devices = make([]*api.RequestedDevice, len(o.Items)) for idx, do := range o.Items { if l := len(do.Keys); l == 0 { return multierror.Prefix(fmt.Errorf("missing device name"), fmt.Sprintf("resources, device[%d]->", idx)) } else if l > 1 { return multierror.Prefix(fmt.Errorf("only one name may be specified"), fmt.Sprintf("resources, device[%d]->", idx)) } name := do.Keys[0].Token.Value().(string) // Value should be an object var listVal *ast.ObjectList if ot, ok := do.Val.(*ast.ObjectType); ok { listVal = ot.List } else { return fmt.Errorf("device should be an object") } // Check for invalid keys valid := []string{ "name", "count", "affinity", "constraint", } if err := checkHCLKeys(do.Val, valid); err != nil { return multierror.Prefix(err, fmt.Sprintf("resources, device[%d]->", idx)) } // Set the name var r api.RequestedDevice r.Name = name var m map[string]interface{} if err := hcl.DecodeObject(&m, do.Val); err != nil { return err } delete(m, "constraint") delete(m, "affinity") if err := mapstructure.WeakDecode(m, &r); err != nil { return err } // Parse constraints if o := listVal.Filter("constraint"); len(o.Items) > 0 { if err := parseConstraints(&r.Constraints, o); err != nil { return multierror.Prefix(err, "constraint ->") } } // Parse affinities if o := listVal.Filter("affinity"); len(o.Items) > 0 { if err := parseAffinities(&r.Affinities, o); err != nil { return multierror.Prefix(err, "affinity ->") } } result.Devices[idx] = &r } } return nil } func parseVolumeMounts(out *[]*api.VolumeMount, list *ast.ObjectList) error { mounts := make([]*api.VolumeMount, len(list.Items)) for i, item := range list.Items { valid := []string{ "volume", "read_only", "destination", "propagation_mode", } if err := checkHCLKeys(item.Val, valid); err != nil { return err } var m map[string]interface{} if err := hcl.DecodeObject(&m, item.Val); err != nil { return err } var result api.VolumeMount dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ WeaklyTypedInput: true, Result: &result, }) if err != nil { return err } if err := dec.Decode(m); err != nil { return err } mounts[i] = &result } *out = mounts return nil }