diff --git a/.changelog/14311.txt b/.changelog/14311.txt new file mode 100644 index 000000000..3fa43640f --- /dev/null +++ b/.changelog/14311.txt @@ -0,0 +1,3 @@ +```release-note:improvement +connect: add namespace, job, and group to Envoy stats +``` diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index 0069bb559..37d9b4a49 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -153,7 +153,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { newNetworkHook(hookLogger, ns, alloc, nm, nc, ar, builtTaskEnv), newGroupServiceHook(groupServiceHookConfig{ alloc: alloc, - namespace: alloc.ServiceProviderNamespace(), + providerNamespace: alloc.ServiceProviderNamespace(), serviceRegWrapper: ar.serviceRegWrapper, restarter: ar, taskEnvBuilder: envBuilder, diff --git a/client/allocrunner/group_service_hook.go b/client/allocrunner/group_service_hook.go index 083ae38ee..7ce96bf6b 100644 --- a/client/allocrunner/group_service_hook.go +++ b/client/allocrunner/group_service_hook.go @@ -24,15 +24,16 @@ type groupServiceHook struct { allocID string jobID string group string + namespace string restarter serviceregistration.WorkloadRestarter prerun bool deregistered bool networkStatus structs.NetworkStatus shutdownDelayCtx context.Context - // namespace is the Nomad or Consul namespace in which service + // providerNamespace is the Nomad or Consul namespace in which service // registrations will be made. This field may be updated. - namespace string + providerNamespace string // serviceRegWrapper is the handler wrapper that is used to perform service // and check registration and deregistration. @@ -61,9 +62,9 @@ type groupServiceHookConfig struct { shutdownDelayCtx context.Context logger log.Logger - // namespace is the Nomad or Consul namespace in which service + // providerNamespace is the Nomad or Consul namespace in which service // registrations will be made. - namespace string + providerNamespace string // serviceRegWrapper is the handler wrapper that is used to perform service // and check registration and deregistration. @@ -82,8 +83,9 @@ func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook { allocID: cfg.alloc.ID, jobID: cfg.alloc.JobID, group: cfg.alloc.TaskGroup, + namespace: cfg.alloc.Namespace, restarter: cfg.restarter, - namespace: cfg.namespace, + providerNamespace: cfg.providerNamespace, taskEnvBuilder: cfg.taskEnvBuilder, delay: shutdownDelay, networkStatus: cfg.networkStatus, @@ -161,7 +163,7 @@ func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error { // An update may change the service provider, therefore we need to account // for how namespaces work across providers also. - h.namespace = req.Alloc.ServiceProviderNamespace() + h.providerNamespace = req.Alloc.ServiceProviderNamespace() // Create new task services struct with those new values newWorkloadServices := h.getWorkloadServices() @@ -244,17 +246,22 @@ func (h *groupServiceHook) getWorkloadServices() *serviceregistration.WorkloadSe netStatus = h.networkStatus.NetworkStatus() } + info := structs.AllocInfo{ + AllocID: h.allocID, + JobID: h.jobID, + Group: h.group, + Namespace: h.namespace, + } + // Create task services struct with request's driver metadata return &serviceregistration.WorkloadServices{ - AllocID: h.allocID, - JobID: h.jobID, - Group: h.group, - Namespace: h.namespace, - Restarter: h.restarter, - Services: interpolatedServices, - Networks: h.networks, - NetworkStatus: netStatus, - Ports: h.ports, - Canary: h.canary, + AllocInfo: info, + ProviderNamespace: h.providerNamespace, + Restarter: h.restarter, + Services: interpolatedServices, + Networks: h.networks, + NetworkStatus: netStatus, + Ports: h.ports, + Canary: h.canary, } } diff --git a/client/allocrunner/taskrunner/service_hook.go b/client/allocrunner/taskrunner/service_hook.go index cac8978de..d9dfe59bc 100644 --- a/client/allocrunner/taskrunner/service_hook.go +++ b/client/allocrunner/taskrunner/service_hook.go @@ -31,7 +31,7 @@ type serviceHookConfig struct { // namespace is the Nomad or Consul namespace in which service // registrations will be made. - namespace string + providerNamespace string // serviceRegWrapper is the handler wrapper that is used to perform service // and check registration and deregistration. @@ -48,6 +48,7 @@ type serviceHook struct { jobID string groupName string taskName string + namespace string restarter serviceregistration.WorkloadRestarter logger log.Logger @@ -60,9 +61,9 @@ type serviceHook struct { ports structs.AllocatedPorts taskEnv *taskenv.TaskEnv - // namespace is the Nomad or Consul namespace in which service + // providerNamespace is the Nomad or Consul namespace in which service // registrations will be made. This field may be updated. - namespace string + providerNamespace string // serviceRegWrapper is the handler wrapper that is used to perform service // and check registration and deregistration. @@ -87,7 +88,8 @@ func newServiceHook(c serviceHookConfig) *serviceHook { jobID: c.alloc.JobID, groupName: c.alloc.TaskGroup, taskName: c.task.Name, - namespace: c.namespace, + namespace: c.alloc.Namespace, + providerNamespace: c.providerNamespace, serviceRegWrapper: c.serviceRegWrapper, services: c.task.Services, restarter: c.restarter, @@ -176,7 +178,7 @@ func (h *serviceHook) updateHookFields(req *interfaces.TaskUpdateRequest) error // An update may change the service provider, therefore we need to account // for how namespaces work across providers also. - h.namespace = req.Alloc.ServiceProviderNamespace() + h.providerNamespace = req.Alloc.ServiceProviderNamespace() return nil } @@ -219,19 +221,23 @@ func (h *serviceHook) getWorkloadServices() *serviceregistration.WorkloadService // Interpolate with the task's environment interpolatedServices := taskenv.InterpolateServices(h.taskEnv, h.services) + info := structs.AllocInfo{ + AllocID: h.allocID, + JobID: h.jobID, + Task: h.taskName, + Namespace: h.namespace, + } + // Create task services struct with request's driver metadata return &serviceregistration.WorkloadServices{ - AllocID: h.allocID, - JobID: h.jobID, - Group: h.groupName, - Task: h.taskName, - Namespace: h.namespace, - Restarter: h.restarter, - Services: interpolatedServices, - DriverExec: h.driverExec, - DriverNetwork: h.driverNet, - Networks: h.networks, - Canary: h.canary, - Ports: h.ports, + AllocInfo: info, + ProviderNamespace: h.providerNamespace, + Restarter: h.restarter, + Services: interpolatedServices, + DriverExec: h.driverExec, + DriverNetwork: h.driverNet, + Networks: h.networks, + Canary: h.canary, + Ports: h.ports, } } diff --git a/client/allocrunner/taskrunner/service_hook_test.go b/client/allocrunner/taskrunner/service_hook_test.go index ae59fb649..6125f65b8 100644 --- a/client/allocrunner/taskrunner/service_hook_test.go +++ b/client/allocrunner/taskrunner/service_hook_test.go @@ -177,7 +177,7 @@ func Test_serviceHook_Nomad(t *testing.T) { h := newServiceHook(serviceHookConfig{ alloc: alloc, task: alloc.LookupTask("web"), - namespace: "default", + providerNamespace: "default", serviceRegWrapper: regWrapper, restarter: agentconsul.NoopRestarter(), logger: logger, diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index 5c15e3a79..089e834d7 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -123,7 +123,7 @@ func (tr *TaskRunner) initHooks() { tr.runnerHooks = append(tr.runnerHooks, newServiceHook(serviceHookConfig{ alloc: tr.Alloc(), task: tr.Task(), - namespace: serviceProviderNamespace, + providerNamespace: serviceProviderNamespace, serviceRegWrapper: tr.serviceRegWrapper, restarter: tr, logger: hookLogger, diff --git a/client/serviceregistration/mock/mock.go b/client/serviceregistration/mock/mock.go index 899c4f824..da7e885ad 100644 --- a/client/serviceregistration/mock/mock.go +++ b/client/serviceregistration/mock/mock.go @@ -42,10 +42,10 @@ func (h *ServiceRegistrationHandler) RegisterWorkload(services *serviceregistrat h.mu.Lock() defer h.mu.Unlock() - h.log.Trace("RegisterWorkload", "alloc_id", services.AllocID, + h.log.Trace("RegisterWorkload", "alloc_id", services.AllocInfo.AllocID, "name", services.Name(), "services", len(services.Services)) - h.ops = append(h.ops, newOperation("add", services.AllocID, services.Name())) + h.ops = append(h.ops, newOperation("add", services.AllocInfo.AllocID, services.Name())) return nil } @@ -53,20 +53,20 @@ func (h *ServiceRegistrationHandler) RemoveWorkload(services *serviceregistratio h.mu.Lock() defer h.mu.Unlock() - h.log.Trace("RemoveWorkload", "alloc_id", services.AllocID, + h.log.Trace("RemoveWorkload", "alloc_id", services.AllocInfo.AllocID, "name", services.Name(), "services", len(services.Services)) - h.ops = append(h.ops, newOperation("remove", services.AllocID, services.Name())) + h.ops = append(h.ops, newOperation("remove", services.AllocInfo.AllocID, services.Name())) } func (h *ServiceRegistrationHandler) UpdateWorkload(old, newServices *serviceregistration.WorkloadServices) error { h.mu.Lock() defer h.mu.Unlock() - h.log.Trace("UpdateWorkload", "alloc_id", newServices.AllocID, "name", newServices.Name(), + h.log.Trace("UpdateWorkload", "alloc_id", newServices.AllocInfo.AllocID, "name", newServices.Name(), "old_services", len(old.Services), "new_services", len(newServices.Services)) - h.ops = append(h.ops, newOperation("update", newServices.AllocID, newServices.Name())) + h.ops = append(h.ops, newOperation("update", newServices.AllocInfo.AllocID, newServices.Name())) return nil } diff --git a/client/serviceregistration/nsd/nsd.go b/client/serviceregistration/nsd/nsd.go index 2be68dd1c..1138c4458 100644 --- a/client/serviceregistration/nsd/nsd.go +++ b/client/serviceregistration/nsd/nsd.go @@ -110,8 +110,8 @@ func (s *ServiceRegistrationHandler) RegisterWorkload(workload *serviceregistrat for _, service := range workload.Services { for _, check := range service.Checks { if check.TriggersRestarts() { - checkID := string(structs.NomadCheckID(workload.AllocID, workload.Group, check)) - s.checkWatcher.Watch(workload.AllocID, workload.Name(), checkID, check, workload.Restarter) + checkID := string(structs.NomadCheckID(workload.AllocInfo.AllocID, workload.AllocInfo.Group, check)) + s.checkWatcher.Watch(workload.AllocInfo.AllocID, workload.Name(), checkID, check, workload.Restarter) } } } @@ -147,19 +147,19 @@ func (s *ServiceRegistrationHandler) removeWorkload( // Stop check watcher for _, service := range workload.Services { for _, check := range service.Checks { - checkID := string(structs.NomadCheckID(workload.AllocID, workload.Group, check)) + checkID := string(structs.NomadCheckID(workload.AllocInfo.AllocID, workload.AllocInfo.Group, check)) s.checkWatcher.Unwatch(checkID) } } // Generate the consistent ID for this service, so we know what to remove. - id := serviceregistration.MakeAllocServiceID(workload.AllocID, workload.Name(), serviceSpec) + id := serviceregistration.MakeAllocServiceID(workload.AllocInfo.AllocID, workload.Name(), serviceSpec) deleteArgs := structs.ServiceRegistrationDeleteByIDRequest{ ID: id, WriteRequest: structs.WriteRequest{ Region: s.cfg.Region, - Namespace: workload.Namespace, + Namespace: workload.ProviderNamespace, AuthToken: s.cfg.NodeSecret, }, } @@ -178,14 +178,14 @@ func (s *ServiceRegistrationHandler) removeWorkload( // while ensuring the operator can see. if strings.Contains(err.Error(), "service registration not found") { s.log.Info("attempted to delete non-existent service registration", - "service_id", id, "namespace", workload.Namespace) + "service_id", id, "namespace", workload.ProviderNamespace) return } // Log the error as there is nothing left to do, so the operator can see it // and identify any problems. s.log.Error("failed to delete service registration", - "error", err, "service_id", id, "namespace", workload.Namespace) + "error", err, "service_id", id, "namespace", workload.ProviderNamespace) } func (s *ServiceRegistrationHandler) UpdateWorkload(old, new *serviceregistration.WorkloadServices) error { @@ -231,7 +231,7 @@ func (s *ServiceRegistrationHandler) dedupUpdatedWorkload( newIDs := make(map[string]*structs.Service, len(newWork.Services)) for _, s := range newWork.Services { - newIDs[serviceregistration.MakeAllocServiceID(newWork.AllocID, newWork.Name(), s)] = s + newIDs[serviceregistration.MakeAllocServiceID(newWork.AllocInfo.AllocID, newWork.Name(), s)] = s } // Iterate through the old services in order to identify whether they can @@ -240,7 +240,7 @@ func (s *ServiceRegistrationHandler) dedupUpdatedWorkload( // Generate the service ID of the old service. If this is not found // within the new mapping then we need to remove it. - oldID := serviceregistration.MakeAllocServiceID(oldWork.AllocID, oldWork.Name(), oldService) + oldID := serviceregistration.MakeAllocServiceID(oldWork.AllocInfo.AllocID, oldWork.Name(), oldService) newSvc, ok := newIDs[oldID] if !ok { oldCopy.Services = append(oldCopy.Services, oldService) @@ -319,12 +319,12 @@ func (s *ServiceRegistrationHandler) generateNomadServiceRegistration( } return &structs.ServiceRegistration{ - ID: serviceregistration.MakeAllocServiceID(workload.AllocID, workload.Name(), serviceSpec), + ID: serviceregistration.MakeAllocServiceID(workload.AllocInfo.AllocID, workload.Name(), serviceSpec), ServiceName: serviceSpec.Name, NodeID: s.cfg.NodeID, - JobID: workload.JobID, - AllocID: workload.AllocID, - Namespace: workload.Namespace, + JobID: workload.AllocInfo.JobID, + AllocID: workload.AllocInfo.AllocID, + Namespace: workload.ProviderNamespace, Datacenter: s.cfg.Datacenter, Tags: tags, Address: ip, diff --git a/client/serviceregistration/nsd/nsd_test.go b/client/serviceregistration/nsd/nsd_test.go index abc561e41..c8681fd37 100644 --- a/client/serviceregistration/nsd/nsd_test.go +++ b/client/serviceregistration/nsd/nsd_test.go @@ -178,12 +178,14 @@ func TestServiceRegistrationHandler_UpdateWorkload(t *testing.T) { }, inputOldWorkload: mockWorkload(), inputNewWorkload: &serviceregistration.WorkloadServices{ - AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", - Task: "redis", - Group: "cache", - JobID: "example", - Canary: false, - Namespace: "default", + AllocInfo: structs.AllocInfo{ + AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", + Task: "redis", + Group: "cache", + JobID: "example", + }, + Canary: false, + ProviderNamespace: "default", Services: []*structs.Service{ { Name: "changed-redis-db", @@ -232,12 +234,14 @@ func TestServiceRegistrationHandler_UpdateWorkload(t *testing.T) { }, inputOldWorkload: mockWorkload(), inputNewWorkload: &serviceregistration.WorkloadServices{ - AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", - Task: "redis", - Group: "cache", - JobID: "example", - Canary: false, - Namespace: "default", + AllocInfo: structs.AllocInfo{ + AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", + Task: "redis", + Group: "cache", + JobID: "example", + }, + Canary: false, + ProviderNamespace: "default", Services: []*structs.Service{ { Name: "redis-db", @@ -329,12 +333,14 @@ func TestServiceRegistrationHandler_dedupUpdatedWorkload(t *testing.T) { { inputOldWorkload: mockWorkload(), inputNewWorkload: &serviceregistration.WorkloadServices{ - AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", - Task: "redis", - Group: "cache", - JobID: "example", - Canary: false, - Namespace: "default", + AllocInfo: structs.AllocInfo{ + AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", + Task: "redis", + Group: "cache", + JobID: "example", + }, + Canary: false, + ProviderNamespace: "default", Services: []*structs.Service{ { Name: "changed-redis-db", @@ -362,12 +368,14 @@ func TestServiceRegistrationHandler_dedupUpdatedWorkload(t *testing.T) { }, expectedOldOutput: mockWorkload(), expectedNewOutput: &serviceregistration.WorkloadServices{ - AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", - Task: "redis", - Group: "cache", - JobID: "example", - Canary: false, - Namespace: "default", + AllocInfo: structs.AllocInfo{ + AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", + Task: "redis", + Group: "cache", + JobID: "example", + }, + Canary: false, + ProviderNamespace: "default", Services: []*structs.Service{ { Name: "changed-redis-db", @@ -398,12 +406,14 @@ func TestServiceRegistrationHandler_dedupUpdatedWorkload(t *testing.T) { { inputOldWorkload: mockWorkload(), inputNewWorkload: &serviceregistration.WorkloadServices{ - AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", - Task: "redis", - Group: "cache", - JobID: "example", - Canary: false, - Namespace: "default", + AllocInfo: structs.AllocInfo{ + AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", + Task: "redis", + Group: "cache", + JobID: "example", + }, + Canary: false, + ProviderNamespace: "default", Services: []*structs.Service{ { Name: "redis-db", @@ -432,13 +442,15 @@ func TestServiceRegistrationHandler_dedupUpdatedWorkload(t *testing.T) { }, }, expectedOldOutput: &serviceregistration.WorkloadServices{ - AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", - Task: "redis", - Group: "cache", - JobID: "example", - Canary: false, - Namespace: "default", - Services: []*structs.Service{}, + AllocInfo: structs.AllocInfo{ + AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", + Task: "redis", + Group: "cache", + JobID: "example", + }, + Canary: false, + ProviderNamespace: "default", + Services: []*structs.Service{}, Ports: []structs.AllocatedPortMapping{ { Label: "db", @@ -453,12 +465,14 @@ func TestServiceRegistrationHandler_dedupUpdatedWorkload(t *testing.T) { }, }, expectedNewOutput: &serviceregistration.WorkloadServices{ - AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", - Task: "redis", - Group: "cache", - JobID: "example", - Canary: false, - Namespace: "default", + AllocInfo: structs.AllocInfo{ + AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", + Task: "redis", + Group: "cache", + JobID: "example", + }, + Canary: false, + ProviderNamespace: "default", Services: []*structs.Service{ { Name: "redis-db", @@ -491,12 +505,14 @@ func TestServiceRegistrationHandler_dedupUpdatedWorkload(t *testing.T) { { inputOldWorkload: mockWorkload(), inputNewWorkload: &serviceregistration.WorkloadServices{ - AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", - Task: "redis", - Group: "cache", - JobID: "example", - Canary: false, - Namespace: "default", + AllocInfo: structs.AllocInfo{ + AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", + Task: "redis", + Group: "cache", + JobID: "example", + }, + Canary: false, + ProviderNamespace: "default", Services: []*structs.Service{ { Name: "redis-db", @@ -524,12 +540,14 @@ func TestServiceRegistrationHandler_dedupUpdatedWorkload(t *testing.T) { }, expectedOldOutput: mockWorkload(), expectedNewOutput: &serviceregistration.WorkloadServices{ - AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", - Task: "redis", - Group: "cache", - JobID: "example", - Canary: false, - Namespace: "default", + AllocInfo: structs.AllocInfo{ + AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", + Task: "redis", + Group: "cache", + JobID: "example", + }, + Canary: false, + ProviderNamespace: "default", Services: []*structs.Service{ { Name: "redis-db", @@ -572,12 +590,14 @@ func TestServiceRegistrationHandler_dedupUpdatedWorkload(t *testing.T) { func mockWorkload() *serviceregistration.WorkloadServices { return &serviceregistration.WorkloadServices{ - AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", - Task: "redis", - Group: "cache", - JobID: "example", - Canary: false, - Namespace: "default", + AllocInfo: structs.AllocInfo{ + AllocID: "98ea220b-7ebe-4662-6d74-9868e797717c", + Task: "redis", + Group: "cache", + JobID: "example", + }, + Canary: false, + ProviderNamespace: "default", Services: []*structs.Service{ { Name: "redis-db", diff --git a/client/serviceregistration/workload.go b/client/serviceregistration/workload.go index 064f4fa06..7123b7e4e 100644 --- a/client/serviceregistration/workload.go +++ b/client/serviceregistration/workload.go @@ -9,27 +9,15 @@ import ( // WorkloadServices describes services defined in either a Task or TaskGroup // that need to be syncronized with a service registration provider. type WorkloadServices struct { - AllocID string - - // Group in which the service belongs for a group-level service, or the - // group in which task belongs for a task-level service. - Group string - - // Task in which the service belongs for task-level service. Will be empty - // for a group-level service. - Task string - - // JobID provides additional context for providers regarding which job - // caused this registration. - JobID string + AllocInfo structs.AllocInfo // Canary indicates whether, or not the allocation is a canary. This is // used to build the correct tags mapping. Canary bool - // Namespace is the provider namespace in which services will be + // ProviderNamespace is the provider namespace in which services will be // registered, if the provider supports this functionality. - Namespace string + ProviderNamespace string // Restarter allows restarting the task or task group depending on the // check_restart stanzas. @@ -88,8 +76,8 @@ func (ws *WorkloadServices) Copy() *WorkloadServices { } func (ws *WorkloadServices) Name() string { - if ws.Task != "" { - return ws.Task + if ws.AllocInfo.Task != "" { + return ws.AllocInfo.Task } - return "group-" + ws.Group + return "group-" + ws.AllocInfo.Group } diff --git a/command/agent/consul/connect.go b/command/agent/consul/connect.go index e30475bcb..bed3c2c0a 100644 --- a/command/agent/consul/connect.go +++ b/command/agent/consul/connect.go @@ -3,6 +3,7 @@ package consul import ( "fmt" "net" + "sort" "strconv" "strings" @@ -14,7 +15,7 @@ import ( // newConnect creates a new Consul AgentServiceConnect struct based on a Nomad // Connect struct. If the nomad Connect struct is nil, nil will be returned to // disable Connect for this service. -func newConnect(serviceID, allocID string, serviceName string, nc *structs.ConsulConnect, networks structs.Networks, ports structs.AllocatedPorts) (*api.AgentServiceConnect, error) { +func newConnect(serviceID string, info structs.AllocInfo, serviceName string, nc *structs.ConsulConnect, networks structs.Networks, ports structs.AllocatedPorts) (*api.AgentServiceConnect, error) { switch { case nc == nil: // no connect stanza means there is no connect service to register @@ -33,7 +34,7 @@ func newConnect(serviceID, allocID string, serviceName string, nc *structs.Consu if nc.SidecarService.Port == "" { nc.SidecarService.Port = fmt.Sprintf("%s-%s", structs.ConnectProxyPrefix, serviceName) } - sidecarReg, err := connectSidecarRegistration(serviceID, allocID, nc.SidecarService, networks, ports) + sidecarReg, err := connectSidecarRegistration(serviceID, info, nc.SidecarService, networks, ports) if err != nil { return nil, err } @@ -90,7 +91,7 @@ func newConnectGateway(connect *structs.ConsulConnect) *api.AgentServiceConnectP return &api.AgentServiceConnectProxyConfig{Config: envoyConfig} } -func connectSidecarRegistration(serviceID, allocID string, css *structs.ConsulSidecarService, networks structs.Networks, ports structs.AllocatedPorts) (*api.AgentServiceRegistration, error) { +func connectSidecarRegistration(serviceID string, info structs.AllocInfo, css *structs.ConsulSidecarService, networks structs.Networks, ports structs.AllocatedPorts) (*api.AgentServiceRegistration, error) { if css == nil { // no sidecar stanza means there is no sidecar service to register return nil, nil @@ -101,7 +102,7 @@ func connectSidecarRegistration(serviceID, allocID string, css *structs.ConsulSi return nil, err } - proxy, err := connectSidecarProxy(allocID, css.Proxy, cMapping.To, networks) + proxy, err := connectSidecarProxy(info, css.Proxy, cMapping.To, networks) if err != nil { return nil, err } @@ -130,7 +131,7 @@ func connectSidecarRegistration(serviceID, allocID string, css *structs.ConsulSi }, nil } -func connectSidecarProxy(allocID string, proxy *structs.ConsulProxy, cPort int, networks structs.Networks) (*api.AgentServiceConnectProxyConfig, error) { +func connectSidecarProxy(info structs.AllocInfo, proxy *structs.ConsulProxy, cPort int, networks structs.Networks) (*api.AgentServiceConnectProxyConfig, error) { if proxy == nil { proxy = new(structs.ConsulProxy) } @@ -143,7 +144,7 @@ func connectSidecarProxy(allocID string, proxy *structs.ConsulProxy, cPort int, return &api.AgentServiceConnectProxyConfig{ LocalServiceAddress: proxy.LocalServiceAddress, LocalServicePort: proxy.LocalServicePort, - Config: connectProxyConfig(proxy.Config, cPort, allocID), + Config: connectProxyConfig(proxy.Config, cPort, info), Upstreams: connectUpstreams(proxy.Upstreams), Expose: expose, }, nil @@ -226,40 +227,59 @@ func connectMeshGateway(in structs.ConsulMeshGateway) api.MeshGatewayConfig { return gw } -func connectProxyConfig(cfg map[string]interface{}, port int, allocID string) map[string]interface{} { +func connectProxyConfig(cfg map[string]interface{}, port int, info structs.AllocInfo) map[string]interface{} { if cfg == nil { cfg = make(map[string]interface{}) } cfg["bind_address"] = "0.0.0.0" cfg["bind_port"] = port - injectAllocID(cfg, allocID) + + tags := map[string]string{ + "nomad.group=": info.Group, + "nomad.job=": info.JobID, + "nomad.namespace=": info.Namespace, + "nomad.alloc_id=": info.AllocID, + } + injectNomadInfo(cfg, tags) return cfg } -// injectAllocID merges allocID into cfg=>envoy_stats_tags +// injectNomadInfo merges nomad information into cfg=>envoy_stats_tags // // cfg must not be nil -func injectAllocID(cfg map[string]interface{}, allocID string) { - const key = "envoy_stats_tags" - const prefix = "nomad.alloc_id=" - pair := prefix + allocID - tags, exists := cfg[key] - if !exists { - cfg[key] = []string{pair} +func injectNomadInfo(cfg map[string]interface{}, defaultTags map[string]string) { + const configKey = "envoy_stats_tags" + + existingTagsI := cfg[configKey] + switch existingTags := existingTagsI.(type) { + case []string: + if len(existingTags) == 0 { + break + } + OUTER: + for key, value := range defaultTags { + for _, tag := range existingTags { + if strings.HasPrefix(tag, key) { + continue OUTER + } + } + existingTags = append(existingTags, key+value) + } + cfg[configKey] = existingTags return } - switch v := tags.(type) { - case []string: - // scan the existing tags to see if alloc_id= is already set - for _, s := range v { - if strings.HasPrefix(s, prefix) { - return - } + // common case. + var tags []string + for key, value := range defaultTags { + if value == "" { + continue } - v = append(v, pair) - cfg[key] = v + tag := key + value + tags = append(tags, tag) } + sort.Strings(tags) // mostly for test stability + cfg[configKey] = tags } func connectNetworkInvariants(networks structs.Networks) error { diff --git a/command/agent/consul/connect_test.go b/command/agent/consul/connect_test.go index 356587c6d..f08860b74 100644 --- a/command/agent/consul/connect_test.go +++ b/command/agent/consul/connect_test.go @@ -37,15 +37,18 @@ func TestConnect_newConnect(t *testing.T) { service := "redis" redisID := uuid.Generate() allocID := uuid.Generate() + info := structs.AllocInfo{ + AllocID: allocID, + } t.Run("nil", func(t *testing.T) { - asr, err := newConnect("", "", "", nil, nil, nil) + asr, err := newConnect("", structs.AllocInfo{}, "", nil, nil, nil) require.NoError(t, err) require.Nil(t, asr) }) t.Run("native", func(t *testing.T) { - asr, err := newConnect(redisID, allocID, service, &structs.ConsulConnect{ + asr, err := newConnect(redisID, info, service, &structs.ConsulConnect{ Native: true, }, nil, nil) require.NoError(t, err) @@ -54,7 +57,7 @@ func TestConnect_newConnect(t *testing.T) { }) t.Run("with sidecar", func(t *testing.T) { - asr, err := newConnect(redisID, allocID, service, &structs.ConsulConnect{ + asr, err := newConnect(redisID, info, service, &structs.ConsulConnect{ Native: false, SidecarService: &structs.ConsulSidecarService{ Tags: []string{"foo", "bar"}, @@ -88,7 +91,7 @@ func TestConnect_newConnect(t *testing.T) { }) t.Run("with sidecar without TCP checks", func(t *testing.T) { - asr, err := newConnect(redisID, allocID, service, &structs.ConsulConnect{ + asr, err := newConnect(redisID, info, service, &structs.ConsulConnect{ Native: false, SidecarService: &structs.ConsulSidecarService{ Tags: []string{"foo", "bar"}, @@ -123,22 +126,25 @@ func TestConnect_connectSidecarRegistration(t *testing.T) { redisID := uuid.Generate() allocID := uuid.Generate() + info := structs.AllocInfo{ + AllocID: allocID, + } t.Run("nil", func(t *testing.T) { - sidecarReg, err := connectSidecarRegistration(redisID, allocID, nil, testConnectNetwork, testConnectPorts) + sidecarReg, err := connectSidecarRegistration(redisID, info, nil, testConnectNetwork, testConnectPorts) require.NoError(t, err) require.Nil(t, sidecarReg) }) t.Run("no service port", func(t *testing.T) { - _, err := connectSidecarRegistration("unknown-id", allocID, &structs.ConsulSidecarService{ + _, err := connectSidecarRegistration("unknown-id", info, &structs.ConsulSidecarService{ Port: "unknown-label", }, testConnectNetwork, testConnectPorts) require.EqualError(t, err, `No port of label "unknown-label" defined`) }) t.Run("bad proxy", func(t *testing.T) { - _, err := connectSidecarRegistration(redisID, allocID, &structs.ConsulSidecarService{ + _, err := connectSidecarRegistration(redisID, info, &structs.ConsulSidecarService{ Port: "connect-proxy-redis", Proxy: &structs.ConsulProxy{ Expose: &structs.ConsulExposeConfig{ @@ -152,7 +158,7 @@ func TestConnect_connectSidecarRegistration(t *testing.T) { }) t.Run("normal", func(t *testing.T) { - proxy, err := connectSidecarRegistration(redisID, allocID, &structs.ConsulSidecarService{ + proxy, err := connectSidecarRegistration(redisID, info, &structs.ConsulSidecarService{ Tags: []string{"foo", "bar"}, Port: "connect-proxy-redis", }, testConnectNetwork, testConnectPorts) @@ -187,11 +193,14 @@ func TestConnect_connectProxy(t *testing.T) { ci.Parallel(t) allocID := uuid.Generate() + info := structs.AllocInfo{ + AllocID: allocID, + } // If the input proxy is nil, we expect the output to be a proxy with its // config set to default values. t.Run("nil proxy", func(t *testing.T) { - proxy, err := connectSidecarProxy(allocID, nil, 2000, testConnectNetwork) + proxy, err := connectSidecarProxy(info, nil, 2000, testConnectNetwork) require.NoError(t, err) require.Equal(t, &api.AgentServiceConnectProxyConfig{ LocalServiceAddress: "", @@ -207,7 +216,7 @@ func TestConnect_connectProxy(t *testing.T) { }) t.Run("bad proxy", func(t *testing.T) { - _, err := connectSidecarProxy(allocID, &structs.ConsulProxy{ + _, err := connectSidecarProxy(info, &structs.ConsulProxy{ LocalServiceAddress: "0.0.0.0", LocalServicePort: 2000, Upstreams: nil, @@ -222,7 +231,7 @@ func TestConnect_connectProxy(t *testing.T) { }) t.Run("normal", func(t *testing.T) { - proxy, err := connectSidecarProxy(allocID, &structs.ConsulProxy{ + proxy, err := connectSidecarProxy(info, &structs.ConsulProxy{ LocalServiceAddress: "0.0.0.0", LocalServicePort: 2000, Upstreams: nil, @@ -388,7 +397,7 @@ func TestConnect_connectProxyConfig(t *testing.T) { "bind_address": "0.0.0.0", "bind_port": 42, "envoy_stats_tags": []string{"nomad.alloc_id=test_alloc1"}, - }, connectProxyConfig(nil, 42, "test_alloc1")) + }, connectProxyConfig(nil, 42, structs.AllocInfo{AllocID: "test_alloc1"})) }) t.Run("pre-existing map", func(t *testing.T) { @@ -399,7 +408,7 @@ func TestConnect_connectProxyConfig(t *testing.T) { "envoy_stats_tags": []string{"nomad.alloc_id=test_alloc2"}, }, connectProxyConfig(map[string]interface{}{ "foo": "bar", - }, 42, "test_alloc2")) + }, 42, structs.AllocInfo{AllocID: "test_alloc2"})) }) } @@ -594,19 +603,34 @@ func Test_connectMeshGateway(t *testing.T) { }) } -func Test_injectAllocID(t *testing.T) { +func Test_injectNomadInfo(t *testing.T) { ci.Parallel(t) - id := "abc123" + info1 := func() map[string]string { + return map[string]string{ + "nomad.alloc_id=": "abc123", + } + } + info2 := func() map[string]string { + return map[string]string{ + "nomad.alloc_id=": "abc123", + "nomad.namespace=": "testns", + } + } - try := func(allocID string, cfg, exp map[string]interface{}) { - injectAllocID(cfg, allocID) + try := func(defaultTags map[string]string, cfg, exp map[string]interface{}) { + // TODO: defaultTags get modified over the execution + injectNomadInfo(cfg, defaultTags) + cfgTags, expTags := cfg["envoy_stats_tags"], exp["envoy_stats_tags"] + delete(cfg, "envoy_stats_tags") + delete(exp, "envoy_stats_tags") require.Equal(t, exp, cfg) + require.ElementsMatch(t, expTags, cfgTags, "") } // empty try( - id, + info1(), make(map[string]interface{}), map[string]interface{}{ "envoy_stats_tags": []string{"nomad.alloc_id=abc123"}, @@ -615,7 +639,7 @@ func Test_injectAllocID(t *testing.T) { // merge fresh try( - id, + info1(), map[string]interface{}{"foo": "bar"}, map[string]interface{}{ "foo": "bar", @@ -625,7 +649,7 @@ func Test_injectAllocID(t *testing.T) { // merge append try( - id, + info1(), map[string]interface{}{ "foo": "bar", "envoy_stats_tags": []string{"k1=v1", "k2=v2"}, @@ -638,25 +662,25 @@ func Test_injectAllocID(t *testing.T) { // merge exists try( - id, + info2(), map[string]interface{}{ "foo": "bar", "envoy_stats_tags": []string{"k1=v1", "k2=v2", "nomad.alloc_id=xyz789"}, }, map[string]interface{}{ "foo": "bar", - "envoy_stats_tags": []string{"k1=v1", "k2=v2", "nomad.alloc_id=xyz789"}, + "envoy_stats_tags": []string{"k1=v1", "k2=v2", "nomad.alloc_id=xyz789", "nomad.namespace=testns"}, }, ) // merge wrong type try( - id, + info1(), map[string]interface{}{ "envoy_stats_tags": "not a slice of string", }, map[string]interface{}{ - "envoy_stats_tags": "not a slice of string", + "envoy_stats_tags": []string{"nomad.alloc_id=abc123"}, }, ) } diff --git a/command/agent/consul/group_test.go b/command/agent/consul/group_test.go index b601ae67f..ab399075e 100644 --- a/command/agent/consul/group_test.go +++ b/command/agent/consul/group_test.go @@ -125,7 +125,7 @@ func TestConsul_Connect(t *testing.T) { require.Equal(t, connectService.Proxy.Config, map[string]interface{}{ "bind_address": "0.0.0.0", "bind_port": float64(9998), - "envoy_stats_tags": []interface{}{"nomad.alloc_id=" + alloc.ID}, + "envoy_stats_tags": []interface{}{"nomad.alloc_id=" + alloc.ID, "nomad.group=" + alloc.TaskGroup}, }) require.Equal(t, alloc.ID, agentService.Meta["alloc_id"]) diff --git a/command/agent/consul/service_client.go b/command/agent/consul/service_client.go index bea559594..8d157d0d4 100644 --- a/command/agent/consul/service_client.go +++ b/command/agent/consul/service_client.go @@ -965,7 +965,7 @@ func (c *ServiceClient) serviceRegs( *serviceregistration.ServiceRegistration, error) { // Get the services ID - id := serviceregistration.MakeAllocServiceID(workload.AllocID, workload.Name(), service) + id := serviceregistration.MakeAllocServiceID(workload.AllocInfo.AllocID, workload.Name(), service) sreg := &serviceregistration.ServiceRegistration{ ServiceID: id, CheckIDs: make(map[string]struct{}, len(service.Checks)), @@ -996,7 +996,7 @@ func (c *ServiceClient) serviceRegs( } // newConnect returns (nil, nil) if there's no Connect-enabled service. - connect, err := newConnect(id, workload.AllocID, service.Name, service.Connect, workload.Networks, workload.Ports) + connect, err := newConnect(id, workload.AllocInfo, service.Name, service.Connect, workload.Networks, workload.Ports) if err != nil { return nil, fmt.Errorf("invalid Consul Connect configuration for service %q: %v", service.Name, err) } @@ -1069,7 +1069,7 @@ func (c *ServiceClient) serviceRegs( Kind: kind, ID: id, Name: service.Name, - Namespace: workload.Namespace, + Namespace: workload.ProviderNamespace, Tags: tags, EnableTagOverride: service.EnableTagOverride, Address: ip, @@ -1130,7 +1130,7 @@ func (c *ServiceClient) checkRegs(serviceID string, service *structs.Service, } checkID := MakeCheckID(serviceID, check) - registration, err := createCheckReg(serviceID, checkID, check, ip, port, workload.Namespace) + registration, err := createCheckReg(serviceID, checkID, check, ip, port, workload.ProviderNamespace) if err != nil { return nil, fmt.Errorf("failed to add check %q: %v", check.Name, err) } @@ -1167,18 +1167,18 @@ func (c *ServiceClient) RegisterWorkload(workload *serviceregistration.WorkloadS } // Add the workload to the allocation's registration - c.addRegistrations(workload.AllocID, workload.Name(), t) + c.addRegistrations(workload.AllocInfo.AllocID, workload.Name(), t) c.commit(ops) // Start watching checks. Done after service registrations are built // since an error building them could leak watches. for _, service := range workload.Services { - serviceID := serviceregistration.MakeAllocServiceID(workload.AllocID, workload.Name(), service) + serviceID := serviceregistration.MakeAllocServiceID(workload.AllocInfo.AllocID, workload.Name(), service) for _, check := range service.Checks { if check.TriggersRestarts() { checkID := MakeCheckID(serviceID, check) - c.checkWatcher.Watch(workload.AllocID, workload.Name(), checkID, check, workload.Restarter) + c.checkWatcher.Watch(workload.AllocInfo.AllocID, workload.Name(), checkID, check, workload.Restarter) } } } @@ -1196,12 +1196,12 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *serviceregistration.Wor newIDs := make(map[string]*structs.Service, len(newWorkload.Services)) for _, s := range newWorkload.Services { - newIDs[serviceregistration.MakeAllocServiceID(newWorkload.AllocID, newWorkload.Name(), s)] = s + newIDs[serviceregistration.MakeAllocServiceID(newWorkload.AllocInfo.AllocID, newWorkload.Name(), s)] = s } // Loop over existing Services to see if they have been removed for _, existingSvc := range old.Services { - existingID := serviceregistration.MakeAllocServiceID(old.AllocID, old.Name(), existingSvc) + existingID := serviceregistration.MakeAllocServiceID(old.AllocInfo.AllocID, old.Name(), existingSvc) newSvc, ok := newIDs[existingID] if !ok { @@ -1219,8 +1219,8 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *serviceregistration.Wor continue } - oldHash := existingSvc.Hash(old.AllocID, old.Name(), old.Canary) - newHash := newSvc.Hash(newWorkload.AllocID, newWorkload.Name(), newWorkload.Canary) + oldHash := existingSvc.Hash(old.AllocInfo.AllocID, old.Name(), old.Canary) + newHash := newSvc.Hash(newWorkload.AllocInfo.AllocID, newWorkload.Name(), newWorkload.Canary) if oldHash == newHash { // Service exists and hasn't changed, don't re-add it later delete(newIDs, existingID) @@ -1265,7 +1265,7 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *serviceregistration.Wor // Update all watched checks as CheckRestart fields aren't part of ID if check.TriggersRestarts() { - c.checkWatcher.Watch(newWorkload.AllocID, newWorkload.Name(), checkID, check, newWorkload.Restarter) + c.checkWatcher.Watch(newWorkload.AllocInfo.AllocID, newWorkload.Name(), checkID, check, newWorkload.Restarter) } } @@ -1291,7 +1291,7 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *serviceregistration.Wor } // Add the task to the allocation's registration - c.addRegistrations(newWorkload.AllocID, newWorkload.Name(), regs) + c.addRegistrations(newWorkload.AllocInfo.AllocID, newWorkload.Name(), regs) c.commit(ops) @@ -1301,7 +1301,7 @@ func (c *ServiceClient) UpdateWorkload(old, newWorkload *serviceregistration.Wor for _, check := range service.Checks { if check.TriggersRestarts() { checkID := MakeCheckID(serviceID, check) - c.checkWatcher.Watch(newWorkload.AllocID, newWorkload.Name(), checkID, check, newWorkload.Restarter) + c.checkWatcher.Watch(newWorkload.AllocInfo.AllocID, newWorkload.Name(), checkID, check, newWorkload.Restarter) } } } @@ -1316,7 +1316,7 @@ func (c *ServiceClient) RemoveWorkload(workload *serviceregistration.WorkloadSer ops := operations{} for _, service := range workload.Services { - id := serviceregistration.MakeAllocServiceID(workload.AllocID, workload.Name(), service) + id := serviceregistration.MakeAllocServiceID(workload.AllocInfo.AllocID, workload.Name(), service) ops.deregServices = append(ops.deregServices, id) for _, check := range service.Checks { @@ -1330,7 +1330,7 @@ func (c *ServiceClient) RemoveWorkload(workload *serviceregistration.WorkloadSer } // Remove the workload from the alloc's registrations - c.removeRegistration(workload.AllocID, workload.Name()) + c.removeRegistration(workload.AllocInfo.AllocID, workload.Name()) // Now add them to the deregistration fields; main Run loop will update c.commit(&ops) diff --git a/command/agent/consul/service_client_test.go b/command/agent/consul/service_client_test.go index e269a9679..a89ed1b0b 100644 --- a/command/agent/consul/service_client_test.go +++ b/command/agent/consul/service_client_test.go @@ -416,8 +416,10 @@ func TestServiceRegistration_CheckOnUpdate(t *testing.T) { allocID := uuid.Generate() ws := &serviceregistration.WorkloadServices{ - AllocID: allocID, - Task: "taskname", + AllocInfo: structs.AllocInfo{ + AllocID: allocID, + Task: "taskname", + }, Restarter: &restartRecorder{}, Services: []*structs.Service{ { diff --git a/command/agent/consul/structs.go b/command/agent/consul/structs.go index 52e3e3629..8b31d471e 100644 --- a/command/agent/consul/structs.go +++ b/command/agent/consul/structs.go @@ -17,8 +17,10 @@ func BuildAllocServices( tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) ws := &serviceregistration.WorkloadServices{ - AllocID: alloc.ID, - Group: alloc.TaskGroup, + AllocInfo: structs.AllocInfo{ + AllocID: alloc.ID, + Group: alloc.TaskGroup, + }, Services: taskenv.InterpolateServices(taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region).Build(), tg.Services), Networks: alloc.AllocatedResources.Shared.Networks, diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go index 8f1c00e9b..d6bdc0c53 100644 --- a/command/agent/consul/unit_test.go +++ b/command/agent/consul/unit_test.go @@ -28,8 +28,10 @@ const ( func testWorkload() *serviceregistration.WorkloadServices { return &serviceregistration.WorkloadServices{ - AllocID: uuid.Generate(), - Task: "taskname", + AllocInfo: structs.AllocInfo{ + AllocID: uuid.Generate(), + Task: "taskname", + }, Restarter: &restartRecorder{}, Services: []*structs.Service{ { @@ -131,7 +133,7 @@ func TestConsul_ChangeTags(t *testing.T) { r.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul") // Validate the alloc registration - reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocID) + reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocInfo.AllocID) r.NoError(err) r.NotNil(reg1, "Unexpected nil alloc registration") r.Equal(1, reg1.NumServices()) @@ -171,7 +173,7 @@ func TestConsul_EnableTagOverride_Syncs(t *testing.T) { r.Equal(1, len(ctx.FakeConsul.services)) // Validate the alloc registration - reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocID) + reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Workload.AllocInfo.AllocID) r.NoError(err) r.NotNil(reg1) r.Equal(1, reg1.NumServices()) @@ -1082,7 +1084,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) { }, }, } - remainingWorkloadServiceID := serviceregistration.MakeAllocServiceID(remainingWorkload.AllocID, + remainingWorkloadServiceID := serviceregistration.MakeAllocServiceID(remainingWorkload.AllocInfo.AllocID, remainingWorkload.Name(), remainingWorkload.Services[0]) require.NoError(ctx.ServiceClient.RegisterWorkload(remainingWorkload)) @@ -1105,7 +1107,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) { }, }, } - explicitlyRemovedWorkloadServiceID := serviceregistration.MakeAllocServiceID(explicitlyRemovedWorkload.AllocID, + explicitlyRemovedWorkloadServiceID := serviceregistration.MakeAllocServiceID(explicitlyRemovedWorkload.AllocInfo.AllocID, explicitlyRemovedWorkload.Name(), explicitlyRemovedWorkload.Services[0]) require.NoError(ctx.ServiceClient.RegisterWorkload(explicitlyRemovedWorkload)) @@ -1130,7 +1132,7 @@ func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) { }, }, } - outofbandWorkloadServiceID := serviceregistration.MakeAllocServiceID(outofbandWorkload.AllocID, + outofbandWorkloadServiceID := serviceregistration.MakeAllocServiceID(outofbandWorkload.AllocInfo.AllocID, outofbandWorkload.Name(), outofbandWorkload.Services[0]) require.NoError(ctx.ServiceClient.RegisterWorkload(outofbandWorkload)) @@ -1192,7 +1194,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) { }, }, } - remainingWorkloadServiceID := serviceregistration.MakeAllocServiceID(remainingWorkload.AllocID, + remainingWorkloadServiceID := serviceregistration.MakeAllocServiceID(remainingWorkload.AllocInfo.AllocID, remainingWorkload.Name(), remainingWorkload.Services[0]) require.NoError(ctx.ServiceClient.RegisterWorkload(remainingWorkload)) @@ -1215,7 +1217,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) { }, }, } - explicitlyRemovedWorkloadServiceID := serviceregistration.MakeAllocServiceID(explicitlyRemovedWorkload.AllocID, + explicitlyRemovedWorkloadServiceID := serviceregistration.MakeAllocServiceID(explicitlyRemovedWorkload.AllocInfo.AllocID, explicitlyRemovedWorkload.Name(), explicitlyRemovedWorkload.Services[0]) require.NoError(ctx.ServiceClient.RegisterWorkload(explicitlyRemovedWorkload)) @@ -1240,7 +1242,7 @@ func TestConsul_ServiceDeregistration_InProbation(t *testing.T) { }, }, } - outofbandWorkloadServiceID := serviceregistration.MakeAllocServiceID(outofbandWorkload.AllocID, + outofbandWorkloadServiceID := serviceregistration.MakeAllocServiceID(outofbandWorkload.AllocInfo.AllocID, outofbandWorkload.Name(), outofbandWorkload.Services[0]) require.NoError(ctx.ServiceClient.RegisterWorkload(outofbandWorkload)) diff --git a/nomad/structs/alloc.go b/nomad/structs/alloc.go index 2f5c0cfa9..57ef361be 100644 --- a/nomad/structs/alloc.go +++ b/nomad/structs/alloc.go @@ -56,3 +56,23 @@ func (a *Allocation) ServiceProviderNamespace() string { return tg.Consul.GetNamespace() } + +type AllocInfo struct { + AllocID string + + // Group in which the service belongs for a group-level service, or the + // group in which task belongs for a task-level service. + Group string + + // Task in which the service belongs for task-level service. Will be empty + // for a group-level service. + Task string + + // JobID provides additional context for providers regarding which job + // caused this registration. + JobID string + + // NomadNamespace provides additional context for providers regarding which + // nomad namespace caused this registration. + Namespace string +}