Add Namespace, Job and Group to envoy stats (#14311)

This commit is contained in:
Jorge Marey 2022-09-22 16:38:21 +02:00 committed by GitHub
parent e144d2bd00
commit 584ddfe859
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 308 additions and 214 deletions

3
.changelog/14311.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
connect: add namespace, job, and group to Envoy stats
```

View File

@ -153,7 +153,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
newNetworkHook(hookLogger, ns, alloc, nm, nc, ar, builtTaskEnv),
newGroupServiceHook(groupServiceHookConfig{
alloc: alloc,
namespace: alloc.ServiceProviderNamespace(),
providerNamespace: alloc.ServiceProviderNamespace(),
serviceRegWrapper: ar.serviceRegWrapper,
restarter: ar,
taskEnvBuilder: envBuilder,

View File

@ -24,15 +24,16 @@ type groupServiceHook struct {
allocID string
jobID string
group string
namespace string
restarter serviceregistration.WorkloadRestarter
prerun bool
deregistered bool
networkStatus structs.NetworkStatus
shutdownDelayCtx context.Context
// namespace is the Nomad or Consul namespace in which service
// providerNamespace is the Nomad or Consul namespace in which service
// registrations will be made. This field may be updated.
namespace string
providerNamespace string
// serviceRegWrapper is the handler wrapper that is used to perform service
// and check registration and deregistration.
@ -61,9 +62,9 @@ type groupServiceHookConfig struct {
shutdownDelayCtx context.Context
logger log.Logger
// namespace is the Nomad or Consul namespace in which service
// providerNamespace is the Nomad or Consul namespace in which service
// registrations will be made.
namespace string
providerNamespace string
// serviceRegWrapper is the handler wrapper that is used to perform service
// and check registration and deregistration.
@ -82,8 +83,9 @@ func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook {
allocID: cfg.alloc.ID,
jobID: cfg.alloc.JobID,
group: cfg.alloc.TaskGroup,
namespace: cfg.alloc.Namespace,
restarter: cfg.restarter,
namespace: cfg.namespace,
providerNamespace: cfg.providerNamespace,
taskEnvBuilder: cfg.taskEnvBuilder,
delay: shutdownDelay,
networkStatus: cfg.networkStatus,
@ -161,7 +163,7 @@ func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
// An update may change the service provider, therefore we need to account
// for how namespaces work across providers also.
h.namespace = req.Alloc.ServiceProviderNamespace()
h.providerNamespace = req.Alloc.ServiceProviderNamespace()
// Create new task services struct with those new values
newWorkloadServices := h.getWorkloadServices()
@ -244,12 +246,17 @@ func (h *groupServiceHook) getWorkloadServices() *serviceregistration.WorkloadSe
netStatus = h.networkStatus.NetworkStatus()
}
// Create task services struct with request's driver metadata
return &serviceregistration.WorkloadServices{
info := structs.AllocInfo{
AllocID: h.allocID,
JobID: h.jobID,
Group: h.group,
Namespace: h.namespace,
}
// Create task services struct with request's driver metadata
return &serviceregistration.WorkloadServices{
AllocInfo: info,
ProviderNamespace: h.providerNamespace,
Restarter: h.restarter,
Services: interpolatedServices,
Networks: h.networks,

View File

@ -31,7 +31,7 @@ type serviceHookConfig struct {
// namespace is the Nomad or Consul namespace in which service
// registrations will be made.
namespace string
providerNamespace string
// serviceRegWrapper is the handler wrapper that is used to perform service
// and check registration and deregistration.
@ -48,6 +48,7 @@ type serviceHook struct {
jobID string
groupName string
taskName string
namespace string
restarter serviceregistration.WorkloadRestarter
logger log.Logger
@ -60,9 +61,9 @@ type serviceHook struct {
ports structs.AllocatedPorts
taskEnv *taskenv.TaskEnv
// namespace is the Nomad or Consul namespace in which service
// providerNamespace is the Nomad or Consul namespace in which service
// registrations will be made. This field may be updated.
namespace string
providerNamespace string
// serviceRegWrapper is the handler wrapper that is used to perform service
// and check registration and deregistration.
@ -87,7 +88,8 @@ func newServiceHook(c serviceHookConfig) *serviceHook {
jobID: c.alloc.JobID,
groupName: c.alloc.TaskGroup,
taskName: c.task.Name,
namespace: c.namespace,
namespace: c.alloc.Namespace,
providerNamespace: c.providerNamespace,
serviceRegWrapper: c.serviceRegWrapper,
services: c.task.Services,
restarter: c.restarter,
@ -176,7 +178,7 @@ func (h *serviceHook) updateHookFields(req *interfaces.TaskUpdateRequest) error
// An update may change the service provider, therefore we need to account
// for how namespaces work across providers also.
h.namespace = req.Alloc.ServiceProviderNamespace()
h.providerNamespace = req.Alloc.ServiceProviderNamespace()
return nil
}
@ -219,13 +221,17 @@ func (h *serviceHook) getWorkloadServices() *serviceregistration.WorkloadService
// Interpolate with the task's environment
interpolatedServices := taskenv.InterpolateServices(h.taskEnv, h.services)
// Create task services struct with request's driver metadata
return &serviceregistration.WorkloadServices{
info := structs.AllocInfo{
AllocID: h.allocID,
JobID: h.jobID,
Group: h.groupName,
Task: h.taskName,
Namespace: h.namespace,
}
// Create task services struct with request's driver metadata
return &serviceregistration.WorkloadServices{
AllocInfo: info,
ProviderNamespace: h.providerNamespace,
Restarter: h.restarter,
Services: interpolatedServices,
DriverExec: h.driverExec,

View File

@ -177,7 +177,7 @@ func Test_serviceHook_Nomad(t *testing.T) {
h := newServiceHook(serviceHookConfig{
alloc: alloc,
task: alloc.LookupTask("web"),
namespace: "default",
providerNamespace: "default",
serviceRegWrapper: regWrapper,
restarter: agentconsul.NoopRestarter(),
logger: logger,

View File

@ -123,7 +123,7 @@ func (tr *TaskRunner) initHooks() {
tr.runnerHooks = append(tr.runnerHooks, newServiceHook(serviceHookConfig{
alloc: tr.Alloc(),
task: tr.Task(),
namespace: serviceProviderNamespace,
providerNamespace: serviceProviderNamespace,
serviceRegWrapper: tr.serviceRegWrapper,
restarter: tr,
logger: hookLogger,

View File

@ -42,10 +42,10 @@ func (h *ServiceRegistrationHandler) RegisterWorkload(services *serviceregistrat
h.mu.Lock()
defer h.mu.Unlock()
h.log.Trace("RegisterWorkload", "alloc_id", services.AllocID,
h.log.Trace("RegisterWorkload", "alloc_id", services.AllocInfo.AllocID,
"name", services.Name(), "services", len(services.Services))
h.ops = append(h.ops, newOperation("add", services.AllocID, services.Name()))
h.ops = append(h.ops, newOperation("add", services.AllocInfo.AllocID, services.Name()))
return nil
}
@ -53,20 +53,20 @@ func (h *ServiceRegistrationHandler) RemoveWorkload(services *serviceregistratio
h.mu.Lock()
defer h.mu.Unlock()
h.log.Trace("RemoveWorkload", "alloc_id", services.AllocID,
h.log.Trace("RemoveWorkload", "alloc_id", services.AllocInfo.AllocID,
"name", services.Name(), "services", len(services.Services))
h.ops = append(h.ops, newOperation("remove", services.AllocID, services.Name()))
h.ops = append(h.ops, newOperation("remove", services.AllocInfo.AllocID, services.Name()))
}
func (h *ServiceRegistrationHandler) UpdateWorkload(old, newServices *serviceregistration.WorkloadServices) error {
h.mu.Lock()
defer h.mu.Unlock()
h.log.Trace("UpdateWorkload", "alloc_id", newServices.AllocID, "name", newServices.Name(),
h.log.Trace("UpdateWorkload", "alloc_id", newServices.AllocInfo.AllocID, "name", newServices.Name(),
"old_services", len(old.Services), "new_services", len(newServices.Services))
h.ops = append(h.ops, newOperation("update", newServices.AllocID, newServices.Name()))
h.ops = append(h.ops, newOperation("update", newServices.AllocInfo.AllocID, newServices.Name()))
return nil
}

View File

@ -110,8 +110,8 @@ func (s *ServiceRegistrationHandler) RegisterWorkload(workload *serviceregistrat
for _, service := range workload.Services {
for _, check := range service.Checks {
if check.TriggersRestarts() {
checkID := string(structs.NomadCheckID(workload.AllocID, workload.Group, check))
s.checkWatcher.Watch(workload.AllocID, workload.Name(), checkID, check, workload.Restarter)
checkID := string(structs.NomadCheckID(workload.AllocInfo.AllocID, workload.AllocInfo.Group, check))
s.checkWatcher.Watch(workload.AllocInfo.AllocID, workload.Name(), checkID, check, workload.Restarter)
}
}
}
@ -147,19 +147,19 @@ func (s *ServiceRegistrationHandler) removeWorkload(
// Stop check watcher
for _, service := range workload.Services {
for _, check := range service.Checks {
checkID := string(structs.NomadCheckID(workload.AllocID, workload.Group, check))
checkID := string(structs.NomadCheckID(workload.AllocInfo.AllocID, workload.AllocInfo.Group, check))
s.checkWatcher.Unwatch(checkID)
}
}
// Generate the consistent ID for this service, so we know what to remove.
id := serviceregistration.MakeAllocServiceID(workload.AllocID, workload.Name(), serviceSpec)
id := serviceregistration.MakeAllocServiceID(workload.AllocInfo.AllocID, workload.Name(), serviceSpec)
deleteArgs := structs.ServiceRegistrationDeleteByIDRequest{
ID: id,
WriteRequest: structs.WriteRequest{
Region: s.cfg.Region,
Namespace: workload.Namespace,
Namespace: workload.ProviderNamespace,
AuthToken: s.cfg.NodeSecret,
},
}
@ -178,14 +178,14 @@ func (s *ServiceRegistrationHandler) removeWorkload(
// while ensuring the operator can see.
if strings.Contains(err.Error(), "service registration not found") {
s.log.Info("attempted to delete non-existent service registration",
"service_id", id, "namespace", workload.Namespace)
"service_id", id, "namespace", workload.ProviderNamespace)
return
}
// Log the error as there is nothing left to do, so the operator can see it
// and identify any problems.
s.log.Error("failed to delete service registration",
"error", err, "service_id", id, "namespace", workload.Namespace)
"error", err, "service_id", id, "namespace", workload.ProviderNamespace)
}
func (s *ServiceRegistrationHandler) UpdateWorkload(old, new *serviceregistration.WorkloadServices) error {
@ -231,7 +231,7 @@ func (s *ServiceRegistrationHandler) dedupUpdatedWorkload(
newIDs := make(map[string]*structs.Service, len(newWork.Services))
for _, s := range newWork.Services {
newIDs[serviceregistration.MakeAllocServiceID(newWork.AllocID, newWork.Name(), s)] = s
newIDs[serviceregistration.MakeAllocServiceID(newWork.AllocInfo.AllocID, newWork.Name(), s)] = s
}
// Iterate through the old services in order to identify whether they can
@ -240,7 +240,7 @@ func (s *ServiceRegistrationHandler) dedupUpdatedWorkload(
// Generate the service ID of the old service. If this is not found
// within the new mapping then we need to remove it.
oldID := serviceregistration.MakeAllocServiceID(oldWork.AllocID, oldWork.Name(), oldService)
oldID := serviceregistration.MakeAllocServiceID(oldWork.AllocInfo.AllocID, oldWork.Name(), oldService)
newSvc, ok := newIDs[oldID]
if !ok {
oldCopy.Services = append(oldCopy.Services, oldService)
@ -319,12 +319,12 @@ func (s *ServiceRegistrationHandler) generateNomadServiceRegistration(
}
return &structs.ServiceRegistration{
ID: serviceregistration.MakeAllocServiceID(workload.AllocID, workload.Name(), serviceSpec),
ID: serviceregistration.MakeAllocServiceID(workload.AllocInfo.AllocID, workload.Name(), serviceSpec),
ServiceName: serviceSpec.Name,
NodeID: s.cfg.NodeID,
JobID: workload.JobID,
AllocID: workload.AllocID,
Namespace: workload.Namespace,
JobID: workload.AllocInfo.JobID,
AllocID: workload.AllocInfo.AllocID,
Namespace: workload.ProviderNamespace,
Datacenter: s.cfg.Datacenter,
Tags: tags,
Address: ip,

View File

@ -178,12 +178,14 @@ func TestServiceRegistrationHandler_UpdateWorkload(t *testing.T) {
},
inputOldWorkload: mockWorkload(),
inputNewWorkload: &serviceregistration.WorkloadServices{
AllocInfo: structs.AllocInfo{
AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c",
Task: "redis",
Group: "cache",
JobID: "example",
},
Canary: false,
Namespace: "default",
ProviderNamespace: "default",
Services: []*structs.Service{
{
Name: "changed-redis-db",
@ -232,12 +234,14 @@ func TestServiceRegistrationHandler_UpdateWorkload(t *testing.T) {
},
inputOldWorkload: mockWorkload(),
inputNewWorkload: &serviceregistration.WorkloadServices{
AllocInfo: structs.AllocInfo{
AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c",
Task: "redis",
Group: "cache",
JobID: "example",
},
Canary: false,
Namespace: "default",
ProviderNamespace: "default",
Services: []*structs.Service{
{
Name: "redis-db",
@ -329,12 +333,14 @@ func TestServiceRegistrationHandler_dedupUpdatedWorkload(t *testing.T) {
{
inputOldWorkload: mockWorkload(),
inputNewWorkload: &serviceregistration.WorkloadServices{
AllocInfo: structs.AllocInfo{
AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c",
Task: "redis",
Group: "cache",
JobID: "example",
},
Canary: false,
Namespace: "default",
ProviderNamespace: "default",
Services: []*structs.Service{
{
Name: "changed-redis-db",
@ -362,12 +368,14 @@ func TestServiceRegistrationHandler_dedupUpdatedWorkload(t *testing.T) {
},
expectedOldOutput: mockWorkload(),
expectedNewOutput: &serviceregistration.WorkloadServices{
AllocInfo: structs.AllocInfo{
AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c",
Task: "redis",
Group: "cache",
JobID: "example",
},
Canary: false,
Namespace: "default",
ProviderNamespace: "default",
Services: []*structs.Service{
{
Name: "changed-redis-db",
@ -398,12 +406,14 @@ func TestServiceRegistrationHandler_dedupUpdatedWorkload(t *testing.T) {
{
inputOldWorkload: mockWorkload(),
inputNewWorkload: &serviceregistration.WorkloadServices{
AllocInfo: structs.AllocInfo{
AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c",
Task: "redis",
Group: "cache",
JobID: "example",
},
Canary: false,
Namespace: "default",
ProviderNamespace: "default",
Services: []*structs.Service{
{
Name: "redis-db",
@ -432,12 +442,14 @@ func TestServiceRegistrationHandler_dedupUpdatedWorkload(t *testing.T) {
},
},
expectedOldOutput: &serviceregistration.WorkloadServices{
AllocInfo: structs.AllocInfo{
AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c",
Task: "redis",
Group: "cache",
JobID: "example",
},
Canary: false,
Namespace: "default",
ProviderNamespace: "default",
Services: []*structs.Service{},
Ports: []structs.AllocatedPortMapping{
{
@ -453,12 +465,14 @@ func TestServiceRegistrationHandler_dedupUpdatedWorkload(t *testing.T) {
},
},
expectedNewOutput: &serviceregistration.WorkloadServices{
AllocInfo: structs.AllocInfo{
AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c",
Task: "redis",
Group: "cache",
JobID: "example",
},
Canary: false,
Namespace: "default",
ProviderNamespace: "default",
Services: []*structs.Service{
{
Name: "redis-db",
@ -491,12 +505,14 @@ func TestServiceRegistrationHandler_dedupUpdatedWorkload(t *testing.T) {
{
inputOldWorkload: mockWorkload(),
inputNewWorkload: &serviceregistration.WorkloadServices{
AllocInfo: structs.AllocInfo{
AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c",
Task: "redis",
Group: "cache",
JobID: "example",
},
Canary: false,
Namespace: "default",
ProviderNamespace: "default",
Services: []*structs.Service{
{
Name: "redis-db",
@ -524,12 +540,14 @@ func TestServiceRegistrationHandler_dedupUpdatedWorkload(t *testing.T) {
},
expectedOldOutput: mockWorkload(),
expectedNewOutput: &serviceregistration.WorkloadServices{
AllocInfo: structs.AllocInfo{
AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c",
Task: "redis",
Group: "cache",
JobID: "example",
},
Canary: false,
Namespace: "default",
ProviderNamespace: "default",
Services: []*structs.Service{
{
Name: "redis-db",
@ -572,12 +590,14 @@ func TestServiceRegistrationHandler_dedupUpdatedWorkload(t *testing.T) {
func mockWorkload() *serviceregistration.WorkloadServices {
return &serviceregistration.WorkloadServices{
AllocInfo: structs.AllocInfo{
AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c",
Task: "redis",
Group: "cache",
JobID: "example",
},
Canary: false,
Namespace: "default",
ProviderNamespace: "default",
Services: []*structs.Service{
{
Name: "redis-db",

View File

@ -9,27 +9,15 @@ import (
// WorkloadServices describes services defined in either a Task or TaskGroup
// that need to be syncronized with a service registration provider.
type WorkloadServices struct {
AllocID string
// Group in which the service belongs for a group-level service, or the
// group in which task belongs for a task-level service.
Group string
// Task in which the service belongs for task-level service. Will be empty
// for a group-level service.
Task string
// JobID provides additional context for providers regarding which job
// caused this registration.
JobID string
AllocInfo structs.AllocInfo
// Canary indicates whether, or not the allocation is a canary. This is
// used to build the correct tags mapping.
Canary bool
// Namespace is the provider namespace in which services will be
// ProviderNamespace is the provider namespace in which services will be
// registered, if the provider supports this functionality.
Namespace string
ProviderNamespace string
// Restarter allows restarting the task or task group depending on the
// check_restart stanzas.
@ -88,8 +76,8 @@ func (ws *WorkloadServices) Copy() *WorkloadServices {
}
func (ws *WorkloadServices) Name() string {
if ws.Task != "" {
return ws.Task
if ws.AllocInfo.Task != "" {
return ws.AllocInfo.Task
}
return "group-" + ws.Group
return "group-" + ws.AllocInfo.Group
}

View File

@ -3,6 +3,7 @@ package consul
import (
"fmt"
"net"
"sort"
"strconv"
"strings"
@ -14,7 +15,7 @@ import (
// newConnect creates a new Consul AgentServiceConnect struct based on a Nomad
// Connect struct. If the nomad Connect struct is nil, nil will be returned to
// disable Connect for this service.
func newConnect(serviceID, allocID string, serviceName string, nc *structs.ConsulConnect, networks structs.Networks, ports structs.AllocatedPorts) (*api.AgentServiceConnect, error) {
func newConnect(serviceID string, info structs.AllocInfo, serviceName string, nc *structs.ConsulConnect, networks structs.Networks, ports structs.AllocatedPorts) (*api.AgentServiceConnect, error) {
switch {
case nc == nil:
// no connect stanza means there is no connect service to register
@ -33,7 +34,7 @@ func newConnect(serviceID, allocID string, serviceName string, nc *structs.Consu
if nc.SidecarService.Port == "" {
nc.SidecarService.Port = fmt.Sprintf("%s-%s", structs.ConnectProxyPrefix, serviceName)
}
sidecarReg, err := connectSidecarRegistration(serviceID, allocID, nc.SidecarService, networks, ports)
sidecarReg, err := connectSidecarRegistration(serviceID, info, nc.SidecarService, networks, ports)
if err != nil {
return nil, err
}
@ -90,7 +91,7 @@ func newConnectGateway(connect *structs.ConsulConnect) *api.AgentServiceConnectP
return &api.AgentServiceConnectProxyConfig{Config: envoyConfig}
}
func connectSidecarRegistration(serviceID, allocID string, css *structs.ConsulSidecarService, networks structs.Networks, ports structs.AllocatedPorts) (*api.AgentServiceRegistration, error) {
func connectSidecarRegistration(serviceID string, info structs.AllocInfo, css *structs.ConsulSidecarService, networks structs.Networks, ports structs.AllocatedPorts) (*api.AgentServiceRegistration, error) {
if css == nil {
// no sidecar stanza means there is no sidecar service to register
return nil, nil
@ -101,7 +102,7 @@ func connectSidecarRegistration(serviceID, allocID string, css *structs.ConsulSi
return nil, err
}
proxy, err := connectSidecarProxy(allocID, css.Proxy, cMapping.To, networks)
proxy, err := connectSidecarProxy(info, css.Proxy, cMapping.To, networks)
if err != nil {
return nil, err
}
@ -130,7 +131,7 @@ func connectSidecarRegistration(serviceID, allocID string, css *structs.ConsulSi
}, nil
}
func connectSidecarProxy(allocID string, proxy *structs.ConsulProxy, cPort int, networks structs.Networks) (*api.AgentServiceConnectProxyConfig, error) {
func connectSidecarProxy(info structs.AllocInfo, proxy *structs.ConsulProxy, cPort int, networks structs.Networks) (*api.AgentServiceConnectProxyConfig, error) {
if proxy == nil {
proxy = new(structs.ConsulProxy)
}
@ -143,7 +144,7 @@ func connectSidecarProxy(allocID string, proxy *structs.ConsulProxy, cPort int,
return &api.AgentServiceConnectProxyConfig{
LocalServiceAddress: proxy.LocalServiceAddress,
LocalServicePort: proxy.LocalServicePort,
Config: connectProxyConfig(proxy.Config, cPort, allocID),
Config: connectProxyConfig(proxy.Config, cPort, info),
Upstreams: connectUpstreams(proxy.Upstreams),
Expose: expose,
}, nil
@ -226,40 +227,59 @@ func connectMeshGateway(in structs.ConsulMeshGateway) api.MeshGatewayConfig {
return gw
}
func connectProxyConfig(cfg map[string]interface{}, port int, allocID string) map[string]interface{} {
func connectProxyConfig(cfg map[string]interface{}, port int, info structs.AllocInfo) map[string]interface{} {
if cfg == nil {
cfg = make(map[string]interface{})
}
cfg["bind_address"] = "0.0.0.0"
cfg["bind_port"] = port
injectAllocID(cfg, allocID)
tags := map[string]string{
"nomad.group=": info.Group,
"nomad.job=": info.JobID,
"nomad.namespace=": info.Namespace,
"nomad.alloc_id=": info.AllocID,
}
injectNomadInfo(cfg, tags)
return cfg
}
// injectAllocID merges allocID into cfg=>envoy_stats_tags
// injectNomadInfo merges nomad information into cfg=>envoy_stats_tags
//
// cfg must not be nil
func injectAllocID(cfg map[string]interface{}, allocID string) {
const key = "envoy_stats_tags"
const prefix = "nomad.alloc_id="
pair := prefix + allocID
tags, exists := cfg[key]
if !exists {
cfg[key] = []string{pair}
func injectNomadInfo(cfg map[string]interface{}, defaultTags map[string]string) {
const configKey = "envoy_stats_tags"
existingTagsI := cfg[configKey]
switch existingTags := existingTagsI.(type) {
case []string:
if len(existingTags) == 0 {
break
}
OUTER:
for key, value := range defaultTags {
for _, tag := range existingTags {
if strings.HasPrefix(tag, key) {
continue OUTER
}
}
existingTags = append(existingTags, key+value)
}
cfg[configKey] = existingTags
return
}
switch v := tags.(type) {
case []string:
// scan the existing tags to see if alloc_id= is already set
for _, s := range v {
if strings.HasPrefix(s, prefix) {
return
// common case.
var tags []string
for key, value := range defaultTags {
if value == "" {
continue
}
tag := key + value
tags = append(tags, tag)
}
v = append(v, pair)
cfg[key] = v
}
sort.Strings(tags) // mostly for test stability
cfg[configKey] = tags
}
func connectNetworkInvariants(networks structs.Networks) error {

View File

@ -37,15 +37,18 @@ func TestConnect_newConnect(t *testing.T) {
service := "redis"
redisID := uuid.Generate()
allocID := uuid.Generate()
info := structs.AllocInfo{
AllocID: allocID,
}
t.Run("nil", func(t *testing.T) {
asr, err := newConnect("", "", "", nil, nil, nil)
asr, err := newConnect("", structs.AllocInfo{}, "", nil, nil, nil)
require.NoError(t, err)
require.Nil(t, asr)
})
t.Run("native", func(t *testing.T) {
asr, err := newConnect(redisID, allocID, service, &structs.ConsulConnect{
asr, err := newConnect(redisID, info, service, &structs.ConsulConnect{
Native: true,
}, nil, nil)
require.NoError(t, err)
@ -54,7 +57,7 @@ func TestConnect_newConnect(t *testing.T) {
})
t.Run("with sidecar", func(t *testing.T) {
asr, err := newConnect(redisID, allocID, service, &structs.ConsulConnect{
asr, err := newConnect(redisID, info, service, &structs.ConsulConnect{
Native: false,
SidecarService: &structs.ConsulSidecarService{
Tags: []string{"foo", "bar"},
@ -88,7 +91,7 @@ func TestConnect_newConnect(t *testing.T) {
})
t.Run("with sidecar without TCP checks", func(t *testing.T) {
asr, err := newConnect(redisID, allocID, service, &structs.ConsulConnect{
asr, err := newConnect(redisID, info, service, &structs.ConsulConnect{
Native: false,
SidecarService: &structs.ConsulSidecarService{
Tags: []string{"foo", "bar"},
@ -123,22 +126,25 @@ func TestConnect_connectSidecarRegistration(t *testing.T) {
redisID := uuid.Generate()
allocID := uuid.Generate()
info := structs.AllocInfo{
AllocID: allocID,
}
t.Run("nil", func(t *testing.T) {
sidecarReg, err := connectSidecarRegistration(redisID, allocID, nil, testConnectNetwork, testConnectPorts)
sidecarReg, err := connectSidecarRegistration(redisID, info, nil, testConnectNetwork, testConnectPorts)
require.NoError(t, err)
require.Nil(t, sidecarReg)
})
t.Run("no service port", func(t *testing.T) {
_, err := connectSidecarRegistration("unknown-id", allocID, &structs.ConsulSidecarService{
_, err := connectSidecarRegistration("unknown-id", info, &structs.ConsulSidecarService{
Port: "unknown-label",
}, testConnectNetwork, testConnectPorts)
require.EqualError(t, err, `No port of label "unknown-label" defined`)
})
t.Run("bad proxy", func(t *testing.T) {
_, err := connectSidecarRegistration(redisID, allocID, &structs.ConsulSidecarService{
_, err := connectSidecarRegistration(redisID, info, &structs.ConsulSidecarService{
Port: "connect-proxy-redis",
Proxy: &structs.ConsulProxy{
Expose: &structs.ConsulExposeConfig{
@ -152,7 +158,7 @@ func TestConnect_connectSidecarRegistration(t *testing.T) {
})
t.Run("normal", func(t *testing.T) {
proxy, err := connectSidecarRegistration(redisID, allocID, &structs.ConsulSidecarService{
proxy, err := connectSidecarRegistration(redisID, info, &structs.ConsulSidecarService{
Tags: []string{"foo", "bar"},
Port: "connect-proxy-redis",
}, testConnectNetwork, testConnectPorts)
@ -187,11 +193,14 @@ func TestConnect_connectProxy(t *testing.T) {
ci.Parallel(t)
allocID := uuid.Generate()
info := structs.AllocInfo{
AllocID: allocID,
}
// If the input proxy is nil, we expect the output to be a proxy with its
// config set to default values.
t.Run("nil proxy", func(t *testing.T) {
proxy, err := connectSidecarProxy(allocID, nil, 2000, testConnectNetwork)
proxy, err := connectSidecarProxy(info, nil, 2000, testConnectNetwork)
require.NoError(t, err)
require.Equal(t, &api.AgentServiceConnectProxyConfig{
LocalServiceAddress: "",
@ -207,7 +216,7 @@ func TestConnect_connectProxy(t *testing.T) {
})
t.Run("bad proxy", func(t *testing.T) {
_, err := connectSidecarProxy(allocID, &structs.ConsulProxy{
_, err := connectSidecarProxy(info, &structs.ConsulProxy{
LocalServiceAddress: "0.0.0.0",
LocalServicePort: 2000,
Upstreams: nil,
@ -222,7 +231,7 @@ func TestConnect_connectProxy(t *testing.T) {
})
t.Run("normal", func(t *testing.T) {
proxy, err := connectSidecarProxy(allocID, &structs.ConsulProxy{
proxy, err := connectSidecarProxy(info, &structs.ConsulProxy{
LocalServiceAddress: "0.0.0.0",
LocalServicePort: 2000,
Upstreams: nil,
@ -388,7 +397,7 @@ func TestConnect_connectProxyConfig(t *testing.T) {
"bind_address": "0.0.0.0",
"bind_port": 42,
"envoy_stats_tags": []string{"nomad.alloc_id=test_alloc1"},
}, connectProxyConfig(nil, 42, "test_alloc1"))
}, connectProxyConfig(nil, 42, structs.AllocInfo{AllocID: "test_alloc1"}))
})
t.Run("pre-existing map", func(t *testing.T) {
@ -399,7 +408,7 @@ func TestConnect_connectProxyConfig(t *testing.T) {
"envoy_stats_tags": []string{"nomad.alloc_id=test_alloc2"},
}, connectProxyConfig(map[string]interface{}{
"foo": "bar",
}, 42, "test_alloc2"))
}, 42, structs.AllocInfo{AllocID: "test_alloc2"}))
})
}
@ -594,19 +603,34 @@ func Test_connectMeshGateway(t *testing.T) {
})
}
func Test_injectAllocID(t *testing.T) {
func Test_injectNomadInfo(t *testing.T) {
ci.Parallel(t)
id := "abc123"
info1 := func() map[string]string {
return map[string]string{
"nomad.alloc_id=": "abc123",
}
}
info2 := func() map[string]string {
return map[string]string{
"nomad.alloc_id=": "abc123",
"nomad.namespace=": "testns",
}
}
try := func(allocID string, cfg, exp map[string]interface{}) {
injectAllocID(cfg, allocID)
try := func(defaultTags map[string]string, cfg, exp map[string]interface{}) {
// TODO: defaultTags get modified over the execution
injectNomadInfo(cfg, defaultTags)
cfgTags, expTags := cfg["envoy_stats_tags"], exp["envoy_stats_tags"]
delete(cfg, "envoy_stats_tags")
delete(exp, "envoy_stats_tags")
require.Equal(t, exp, cfg)
require.ElementsMatch(t, expTags, cfgTags, "")
}
// empty
try(
id,
info1(),
make(map[string]interface{}),
map[string]interface{}{
"envoy_stats_tags": []string{"nomad.alloc_id=abc123"},
@ -615,7 +639,7 @@ func Test_injectAllocID(t *testing.T) {
// merge fresh
try(
id,
info1(),
map[string]interface{}{"foo": "bar"},
map[string]interface{}{
"foo": "bar",
@ -625,7 +649,7 @@ func Test_injectAllocID(t *testing.T) {
// merge append
try(
id,
info1(),
map[string]interface{}{
"foo": "bar",
"envoy_stats_tags": []string{"k1=v1", "k2=v2"},
@ -638,25 +662,25 @@ func Test_injectAllocID(t *testing.T) {
// merge exists
try(
id,
info2(),
map[string]interface{}{
"foo": "bar",
"envoy_stats_tags": []string{"k1=v1", "k2=v2", "nomad.alloc_id=xyz789"},
},
map[string]interface{}{
"foo": "bar",
"envoy_stats_tags": []string{"k1=v1", "k2=v2", "nomad.alloc_id=xyz789"},
"envoy_stats_tags": []string{"k1=v1", "k2=v2", "nomad.alloc_id=xyz789", "nomad.namespace=testns"},
},
)
// merge wrong type
try(
id,
info1(),
map[string]interface{}{
"envoy_stats_tags": "not a slice of string",
},
map[string]interface{}{
"envoy_stats_tags": "not a slice of string",
"envoy_stats_tags": []string{"nomad.alloc_id=abc123"},
},
)
}

View File

@ -125,7 +125,7 @@ func TestConsul_Connect(t *testing.T) {
require.Equal(t, connectService.Proxy.Config, map[string]interface{}{
"bind_address": "0.0.0.0",
"bind_port": float64(9998),
"envoy_stats_tags": []interface{}{"nomad.alloc_id=" + alloc.ID},
"envoy_stats_tags": []interface{}{"nomad.alloc_id=" + alloc.ID, "nomad.group=" + alloc.TaskGroup},
})
require.Equal(t, alloc.ID, agentService.Meta["alloc_id"])

View File

@ -965,7 +965,7 @@ func (c *ServiceClient) serviceRegs(
*serviceregistration.ServiceRegistration, error) {
// Get the services ID
id := serviceregistration.MakeAllocServiceID(workload.AllocID, workload.Name(), service)
id := serviceregistration.MakeAllocServiceID(workload.AllocInfo.AllocID, workload.Name(), service)
sreg := &serviceregistration.ServiceRegistration{
ServiceID: id,
CheckIDs: make(map[string]struct{}, len(service.Checks)),
@ -996,7 +996,7 @@ func (c *ServiceClient) serviceRegs(
}
// newConnect returns (nil, nil) if there's no Connect-enabled service.
connect, err := newConnect(id, workload.AllocID, service.Name, service.Connect, workload.Networks, workload.Ports)
connect, err := newConnect(id, workload.AllocInfo, service.Name, service.Connect, workload.Networks, workload.Ports)
if err != nil {
return nil, fmt.Errorf("invalid Consul Connect configuration for service %q: %v", service.Name, err)
}
@ -1069,7 +1069,7 @@ func (c *ServiceClient) serviceRegs(
Kind: kind,
ID: id,
Name: service.Name,
Namespace: workload.Namespace,
Namespace: workload.ProviderNamespace,
Tags: tags,
EnableTagOverride: service.EnableTagOverride,
Address: ip,
@ -1130,7 +1130,7 @@ func (c *ServiceClient) checkRegs(serviceID string, service *structs.Service,
}
checkID := MakeCheckID(serviceID, check)
registration, err := createCheckReg(serviceID, checkID, check, ip, port, workload.Namespace)
registration, err := createCheckReg(serviceID, checkID, check, ip, port, workload.ProviderNamespace)
if err != nil {
return nil, fmt.Errorf("failed to add check %q: %v", check.Name, err)
}
@ -1167,18 +1167,18 @@ func (c *ServiceClient) RegisterWorkload(workload *serviceregistration.WorkloadS
}
// Add the workload to the allocation's registration
c.addRegistrations(workload.AllocID, workload.Name(), t)
c.addRegistrations(workload.AllocInfo.AllocID, workload.Name(), t)
c.commit(ops)
// Start watching checks. Done after service registrations are built
// since an error building them could leak watches.
for _, service := range workload.Services {
serviceID := serviceregistration.MakeAllocServiceID(workload.AllocID, workload.Name(), service)
serviceID := serviceregistration.MakeAllocServiceID(workload.AllocInfo.AllocID, workload.Name(), service)
for _, check := range service.Checks {
if check.TriggersRestarts() {
checkID := MakeCheckID(serviceID, check)
c.checkWatcher.Watch(workload.AllocID, workload.Name(), checkID, check, workload.Restarter)
c.checkWatcher.Watch(workload.AllocInfo.AllocID, workload.Name(), checkID, check, workload.Restarter)
}
}
}
@ -1196,12 +1196,12 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *serviceregistration.Wor
newIDs := make(map[string]*structs.Service, len(newWorkload.Services))
for _, s := range newWorkload.Services {
newIDs[serviceregistration.MakeAllocServiceID(newWorkload.AllocID, newWorkload.Name(), s)] = s
newIDs[serviceregistration.MakeAllocServiceID(newWorkload.AllocInfo.AllocID, newWorkload.Name(), s)] = s
}
// Loop over existing Services to see if they have been removed
for _, existingSvc := range old.Services {
existingID := serviceregistration.MakeAllocServiceID(old.AllocID, old.Name(), existingSvc)
existingID := serviceregistration.MakeAllocServiceID(old.AllocInfo.AllocID, old.Name(), existingSvc)
newSvc, ok := newIDs[existingID]
if !ok {
@ -1219,8 +1219,8 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *serviceregistration.Wor
continue
}
oldHash := existingSvc.Hash(old.AllocID, old.Name(), old.Canary)
newHash := newSvc.Hash(newWorkload.AllocID, newWorkload.Name(), newWorkload.Canary)
oldHash := existingSvc.Hash(old.AllocInfo.AllocID, old.Name(), old.Canary)
newHash := newSvc.Hash(newWorkload.AllocInfo.AllocID, newWorkload.Name(), newWorkload.Canary)
if oldHash == newHash {
// Service exists and hasn't changed, don't re-add it later
delete(newIDs, existingID)
@ -1265,7 +1265,7 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *serviceregistration.Wor
// Update all watched checks as CheckRestart fields aren't part of ID
if check.TriggersRestarts() {
c.checkWatcher.Watch(newWorkload.AllocID, newWorkload.Name(), checkID, check, newWorkload.Restarter)
c.checkWatcher.Watch(newWorkload.AllocInfo.AllocID, newWorkload.Name(), checkID, check, newWorkload.Restarter)
}
}
@ -1291,7 +1291,7 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *serviceregistration.Wor
}
// Add the task to the allocation's registration
c.addRegistrations(newWorkload.AllocID, newWorkload.Name(), regs)
c.addRegistrations(newWorkload.AllocInfo.AllocID, newWorkload.Name(), regs)
c.commit(ops)
@ -1301,7 +1301,7 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *serviceregistration.Wor
for _, check := range service.Checks {
if check.TriggersRestarts() {
checkID := MakeCheckID(serviceID, check)
c.checkWatcher.Watch(newWorkload.AllocID, newWorkload.Name(), checkID, check, newWorkload.Restarter)
c.checkWatcher.Watch(newWorkload.AllocInfo.AllocID, newWorkload.Name(), checkID, check, newWorkload.Restarter)
}
}
}
@ -1316,7 +1316,7 @@ func (c *ServiceClient) RemoveWorkload(workload *serviceregistration.WorkloadSer
ops := operations{}
for _, service := range workload.Services {
id := serviceregistration.MakeAllocServiceID(workload.AllocID, workload.Name(), service)
id := serviceregistration.MakeAllocServiceID(workload.AllocInfo.AllocID, workload.Name(), service)
ops.deregServices = append(ops.deregServices, id)
for _, check := range service.Checks {
@ -1330,7 +1330,7 @@ func (c *ServiceClient) RemoveWorkload(workload *serviceregistration.WorkloadSer
}
// Remove the workload from the alloc's registrations
c.removeRegistration(workload.AllocID, workload.Name())
c.removeRegistration(workload.AllocInfo.AllocID, workload.Name())
// Now add them to the deregistration fields; main Run loop will update
c.commit(&ops)

View File

@ -416,8 +416,10 @@ func TestServiceRegistration_CheckOnUpdate(t *testing.T) {
allocID := uuid.Generate()
ws := &serviceregistration.WorkloadServices{
AllocInfo: structs.AllocInfo{
AllocID: allocID,
Task: "taskname",
},
Restarter: &restartRecorder{},
Services: []*structs.Service{
{

View File

@ -17,8 +17,10 @@ func BuildAllocServices(
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
ws := &serviceregistration.WorkloadServices{
AllocInfo: structs.AllocInfo{
AllocID: alloc.ID,
Group: alloc.TaskGroup,
},
Services: taskenv.InterpolateServices(taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build(), tg.Services),
Networks: alloc.AllocatedResources.Shared.Networks,

View File

@ -28,8 +28,10 @@ const (
func testWorkload() *serviceregistration.WorkloadServices {
return &serviceregistration.WorkloadServices{
AllocInfo: structs.AllocInfo{
AllocID: uuid.Generate(),
Task: "taskname",
},
Restarter: &restartRecorder{},
Services: []*structs.Service{
{
@ -131,7 +133,7 @@ func TestConsul_ChangeTags(t *testing.T) {
r.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
// Validate the alloc registration
reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocID)
reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocInfo.AllocID)
r.NoError(err)
r.NotNil(reg1, "Unexpected nil alloc registration")
r.Equal(1, reg1.NumServices())
@ -171,7 +173,7 @@ func TestConsul_EnableTagOverride_Syncs(t *testing.T) {
r.Equal(1, len(ctx.FakeConsul.services))
// Validate the alloc registration
reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocID)
reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocInfo.AllocID)
r.NoError(err)
r.NotNil(reg1)
r.Equal(1, reg1.NumServices())
@ -1082,7 +1084,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
},
},
}
remainingWorkloadServiceID := serviceregistration.MakeAllocServiceID(remainingWorkload.AllocID,
remainingWorkloadServiceID := serviceregistration.MakeAllocServiceID(remainingWorkload.AllocInfo.AllocID,
remainingWorkload.Name(), remainingWorkload.Services[0])
require.NoError(ctx.ServiceClient.RegisterWorkload(remainingWorkload))
@ -1105,7 +1107,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
},
},
}
explicitlyRemovedWorkloadServiceID := serviceregistration.MakeAllocServiceID(explicitlyRemovedWorkload.AllocID,
explicitlyRemovedWorkloadServiceID := serviceregistration.MakeAllocServiceID(explicitlyRemovedWorkload.AllocInfo.AllocID,
explicitlyRemovedWorkload.Name(), explicitlyRemovedWorkload.Services[0])
require.NoError(ctx.ServiceClient.RegisterWorkload(explicitlyRemovedWorkload))
@ -1130,7 +1132,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
},
},
}
outofbandWorkloadServiceID := serviceregistration.MakeAllocServiceID(outofbandWorkload.AllocID,
outofbandWorkloadServiceID := serviceregistration.MakeAllocServiceID(outofbandWorkload.AllocInfo.AllocID,
outofbandWorkload.Name(), outofbandWorkload.Services[0])
require.NoError(ctx.ServiceClient.RegisterWorkload(outofbandWorkload))
@ -1192,7 +1194,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
},
},
}
remainingWorkloadServiceID := serviceregistration.MakeAllocServiceID(remainingWorkload.AllocID,
remainingWorkloadServiceID := serviceregistration.MakeAllocServiceID(remainingWorkload.AllocInfo.AllocID,
remainingWorkload.Name(), remainingWorkload.Services[0])
require.NoError(ctx.ServiceClient.RegisterWorkload(remainingWorkload))
@ -1215,7 +1217,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
},
},
}
explicitlyRemovedWorkloadServiceID := serviceregistration.MakeAllocServiceID(explicitlyRemovedWorkload.AllocID,
explicitlyRemovedWorkloadServiceID := serviceregistration.MakeAllocServiceID(explicitlyRemovedWorkload.AllocInfo.AllocID,
explicitlyRemovedWorkload.Name(), explicitlyRemovedWorkload.Services[0])
require.NoError(ctx.ServiceClient.RegisterWorkload(explicitlyRemovedWorkload))
@ -1240,7 +1242,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
},
},
}
outofbandWorkloadServiceID := serviceregistration.MakeAllocServiceID(outofbandWorkload.AllocID,
outofbandWorkloadServiceID := serviceregistration.MakeAllocServiceID(outofbandWorkload.AllocInfo.AllocID,
outofbandWorkload.Name(), outofbandWorkload.Services[0])
require.NoError(ctx.ServiceClient.RegisterWorkload(outofbandWorkload))

View File

@ -56,3 +56,23 @@ func (a *Allocation) ServiceProviderNamespace() string {
return tg.Consul.GetNamespace()
}
type AllocInfo struct {
AllocID string
// Group in which the service belongs for a group-level service, or the
// group in which task belongs for a task-level service.
Group string
// Task in which the service belongs for task-level service. Will be empty
// for a group-level service.
Task string
// JobID provides additional context for providers regarding which job
// caused this registration.
JobID string
// NomadNamespace provides additional context for providers regarding which
// nomad namespace caused this registration.
Namespace string
}