Merge pull request #1272 from hashicorp/b-consul-fixes

Fix IDs and domain scoping
This commit is contained in:
Alex Dadgar 2016-06-13 17:45:57 -07:00 committed by GitHub
commit 57931b4bf0
5 changed files with 245 additions and 181 deletions

View File

@ -1342,32 +1342,30 @@ func (c *Client) setupConsulSyncer() error {
} }
c.consulSyncer.AddPeriodicHandler("Nomad Client Fallback Server Handler", bootstrapFn) c.consulSyncer.AddPeriodicHandler("Nomad Client Fallback Server Handler", bootstrapFn)
// TODO this should only deregister things that the executors do not know consulServicesReaperFn := func() error {
// about const estInitialExecutorDomains = 8
consulServicesSyncFn := func() error {
const estInitialConsulServices = 8 // Create the domains to keep and add the server and client
const serviceGroupName = "executor" domains := make([]consul.ServiceDomain, 2, estInitialExecutorDomains)
servicesInRunningAllocs := make(map[string][]*structs.Service) domains[0] = consul.ServerDomain
domains[1] = consul.ClientDomain
for allocID, ar := range c.getAllocRunners() { for allocID, ar := range c.getAllocRunners() {
services := make([]*structs.Service, 0, estInitialConsulServices)
ar.taskStatusLock.RLock() ar.taskStatusLock.RLock()
taskStates := copyTaskStates(ar.taskStates) taskStates := copyTaskStates(ar.taskStates)
ar.taskStatusLock.RUnlock() ar.taskStatusLock.RUnlock()
for taskName, taskState := range taskStates { for taskName, taskState := range taskStates {
// Only keep running tasks
if taskState.State == structs.TaskStateRunning { if taskState.State == structs.TaskStateRunning {
if tr, ok := ar.tasks[taskName]; ok { d := consul.NewExecutorDomain(allocID, taskName)
for _, service := range tr.task.Services { domains = append(domains, d)
services = append(services, service)
}
}
} }
} }
servicesInRunningAllocs[allocID] = services
} }
c.consulSyncer.KeepServices(serviceGroupName, servicesInRunningAllocs)
return nil return c.consulSyncer.ReapUnmatched(domains)
} }
c.consulSyncer.AddPeriodicHandler("Nomad Client Services Sync Handler", consulServicesSyncFn) c.consulSyncer.AddPeriodicHandler("Nomad Client Services Sync Handler", consulServicesReaperFn)
return nil return nil
} }

View File

@ -37,9 +37,6 @@ const (
// tree for finding out the pids that the executor and it's child processes // tree for finding out the pids that the executor and it's child processes
// have forked // have forked
pidScanInterval = 5 * time.Second pidScanInterval = 5 * time.Second
// serviceRegPrefix is the prefix the entire Executor should use
executorServiceRegPrefix = "executor"
) )
var ( var (
@ -365,11 +362,24 @@ func (e *UniversalExecutor) UpdateTask(task *structs.Task) error {
// Re-syncing task with Consul agent // Re-syncing task with Consul agent
if e.consulSyncer != nil { if e.consulSyncer != nil {
e.interpolateServices(e.ctx.Task) e.interpolateServices(e.ctx.Task)
e.consulSyncer.SetServices(e.ctx.AllocID, task.Services, executorServiceRegPrefix) domain := consul.NewExecutorDomain(e.ctx.AllocID, task.Name)
serviceMap := generateServiceKeys(e.ctx.AllocID, task.Services)
e.consulSyncer.SetServices(domain, serviceMap)
} }
return nil return nil
} }
// generateServiceKeys takes a list of interpolated Nomad Services and returns a map
// of ServiceKeys to Nomad Services.
func generateServiceKeys(allocID string, services []*structs.Service) map[consul.ServiceKey]*structs.Service {
keys := make(map[consul.ServiceKey]*structs.Service, len(services))
for _, service := range services {
key := consul.GenerateServiceKey(service)
keys[key] = service
}
return keys
}
func (e *UniversalExecutor) wait() { func (e *UniversalExecutor) wait() {
defer close(e.processExited) defer close(e.processExited)
err := e.cmd.Wait() err := e.cmd.Wait()
@ -493,7 +503,9 @@ func (e *UniversalExecutor) SyncServices(ctx *ConsulContext) error {
e.interpolateServices(e.ctx.Task) e.interpolateServices(e.ctx.Task)
e.consulSyncer.SetDelegatedChecks(e.createCheckMap(), e.createCheck) e.consulSyncer.SetDelegatedChecks(e.createCheckMap(), e.createCheck)
e.consulSyncer.SetAddrFinder(e.ctx.Task.FindHostAndPortFor) e.consulSyncer.SetAddrFinder(e.ctx.Task.FindHostAndPortFor)
e.consulSyncer.SetServices(e.ctx.AllocID, e.ctx.Task.Services, executorServiceRegPrefix) domain := consul.NewExecutorDomain(e.ctx.AllocID, e.ctx.Task.Name)
serviceMap := generateServiceKeys(e.ctx.AllocID, e.ctx.Task.Services)
e.consulSyncer.SetServices(domain, serviceMap)
return nil return nil
} }

View File

@ -361,24 +361,26 @@ func (a *Agent) setupServer() error {
// Create the Nomad Server services for Consul // Create the Nomad Server services for Consul
if a.config.Consul.AutoRegister && a.config.Consul.ServerServiceName != "" { if a.config.Consul.AutoRegister && a.config.Consul.ServerServiceName != "" {
const serviceGroupName = "server" httpServ := &structs.Service{
a.consulSyncer.SetServices(serviceGroupName, []*structs.Service{ Name: a.config.Consul.ServerServiceName,
&structs.Service{ PortLabel: a.serverHTTPAddr,
Name: a.config.Consul.ServerServiceName, Tags: []string{consul.ServiceTagHTTP},
PortLabel: a.serverHTTPAddr, }
Tags: []string{consul.ServiceTagHTTP}, rpcServ := &structs.Service{
}, Name: a.config.Consul.ServerServiceName,
&structs.Service{ PortLabel: a.serverRPCAddr,
Name: a.config.Consul.ServerServiceName, Tags: []string{consul.ServiceTagRPC},
PortLabel: a.serverRPCAddr, }
Tags: []string{consul.ServiceTagRPC}, serfServ := &structs.Service{
}, PortLabel: a.serverSerfAddr,
&structs.Service{ Name: a.config.Consul.ServerServiceName,
PortLabel: a.serverSerfAddr, Tags: []string{consul.ServiceTagSerf},
Name: a.config.Consul.ServerServiceName, }
Tags: []string{consul.ServiceTagSerf}, a.consulSyncer.SetServices(consul.ServerDomain, map[consul.ServiceKey]*structs.Service{
}, consul.GenerateServiceKey(httpServ): httpServ,
}, "agent") consul.GenerateServiceKey(rpcServ): rpcServ,
consul.GenerateServiceKey(serfServ): serfServ,
})
} }
return nil return nil
@ -410,21 +412,22 @@ func (a *Agent) setupClient() error {
} }
a.client = client a.client = client
// Create the Nomad Server services for Consul // Create the Nomad Client services for Consul
if a.config.Consul.AutoRegister && a.config.Consul.ClientServiceName != "" { if a.config.Consul.AutoRegister && a.config.Consul.ClientServiceName != "" {
const serviceGroupName = "client" httpServ := &structs.Service{
a.consulSyncer.SetServices(serviceGroupName, []*structs.Service{ Name: a.config.Consul.ClientServiceName,
&structs.Service{ PortLabel: a.clientHTTPAddr,
Name: a.config.Consul.ClientServiceName, Tags: []string{consul.ServiceTagHTTP},
PortLabel: a.clientHTTPAddr, }
Tags: []string{consul.ServiceTagHTTP}, rpcServ := &structs.Service{
}, Name: a.config.Consul.ClientServiceName,
&structs.Service{ PortLabel: a.clientRPCAddr,
Name: a.config.Consul.ClientServiceName, Tags: []string{consul.ServiceTagRPC},
PortLabel: a.clientRPCAddr, }
Tags: []string{consul.ServiceTagRPC}, a.consulSyncer.SetServices(consul.ClientDomain, map[consul.ServiceKey]*structs.Service{
}, consul.GenerateServiceKey(httpServ): httpServ,
}, "agent") consul.GenerateServiceKey(rpcServ): rpcServ,
})
} }
return nil return nil

View File

@ -1,3 +1,27 @@
// Package consul is used by Nomad to register all services both static services
// and dynamic via allocations.
//
// Consul Service IDs have the following format: ${nomadServicePrefix}-${groupName}-${serviceKey}
// groupName takes on one of the following values:
// - server
// - client
// - executor-${alloc-id}-${task-name}
//
// serviceKey should be generated by service registrators.
// If the serviceKey is being generated by the executor for a Nomad Task.Services
// the following helper should be used:
// NOTE: Executor should interpolate the service prior to calling
// func GenerateTaskServiceKey(service *structs.Service) string
//
// The Nomad Client reaps services registered from dead allocations that were
// not properly cleaned up by the executor (this is not the expected case).
//
// TODO fix this comment
// The Consul ServiceIDs generated by the executor will contain the allocation
// ID. Thus the client can generate the list of Consul ServiceIDs to keep by
// calling the following method on all running allocations the client is aware
// of:
// func GenerateExecutorServiceKeyPrefixFromAlloc(allocID string) string
package consul package consul
import ( import (
@ -28,9 +52,9 @@ const (
// initialSyncDelay is the delay before an initial sync. // initialSyncDelay is the delay before an initial sync.
initialSyncDelay = 5 * time.Second initialSyncDelay = 5 * time.Second
// nomadServicePrefix is the prefix used when registering a service // nomadServicePrefix is the first prefix that scopes all Nomad registered
// with consul // services
nomadServicePrefix = "nomad-registered-service" nomadServicePrefix = "_nomad"
// The periodic time interval for syncing services and checks with Consul // The periodic time interval for syncing services and checks with Consul
syncInterval = 5 * time.Second syncInterval = 5 * time.Second
@ -57,6 +81,27 @@ const (
ServiceTagSerf = "serf" ServiceTagSerf = "serf"
) )
// consulServiceID and consulCheckID are the IDs registered with Consul
type consulServiceID string
type consulCheckID string
// ServiceKey is the generated service key that is used to build the Consul
// ServiceID
type ServiceKey string
// ServiceDomain is the domain of services registered by Nomad
type ServiceDomain string
const (
ClientDomain ServiceDomain = "client"
ServerDomain ServiceDomain = "server"
)
// NewExecutorDomain returns a domain specific to the alloc ID and task
func NewExecutorDomain(allocID, task string) ServiceDomain {
return ServiceDomain(fmt.Sprintf("executor-%s-%s", allocID, task))
}
// Syncer allows syncing of services and checks with Consul // Syncer allows syncing of services and checks with Consul
type Syncer struct { type Syncer struct {
client *consul.Client client *consul.Client
@ -67,27 +112,24 @@ type Syncer struct {
// SyncServices() is called. The key to the servicesGroups map is unique // SyncServices() is called. The key to the servicesGroups map is unique
// per handler and is used to allow the Agent's services to be maintained // per handler and is used to allow the Agent's services to be maintained
// independently of the Client or Server's services. // independently of the Client or Server's services.
servicesGroups map[string][]*consul.AgentServiceRegistration servicesGroups map[ServiceDomain]map[ServiceKey]*consul.AgentServiceRegistration
checkGroups map[string][]*consul.AgentCheckRegistration checkGroups map[ServiceDomain]map[ServiceKey][]*consul.AgentCheckRegistration
groupsLock sync.RWMutex groupsLock sync.RWMutex
groupName string
// The "Consul Registry" is a collection of Consul Services and // The "Consul Registry" is a collection of Consul Services and
// Checks all guarded by the registryLock. // Checks all guarded by the registryLock.
registryLock sync.RWMutex registryLock sync.RWMutex
checkRunners map[string]*CheckRunner
delegateChecks map[string]struct{} // delegateChecks are the checks that the Nomad client runs and reports to Consul
// trackedChecks and trackedServices are registered with consul // trackedChecks and trackedServices are registered with consul
trackedChecks map[string]*consul.AgentCheckRegistration trackedChecks map[consulCheckID]*consul.AgentCheckRegistration
trackedServices map[string]*consul.AgentServiceRegistration trackedServices map[consulServiceID]*consul.AgentServiceRegistration
serviceRegPrefix string // checkRunners are delegated Consul checks being ran by the Syncer
checkRunners map[consulCheckID]*CheckRunner
addrFinder func(portLabel string) (string, int) addrFinder func(portLabel string) (string, int)
createDelegatedCheck func(*structs.ServiceCheck, string) (Check, error) createDelegatedCheck func(*structs.ServiceCheck, string) (Check, error)
delegateChecks map[string]struct{} // delegateChecks are the checks that the Nomad client runs and reports to Consul
// End registryLock guarded attributes. // End registryLock guarded attributes.
logger *log.Logger logger *log.Logger
@ -174,13 +216,14 @@ func NewSyncer(consulConfig *config.ConsulConfig, shutdownCh chan struct{}, logg
logger: logger, logger: logger,
consulAvailable: true, consulAvailable: true,
shutdownCh: shutdownCh, shutdownCh: shutdownCh,
servicesGroups: make(map[string][]*consul.AgentServiceRegistration), servicesGroups: make(map[ServiceDomain]map[ServiceKey]*consul.AgentServiceRegistration),
checkGroups: make(map[string][]*consul.AgentCheckRegistration), checkGroups: make(map[ServiceDomain]map[ServiceKey][]*consul.AgentCheckRegistration),
trackedServices: make(map[string]*consul.AgentServiceRegistration), trackedServices: make(map[consulServiceID]*consul.AgentServiceRegistration),
trackedChecks: make(map[string]*consul.AgentCheckRegistration), trackedChecks: make(map[consulCheckID]*consul.AgentCheckRegistration),
checkRunners: make(map[string]*CheckRunner), checkRunners: make(map[consulCheckID]*CheckRunner),
periodicCallbacks: make(map[string]types.PeriodicCallback), periodicCallbacks: make(map[string]types.PeriodicCallback),
} }
return &consulSyncer, nil return &consulSyncer, nil
} }
@ -198,47 +241,35 @@ func (c *Syncer) SetAddrFinder(addrFinder func(string) (string, int)) *Syncer {
return c return c
} }
// filterPrefix generates a unique prefix that a Syncer can later filter on. // GenerateServiceKey should be called to generate a serviceKey based on the
func (c *Syncer) filterPrefix(serviceRegPrefix string) string {
c.registryLock.RLock()
defer c.registryLock.RUnlock()
return fmt.Sprintf("%s-%s", nomadServicePrefix, serviceRegPrefix)
}
// GenerateServiceID creates a unique Consul ServiceID for a given
// Service. // Service.
func (c *Syncer) GenerateServiceID(groupName string, service *structs.Service, serviceRegPrefix string) string { func GenerateServiceKey(service *structs.Service) ServiceKey {
var key string
numTags := len(service.Tags) numTags := len(service.Tags)
switch numTags { switch numTags {
case 0: case 0:
return fmt.Sprintf("%s-%s:%s", c.filterPrefix(serviceRegPrefix), groupName, service.Name) key = fmt.Sprintf("%s", service.Name)
case 1:
return fmt.Sprintf("%s-%s:%s@%s", c.filterPrefix(serviceRegPrefix), groupName, service.Tags[0], service.Name)
default: default:
tags := strings.Join(service.Tags, "|") tags := strings.Join(service.Tags, "-")
return fmt.Sprintf("%s-%s:(%s)@%s", c.filterPrefix(serviceRegPrefix), groupName, tags, service.Name) key = fmt.Sprintf("%s-%s", service.Name, tags)
} }
return ServiceKey(key)
} }
// SetServices assigns the slice of Nomad Services to the provided services // SetServices stores the map of Nomad Services to the provided service
// group name. // domain name.
func (c *Syncer) SetServices(groupName string, services []*structs.Service, serviceRegPrefix string) error { func (c *Syncer) SetServices(domain ServiceDomain, services map[ServiceKey]*structs.Service) error {
c.groupName = groupName
var mErr multierror.Error var mErr multierror.Error
numServ := len(services) numServ := len(services)
registeredServices := make([]*consul.AgentServiceRegistration, 0, numServ) registeredServices := make(map[ServiceKey]*consul.AgentServiceRegistration, numServ)
registeredChecks := make([]*consul.AgentCheckRegistration, 0, numServ) registeredChecks := make(map[ServiceKey][]*consul.AgentCheckRegistration, numServ)
for _, service := range services { for serviceKey, service := range services {
if service.ServiceID == "" { serviceReg, err := c.createService(service, domain, serviceKey)
service.ServiceID = c.GenerateServiceID(groupName, service, serviceRegPrefix) if err != nil {
}
var serviceReg *consul.AgentServiceRegistration
var err error
if serviceReg, err = c.createService(service); err != nil {
mErr.Errors = append(mErr.Errors, err) mErr.Errors = append(mErr.Errors, err)
continue continue
} }
registeredServices = append(registeredServices, serviceReg) registeredServices[serviceKey] = serviceReg
// Register the check(s) for this service // Register the check(s) for this service
for _, chk := range service.Checks { for _, chk := range service.Checks {
@ -252,7 +283,7 @@ func (c *Syncer) SetServices(groupName string, services []*structs.Service, serv
// creating a nomad check if we have to handle this particular check type // creating a nomad check if we have to handle this particular check type
c.registryLock.RLock() c.registryLock.RLock()
if _, ok := c.delegateChecks[chk.Type]; ok { if _, ok := c.delegateChecks[chk.Type]; ok {
_, ok := c.checkRunners[chkReg.ID] _, ok := c.checkRunners[consulCheckID(chkReg.ID)]
c.registryLock.RUnlock() c.registryLock.RUnlock()
if ok { if ok {
continue continue
@ -266,13 +297,14 @@ func (c *Syncer) SetServices(groupName string, services []*structs.Service, serv
cr := NewCheckRunner(nc, c.runCheck, c.logger) cr := NewCheckRunner(nc, c.runCheck, c.logger)
c.registryLock.Lock() c.registryLock.Lock()
c.checkRunners[nc.ID()] = cr // TODO type the CheckRunner
c.checkRunners[consulCheckID(nc.ID())] = cr
c.registryLock.Unlock() c.registryLock.Unlock()
} else { } else {
c.registryLock.RUnlock() c.registryLock.RUnlock()
} }
registeredChecks = append(registeredChecks, chkReg) registeredChecks[serviceKey] = append(registeredChecks[serviceKey], chkReg)
} }
} }
@ -281,8 +313,22 @@ func (c *Syncer) SetServices(groupName string, services []*structs.Service, serv
} }
c.groupsLock.Lock() c.groupsLock.Lock()
c.servicesGroups[groupName] = registeredServices for serviceKey, service := range registeredServices {
c.checkGroups[groupName] = registeredChecks serviceKeys, ok := c.servicesGroups[domain]
if !ok {
serviceKeys = make(map[ServiceKey]*consul.AgentServiceRegistration, len(registeredServices))
c.servicesGroups[domain] = serviceKeys
}
serviceKeys[serviceKey] = service
}
for serviceKey, checks := range registeredChecks {
serviceKeys, ok := c.checkGroups[domain]
if !ok {
serviceKeys = make(map[ServiceKey][]*consul.AgentCheckRegistration, len(registeredChecks))
c.checkGroups[domain] = serviceKeys
}
serviceKeys[serviceKey] = checks
}
c.groupsLock.Unlock() c.groupsLock.Unlock()
// Sync immediately // Sync immediately
@ -300,7 +346,8 @@ func (c *Syncer) SyncNow() {
} }
} }
// flattenedServices returns a flattened list of services // flattenedServices returns a flattened list of services that are registered
// locally
func (c *Syncer) flattenedServices() []*consul.AgentServiceRegistration { func (c *Syncer) flattenedServices() []*consul.AgentServiceRegistration {
const initialNumServices = 8 const initialNumServices = 8
services := make([]*consul.AgentServiceRegistration, 0, initialNumServices) services := make([]*consul.AgentServiceRegistration, 0, initialNumServices)
@ -314,18 +361,18 @@ func (c *Syncer) flattenedServices() []*consul.AgentServiceRegistration {
return services return services
} }
// flattenedChecks returns a flattened list of checks // flattenedChecks returns a flattened list of checks that are registered
// locally
func (c *Syncer) flattenedChecks() []*consul.AgentCheckRegistration { func (c *Syncer) flattenedChecks() []*consul.AgentCheckRegistration {
const initialNumChecks = 8 const initialNumChecks = 8
checks := make([]*consul.AgentCheckRegistration, 0, initialNumChecks) checks := make([]*consul.AgentCheckRegistration, 0, initialNumChecks)
c.groupsLock.RLock() c.groupsLock.RLock()
for _, checkGroup := range c.checkGroups { for _, checkGroup := range c.checkGroups {
for _, check := range checkGroup { for _, check := range checkGroup {
checks = append(checks, check) checks = append(checks, check...)
} }
} }
c.groupsLock.RUnlock() c.groupsLock.RUnlock()
return checks return checks
} }
@ -350,26 +397,16 @@ func (c *Syncer) Shutdown() error {
// Stop all the checks that nomad is running // Stop all the checks that nomad is running
c.registryLock.RLock() c.registryLock.RLock()
defer c.registryLock.RUnlock()
for _, cr := range c.checkRunners { for _, cr := range c.checkRunners {
cr.Stop() cr.Stop()
} }
c.registryLock.RUnlock()
// De-register all the services from Consul // De-register all the services from Consul
consulServices := make(map[string]*consul.AgentService) for serviceID := range c.trackedServices {
for trackedGroup := range c.servicesGroups { convertedID := string(serviceID)
serviceIDPrefix := fmt.Sprintf("%s-%s", c.filterPrefix(c.serviceRegPrefix), trackedGroup) if err := c.client.Agent().ServiceDeregister(convertedID); err != nil {
services, err := c.queryAgentServices(serviceIDPrefix) c.logger.Printf("[WARN] consul.syncer: failed to deregister service ID %+q: %v", convertedID, err)
if err != nil {
return err
}
for serviceID, agentService := range services {
consulServices[serviceID] = agentService
}
}
for _, service := range consulServices {
if err := c.client.Agent().ServiceDeregister(service.ID); err != nil {
c.logger.Printf("[WARN] consul.syncer: failed to deregister service ID %+q: %v", service.ID, err)
mErr.Errors = append(mErr.Errors, err) mErr.Errors = append(mErr.Errors, err)
} }
} }
@ -378,7 +415,7 @@ func (c *Syncer) Shutdown() error {
// queryChecks queries the Consul Agent for a list of Consul checks that // queryChecks queries the Consul Agent for a list of Consul checks that
// have been registered with this Consul Syncer. // have been registered with this Consul Syncer.
func (c *Syncer) queryChecks() (map[string]*consul.AgentCheck, error) { func (c *Syncer) queryChecks() (map[consulCheckID]*consul.AgentCheck, error) {
checks, err := c.client.Agent().Checks() checks, err := c.client.Agent().Checks()
if err != nil { if err != nil {
return nil, err return nil, err
@ -388,12 +425,12 @@ func (c *Syncer) queryChecks() (map[string]*consul.AgentCheck, error) {
// queryAgentServices queries the Consul Agent for a list of Consul services that // queryAgentServices queries the Consul Agent for a list of Consul services that
// have been registered with this Consul Syncer. // have been registered with this Consul Syncer.
func (c *Syncer) queryAgentServices(serviceIDPrefix string) (map[string]*consul.AgentService, error) { func (c *Syncer) queryAgentServices() (map[consulServiceID]*consul.AgentService, error) {
services, err := c.client.Agent().Services() services, err := c.client.Agent().Services()
if err != nil { if err != nil {
return nil, err return nil, err
} }
return c.filterConsulServices(serviceIDPrefix, services), nil return c.filterConsulServices(services), nil
} }
// syncChecks synchronizes this Syncer's Consul Checks with the Consul Agent. // syncChecks synchronizes this Syncer's Consul Checks with the Consul Agent.
@ -411,7 +448,7 @@ func (c *Syncer) syncChecks() error {
mErr.Errors = append(mErr.Errors, err) mErr.Errors = append(mErr.Errors, err)
} }
c.registryLock.Lock() c.registryLock.Lock()
c.trackedChecks[check.ID] = check c.trackedChecks[consulCheckID(check.ID)] = check
c.registryLock.Unlock() c.registryLock.Unlock()
} }
for _, check := range changedChecks { for _, check := range changedChecks {
@ -428,11 +465,11 @@ func (c *Syncer) syncChecks() error {
} }
} }
for _, check := range staleChecks { for _, check := range staleChecks {
if err := c.deregisterCheck(check.ID); err != nil { if err := c.deregisterCheck(consulCheckID(check.ID)); err != nil {
mErr.Errors = append(mErr.Errors, err) mErr.Errors = append(mErr.Errors, err)
} }
c.registryLock.Lock() c.registryLock.Lock()
delete(c.trackedChecks, check.ID) delete(c.trackedChecks, consulCheckID(check.ID))
c.registryLock.Unlock() c.registryLock.Unlock()
} }
return mErr.ErrorOrNil() return mErr.ErrorOrNil()
@ -467,7 +504,7 @@ func compareConsulCheck(localCheck *consul.AgentCheckRegistration, consulCheck *
// 4) a slice of checks that exist only in the Consul Agent (consulChecks) // 4) a slice of checks that exist only in the Consul Agent (consulChecks)
// and should be removed because the Consul Agent has drifted from the // and should be removed because the Consul Agent has drifted from the
// Syncer. // Syncer.
func (c *Syncer) calcChecksDiff(consulChecks map[string]*consul.AgentCheck) ( func (c *Syncer) calcChecksDiff(consulChecks map[consulCheckID]*consul.AgentCheck) (
missingChecks []*consul.AgentCheckRegistration, missingChecks []*consul.AgentCheckRegistration,
equalChecks []*consul.AgentCheckRegistration, equalChecks []*consul.AgentCheckRegistration,
changedChecks []*consul.AgentCheckRegistration, changedChecks []*consul.AgentCheckRegistration,
@ -584,7 +621,7 @@ func compareConsulService(localService *consul.AgentServiceRegistration, consulS
// 4) a slice of services that exist only in the Consul Agent // 4) a slice of services that exist only in the Consul Agent
// (consulServices) and should be removed because the Consul Agent has // (consulServices) and should be removed because the Consul Agent has
// drifted from the Syncer. // drifted from the Syncer.
func (c *Syncer) calcServicesDiff(consulServices map[string]*consul.AgentService) (missingServices []*consul.AgentServiceRegistration, equalServices []*consul.AgentServiceRegistration, changedServices []*consul.AgentServiceRegistration, staleServices []*consul.AgentServiceRegistration) { func (c *Syncer) calcServicesDiff(consulServices map[consulServiceID]*consul.AgentService) (missingServices []*consul.AgentServiceRegistration, equalServices []*consul.AgentServiceRegistration, changedServices []*consul.AgentServiceRegistration, staleServices []*consul.AgentServiceRegistration) {
type mergedService struct { type mergedService struct {
service *consul.AgentServiceRegistration service *consul.AgentServiceRegistration
// 'l' == Nomad local only // 'l' == Nomad local only
@ -652,16 +689,9 @@ func (c *Syncer) calcServicesDiff(consulServices map[string]*consul.AgentService
// syncServices synchronizes this Syncer's Consul Services with the Consul // syncServices synchronizes this Syncer's Consul Services with the Consul
// Agent. // Agent.
func (c *Syncer) syncServices() error { func (c *Syncer) syncServices() error {
consulServices := make(map[string]*consul.AgentService) consulServices, err := c.queryAgentServices()
for trackedGroup := range c.servicesGroups { if err != nil {
serviceIDPrefix := fmt.Sprintf("%s-%s", c.filterPrefix(c.serviceRegPrefix), trackedGroup) return err
services, err := c.queryAgentServices(serviceIDPrefix)
if err != nil {
return err
}
for serviceID, agentService := range services {
consulServices[serviceID] = agentService
}
} }
// Synchronize services with Consul // Synchronize services with Consul
@ -672,7 +702,7 @@ func (c *Syncer) syncServices() error {
mErr.Errors = append(mErr.Errors, err) mErr.Errors = append(mErr.Errors, err)
} }
c.registryLock.Lock() c.registryLock.Lock()
c.trackedServices[service.ID] = service c.trackedServices[consulServiceID(service.ID)] = service
c.registryLock.Unlock() c.registryLock.Unlock()
} }
for _, service := range changedServices { for _, service := range changedServices {
@ -686,7 +716,7 @@ func (c *Syncer) syncServices() error {
mErr.Errors = append(mErr.Errors, err) mErr.Errors = append(mErr.Errors, err)
} }
c.registryLock.Lock() c.registryLock.Lock()
delete(c.trackedServices, service.ID) delete(c.trackedServices, consulServiceID(service.ID))
c.registryLock.Unlock() c.registryLock.Unlock()
} }
return mErr.ErrorOrNil() return mErr.ErrorOrNil()
@ -695,7 +725,7 @@ func (c *Syncer) syncServices() error {
// registerCheck registers a check definition with Consul // registerCheck registers a check definition with Consul
func (c *Syncer) registerCheck(chkReg *consul.AgentCheckRegistration) error { func (c *Syncer) registerCheck(chkReg *consul.AgentCheckRegistration) error {
c.registryLock.RLock() c.registryLock.RLock()
if cr, ok := c.checkRunners[chkReg.ID]; ok { if cr, ok := c.checkRunners[consulCheckID(chkReg.ID)]; ok {
cr.Start() cr.Start()
} }
c.registryLock.RUnlock() c.registryLock.RUnlock()
@ -733,13 +763,19 @@ func (c *Syncer) createCheckReg(check *structs.ServiceCheck, service *consul.Age
return &chkReg, nil return &chkReg, nil
} }
// generateConsulServiceID takes the domain and service key and returns a Consul
// ServiceID
func generateConsulServiceID(domain ServiceDomain, key ServiceKey) consulServiceID {
return consulServiceID(fmt.Sprintf("%s-%s-%s", nomadServicePrefix, domain, key))
}
// createService creates a Consul AgentService from a Nomad ConsulService. // createService creates a Consul AgentService from a Nomad ConsulService.
func (c *Syncer) createService(service *structs.Service) (*consul.AgentServiceRegistration, error) { func (c *Syncer) createService(service *structs.Service, domain ServiceDomain, key ServiceKey) (*consul.AgentServiceRegistration, error) {
c.registryLock.RLock() c.registryLock.RLock()
defer c.registryLock.RUnlock() defer c.registryLock.RUnlock()
srv := consul.AgentServiceRegistration{ srv := consul.AgentServiceRegistration{
ID: service.ServiceID, ID: string(generateConsulServiceID(domain, key)),
Name: service.Name, Name: service.Name,
Tags: service.Tags, Tags: service.Tags,
} }
@ -761,21 +797,21 @@ func (c *Syncer) deregisterService(serviceID string) error {
} }
// deregisterCheck de-registers a check from Consul // deregisterCheck de-registers a check from Consul
func (c *Syncer) deregisterCheck(checkID string) error { func (c *Syncer) deregisterCheck(id consulCheckID) error {
c.registryLock.Lock() c.registryLock.Lock()
defer c.registryLock.Unlock() defer c.registryLock.Unlock()
// Deleting from Consul Agent // Deleting from Consul Agent
if err := c.client.Agent().CheckDeregister(checkID); err != nil { if err := c.client.Agent().CheckDeregister(string(id)); err != nil {
// CheckDeregister() will be reattempted again in a future // CheckDeregister() will be reattempted again in a future
// sync. // sync.
return err return err
} }
// Remove the check from the local registry // Remove the check from the local registry
if cr, ok := c.checkRunners[checkID]; ok { if cr, ok := c.checkRunners[id]; ok {
cr.Stop() cr.Stop()
delete(c.checkRunners, checkID) delete(c.checkRunners, id)
} }
return nil return nil
@ -847,15 +883,18 @@ func (c *Syncer) SyncServices() error {
return nil return nil
} }
// filterConsulServices prunes out all the service whose ids are not prefixed // filterConsulServices prunes out all the service who were not registered with
// with nomad- // the syncer
func (c *Syncer) filterConsulServices(serviceIDPrefix string, consulServices map[string]*consul.AgentService) map[string]*consul.AgentService { func (c *Syncer) filterConsulServices(consulServices map[string]*consul.AgentService) map[consulServiceID]*consul.AgentService {
localServices := make(map[string]*consul.AgentService, len(consulServices)) localServices := make(map[consulServiceID]*consul.AgentService, len(consulServices))
c.registryLock.RLock() c.registryLock.RLock()
defer c.registryLock.RUnlock() defer c.registryLock.RUnlock()
for serviceID, service := range consulServices { for serviceID, service := range consulServices {
if strings.HasPrefix(service.ID, serviceIDPrefix) { for domain := range c.servicesGroups {
localServices[serviceID] = service if strings.HasPrefix(service.ID, fmt.Sprintf("%s-%s", nomadServicePrefix, domain)) {
localServices[consulServiceID(serviceID)] = service
break
}
} }
} }
return localServices return localServices
@ -863,12 +902,16 @@ func (c *Syncer) filterConsulServices(serviceIDPrefix string, consulServices map
// filterConsulChecks prunes out all the consul checks which do not have // filterConsulChecks prunes out all the consul checks which do not have
// services with Syncer's idPrefix. // services with Syncer's idPrefix.
func (c *Syncer) filterConsulChecks(consulChecks map[string]*consul.AgentCheck) map[string]*consul.AgentCheck { func (c *Syncer) filterConsulChecks(consulChecks map[string]*consul.AgentCheck) map[consulCheckID]*consul.AgentCheck {
localChecks := make(map[string]*consul.AgentCheck, len(consulChecks)) localChecks := make(map[consulCheckID]*consul.AgentCheck, len(consulChecks))
filterPrefix := c.filterPrefix(c.serviceRegPrefix) c.registryLock.RLock()
defer c.registryLock.RUnlock()
for checkID, check := range consulChecks { for checkID, check := range consulChecks {
if strings.HasPrefix(check.ServiceID, filterPrefix) { for domain := range c.checkGroups {
localChecks[checkID] = check if strings.HasPrefix(check.ServiceID, fmt.Sprintf("%s-%s", nomadServicePrefix, domain)) {
localChecks[consulCheckID(checkID)] = check
break
}
} }
} }
return localChecks return localChecks
@ -910,28 +953,40 @@ func (c *Syncer) runCheck(check Check) {
} }
} }
// KeepServices prunes all services besides the ones specified by the user // ReapUnmatched prunes all services that do not exist in the passed domains
func (c *Syncer) KeepServices(serviceRegPrefix string, servicesInRunningAllocs map[string][]*structs.Service) { func (c *Syncer) ReapUnmatched(domains []ServiceDomain) error {
servicesToKeep := make(map[string]struct{})
for allocID, services := range servicesInRunningAllocs {
for _, service := range services {
serviceID := c.GenerateServiceID(allocID, service, serviceRegPrefix)
servicesToKeep[serviceID] = struct{}{}
}
}
servicesInConsul, err := c.ConsulClient().Agent().Services() servicesInConsul, err := c.ConsulClient().Agent().Services()
if err != nil { if err != nil {
return return err
} }
filterServicesWithPrefix := c.filterPrefix(serviceRegPrefix) var mErr multierror.Error
relevantServices := c.filterConsulServices(filterServicesWithPrefix, servicesInConsul) for serviceID := range servicesInConsul {
for serviceID := range relevantServices { // Skip any service that was not registered by Nomad
if _, ok := servicesToKeep[serviceID]; !ok { if !strings.HasPrefix(serviceID, nomadServicePrefix) {
c.deregisterService(serviceID) continue
}
// Filter services that do not exist in the desired domains
match := false
for _, domain := range domains {
// Include the hyphen so it is explicit to that domain otherwise it
// maybe a subset match
desired := fmt.Sprintf("%s-%s-", nomadServicePrefix, domain)
if strings.HasPrefix(serviceID, desired) {
match = true
break
}
}
if !match {
if err := c.deregisterService(serviceID); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
} }
} }
return mErr.ErrorOrNil()
} }
// AddPeriodicHandler adds a uniquely named callback. Returns true if // AddPeriodicHandler adds a uniquely named callback. Returns true if

View File

@ -1555,10 +1555,6 @@ func (sc *ServiceCheck) Hash(serviceID string) string {
// Service represents a Consul service definition in Nomad // Service represents a Consul service definition in Nomad
type Service struct { type Service struct {
// ServiceID is the calculated Consul ServiceID used for a service.
// This value is not available to be set via configuration.
ServiceID string `mapstructure:"-"`
// Name of the service registered with Consul. Consul defaults the // Name of the service registered with Consul. Consul defaults the
// Name to ServiceID if not specified. The Name if specified is used // Name to ServiceID if not specified. The Name if specified is used
// as one of the seed values when generating a Consul ServiceID. // as one of the seed values when generating a Consul ServiceID.