Merge pull request #425 from hashicorp/f-service-block

Parsing logic for service block
This commit is contained in:
Diptanu Choudhury 2015-11-17 17:18:16 -08:00
commit 21e9c33dc6
8 changed files with 322 additions and 8 deletions

View file

@ -20,6 +20,28 @@ func NewRestartPolicy() *RestartPolicy {
}
}
// The ServiceCheck data model represents the consul health check that
// Nomad registers for a Task
type ServiceCheck struct {
Id string
Name string
Type string
Script string
Http string
Protocol string
Interval time.Duration
Timeout time.Duration
}
// The Service model represents a Consul service defintion
type Service struct {
Id string
Name string
Tags []string
PortLabel string `mapstructure:"port"`
Checks []ServiceCheck
}
// TaskGroup is the unit of scheduling.
type TaskGroup struct {
Name string
@ -68,6 +90,7 @@ type Task struct {
Config map[string]interface{}
Constraints []*Constraint
Env map[string]string
Services []Service
Resources *Resources
Meta map[string]string
}

View file

@ -128,6 +128,18 @@ job "example" {
}
}
service {
# name = redis
tags = ["global", "cache"]
port = "db"
check {
name = "alive"
type = "tcp"
interval = "10s"
timeout = "2s"
}
}
# We must specify the resources required for
# this task to ensure it runs on a machine with
# enough capacity.

View file

@ -144,7 +144,7 @@ func parseJob(result *structs.Job, list *ast.ObjectList) error {
// If we have tasks outside, create TaskGroups for them
if o := listVal.Filter("task"); len(o.Items) > 0 {
var tasks []*structs.Task
if err := parseTasks(&tasks, o); err != nil {
if err := parseTasks(result.Name, "", &tasks, o); err != nil {
return err
}
@ -247,7 +247,7 @@ func parseGroups(result *structs.Job, list *ast.ObjectList) error {
// Parse tasks
if o := listVal.Filter("task"); len(o.Items) > 0 {
if err := parseTasks(&g.Tasks, o); err != nil {
if err := parseTasks(result.Name, g.Name, &g.Tasks, o); err != nil {
return err
}
}
@ -346,7 +346,7 @@ func parseConstraints(result *[]*structs.Constraint, list *ast.ObjectList) error
return nil
}
func parseTasks(result *[]*structs.Task, list *ast.ObjectList) error {
func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, list *ast.ObjectList) error {
list = list.Children()
if len(list.Items) == 0 {
return nil
@ -378,12 +378,16 @@ func parseTasks(result *[]*structs.Task, list *ast.ObjectList) error {
delete(m, "config")
delete(m, "env")
delete(m, "constraint")
delete(m, "service")
delete(m, "meta")
delete(m, "resources")
// Build the task
var t structs.Task
t.Name = n
if taskGroupName == "" {
taskGroupName = n
}
if err := mapstructure.WeakDecode(m, &t); err != nil {
return err
}
@ -401,6 +405,12 @@ func parseTasks(result *[]*structs.Task, list *ast.ObjectList) error {
}
}
if o := listVal.Filter("service"); len(o.Items) > 0 {
if err := parseServices(jobName, taskGroupName, &t, o); err != nil {
return err
}
}
// If we have config, then parse that
if o := listVal.Filter("config"); len(o.Items) > 0 {
for _, o := range o.Elem().Items {
@ -452,6 +462,79 @@ func parseTasks(result *[]*structs.Task, list *ast.ObjectList) error {
return nil
}
func parseServices(jobName string, taskGroupName string, task *structs.Task, serviceObjs *ast.ObjectList) error {
task.Services = make([]structs.Service, len(serviceObjs.Items))
var defaultServiceName bool
for idx, o := range serviceObjs.Items {
var service structs.Service
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o.Val); err != nil {
return err
}
delete(m, "check")
if err := mapstructure.WeakDecode(m, &service); err != nil {
return err
}
if defaultServiceName && service.Name == "" {
return fmt.Errorf("Only one service block may omit the Name field")
}
if service.Name == "" {
defaultServiceName = true
service.Name = fmt.Sprintf("%s-%s-%s", jobName, taskGroupName, task.Name)
} else {
service.Name = fmt.Sprintf("%s-%s-%s-%s", jobName, taskGroupName, task.Name, service.Name)
}
// Fileter checks
var checkList *ast.ObjectList
if ot, ok := o.Val.(*ast.ObjectType); ok {
checkList = ot.List
} else {
return fmt.Errorf("service '%s': should be an object", service.Name)
}
if co := checkList.Filter("check"); len(co.Items) > 0 {
if err := parseChecks(&service, co); err != nil {
return err
}
}
task.Services[idx] = service
}
return nil
}
func parseChecks(service *structs.Service, checkObjs *ast.ObjectList) error {
service.Checks = make([]structs.ServiceCheck, len(checkObjs.Items))
for idx, co := range checkObjs.Items {
var check structs.ServiceCheck
var cm map[string]interface{}
if err := hcl.DecodeObject(&cm, co.Val); err != nil {
return err
}
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
WeaklyTypedInput: true,
Result: &check,
})
if err != nil {
return err
}
if err := dec.Decode(cm); err != nil {
return err
}
service.Checks[idx] = check
}
return nil
}
func parseResources(result *structs.Resources, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) == 0 {

View file

@ -94,6 +94,23 @@ func TestParse(t *testing.T) {
Config: map[string]interface{}{
"image": "hashicorp/binstore",
},
Services: []structs.Service{
{
Id: "",
Name: "binstore-storagelocker-binsl-binstore",
Tags: []string{"foo", "bar"},
PortLabel: "http",
Checks: []structs.ServiceCheck{
{
Id: "",
Name: "check-name",
Type: "tcp",
Interval: 10 * time.Second,
Timeout: 2 * time.Second,
},
},
},
},
Env: map[string]string{
"HELLO": "world",
"LOREM": "ipsum",
@ -301,7 +318,7 @@ func TestBadPorts(t *testing.T) {
func TestOverlappingPorts(t *testing.T) {
path, err := filepath.Abs(filepath.Join("./test-fixtures", "overlapping-ports.hcl"))
if err != nil {
t.Fatalf("Can't get absoluate path for file: %s", err)
t.Fatalf("Can't get absolute path for file: %s", err)
}
_, err = ParseFile(path)
@ -314,3 +331,20 @@ func TestOverlappingPorts(t *testing.T) {
t.Fatalf("Expected collision error; got %v", err)
}
}
func TestIncompleteServiceDefn(t *testing.T) {
path, err := filepath.Abs(filepath.Join("./test-fixtures", "incorrect-service-def.hcl"))
if err != nil {
t.Fatalf("Can't get absolute path for file: %s", err)
}
_, err = ParseFile(path)
if err == nil {
t.Fatalf("Expected an error")
}
if !strings.Contains(err.Error(), "Only one service block may omit the Name field") {
t.Fatalf("Expected collision error; got %v", err)
}
}

View file

@ -45,6 +45,16 @@ job "binstore-storagelocker" {
HELLO = "world"
LOREM = "ipsum"
}
service {
tags = ["foo", "bar"]
port = "http"
check {
name = "check-name"
type = "tcp"
interval = "10s"
timeout = "2s"
}
}
resources {
cpu = 500
memory = 128

View file

@ -0,0 +1,77 @@
job "binstore-storagelocker" {
region = "global"
type = "service"
priority = 50
all_at_once = true
datacenters = ["us2", "eu1"]
meta {
foo = "bar"
}
constraint {
attribute = "kernel.os"
value = "windows"
}
update {
stagger = "60s"
max_parallel = 2
}
task "outside" {
driver = "java"
config {
jar = "s3://my-cool-store/foo.jar"
}
meta {
my-cool-key = "foobar"
}
}
group "binsl" {
count = 5
restart {
attempts = 5
interval = "10m"
delay = "15s"
}
task "binstore" {
driver = "docker"
config {
image = "hashicorp/binstore"
}
env {
HELLO = "world"
LOREM = "ipsum"
}
service {
tags = ["foo", "bar"]
port = "http"
check {
name = "check-name"
type = "http"
interval = "10s"
timeout = "2s"
}
}
service {
port = "one"
}
resources {
cpu = 500
memory = 128
network {
mbits = "100"
port "one" {
static = 1
}
port "three" {
static = 3
}
port "http" {}
}
}
}
}

View file

@ -995,6 +995,60 @@ func (tg *TaskGroup) GoString() string {
return fmt.Sprintf("*%#v", *tg)
}
const (
ServiceCheckHTTP = "http"
ServiceCheckTCP = "tcp"
ServiceCheckDocker = "docker"
ServiceCheckScript = "script"
)
// The ServiceCheck data model represents the consul health check that
// Nomad registers for a Task
type ServiceCheck struct {
Id string // Id of the check, must be unique and it is autogenrated
Name string // Name of the check, defaults to id
Type string // Type of the check - tcp, http, docker and script
Script string // Script to invoke for script check
Http string // path of the health check url for http type check
Protocol string // Protocol to use if check is http, defaults to http
Interval time.Duration // Interval of the check
Timeout time.Duration // Timeout of the response from the check before consul fails the check
}
func (sc *ServiceCheck) Validate() error {
t := strings.ToLower(sc.Type)
if sc.Type == ServiceCheckHTTP && sc.Http == "" {
return fmt.Errorf("http checks needs the Http path information.")
}
if sc.Type == ServiceCheckScript && sc.Script == "" {
return fmt.Errorf("Script checks need the script to invoke")
}
if t != ServiceCheckTCP && t != ServiceCheckHTTP && t != ServiceCheckDocker && t != ServiceCheckScript {
return fmt.Errorf("Check with name %v has invalid check type: %s ", sc.Name, sc.Type)
}
return nil
}
// The Service model represents a Consul service defintion
type Service struct {
Id string // Id of the service, this needs to be unique on a local machine
Name string // Name of the service, defaults to id
Tags []string // List of tags for the service
PortLabel string `mapstructure:"port"` // port for the service
Checks []ServiceCheck // List of checks associated with the service
}
func (s *Service) Validate() error {
var mErr multierror.Error
for _, c := range s.Checks {
if err := c.Validate(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
return mErr.ErrorOrNil()
}
// Task is a single process typically that is executed as part of a task group.
type Task struct {
// Name of the task
@ -1009,6 +1063,9 @@ type Task struct {
// Map of environment variables to be used by the driver
Env map[string]string
// List of service definitions exposed by the Task
Services []Service
// Constraints can be specified at a task level and apply only to
// the particular task.
Constraints []*Constraint
@ -1132,6 +1189,12 @@ func (t *Task) Validate() error {
mErr.Errors = append(mErr.Errors, outer)
}
}
for _, service := range t.Services {
if err := service.Validate(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
return mErr.ErrorOrNil()
}

View file

@ -357,9 +357,21 @@ func TestEncodeDecode(t *testing.T) {
}
}
func TestBatchRestartPolicyValidate(t *testing.T) {
rp := RestartPolicy{Attempts: 10, Delay: 25 * time.Second}
if err := rp.Validate(); err != nil {
t.Fatalf("err: %v", err)
func TestInvalidServiceCheck(t *testing.T) {
s := Service{
Id: "service-id",
Name: "service-name",
PortLabel: "bar",
Checks: []ServiceCheck{
{
Id: "check-id",
Name: "check-name",
Type: "lol",
},
},
}
if err := s.Validate(); err == nil {
t.Fatalf("Service should be invalid")
}
}