connect: add group.service stanza support

This commit is contained in:
Michael Schurter 2019-06-24 08:29:26 -07:00 committed by Nick Ethier
parent 533b2850fc
commit fb487358fb
No known key found for this signature in database
GPG Key ID: 07C1A3ECED90D24A
9 changed files with 734 additions and 4 deletions

View File

@ -99,6 +99,7 @@ type NetworkResource struct {
MBits *int
ReservedPorts []Port
DynamicPorts []Port
Services []*Service
}
func (n *NetworkResource) Canonicalize() {

View File

@ -372,6 +372,7 @@ type Service struct {
AddressMode string `mapstructure:"address_mode"`
Checks []ServiceCheck
CheckRestart *CheckRestart `mapstructure:"check_restart"`
Connect *ConsulConnect
}
func (s *Service) Canonicalize(t *Task, tg *TaskGroup, job *Job) {
@ -392,6 +393,25 @@ func (s *Service) Canonicalize(t *Task, tg *TaskGroup, job *Job) {
}
}
type ConsulConnect struct {
SidecarService *ConsulSidecarService `mapstructure:"sidecar_service"`
}
type ConsulSidecarService struct {
Port string
Proxy *ConsulProxy
}
type ConsulProxy struct {
Upstreams []*ConsulUpstream
}
type ConsulUpstream struct {
//FIXME Pointers?
DestinationName string `mapstructure:"destination_name"`
LocalBindPort int `mapstructure:"local_bind_port"`
}
// EphemeralDisk is an ephemeral disk object
type EphemeralDisk struct {
Sticky *bool
@ -495,6 +515,7 @@ type TaskGroup struct {
Migrate *MigrateStrategy
Networks []*NetworkResource
Meta map[string]string
Services []*Service
}
// NewTaskGroup creates a new TaskGroup.

View File

@ -238,7 +238,12 @@ func (t *Tracker) watchTaskEvents() {
// Store the task states
t.l.Lock()
for task, state := range alloc.TaskStates {
t.taskHealth[task].state = state
//TODO(schmichael) for now skip unknown tasks as
//they're task group services which don't currently
//support checks anyway
if v, ok := t.taskHealth[task]; ok {
v.state = state
}
}
t.l.Unlock()
@ -355,7 +360,12 @@ OUTER:
// Store the task registrations
t.l.Lock()
for task, reg := range allocReg.Tasks {
t.taskHealth[task].taskRegistrations = reg
//TODO(schmichael) for now skip unknown tasks as
//they're task group services which don't currently
//support checks anyway
if v, ok := t.taskHealth[task]; ok {
v.taskRegistrations = reg
}
}
t.l.Unlock()

View File

@ -686,6 +686,7 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) {
tg.Constraints = ApiConstraintsToStructs(taskGroup.Constraints)
tg.Affinities = ApiAffinitiesToStructs(taskGroup.Affinities)
tg.Networks = ApiNetworkResourceToStructs(taskGroup.Networks)
tg.Services = ApiServicesToStructs(taskGroup.Services)
tg.RestartPolicy = &structs.RestartPolicy{
Attempts: *taskGroup.RestartPolicy.Attempts,
@ -926,6 +927,7 @@ func ApiNetworkResourceToStructs(in []*api.NetworkResource) []*structs.NetworkRe
out[i].DynamicPorts[j] = structs.Port{
Label: dp.Label,
Value: dp.Value,
To: dp.To,
}
}
}
@ -936,6 +938,7 @@ func ApiNetworkResourceToStructs(in []*api.NetworkResource) []*structs.NetworkRe
out[i].ReservedPorts[j] = structs.Port{
Label: rp.Label,
Value: rp.Value,
To: rp.To,
}
}
}
@ -944,6 +947,87 @@ func ApiNetworkResourceToStructs(in []*api.NetworkResource) []*structs.NetworkRe
return out
}
//TODO(schmichael) refactor and reuse in service parsing above
func ApiServicesToStructs(in []*api.Service) []*structs.Service {
if len(in) == 0 {
return nil
}
out := make([]*structs.Service, len(in))
for i, s := range in {
out[i] = &structs.Service{
Name: s.Name,
PortLabel: s.PortLabel,
Tags: s.Tags,
CanaryTags: s.CanaryTags,
AddressMode: s.AddressMode,
}
if l := len(s.Checks); l != 0 {
out[i].Checks = make([]*structs.ServiceCheck, l)
for j, check := range s.Checks {
out[i].Checks[j] = &structs.ServiceCheck{
Name: check.Name,
Type: check.Type,
Command: check.Command,
Args: check.Args,
Path: check.Path,
Protocol: check.Protocol,
PortLabel: check.PortLabel,
AddressMode: check.AddressMode,
Interval: check.Interval,
Timeout: check.Timeout,
InitialStatus: check.InitialStatus,
TLSSkipVerify: check.TLSSkipVerify,
Header: check.Header,
Method: check.Method,
GRPCService: check.GRPCService,
GRPCUseTLS: check.GRPCUseTLS,
}
if check.CheckRestart != nil {
out[i].Checks[j].CheckRestart = &structs.CheckRestart{
Limit: check.CheckRestart.Limit,
Grace: *check.CheckRestart.Grace,
IgnoreWarnings: check.CheckRestart.IgnoreWarnings,
}
}
}
}
if s.Connect == nil {
continue
}
out[i].Connect = &structs.ConsulConnect{}
if s.Connect.SidecarService == nil {
continue
}
out[i].Connect.SidecarService = &structs.ConsulSidecarService{
Port: s.Connect.SidecarService.Port,
}
if s.Connect.SidecarService.Proxy == nil {
continue
}
out[i].Connect.SidecarService.Proxy = &structs.ConsulProxy{}
upstreams := make([]*structs.ConsulUpstream, len(s.Connect.SidecarService.Proxy.Upstreams))
for i, p := range s.Connect.SidecarService.Proxy.Upstreams {
upstreams[i] = &structs.ConsulUpstream{
DestinationName: p.DestinationName,
LocalBindPort: p.LocalBindPort,
}
}
out[i].Connect.SidecarService.Proxy.Upstreams = upstreams
}
return out
}
func ApiConstraintsToStructs(in []*api.Constraint) []*structs.Constraint {
if in == nil {
return nil

View File

@ -190,6 +190,31 @@ func SliceSetDisjoint(first, second []string) (bool, []string) {
return false, flattened
}
// CompareSliceSetString returns true if the slices contain the same strings.
// Order is ignored. The slice may be copied but is never altered. The slice is
// assumed to be a set. Multiple instances of an entry are treated the same as
// a single instance.
func CompareSliceSetString(a, b []string) bool {
n := len(a)
if n != len(b) {
return false
}
// Copy a into a map and compare b against it
amap := make(map[string]struct{}, n)
for i := range a {
amap[a[i]] = struct{}{}
}
for i := range b {
if _, ok := amap[b[i]]; !ok {
return false
}
}
return true
}
// CompareMapStringString returns true if the maps are equivalent. A nil and
// empty map are considered not equal.
func CompareMapStringString(a, b map[string]string) bool {

View File

@ -1,6 +1,7 @@
package helper
import (
"fmt"
"reflect"
"sort"
"testing"
@ -21,6 +22,75 @@ func TestSliceStringIsSubset(t *testing.T) {
}
}
func TestCompareSliceSetString(t *testing.T) {
cases := []struct {
A []string
B []string
Result bool
}{
{
A: []string{},
B: []string{},
Result: true,
},
{
A: []string{},
B: []string{"a"},
Result: false,
},
{
A: []string{"a"},
B: []string{"a"},
Result: true,
},
{
A: []string{"a"},
B: []string{"b"},
Result: false,
},
{
A: []string{"a", "b"},
B: []string{"b"},
Result: false,
},
{
A: []string{"a", "b"},
B: []string{"a"},
Result: false,
},
{
A: []string{"a", "b"},
B: []string{"a", "b"},
Result: true,
},
{
A: []string{"a", "b"},
B: []string{"b", "a"},
Result: true,
},
}
for i, tc := range cases {
tc := tc
t.Run(fmt.Sprintf("case-%da", i), func(t *testing.T) {
if res := CompareSliceSetString(tc.A, tc.B); res != tc.Result {
t.Fatalf("expected %t but CompareSliceSetString(%v, %v) -> %t",
tc.Result, tc.A, tc.B, res,
)
}
})
// Function is commutative so compare B and A
t.Run(fmt.Sprintf("case-%db", i), func(t *testing.T) {
if res := CompareSliceSetString(tc.B, tc.A); res != tc.Result {
t.Fatalf("expected %t but CompareSliceSetString(%v, %v) -> %t",
tc.Result, tc.B, tc.A, res,
)
}
})
}
}
func TestMapStringStringSliceValueSet(t *testing.T) {
m := map[string][]string{
"foo": {"1", "2"},

View File

@ -315,6 +315,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
"migrate",
"spread",
"network",
"service",
}
if err := helper.CheckHCLKeys(listVal, valid); err != nil {
return multierror.Prefix(err, fmt.Sprintf("'%s' ->", n))
@ -335,6 +336,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
delete(m, "migrate")
delete(m, "spread")
delete(m, "network")
delete(m, "service")
// Build the group with the basic decode
var g api.TaskGroup
@ -448,6 +450,12 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
}
}
if o := listVal.Filter("service"); len(o.Items) > 0 {
if err := parseGroupServices(*result.Name, *g.Name, &g, o); err != nil {
return multierror.Prefix(err, fmt.Sprintf("'%s',", n))
}
}
collection = append(collection, &g)
}
@ -1202,6 +1210,83 @@ func parseTemplates(result *[]*api.Template, list *ast.ObjectList) error {
return nil
}
//TODO(schmichael) combine with non-group services
func parseGroupServices(jobName string, taskGroupName string, g *api.TaskGroup, serviceObjs *ast.ObjectList) error {
g.Services = make([]*api.Service, len(serviceObjs.Items))
for idx, o := range serviceObjs.Items {
// Check for invalid keys
valid := []string{
"name",
"tags",
"canary_tags",
"port",
"check",
"address_mode",
"check_restart",
"connect",
}
if err := helper.CheckHCLKeys(o.Val, valid); err != nil {
return multierror.Prefix(err, fmt.Sprintf("service (%d) ->", idx))
}
var service api.Service
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o.Val); err != nil {
return err
}
delete(m, "check")
delete(m, "check_restart")
delete(m, "connect")
if err := mapstructure.WeakDecode(m, &service); err != nil {
return err
}
// Filter checks
var checkList *ast.ObjectList
if ot, ok := o.Val.(*ast.ObjectType); ok {
checkList = ot.List
} else {
return fmt.Errorf("service '%s': should be an object", service.Name)
}
if co := checkList.Filter("check"); len(co.Items) > 0 {
if err := parseChecks(&service, co); err != nil {
return multierror.Prefix(err, fmt.Sprintf("service: '%s',", service.Name))
}
}
// Filter check_restart
if cro := checkList.Filter("check_restart"); len(cro.Items) > 0 {
if len(cro.Items) > 1 {
return fmt.Errorf("check_restart '%s': cannot have more than 1 check_restart", service.Name)
}
if cr, err := parseCheckRestart(cro.Items[0]); err != nil {
return multierror.Prefix(err, fmt.Sprintf("service: '%s',", service.Name))
} else {
service.CheckRestart = cr
}
}
// Filter connect
if co := checkList.Filter("connect"); len(co.Items) > 0 {
if len(co.Items) > 1 {
return fmt.Errorf("connect '%s': cannot have more than 1 connect", service.Name)
}
if c, err := parseConnect(co.Items[0]); err != nil {
return multierror.Prefix(err, fmt.Sprintf("service: '%s',", service.Name))
} else {
service.Connect = c
}
}
g.Services[idx] = &service
}
return nil
}
func parseServices(jobName string, taskGroupName string, task *api.Task, serviceObjs *ast.ObjectList) error {
task.Services = make([]*api.Service, len(serviceObjs.Items))
for idx, o := range serviceObjs.Items {
@ -1398,6 +1483,162 @@ func parseCheckRestart(cro *ast.ObjectItem) (*api.CheckRestart, error) {
return &checkRestart, nil
}
func parseConnect(co *ast.ObjectItem) (*api.ConsulConnect, error) {
valid := []string{
"sidecar_service",
}
if err := helper.CheckHCLKeys(co.Val, valid); err != nil {
return nil, multierror.Prefix(err, "connect ->")
}
var connect api.ConsulConnect
var connectList *ast.ObjectList
if ot, ok := co.Val.(*ast.ObjectType); ok {
connectList = ot.List
} else {
return nil, fmt.Errorf("connect should be an object")
}
// Parse the sidecar_service
o := connectList.Filter("sidecar_service")
if len(o.Items) == 0 {
return nil, nil
}
if len(o.Items) > 1 {
return nil, fmt.Errorf("only one 'sidecar_service' block allowed per task")
}
r, err := parseSidecarService(o.Items[0])
if err != nil {
return nil, fmt.Errorf("sidecar_service, %v", err)
}
connect.SidecarService = r
return &connect, nil
}
func parseSidecarService(o *ast.ObjectItem) (*api.ConsulSidecarService, error) {
valid := []string{
"port",
"proxy",
}
if err := helper.CheckHCLKeys(o.Val, valid); err != nil {
return nil, multierror.Prefix(err, "sidecar_service ->")
}
var sidecar api.ConsulSidecarService
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o.Val); err != nil {
return nil, err
}
delete(m, "proxy")
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
WeaklyTypedInput: true,
Result: &sidecar,
})
if err != nil {
return nil, err
}
if err := dec.Decode(m); err != nil {
return nil, fmt.Errorf("foo: %v", err)
}
var proxyList *ast.ObjectList
if ot, ok := o.Val.(*ast.ObjectType); ok {
proxyList = ot.List
} else {
return nil, fmt.Errorf("sidecar_service: should be an object")
}
// Parse the proxy
po := proxyList.Filter("proxy")
if len(po.Items) == 0 {
return &sidecar, nil
}
if len(po.Items) > 1 {
return nil, fmt.Errorf("only one 'proxy' block allowed per task")
}
r, err := parseProxy(po.Items[0])
if err != nil {
return nil, fmt.Errorf("proxy, %v", err)
}
sidecar.Proxy = r
return &sidecar, nil
}
func parseProxy(o *ast.ObjectItem) (*api.ConsulProxy, error) {
valid := []string{
"upstreams",
}
if err := helper.CheckHCLKeys(o.Val, valid); err != nil {
return nil, multierror.Prefix(err, "proxy ->")
}
var proxy api.ConsulProxy
var listVal *ast.ObjectList
if ot, ok := o.Val.(*ast.ObjectType); ok {
listVal = ot.List
} else {
return nil, fmt.Errorf("proxy: should be an object")
}
// Parse the proxy
uo := listVal.Filter("upstreams")
proxy.Upstreams = make([]*api.ConsulUpstream, len(uo.Items))
for i := range uo.Items {
u, err := parseUpstream(uo.Items[i])
if err != nil {
return nil, err
}
proxy.Upstreams[i] = u
}
return &proxy, nil
}
func parseUpstream(uo *ast.ObjectItem) (*api.ConsulUpstream, error) {
valid := []string{
"destination_name",
"local_bind_port",
}
if err := helper.CheckHCLKeys(uo.Val, valid); err != nil {
return nil, multierror.Prefix(err, "upstream ->")
}
var upstream api.ConsulUpstream
var m map[string]interface{}
if err := hcl.DecodeObject(&m, uo.Val); err != nil {
return nil, err
}
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
WeaklyTypedInput: true,
Result: &upstream,
})
if err != nil {
return nil, err
}
if err := dec.Decode(m); err != nil {
return nil, err
}
return &upstream, nil
}
func parseResources(result *api.Resources, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) == 0 {

109
nomad/structs/consul.go Normal file
View File

@ -0,0 +1,109 @@
package structs
type ConsulConnect struct {
SidecarService *ConsulSidecarService
}
func (c *ConsulConnect) Copy() *ConsulConnect {
return &ConsulConnect{
SidecarService: c.SidecarService.Copy(),
}
}
func (c *ConsulConnect) Equals(o *ConsulConnect) bool {
if c == nil || o == nil {
return c == o
}
return c.SidecarService.Equals(o.SidecarService)
}
func (c *ConsulConnect) HasSidecar() bool {
return c != nil && c.SidecarService != nil
}
type ConsulSidecarService struct {
Port string
Proxy *ConsulProxy
}
func (s *ConsulSidecarService) Copy() *ConsulSidecarService {
return &ConsulSidecarService{
Port: s.Port,
Proxy: s.Proxy.Copy(),
}
}
func (s *ConsulSidecarService) Equals(o *ConsulSidecarService) bool {
if s == nil || o == nil {
return s == o
}
if s.Port != o.Port {
return false
}
return s.Proxy.Equals(o.Proxy)
}
type ConsulProxy struct {
Upstreams []*ConsulUpstream
}
func (p *ConsulProxy) Copy() *ConsulProxy {
upstreams := make([]*ConsulUpstream, len(p.Upstreams))
for i := range p.Upstreams {
upstreams[i] = p.Upstreams[i].Copy()
}
return &ConsulProxy{
Upstreams: upstreams,
}
}
func (p *ConsulProxy) Equals(o *ConsulProxy) bool {
if p == nil || o == nil {
return p == o
}
if len(p.Upstreams) != len(o.Upstreams) {
return false
}
// Order doesn't matter
OUTER:
for _, up := range p.Upstreams {
for _, innerUp := range o.Upstreams {
if up.Equals(innerUp) {
// Match; find next upstream
continue OUTER
}
}
// No match
return false
}
return true
}
type ConsulUpstream struct {
DestinationName string
LocalBindPort int
}
func (u *ConsulUpstream) Copy() *ConsulUpstream {
return &ConsulUpstream{
DestinationName: u.DestinationName,
LocalBindPort: u.LocalBindPort,
}
}
func (u *ConsulUpstream) Equals(o *ConsulUpstream) bool {
if u == nil || o == nil {
return u == o
}
return (*u) == (*o)
}

View File

@ -2071,6 +2071,7 @@ func (nr *NetworkResource) Equals(other *NetworkResource) bool {
return false
}
}
return true
}
@ -2148,12 +2149,22 @@ func (ns Networks) Port(label string) (string, int) {
for _, n := range ns {
for _, p := range n.ReservedPorts {
if p.Label == label {
return n.IP, p.Value
//TODO(schmichael) this doesn't seem right
if p.Value == 0 {
return n.IP, p.To
} else {
return n.IP, p.Value
}
}
}
for _, p := range n.DynamicPorts {
if p.Label == label {
return n.IP, p.Value
//TODO(schmichael) this doesn't seem right
if p.Value == 0 {
return n.IP, p.To
} else {
return n.IP, p.Value
}
}
}
}
@ -4646,6 +4657,9 @@ type TaskGroup struct {
// Networks are the network configuration for the task group. This can be
// overridden in the task.
Networks Networks
// Services this group provides
Services []*Service
}
func (tg *TaskGroup) Copy() *TaskGroup {
@ -4683,6 +4697,14 @@ func (tg *TaskGroup) Copy() *TaskGroup {
if tg.EphemeralDisk != nil {
ntg.EphemeralDisk = tg.EphemeralDisk.Copy()
}
if tg.Services != nil {
ntg.Services = make([]*Service, len(tg.Services))
for i, s := range tg.Services {
ntg.Services[i] = s.Copy()
}
}
return ntg
}
@ -4981,6 +5003,26 @@ func (c *CheckRestart) Copy() *CheckRestart {
return nc
}
func (c *CheckRestart) Equals(o *CheckRestart) bool {
if c == nil || o == nil {
return c == o
}
if c.Limit != o.Limit {
return false
}
if c.Grace != o.Grace {
return false
}
if c.IgnoreWarnings != o.IgnoreWarnings {
return false
}
return true
}
func (c *CheckRestart) Validate() error {
if c == nil {
return nil
@ -5048,6 +5090,83 @@ func (sc *ServiceCheck) Copy() *ServiceCheck {
return nsc
}
func (sc *ServiceCheck) Equals(o *ServiceCheck) bool {
if sc == nil || o == nil {
return sc == o
}
if sc.Name != o.Name {
return false
}
if sc.AddressMode != o.AddressMode {
return false
}
if !helper.CompareSliceSetString(sc.Args, o.Args) {
return false
}
if !sc.CheckRestart.Equals(o.CheckRestart) {
return false
}
if sc.Command != o.Command {
return false
}
if sc.GRPCService != o.GRPCService {
return false
}
if sc.GRPCUseTLS != o.GRPCUseTLS {
return false
}
// Use DeepEqual here as order of slice values could matter
if !reflect.DeepEqual(sc.Header, o.Header) {
return false
}
if sc.InitialStatus != o.InitialStatus {
return false
}
if sc.Interval != o.Interval {
return false
}
if sc.Method != o.Method {
return false
}
if sc.Path != o.Path {
return false
}
if sc.PortLabel != o.Path {
return false
}
if sc.Protocol != o.Protocol {
return false
}
if sc.TLSSkipVerify != o.TLSSkipVerify {
return false
}
if sc.Timeout != o.Timeout {
return false
}
if sc.Type != o.Type {
return false
}
return true
}
func (sc *ServiceCheck) Canonicalize(serviceName string) {
// Ensure empty maps/slices are treated as null to avoid scheduling
// issues when using DeepEquals.
@ -5224,6 +5343,7 @@ type Service struct {
Tags []string // List of tags for the service
CanaryTags []string // List of tags for the service when it is a canary
Checks []*ServiceCheck // List of checks associated with the service
Connect *ConsulConnect // Consul Connect configuration
}
func (s *Service) Copy() *Service {
@ -5350,6 +5470,55 @@ func (s *Service) Hash(allocID, taskName string, canary bool) string {
return b32.EncodeToString(h.Sum(nil))
}
func (s *Service) Equals(o *Service) bool {
if s == nil || o == nil {
return s == o
}
if s.AddressMode != o.AddressMode {
return false
}
if !helper.CompareSliceSetString(s.CanaryTags, o.CanaryTags) {
return false
}
if len(s.Checks) != len(o.Checks) {
return false
}
OUTER:
for i := range s.Checks {
for ii := range o.Checks {
if s.Checks[i].Equals(o.Checks[ii]) {
// Found match; continue with next check
continue OUTER
}
}
// No match
return false
}
if !s.Connect.Equals(o.Connect) {
return false
}
if s.Name != o.Name {
return false
}
if s.PortLabel != o.PortLabel {
return false
}
if !helper.CompareSliceSetString(s.Tags, o.Tags) {
return false
}
return true
}
const (
// DefaultKillTimeout is the default timeout between signaling a task it
// will be killed and killing it.