diff --git a/api/resources.go b/api/resources.go index 610ea8a8e..c5fb8f49f 100644 --- a/api/resources.go +++ b/api/resources.go @@ -99,6 +99,7 @@ type NetworkResource struct { MBits *int ReservedPorts []Port DynamicPorts []Port + Services []*Service } func (n *NetworkResource) Canonicalize() { diff --git a/api/tasks.go b/api/tasks.go index e26c04015..e627dae09 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -372,6 +372,7 @@ type Service struct { AddressMode string `mapstructure:"address_mode"` Checks []ServiceCheck CheckRestart *CheckRestart `mapstructure:"check_restart"` + Connect *ConsulConnect } func (s *Service) Canonicalize(t *Task, tg *TaskGroup, job *Job) { @@ -392,6 +393,25 @@ func (s *Service) Canonicalize(t *Task, tg *TaskGroup, job *Job) { } } +type ConsulConnect struct { + SidecarService *ConsulSidecarService `mapstructure:"sidecar_service"` +} + +type ConsulSidecarService struct { + Port string + Proxy *ConsulProxy +} + +type ConsulProxy struct { + Upstreams []*ConsulUpstream +} + +type ConsulUpstream struct { + //FIXME Pointers? + DestinationName string `mapstructure:"destination_name"` + LocalBindPort int `mapstructure:"local_bind_port"` +} + // EphemeralDisk is an ephemeral disk object type EphemeralDisk struct { Sticky *bool @@ -495,6 +515,7 @@ type TaskGroup struct { Migrate *MigrateStrategy Networks []*NetworkResource Meta map[string]string + Services []*Service } // NewTaskGroup creates a new TaskGroup. diff --git a/client/allochealth/tracker.go b/client/allochealth/tracker.go index d9f943ccc..abc923648 100644 --- a/client/allochealth/tracker.go +++ b/client/allochealth/tracker.go @@ -238,7 +238,12 @@ func (t *Tracker) watchTaskEvents() { // Store the task states t.l.Lock() for task, state := range alloc.TaskStates { - t.taskHealth[task].state = state + //TODO(schmichael) for now skip unknown tasks as + //they're task group services which don't currently + //support checks anyway + if v, ok := t.taskHealth[task]; ok { + v.state = state + } } t.l.Unlock() @@ -355,7 +360,12 @@ OUTER: // Store the task registrations t.l.Lock() for task, reg := range allocReg.Tasks { - t.taskHealth[task].taskRegistrations = reg + //TODO(schmichael) for now skip unknown tasks as + //they're task group services which don't currently + //support checks anyway + if v, ok := t.taskHealth[task]; ok { + v.taskRegistrations = reg + } } t.l.Unlock() diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index afe4cbb98..a42ce63de 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -686,6 +686,7 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) { tg.Constraints = ApiConstraintsToStructs(taskGroup.Constraints) tg.Affinities = ApiAffinitiesToStructs(taskGroup.Affinities) tg.Networks = ApiNetworkResourceToStructs(taskGroup.Networks) + tg.Services = ApiServicesToStructs(taskGroup.Services) tg.RestartPolicy = &structs.RestartPolicy{ Attempts: *taskGroup.RestartPolicy.Attempts, @@ -926,6 +927,7 @@ func ApiNetworkResourceToStructs(in []*api.NetworkResource) []*structs.NetworkRe out[i].DynamicPorts[j] = structs.Port{ Label: dp.Label, Value: dp.Value, + To: dp.To, } } } @@ -936,6 +938,7 @@ func ApiNetworkResourceToStructs(in []*api.NetworkResource) []*structs.NetworkRe out[i].ReservedPorts[j] = structs.Port{ Label: rp.Label, Value: rp.Value, + To: rp.To, } } } @@ -944,6 +947,87 @@ func ApiNetworkResourceToStructs(in []*api.NetworkResource) []*structs.NetworkRe return out } +//TODO(schmichael) refactor and reuse in service parsing above +func ApiServicesToStructs(in []*api.Service) []*structs.Service { + if len(in) == 0 { + return nil + } + + out := make([]*structs.Service, len(in)) + for i, s := range in { + out[i] = &structs.Service{ + Name: s.Name, + PortLabel: s.PortLabel, + Tags: s.Tags, + CanaryTags: s.CanaryTags, + AddressMode: s.AddressMode, + } + + if l := len(s.Checks); l != 0 { + out[i].Checks = make([]*structs.ServiceCheck, l) + for j, check := range s.Checks { + out[i].Checks[j] = &structs.ServiceCheck{ + Name: check.Name, + Type: check.Type, + Command: check.Command, + Args: check.Args, + Path: check.Path, + Protocol: check.Protocol, + PortLabel: check.PortLabel, + AddressMode: check.AddressMode, + Interval: check.Interval, + Timeout: check.Timeout, + InitialStatus: check.InitialStatus, + TLSSkipVerify: check.TLSSkipVerify, + Header: check.Header, + Method: check.Method, + GRPCService: check.GRPCService, + GRPCUseTLS: check.GRPCUseTLS, + } + if check.CheckRestart != nil { + out[i].Checks[j].CheckRestart = &structs.CheckRestart{ + Limit: check.CheckRestart.Limit, + Grace: *check.CheckRestart.Grace, + IgnoreWarnings: check.CheckRestart.IgnoreWarnings, + } + } + } + } + + if s.Connect == nil { + continue + } + + out[i].Connect = &structs.ConsulConnect{} + + if s.Connect.SidecarService == nil { + continue + } + + out[i].Connect.SidecarService = &structs.ConsulSidecarService{ + Port: s.Connect.SidecarService.Port, + } + + if s.Connect.SidecarService.Proxy == nil { + continue + } + + out[i].Connect.SidecarService.Proxy = &structs.ConsulProxy{} + + upstreams := make([]*structs.ConsulUpstream, len(s.Connect.SidecarService.Proxy.Upstreams)) + for i, p := range s.Connect.SidecarService.Proxy.Upstreams { + upstreams[i] = &structs.ConsulUpstream{ + DestinationName: p.DestinationName, + LocalBindPort: p.LocalBindPort, + } + } + + out[i].Connect.SidecarService.Proxy.Upstreams = upstreams + } + + return out +} + func ApiConstraintsToStructs(in []*api.Constraint) []*structs.Constraint { if in == nil { return nil diff --git a/helper/funcs.go b/helper/funcs.go index 64d998aaf..7a6b4c151 100644 --- a/helper/funcs.go +++ b/helper/funcs.go @@ -190,6 +190,31 @@ func SliceSetDisjoint(first, second []string) (bool, []string) { return false, flattened } +// CompareSliceSetString returns true if the slices contain the same strings. +// Order is ignored. The slice may be copied but is never altered. The slice is +// assumed to be a set. Multiple instances of an entry are treated the same as +// a single instance. +func CompareSliceSetString(a, b []string) bool { + n := len(a) + if n != len(b) { + return false + } + + // Copy a into a map and compare b against it + amap := make(map[string]struct{}, n) + for i := range a { + amap[a[i]] = struct{}{} + } + + for i := range b { + if _, ok := amap[b[i]]; !ok { + return false + } + } + + return true +} + // CompareMapStringString returns true if the maps are equivalent. A nil and // empty map are considered not equal. func CompareMapStringString(a, b map[string]string) bool { diff --git a/helper/funcs_test.go b/helper/funcs_test.go index 774030be1..cff3e9c21 100644 --- a/helper/funcs_test.go +++ b/helper/funcs_test.go @@ -1,6 +1,7 @@ package helper import ( + "fmt" "reflect" "sort" "testing" @@ -21,6 +22,75 @@ func TestSliceStringIsSubset(t *testing.T) { } } +func TestCompareSliceSetString(t *testing.T) { + cases := []struct { + A []string + B []string + Result bool + }{ + { + A: []string{}, + B: []string{}, + Result: true, + }, + { + A: []string{}, + B: []string{"a"}, + Result: false, + }, + { + A: []string{"a"}, + B: []string{"a"}, + Result: true, + }, + { + A: []string{"a"}, + B: []string{"b"}, + Result: false, + }, + { + A: []string{"a", "b"}, + B: []string{"b"}, + Result: false, + }, + { + A: []string{"a", "b"}, + B: []string{"a"}, + Result: false, + }, + { + A: []string{"a", "b"}, + B: []string{"a", "b"}, + Result: true, + }, + { + A: []string{"a", "b"}, + B: []string{"b", "a"}, + Result: true, + }, + } + + for i, tc := range cases { + tc := tc + t.Run(fmt.Sprintf("case-%da", i), func(t *testing.T) { + if res := CompareSliceSetString(tc.A, tc.B); res != tc.Result { + t.Fatalf("expected %t but CompareSliceSetString(%v, %v) -> %t", + tc.Result, tc.A, tc.B, res, + ) + } + }) + + // Function is commutative so compare B and A + t.Run(fmt.Sprintf("case-%db", i), func(t *testing.T) { + if res := CompareSliceSetString(tc.B, tc.A); res != tc.Result { + t.Fatalf("expected %t but CompareSliceSetString(%v, %v) -> %t", + tc.Result, tc.B, tc.A, res, + ) + } + }) + } +} + func TestMapStringStringSliceValueSet(t *testing.T) { m := map[string][]string{ "foo": {"1", "2"}, diff --git a/jobspec/parse.go b/jobspec/parse.go index ca61bad84..c0a7dc608 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -315,6 +315,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error { "migrate", "spread", "network", + "service", } if err := helper.CheckHCLKeys(listVal, valid); err != nil { return multierror.Prefix(err, fmt.Sprintf("'%s' ->", n)) @@ -335,6 +336,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error { delete(m, "migrate") delete(m, "spread") delete(m, "network") + delete(m, "service") // Build the group with the basic decode var g api.TaskGroup @@ -448,6 +450,12 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error { } } + if o := listVal.Filter("service"); len(o.Items) > 0 { + if err := parseGroupServices(*result.Name, *g.Name, &g, o); err != nil { + return multierror.Prefix(err, fmt.Sprintf("'%s',", n)) + } + } + collection = append(collection, &g) } @@ -1202,6 +1210,83 @@ func parseTemplates(result *[]*api.Template, list *ast.ObjectList) error { return nil } +//TODO(schmichael) combine with non-group services +func parseGroupServices(jobName string, taskGroupName string, g *api.TaskGroup, serviceObjs *ast.ObjectList) error { + g.Services = make([]*api.Service, len(serviceObjs.Items)) + for idx, o := range serviceObjs.Items { + // Check for invalid keys + valid := []string{ + "name", + "tags", + "canary_tags", + "port", + "check", + "address_mode", + "check_restart", + "connect", + } + if err := helper.CheckHCLKeys(o.Val, valid); err != nil { + return multierror.Prefix(err, fmt.Sprintf("service (%d) ->", idx)) + } + + var service api.Service + var m map[string]interface{} + if err := hcl.DecodeObject(&m, o.Val); err != nil { + return err + } + + delete(m, "check") + delete(m, "check_restart") + delete(m, "connect") + + if err := mapstructure.WeakDecode(m, &service); err != nil { + return err + } + + // Filter checks + var checkList *ast.ObjectList + if ot, ok := o.Val.(*ast.ObjectType); ok { + checkList = ot.List + } else { + return fmt.Errorf("service '%s': should be an object", service.Name) + } + + if co := checkList.Filter("check"); len(co.Items) > 0 { + if err := parseChecks(&service, co); err != nil { + return multierror.Prefix(err, fmt.Sprintf("service: '%s',", service.Name)) + } + } + + // Filter check_restart + if cro := checkList.Filter("check_restart"); len(cro.Items) > 0 { + if len(cro.Items) > 1 { + return fmt.Errorf("check_restart '%s': cannot have more than 1 check_restart", service.Name) + } + if cr, err := parseCheckRestart(cro.Items[0]); err != nil { + return multierror.Prefix(err, fmt.Sprintf("service: '%s',", service.Name)) + } else { + service.CheckRestart = cr + } + } + + // Filter connect + if co := checkList.Filter("connect"); len(co.Items) > 0 { + if len(co.Items) > 1 { + return fmt.Errorf("connect '%s': cannot have more than 1 connect", service.Name) + } + if c, err := parseConnect(co.Items[0]); err != nil { + return multierror.Prefix(err, fmt.Sprintf("service: '%s',", service.Name)) + } else { + service.Connect = c + } + } + + g.Services[idx] = &service + } + + return nil +} + func parseServices(jobName string, taskGroupName string, task *api.Task, serviceObjs *ast.ObjectList) error { task.Services = make([]*api.Service, len(serviceObjs.Items)) for idx, o := range serviceObjs.Items { @@ -1398,6 +1483,162 @@ func parseCheckRestart(cro *ast.ObjectItem) (*api.CheckRestart, error) { return &checkRestart, nil } +func parseConnect(co *ast.ObjectItem) (*api.ConsulConnect, error) { + valid := []string{ + "sidecar_service", + } + + if err := helper.CheckHCLKeys(co.Val, valid); err != nil { + return nil, multierror.Prefix(err, "connect ->") + } + + var connect api.ConsulConnect + + var connectList *ast.ObjectList + if ot, ok := co.Val.(*ast.ObjectType); ok { + connectList = ot.List + } else { + return nil, fmt.Errorf("connect should be an object") + } + + // Parse the sidecar_service + o := connectList.Filter("sidecar_service") + if len(o.Items) == 0 { + return nil, nil + } + if len(o.Items) > 1 { + return nil, fmt.Errorf("only one 'sidecar_service' block allowed per task") + } + + r, err := parseSidecarService(o.Items[0]) + if err != nil { + return nil, fmt.Errorf("sidecar_service, %v", err) + } + connect.SidecarService = r + + return &connect, nil +} + +func parseSidecarService(o *ast.ObjectItem) (*api.ConsulSidecarService, error) { + valid := []string{ + "port", + "proxy", + } + + if err := helper.CheckHCLKeys(o.Val, valid); err != nil { + return nil, multierror.Prefix(err, "sidecar_service ->") + } + + var sidecar api.ConsulSidecarService + var m map[string]interface{} + if err := hcl.DecodeObject(&m, o.Val); err != nil { + return nil, err + } + + delete(m, "proxy") + + dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + DecodeHook: mapstructure.StringToTimeDurationHookFunc(), + WeaklyTypedInput: true, + Result: &sidecar, + }) + if err != nil { + return nil, err + } + if err := dec.Decode(m); err != nil { + return nil, fmt.Errorf("foo: %v", err) + } + + var proxyList *ast.ObjectList + if ot, ok := o.Val.(*ast.ObjectType); ok { + proxyList = ot.List + } else { + return nil, fmt.Errorf("sidecar_service: should be an object") + } + + // Parse the proxy + po := proxyList.Filter("proxy") + if len(po.Items) == 0 { + return &sidecar, nil + } + if len(po.Items) > 1 { + return nil, fmt.Errorf("only one 'proxy' block allowed per task") + } + + r, err := parseProxy(po.Items[0]) + if err != nil { + return nil, fmt.Errorf("proxy, %v", err) + } + sidecar.Proxy = r + + return &sidecar, nil +} + +func parseProxy(o *ast.ObjectItem) (*api.ConsulProxy, error) { + valid := []string{ + "upstreams", + } + + if err := helper.CheckHCLKeys(o.Val, valid); err != nil { + return nil, multierror.Prefix(err, "proxy ->") + } + + var proxy api.ConsulProxy + + var listVal *ast.ObjectList + if ot, ok := o.Val.(*ast.ObjectType); ok { + listVal = ot.List + } else { + return nil, fmt.Errorf("proxy: should be an object") + } + + // Parse the proxy + uo := listVal.Filter("upstreams") + proxy.Upstreams = make([]*api.ConsulUpstream, len(uo.Items)) + for i := range uo.Items { + u, err := parseUpstream(uo.Items[i]) + if err != nil { + return nil, err + } + + proxy.Upstreams[i] = u + } + + return &proxy, nil +} + +func parseUpstream(uo *ast.ObjectItem) (*api.ConsulUpstream, error) { + valid := []string{ + "destination_name", + "local_bind_port", + } + + if err := helper.CheckHCLKeys(uo.Val, valid); err != nil { + return nil, multierror.Prefix(err, "upstream ->") + } + + var upstream api.ConsulUpstream + var m map[string]interface{} + if err := hcl.DecodeObject(&m, uo.Val); err != nil { + return nil, err + } + + dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + DecodeHook: mapstructure.StringToTimeDurationHookFunc(), + WeaklyTypedInput: true, + Result: &upstream, + }) + if err != nil { + return nil, err + } + + if err := dec.Decode(m); err != nil { + return nil, err + } + + return &upstream, nil +} + func parseResources(result *api.Resources, list *ast.ObjectList) error { list = list.Elem() if len(list.Items) == 0 { diff --git a/nomad/structs/consul.go b/nomad/structs/consul.go new file mode 100644 index 000000000..a184039d5 --- /dev/null +++ b/nomad/structs/consul.go @@ -0,0 +1,109 @@ +package structs + +type ConsulConnect struct { + SidecarService *ConsulSidecarService +} + +func (c *ConsulConnect) Copy() *ConsulConnect { + return &ConsulConnect{ + SidecarService: c.SidecarService.Copy(), + } +} + +func (c *ConsulConnect) Equals(o *ConsulConnect) bool { + if c == nil || o == nil { + return c == o + } + + return c.SidecarService.Equals(o.SidecarService) +} + +func (c *ConsulConnect) HasSidecar() bool { + return c != nil && c.SidecarService != nil +} + +type ConsulSidecarService struct { + Port string + Proxy *ConsulProxy +} + +func (s *ConsulSidecarService) Copy() *ConsulSidecarService { + return &ConsulSidecarService{ + Port: s.Port, + Proxy: s.Proxy.Copy(), + } +} + +func (s *ConsulSidecarService) Equals(o *ConsulSidecarService) bool { + if s == nil || o == nil { + return s == o + } + + if s.Port != o.Port { + return false + } + + return s.Proxy.Equals(o.Proxy) +} + +type ConsulProxy struct { + Upstreams []*ConsulUpstream +} + +func (p *ConsulProxy) Copy() *ConsulProxy { + upstreams := make([]*ConsulUpstream, len(p.Upstreams)) + + for i := range p.Upstreams { + upstreams[i] = p.Upstreams[i].Copy() + } + + return &ConsulProxy{ + Upstreams: upstreams, + } +} + +func (p *ConsulProxy) Equals(o *ConsulProxy) bool { + if p == nil || o == nil { + return p == o + } + + if len(p.Upstreams) != len(o.Upstreams) { + return false + } + + // Order doesn't matter +OUTER: + for _, up := range p.Upstreams { + for _, innerUp := range o.Upstreams { + if up.Equals(innerUp) { + // Match; find next upstream + continue OUTER + } + } + + // No match + return false + } + + return true +} + +type ConsulUpstream struct { + DestinationName string + LocalBindPort int +} + +func (u *ConsulUpstream) Copy() *ConsulUpstream { + return &ConsulUpstream{ + DestinationName: u.DestinationName, + LocalBindPort: u.LocalBindPort, + } +} + +func (u *ConsulUpstream) Equals(o *ConsulUpstream) bool { + if u == nil || o == nil { + return u == o + } + + return (*u) == (*o) +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 5fe1814dc..adad12df9 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2071,6 +2071,7 @@ func (nr *NetworkResource) Equals(other *NetworkResource) bool { return false } } + return true } @@ -2148,12 +2149,22 @@ func (ns Networks) Port(label string) (string, int) { for _, n := range ns { for _, p := range n.ReservedPorts { if p.Label == label { - return n.IP, p.Value + //TODO(schmichael) this doesn't seem right + if p.Value == 0 { + return n.IP, p.To + } else { + return n.IP, p.Value + } } } for _, p := range n.DynamicPorts { if p.Label == label { - return n.IP, p.Value + //TODO(schmichael) this doesn't seem right + if p.Value == 0 { + return n.IP, p.To + } else { + return n.IP, p.Value + } } } } @@ -4646,6 +4657,9 @@ type TaskGroup struct { // Networks are the network configuration for the task group. This can be // overridden in the task. Networks Networks + + // Services this group provides + Services []*Service } func (tg *TaskGroup) Copy() *TaskGroup { @@ -4683,6 +4697,14 @@ func (tg *TaskGroup) Copy() *TaskGroup { if tg.EphemeralDisk != nil { ntg.EphemeralDisk = tg.EphemeralDisk.Copy() } + + if tg.Services != nil { + ntg.Services = make([]*Service, len(tg.Services)) + for i, s := range tg.Services { + ntg.Services[i] = s.Copy() + } + } + return ntg } @@ -4981,6 +5003,26 @@ func (c *CheckRestart) Copy() *CheckRestart { return nc } +func (c *CheckRestart) Equals(o *CheckRestart) bool { + if c == nil || o == nil { + return c == o + } + + if c.Limit != o.Limit { + return false + } + + if c.Grace != o.Grace { + return false + } + + if c.IgnoreWarnings != o.IgnoreWarnings { + return false + } + + return true +} + func (c *CheckRestart) Validate() error { if c == nil { return nil @@ -5048,6 +5090,83 @@ func (sc *ServiceCheck) Copy() *ServiceCheck { return nsc } +func (sc *ServiceCheck) Equals(o *ServiceCheck) bool { + if sc == nil || o == nil { + return sc == o + } + + if sc.Name != o.Name { + return false + } + + if sc.AddressMode != o.AddressMode { + return false + } + + if !helper.CompareSliceSetString(sc.Args, o.Args) { + return false + } + + if !sc.CheckRestart.Equals(o.CheckRestart) { + return false + } + + if sc.Command != o.Command { + return false + } + + if sc.GRPCService != o.GRPCService { + return false + } + + if sc.GRPCUseTLS != o.GRPCUseTLS { + return false + } + + // Use DeepEqual here as order of slice values could matter + if !reflect.DeepEqual(sc.Header, o.Header) { + return false + } + + if sc.InitialStatus != o.InitialStatus { + return false + } + + if sc.Interval != o.Interval { + return false + } + + if sc.Method != o.Method { + return false + } + + if sc.Path != o.Path { + return false + } + + if sc.PortLabel != o.Path { + return false + } + + if sc.Protocol != o.Protocol { + return false + } + + if sc.TLSSkipVerify != o.TLSSkipVerify { + return false + } + + if sc.Timeout != o.Timeout { + return false + } + + if sc.Type != o.Type { + return false + } + + return true +} + func (sc *ServiceCheck) Canonicalize(serviceName string) { // Ensure empty maps/slices are treated as null to avoid scheduling // issues when using DeepEquals. @@ -5224,6 +5343,7 @@ type Service struct { Tags []string // List of tags for the service CanaryTags []string // List of tags for the service when it is a canary Checks []*ServiceCheck // List of checks associated with the service + Connect *ConsulConnect // Consul Connect configuration } func (s *Service) Copy() *Service { @@ -5350,6 +5470,55 @@ func (s *Service) Hash(allocID, taskName string, canary bool) string { return b32.EncodeToString(h.Sum(nil)) } +func (s *Service) Equals(o *Service) bool { + if s == nil || o == nil { + return s == o + } + + if s.AddressMode != o.AddressMode { + return false + } + + if !helper.CompareSliceSetString(s.CanaryTags, o.CanaryTags) { + return false + } + + if len(s.Checks) != len(o.Checks) { + return false + } + +OUTER: + for i := range s.Checks { + for ii := range o.Checks { + if s.Checks[i].Equals(o.Checks[ii]) { + // Found match; continue with next check + continue OUTER + } + } + + // No match + return false + } + + if !s.Connect.Equals(o.Connect) { + return false + } + + if s.Name != o.Name { + return false + } + + if s.PortLabel != o.PortLabel { + return false + } + + if !helper.CompareSliceSetString(s.Tags, o.Tags) { + return false + } + + return true +} + const ( // DefaultKillTimeout is the default timeout between signaling a task it // will be killed and killing it.