From 8bbf4a55e5aa71a870f75cac34aaaeedbece9853 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 13 Jun 2016 16:29:07 -0700 Subject: [PATCH 1/2] Fix IDs and domain scoping --- client/client.go | 42 ++-- client/driver/executor/executor.go | 23 ++- command/agent/agent.go | 65 ++++--- command/agent/consul/syncer.go | 301 +++++++++++++++++------------ nomad/structs/structs.go | 4 - 5 files changed, 250 insertions(+), 185 deletions(-) diff --git a/client/client.go b/client/client.go index 6983991a3..dc6f9804b 100644 --- a/client/client.go +++ b/client/client.go @@ -1342,32 +1342,30 @@ func (c *Client) setupConsulSyncer() error { } c.consulSyncer.AddPeriodicHandler("Nomad Client Fallback Server Handler", bootstrapFn) - // TODO this should only deregister things that the executors do not know - // about - consulServicesSyncFn := func() error { - const estInitialConsulServices = 8 - const serviceGroupName = "executor" - servicesInRunningAllocs := make(map[string][]*structs.Service) + consulServicesReaperFn := func() error { + const estInitialExecutorDomains = 8 + + // Create the domains to keep and add the server and client + domains := make([]consul.ServiceDomain, 2, estInitialExecutorDomains) + domains[0] = consul.ServerDomain + domains[1] = consul.ClientDomain + for allocID, ar := range c.getAllocRunners() { - services := make([]*structs.Service, 0, estInitialConsulServices) - ar.taskStatusLock.RLock() - taskStates := copyTaskStates(ar.taskStates) - ar.taskStatusLock.RUnlock() - for taskName, taskState := range taskStates { - if taskState.State == structs.TaskStateRunning { - if tr, ok := ar.tasks[taskName]; ok { - for _, service := range tr.task.Services { - services = append(services, service) - } - } - } + if ar.Alloc().TerminalStatus() { + // Ignore non-running allocations + continue } - servicesInRunningAllocs[allocID] = services + ar.taskLock.RLock() + for task := range ar.tasks { + d := consul.NewExecutorDomain(allocID, task) + domains = append(domains, d) + } + ar.taskLock.RUnlock() } - c.consulSyncer.KeepServices(serviceGroupName, servicesInRunningAllocs) - return nil + + return c.consulSyncer.KeepDomains(domains) } - c.consulSyncer.AddPeriodicHandler("Nomad Client Services Sync Handler", consulServicesSyncFn) + c.consulSyncer.AddPeriodicHandler("Nomad Client Services Sync Handler", consulServicesReaperFn) return nil } diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 9088728b5..b04057dd9 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -37,9 +37,6 @@ const ( // tree for finding out the pids that the executor and it's child processes // have forked pidScanInterval = 5 * time.Second - - // serviceRegPrefix is the prefix the entire Executor should use - executorServiceRegPrefix = "executor" ) var ( @@ -365,11 +362,25 @@ func (e *UniversalExecutor) UpdateTask(task *structs.Task) error { // Re-syncing task with Consul agent if e.consulSyncer != nil { e.interpolateServices(e.ctx.Task) - e.consulSyncer.SetServices(e.ctx.AllocID, task.Services, executorServiceRegPrefix) + domain := consul.NewExecutorDomain(e.ctx.AllocID, task.Name) + serviceMap := servicesToServiceMap(e.ctx.AllocID, task.Services) + e.consulSyncer.SetServices(domain, serviceMap) } return nil } +// servicesToServiceMap takes a list of interpolated services and returns a map +// of ServiceKeys to services where the service key is appropriate for the +// executor. +func servicesToServiceMap(allocID string, services []*structs.Service) map[consul.ServiceKey]*structs.Service { + keys := make(map[consul.ServiceKey]*structs.Service, len(services)) + for _, service := range services { + key := consul.GenerateServiceKey(service) + keys[key] = service + } + return keys +} + func (e *UniversalExecutor) wait() { defer close(e.processExited) err := e.cmd.Wait() @@ -493,7 +504,9 @@ func (e *UniversalExecutor) SyncServices(ctx *ConsulContext) error { e.interpolateServices(e.ctx.Task) e.consulSyncer.SetDelegatedChecks(e.createCheckMap(), e.createCheck) e.consulSyncer.SetAddrFinder(e.ctx.Task.FindHostAndPortFor) - e.consulSyncer.SetServices(e.ctx.AllocID, e.ctx.Task.Services, executorServiceRegPrefix) + domain := consul.NewExecutorDomain(e.ctx.AllocID, e.ctx.Task.Name) + serviceMap := servicesToServiceMap(e.ctx.AllocID, e.ctx.Task.Services) + e.consulSyncer.SetServices(domain, serviceMap) return nil } diff --git a/command/agent/agent.go b/command/agent/agent.go index 6309e76d6..50b0e2313 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -361,24 +361,26 @@ func (a *Agent) setupServer() error { // Create the Nomad Server services for Consul if a.config.Consul.AutoRegister && a.config.Consul.ServerServiceName != "" { - const serviceGroupName = "server" - a.consulSyncer.SetServices(serviceGroupName, []*structs.Service{ - &structs.Service{ - Name: a.config.Consul.ServerServiceName, - PortLabel: a.serverHTTPAddr, - Tags: []string{consul.ServiceTagHTTP}, - }, - &structs.Service{ - Name: a.config.Consul.ServerServiceName, - PortLabel: a.serverRPCAddr, - Tags: []string{consul.ServiceTagRPC}, - }, - &structs.Service{ - PortLabel: a.serverSerfAddr, - Name: a.config.Consul.ServerServiceName, - Tags: []string{consul.ServiceTagSerf}, - }, - }, "agent") + httpServ := &structs.Service{ + Name: a.config.Consul.ServerServiceName, + PortLabel: a.serverHTTPAddr, + Tags: []string{consul.ServiceTagHTTP}, + } + rpcServ := &structs.Service{ + Name: a.config.Consul.ServerServiceName, + PortLabel: a.serverRPCAddr, + Tags: []string{consul.ServiceTagRPC}, + } + serfServ := &structs.Service{ + PortLabel: a.serverSerfAddr, + Name: a.config.Consul.ServerServiceName, + Tags: []string{consul.ServiceTagSerf}, + } + a.consulSyncer.SetServices(consul.ServerDomain, map[consul.ServiceKey]*structs.Service{ + consul.GenerateServiceKey(httpServ): httpServ, + consul.GenerateServiceKey(rpcServ): rpcServ, + consul.GenerateServiceKey(serfServ): serfServ, + }) } return nil @@ -412,19 +414,20 @@ func (a *Agent) setupClient() error { // Create the Nomad Server services for Consul if a.config.Consul.AutoRegister && a.config.Consul.ClientServiceName != "" { - const serviceGroupName = "client" - a.consulSyncer.SetServices(serviceGroupName, []*structs.Service{ - &structs.Service{ - Name: a.config.Consul.ClientServiceName, - PortLabel: a.clientHTTPAddr, - Tags: []string{consul.ServiceTagHTTP}, - }, - &structs.Service{ - Name: a.config.Consul.ClientServiceName, - PortLabel: a.clientRPCAddr, - Tags: []string{consul.ServiceTagRPC}, - }, - }, "agent") + httpServ := &structs.Service{ + Name: a.config.Consul.ClientServiceName, + PortLabel: a.clientHTTPAddr, + Tags: []string{consul.ServiceTagHTTP}, + } + rpcServ := &structs.Service{ + Name: a.config.Consul.ClientServiceName, + PortLabel: a.clientRPCAddr, + Tags: []string{consul.ServiceTagRPC}, + } + a.consulSyncer.SetServices(consul.ClientDomain, map[consul.ServiceKey]*structs.Service{ + consul.GenerateServiceKey(httpServ): httpServ, + consul.GenerateServiceKey(rpcServ): rpcServ, + }) } return nil diff --git a/command/agent/consul/syncer.go b/command/agent/consul/syncer.go index 39a38b78c..99c305862 100644 --- a/command/agent/consul/syncer.go +++ b/command/agent/consul/syncer.go @@ -1,3 +1,27 @@ +// Package consul is used by Nomad to register all services both static services +// and dynamic via allocations. +// +// Consul Service IDs have the following format: ${nomadServicePrefix}-${groupName}-${serviceKey} +// groupName takes on one of the following values: +// - server +// - client +// - executor-${alloc-id}-${task-name} +// +// serviceKey should be generated by service registrators. +// If the serviceKey is being generated by the executor for a Nomad Task.Services +// the following helper should be used: +// NOTE: Executor should interpolate the service prior to calling +// func GenerateTaskServiceKey(service *structs.Service) string +// +// The Nomad Client reaps services registered from dead allocations that were +// not properly cleaned up by the executor (this is not the expected case). +// +// TODO fix this comment +// The Consul ServiceIDs generated by the executor will contain the allocation +// ID. Thus the client can generate the list of Consul ServiceIDs to keep by +// calling the following method on all running allocations the client is aware +// of: +// func GenerateExecutorServiceKeyPrefixFromAlloc(allocID string) string package consul import ( @@ -28,8 +52,8 @@ const ( // initialSyncDelay is the delay before an initial sync. initialSyncDelay = 5 * time.Second - // nomadServicePrefix is the prefix used when registering a service - // with consul + // nomadServicePrefix is the first prefix that scopes all Nomad registered + // services nomadServicePrefix = "nomad-registered-service" // The periodic time interval for syncing services and checks with Consul @@ -57,6 +81,27 @@ const ( ServiceTagSerf = "serf" ) +// consulServiceID and consulCheckID are the IDs registered with Consul +type consulServiceID string +type consulCheckID string + +// ServiceKey is the generated service key that is used to build the Consul +// ServiceID +type ServiceKey string + +// ServiceDomain is the domain of services registered by Nomad +type ServiceDomain string + +const ( + ClientDomain ServiceDomain = "client" + ServerDomain ServiceDomain = "server" +) + +// NewExecutorDomain returns a domain specific to the alloc ID and task +func NewExecutorDomain(allocID, task string) ServiceDomain { + return ServiceDomain(fmt.Sprintf("executor-%s-%s", allocID, task)) +} + // Syncer allows syncing of services and checks with Consul type Syncer struct { client *consul.Client @@ -67,27 +112,24 @@ type Syncer struct { // SyncServices() is called. The key to the servicesGroups map is unique // per handler and is used to allow the Agent's services to be maintained // independently of the Client or Server's services. - servicesGroups map[string][]*consul.AgentServiceRegistration - checkGroups map[string][]*consul.AgentCheckRegistration + servicesGroups map[ServiceDomain]map[ServiceKey]*consul.AgentServiceRegistration + checkGroups map[ServiceDomain]map[ServiceKey][]*consul.AgentCheckRegistration groupsLock sync.RWMutex - groupName string - // The "Consul Registry" is a collection of Consul Services and // Checks all guarded by the registryLock. registryLock sync.RWMutex - checkRunners map[string]*CheckRunner - delegateChecks map[string]struct{} // delegateChecks are the checks that the Nomad client runs and reports to Consul - // trackedChecks and trackedServices are registered with consul - trackedChecks map[string]*consul.AgentCheckRegistration - trackedServices map[string]*consul.AgentServiceRegistration + trackedChecks map[consulCheckID]*consul.AgentCheckRegistration + trackedServices map[consulServiceID]*consul.AgentServiceRegistration - serviceRegPrefix string + // checkRunners are delegated Consul checks being ran by the Syncer + checkRunners map[consulCheckID]*CheckRunner addrFinder func(portLabel string) (string, int) createDelegatedCheck func(*structs.ServiceCheck, string) (Check, error) + delegateChecks map[string]struct{} // delegateChecks are the checks that the Nomad client runs and reports to Consul // End registryLock guarded attributes. logger *log.Logger @@ -174,13 +216,14 @@ func NewSyncer(consulConfig *config.ConsulConfig, shutdownCh chan struct{}, logg logger: logger, consulAvailable: true, shutdownCh: shutdownCh, - servicesGroups: make(map[string][]*consul.AgentServiceRegistration), - checkGroups: make(map[string][]*consul.AgentCheckRegistration), - trackedServices: make(map[string]*consul.AgentServiceRegistration), - trackedChecks: make(map[string]*consul.AgentCheckRegistration), - checkRunners: make(map[string]*CheckRunner), + servicesGroups: make(map[ServiceDomain]map[ServiceKey]*consul.AgentServiceRegistration), + checkGroups: make(map[ServiceDomain]map[ServiceKey][]*consul.AgentCheckRegistration), + trackedServices: make(map[consulServiceID]*consul.AgentServiceRegistration), + trackedChecks: make(map[consulCheckID]*consul.AgentCheckRegistration), + checkRunners: make(map[consulCheckID]*CheckRunner), periodicCallbacks: make(map[string]types.PeriodicCallback), } + return &consulSyncer, nil } @@ -198,47 +241,35 @@ func (c *Syncer) SetAddrFinder(addrFinder func(string) (string, int)) *Syncer { return c } -// filterPrefix generates a unique prefix that a Syncer can later filter on. -func (c *Syncer) filterPrefix(serviceRegPrefix string) string { - c.registryLock.RLock() - defer c.registryLock.RUnlock() - return fmt.Sprintf("%s-%s", nomadServicePrefix, serviceRegPrefix) -} - -// GenerateServiceID creates a unique Consul ServiceID for a given +// GenerateServiceKey should be called to generate a serviceKey based on the // Service. -func (c *Syncer) GenerateServiceID(groupName string, service *structs.Service, serviceRegPrefix string) string { +func GenerateServiceKey(service *structs.Service) ServiceKey { + var key string numTags := len(service.Tags) switch numTags { case 0: - return fmt.Sprintf("%s-%s:%s", c.filterPrefix(serviceRegPrefix), groupName, service.Name) - case 1: - return fmt.Sprintf("%s-%s:%s@%s", c.filterPrefix(serviceRegPrefix), groupName, service.Tags[0], service.Name) + key = fmt.Sprintf("%s", service.Name) default: - tags := strings.Join(service.Tags, "|") - return fmt.Sprintf("%s-%s:(%s)@%s", c.filterPrefix(serviceRegPrefix), groupName, tags, service.Name) + tags := strings.Join(service.Tags, "-") + key = fmt.Sprintf("%s-%s", service.Name, tags) } + return ServiceKey(key) } -// SetServices assigns the slice of Nomad Services to the provided services -// group name. -func (c *Syncer) SetServices(groupName string, services []*structs.Service, serviceRegPrefix string) error { - c.groupName = groupName +// SetServices stores the map of Nomad Services to the provided service +// domain name. +func (c *Syncer) SetServices(domain ServiceDomain, services map[ServiceKey]*structs.Service) error { var mErr multierror.Error numServ := len(services) - registeredServices := make([]*consul.AgentServiceRegistration, 0, numServ) - registeredChecks := make([]*consul.AgentCheckRegistration, 0, numServ) - for _, service := range services { - if service.ServiceID == "" { - service.ServiceID = c.GenerateServiceID(groupName, service, serviceRegPrefix) - } - var serviceReg *consul.AgentServiceRegistration - var err error - if serviceReg, err = c.createService(service); err != nil { + registeredServices := make(map[ServiceKey]*consul.AgentServiceRegistration, numServ) + registeredChecks := make(map[ServiceKey][]*consul.AgentCheckRegistration, numServ) + for serviceKey, service := range services { + serviceReg, err := c.createService(service, domain, serviceKey) + if err != nil { mErr.Errors = append(mErr.Errors, err) continue } - registeredServices = append(registeredServices, serviceReg) + registeredServices[serviceKey] = serviceReg // Register the check(s) for this service for _, chk := range service.Checks { @@ -252,7 +283,7 @@ func (c *Syncer) SetServices(groupName string, services []*structs.Service, serv // creating a nomad check if we have to handle this particular check type c.registryLock.RLock() if _, ok := c.delegateChecks[chk.Type]; ok { - _, ok := c.checkRunners[chkReg.ID] + _, ok := c.checkRunners[consulCheckID(chkReg.ID)] c.registryLock.RUnlock() if ok { continue @@ -266,13 +297,14 @@ func (c *Syncer) SetServices(groupName string, services []*structs.Service, serv cr := NewCheckRunner(nc, c.runCheck, c.logger) c.registryLock.Lock() - c.checkRunners[nc.ID()] = cr + // TODO type the CheckRunner + c.checkRunners[consulCheckID(nc.ID())] = cr c.registryLock.Unlock() } else { c.registryLock.RUnlock() } - registeredChecks = append(registeredChecks, chkReg) + registeredChecks[serviceKey] = append(registeredChecks[serviceKey], chkReg) } } @@ -281,8 +313,22 @@ func (c *Syncer) SetServices(groupName string, services []*structs.Service, serv } c.groupsLock.Lock() - c.servicesGroups[groupName] = registeredServices - c.checkGroups[groupName] = registeredChecks + for serviceKey, service := range registeredServices { + serviceKeys, ok := c.servicesGroups[domain] + if !ok { + serviceKeys = make(map[ServiceKey]*consul.AgentServiceRegistration, len(registeredServices)) + c.servicesGroups[domain] = serviceKeys + } + serviceKeys[serviceKey] = service + } + for serviceKey, checks := range registeredChecks { + serviceKeys, ok := c.checkGroups[domain] + if !ok { + serviceKeys = make(map[ServiceKey][]*consul.AgentCheckRegistration, len(registeredChecks)) + c.checkGroups[domain] = serviceKeys + } + serviceKeys[serviceKey] = checks + } c.groupsLock.Unlock() // Sync immediately @@ -300,7 +346,8 @@ func (c *Syncer) SyncNow() { } } -// flattenedServices returns a flattened list of services +// flattenedServices returns a flattened list of services that are registered +// locally func (c *Syncer) flattenedServices() []*consul.AgentServiceRegistration { const initialNumServices = 8 services := make([]*consul.AgentServiceRegistration, 0, initialNumServices) @@ -314,18 +361,18 @@ func (c *Syncer) flattenedServices() []*consul.AgentServiceRegistration { return services } -// flattenedChecks returns a flattened list of checks +// flattenedChecks returns a flattened list of checks that are registered +// locally func (c *Syncer) flattenedChecks() []*consul.AgentCheckRegistration { const initialNumChecks = 8 checks := make([]*consul.AgentCheckRegistration, 0, initialNumChecks) c.groupsLock.RLock() for _, checkGroup := range c.checkGroups { for _, check := range checkGroup { - checks = append(checks, check) + checks = append(checks, check...) } } c.groupsLock.RUnlock() - return checks } @@ -350,26 +397,16 @@ func (c *Syncer) Shutdown() error { // Stop all the checks that nomad is running c.registryLock.RLock() + defer c.registryLock.RUnlock() for _, cr := range c.checkRunners { cr.Stop() } - c.registryLock.RUnlock() // De-register all the services from Consul - consulServices := make(map[string]*consul.AgentService) - for trackedGroup := range c.servicesGroups { - serviceIDPrefix := fmt.Sprintf("%s-%s", c.filterPrefix(c.serviceRegPrefix), trackedGroup) - services, err := c.queryAgentServices(serviceIDPrefix) - if err != nil { - return err - } - for serviceID, agentService := range services { - consulServices[serviceID] = agentService - } - } - for _, service := range consulServices { - if err := c.client.Agent().ServiceDeregister(service.ID); err != nil { - c.logger.Printf("[WARN] consul.syncer: failed to deregister service ID %+q: %v", service.ID, err) + for serviceID := range c.trackedServices { + convertedID := string(serviceID) + if err := c.client.Agent().ServiceDeregister(convertedID); err != nil { + c.logger.Printf("[WARN] consul.syncer: failed to deregister service ID %+q: %v", convertedID, err) mErr.Errors = append(mErr.Errors, err) } } @@ -378,7 +415,7 @@ func (c *Syncer) Shutdown() error { // queryChecks queries the Consul Agent for a list of Consul checks that // have been registered with this Consul Syncer. -func (c *Syncer) queryChecks() (map[string]*consul.AgentCheck, error) { +func (c *Syncer) queryChecks() (map[consulCheckID]*consul.AgentCheck, error) { checks, err := c.client.Agent().Checks() if err != nil { return nil, err @@ -388,12 +425,12 @@ func (c *Syncer) queryChecks() (map[string]*consul.AgentCheck, error) { // queryAgentServices queries the Consul Agent for a list of Consul services that // have been registered with this Consul Syncer. -func (c *Syncer) queryAgentServices(serviceIDPrefix string) (map[string]*consul.AgentService, error) { +func (c *Syncer) queryAgentServices() (map[consulServiceID]*consul.AgentService, error) { services, err := c.client.Agent().Services() if err != nil { return nil, err } - return c.filterConsulServices(serviceIDPrefix, services), nil + return c.filterConsulServices(services), nil } // syncChecks synchronizes this Syncer's Consul Checks with the Consul Agent. @@ -411,7 +448,7 @@ func (c *Syncer) syncChecks() error { mErr.Errors = append(mErr.Errors, err) } c.registryLock.Lock() - c.trackedChecks[check.ID] = check + c.trackedChecks[consulCheckID(check.ID)] = check c.registryLock.Unlock() } for _, check := range changedChecks { @@ -428,11 +465,11 @@ func (c *Syncer) syncChecks() error { } } for _, check := range staleChecks { - if err := c.deregisterCheck(check.ID); err != nil { + if err := c.deregisterCheck(consulCheckID(check.ID)); err != nil { mErr.Errors = append(mErr.Errors, err) } c.registryLock.Lock() - delete(c.trackedChecks, check.ID) + delete(c.trackedChecks, consulCheckID(check.ID)) c.registryLock.Unlock() } return mErr.ErrorOrNil() @@ -467,7 +504,7 @@ func compareConsulCheck(localCheck *consul.AgentCheckRegistration, consulCheck * // 4) a slice of checks that exist only in the Consul Agent (consulChecks) // and should be removed because the Consul Agent has drifted from the // Syncer. -func (c *Syncer) calcChecksDiff(consulChecks map[string]*consul.AgentCheck) ( +func (c *Syncer) calcChecksDiff(consulChecks map[consulCheckID]*consul.AgentCheck) ( missingChecks []*consul.AgentCheckRegistration, equalChecks []*consul.AgentCheckRegistration, changedChecks []*consul.AgentCheckRegistration, @@ -584,7 +621,7 @@ func compareConsulService(localService *consul.AgentServiceRegistration, consulS // 4) a slice of services that exist only in the Consul Agent // (consulServices) and should be removed because the Consul Agent has // drifted from the Syncer. -func (c *Syncer) calcServicesDiff(consulServices map[string]*consul.AgentService) (missingServices []*consul.AgentServiceRegistration, equalServices []*consul.AgentServiceRegistration, changedServices []*consul.AgentServiceRegistration, staleServices []*consul.AgentServiceRegistration) { +func (c *Syncer) calcServicesDiff(consulServices map[consulServiceID]*consul.AgentService) (missingServices []*consul.AgentServiceRegistration, equalServices []*consul.AgentServiceRegistration, changedServices []*consul.AgentServiceRegistration, staleServices []*consul.AgentServiceRegistration) { type mergedService struct { service *consul.AgentServiceRegistration // 'l' == Nomad local only @@ -652,16 +689,9 @@ func (c *Syncer) calcServicesDiff(consulServices map[string]*consul.AgentService // syncServices synchronizes this Syncer's Consul Services with the Consul // Agent. func (c *Syncer) syncServices() error { - consulServices := make(map[string]*consul.AgentService) - for trackedGroup := range c.servicesGroups { - serviceIDPrefix := fmt.Sprintf("%s-%s", c.filterPrefix(c.serviceRegPrefix), trackedGroup) - services, err := c.queryAgentServices(serviceIDPrefix) - if err != nil { - return err - } - for serviceID, agentService := range services { - consulServices[serviceID] = agentService - } + consulServices, err := c.queryAgentServices() + if err != nil { + return err } // Synchronize services with Consul @@ -672,7 +702,7 @@ func (c *Syncer) syncServices() error { mErr.Errors = append(mErr.Errors, err) } c.registryLock.Lock() - c.trackedServices[service.ID] = service + c.trackedServices[consulServiceID(service.ID)] = service c.registryLock.Unlock() } for _, service := range changedServices { @@ -686,7 +716,7 @@ func (c *Syncer) syncServices() error { mErr.Errors = append(mErr.Errors, err) } c.registryLock.Lock() - delete(c.trackedServices, service.ID) + delete(c.trackedServices, consulServiceID(service.ID)) c.registryLock.Unlock() } return mErr.ErrorOrNil() @@ -695,7 +725,7 @@ func (c *Syncer) syncServices() error { // registerCheck registers a check definition with Consul func (c *Syncer) registerCheck(chkReg *consul.AgentCheckRegistration) error { c.registryLock.RLock() - if cr, ok := c.checkRunners[chkReg.ID]; ok { + if cr, ok := c.checkRunners[consulCheckID(chkReg.ID)]; ok { cr.Start() } c.registryLock.RUnlock() @@ -733,13 +763,19 @@ func (c *Syncer) createCheckReg(check *structs.ServiceCheck, service *consul.Age return &chkReg, nil } +// generateConsulServiceID takes the domain and service key and returns a Consul +// ServiceID +func generateConsulServiceID(domain ServiceDomain, key ServiceKey) consulServiceID { + return consulServiceID(fmt.Sprintf("%s-%s-%s", nomadServicePrefix, domain, key)) +} + // createService creates a Consul AgentService from a Nomad ConsulService. -func (c *Syncer) createService(service *structs.Service) (*consul.AgentServiceRegistration, error) { +func (c *Syncer) createService(service *structs.Service, domain ServiceDomain, key ServiceKey) (*consul.AgentServiceRegistration, error) { c.registryLock.RLock() defer c.registryLock.RUnlock() srv := consul.AgentServiceRegistration{ - ID: service.ServiceID, + ID: string(generateConsulServiceID(domain, key)), Name: service.Name, Tags: service.Tags, } @@ -761,21 +797,21 @@ func (c *Syncer) deregisterService(serviceID string) error { } // deregisterCheck de-registers a check from Consul -func (c *Syncer) deregisterCheck(checkID string) error { +func (c *Syncer) deregisterCheck(id consulCheckID) error { c.registryLock.Lock() defer c.registryLock.Unlock() // Deleting from Consul Agent - if err := c.client.Agent().CheckDeregister(checkID); err != nil { + if err := c.client.Agent().CheckDeregister(string(id)); err != nil { // CheckDeregister() will be reattempted again in a future // sync. return err } // Remove the check from the local registry - if cr, ok := c.checkRunners[checkID]; ok { + if cr, ok := c.checkRunners[id]; ok { cr.Stop() - delete(c.checkRunners, checkID) + delete(c.checkRunners, id) } return nil @@ -847,15 +883,18 @@ func (c *Syncer) SyncServices() error { return nil } -// filterConsulServices prunes out all the service whose ids are not prefixed -// with nomad- -func (c *Syncer) filterConsulServices(serviceIDPrefix string, consulServices map[string]*consul.AgentService) map[string]*consul.AgentService { - localServices := make(map[string]*consul.AgentService, len(consulServices)) +// filterConsulServices prunes out all the service who were not registered with +// the syncer +func (c *Syncer) filterConsulServices(consulServices map[string]*consul.AgentService) map[consulServiceID]*consul.AgentService { + localServices := make(map[consulServiceID]*consul.AgentService, len(consulServices)) c.registryLock.RLock() defer c.registryLock.RUnlock() for serviceID, service := range consulServices { - if strings.HasPrefix(service.ID, serviceIDPrefix) { - localServices[serviceID] = service + for domain := range c.servicesGroups { + if strings.HasPrefix(service.ID, fmt.Sprintf("%s-%s", nomadServicePrefix, domain)) { + localServices[consulServiceID(serviceID)] = service + break + } } } return localServices @@ -863,12 +902,16 @@ func (c *Syncer) filterConsulServices(serviceIDPrefix string, consulServices map // filterConsulChecks prunes out all the consul checks which do not have // services with Syncer's idPrefix. -func (c *Syncer) filterConsulChecks(consulChecks map[string]*consul.AgentCheck) map[string]*consul.AgentCheck { - localChecks := make(map[string]*consul.AgentCheck, len(consulChecks)) - filterPrefix := c.filterPrefix(c.serviceRegPrefix) +func (c *Syncer) filterConsulChecks(consulChecks map[string]*consul.AgentCheck) map[consulCheckID]*consul.AgentCheck { + localChecks := make(map[consulCheckID]*consul.AgentCheck, len(consulChecks)) + c.registryLock.RLock() + defer c.registryLock.RUnlock() for checkID, check := range consulChecks { - if strings.HasPrefix(check.ServiceID, filterPrefix) { - localChecks[checkID] = check + for domain := range c.checkGroups { + if strings.HasPrefix(check.ServiceID, fmt.Sprintf("%s-%s", nomadServicePrefix, domain)) { + localChecks[consulCheckID(checkID)] = check + break + } } } return localChecks @@ -910,28 +953,40 @@ func (c *Syncer) runCheck(check Check) { } } -// KeepServices prunes all services besides the ones specified by the user -func (c *Syncer) KeepServices(serviceRegPrefix string, servicesInRunningAllocs map[string][]*structs.Service) { - servicesToKeep := make(map[string]struct{}) - for allocID, services := range servicesInRunningAllocs { - for _, service := range services { - serviceID := c.GenerateServiceID(allocID, service, serviceRegPrefix) - servicesToKeep[serviceID] = struct{}{} - } - } - +// KeepDomains prunes all services that do not exist in the passed domains +func (c *Syncer) KeepDomains(domains []ServiceDomain) error { servicesInConsul, err := c.ConsulClient().Agent().Services() if err != nil { - return + return err } - filterServicesWithPrefix := c.filterPrefix(serviceRegPrefix) - relevantServices := c.filterConsulServices(filterServicesWithPrefix, servicesInConsul) - for serviceID := range relevantServices { - if _, ok := servicesToKeep[serviceID]; !ok { - c.deregisterService(serviceID) + var mErr multierror.Error + for serviceID := range servicesInConsul { + // Skip any service that was not registered by Nomad + if !strings.HasPrefix(serviceID, nomadServicePrefix) { + continue + } + + // Filter services that do not exist in the desired domains + match := false + for _, domain := range domains { + // Include the hyphen so it is explicit to that domain otherwise it + // maybe a subset match + desired := fmt.Sprintf("%s-%s-", nomadServicePrefix, domain) + if strings.HasPrefix(serviceID, desired) { + match = true + break + } + } + + if !match { + if err := c.deregisterService(serviceID); err != nil { + mErr.Errors = append(mErr.Errors, err) + } } } + + return mErr.ErrorOrNil() } // AddPeriodicHandler adds a uniquely named callback. Returns true if diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 7729723f8..68c4c5d2f 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1555,10 +1555,6 @@ func (sc *ServiceCheck) Hash(serviceID string) string { // Service represents a Consul service definition in Nomad type Service struct { - // ServiceID is the calculated Consul ServiceID used for a service. - // This value is not available to be set via configuration. - ServiceID string `mapstructure:"-"` - // Name of the service registered with Consul. Consul defaults the // Name to ServiceID if not specified. The Name if specified is used // as one of the seed values when generating a Consul ServiceID. From 4b04e503f327939450ffcaaef91ddce8b3232fb0 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 13 Jun 2016 17:32:18 -0700 Subject: [PATCH 2/2] address comments --- client/client.go | 20 ++++++++++---------- client/driver/executor/executor.go | 11 +++++------ command/agent/agent.go | 2 +- command/agent/consul/syncer.go | 6 +++--- 4 files changed, 19 insertions(+), 20 deletions(-) diff --git a/client/client.go b/client/client.go index dc6f9804b..de7f10a31 100644 --- a/client/client.go +++ b/client/client.go @@ -1351,19 +1351,19 @@ func (c *Client) setupConsulSyncer() error { domains[1] = consul.ClientDomain for allocID, ar := range c.getAllocRunners() { - if ar.Alloc().TerminalStatus() { - // Ignore non-running allocations - continue + ar.taskStatusLock.RLock() + taskStates := copyTaskStates(ar.taskStates) + ar.taskStatusLock.RUnlock() + for taskName, taskState := range taskStates { + // Only keep running tasks + if taskState.State == structs.TaskStateRunning { + d := consul.NewExecutorDomain(allocID, taskName) + domains = append(domains, d) + } } - ar.taskLock.RLock() - for task := range ar.tasks { - d := consul.NewExecutorDomain(allocID, task) - domains = append(domains, d) - } - ar.taskLock.RUnlock() } - return c.consulSyncer.KeepDomains(domains) + return c.consulSyncer.ReapUnmatched(domains) } c.consulSyncer.AddPeriodicHandler("Nomad Client Services Sync Handler", consulServicesReaperFn) diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index b04057dd9..dbf3bffec 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -363,16 +363,15 @@ func (e *UniversalExecutor) UpdateTask(task *structs.Task) error { if e.consulSyncer != nil { e.interpolateServices(e.ctx.Task) domain := consul.NewExecutorDomain(e.ctx.AllocID, task.Name) - serviceMap := servicesToServiceMap(e.ctx.AllocID, task.Services) + serviceMap := generateServiceKeys(e.ctx.AllocID, task.Services) e.consulSyncer.SetServices(domain, serviceMap) } return nil } -// servicesToServiceMap takes a list of interpolated services and returns a map -// of ServiceKeys to services where the service key is appropriate for the -// executor. -func servicesToServiceMap(allocID string, services []*structs.Service) map[consul.ServiceKey]*structs.Service { +// generateServiceKeys takes a list of interpolated Nomad Services and returns a map +// of ServiceKeys to Nomad Services. +func generateServiceKeys(allocID string, services []*structs.Service) map[consul.ServiceKey]*structs.Service { keys := make(map[consul.ServiceKey]*structs.Service, len(services)) for _, service := range services { key := consul.GenerateServiceKey(service) @@ -505,7 +504,7 @@ func (e *UniversalExecutor) SyncServices(ctx *ConsulContext) error { e.consulSyncer.SetDelegatedChecks(e.createCheckMap(), e.createCheck) e.consulSyncer.SetAddrFinder(e.ctx.Task.FindHostAndPortFor) domain := consul.NewExecutorDomain(e.ctx.AllocID, e.ctx.Task.Name) - serviceMap := servicesToServiceMap(e.ctx.AllocID, e.ctx.Task.Services) + serviceMap := generateServiceKeys(e.ctx.AllocID, e.ctx.Task.Services) e.consulSyncer.SetServices(domain, serviceMap) return nil } diff --git a/command/agent/agent.go b/command/agent/agent.go index 50b0e2313..d2359fedb 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -412,7 +412,7 @@ func (a *Agent) setupClient() error { } a.client = client - // Create the Nomad Server services for Consul + // Create the Nomad Client services for Consul if a.config.Consul.AutoRegister && a.config.Consul.ClientServiceName != "" { httpServ := &structs.Service{ Name: a.config.Consul.ClientServiceName, diff --git a/command/agent/consul/syncer.go b/command/agent/consul/syncer.go index 99c305862..76bcfcd8d 100644 --- a/command/agent/consul/syncer.go +++ b/command/agent/consul/syncer.go @@ -54,7 +54,7 @@ const ( // nomadServicePrefix is the first prefix that scopes all Nomad registered // services - nomadServicePrefix = "nomad-registered-service" + nomadServicePrefix = "_nomad" // The periodic time interval for syncing services and checks with Consul syncInterval = 5 * time.Second @@ -953,8 +953,8 @@ func (c *Syncer) runCheck(check Check) { } } -// KeepDomains prunes all services that do not exist in the passed domains -func (c *Syncer) KeepDomains(domains []ServiceDomain) error { +// ReapUnmatched prunes all services that do not exist in the passed domains +func (c *Syncer) ReapUnmatched(domains []ServiceDomain) error { servicesInConsul, err := c.ConsulClient().Agent().Services() if err != nil { return err