Fix IDs and domain scoping
This commit is contained in:
parent
091b90dc33
commit
8bbf4a55e5
|
@ -1342,32 +1342,30 @@ func (c *Client) setupConsulSyncer() error {
|
|||
}
|
||||
c.consulSyncer.AddPeriodicHandler("Nomad Client Fallback Server Handler", bootstrapFn)
|
||||
|
||||
// TODO this should only deregister things that the executors do not know
|
||||
// about
|
||||
consulServicesSyncFn := func() error {
|
||||
const estInitialConsulServices = 8
|
||||
const serviceGroupName = "executor"
|
||||
servicesInRunningAllocs := make(map[string][]*structs.Service)
|
||||
consulServicesReaperFn := func() error {
|
||||
const estInitialExecutorDomains = 8
|
||||
|
||||
// Create the domains to keep and add the server and client
|
||||
domains := make([]consul.ServiceDomain, 2, estInitialExecutorDomains)
|
||||
domains[0] = consul.ServerDomain
|
||||
domains[1] = consul.ClientDomain
|
||||
|
||||
for allocID, ar := range c.getAllocRunners() {
|
||||
services := make([]*structs.Service, 0, estInitialConsulServices)
|
||||
ar.taskStatusLock.RLock()
|
||||
taskStates := copyTaskStates(ar.taskStates)
|
||||
ar.taskStatusLock.RUnlock()
|
||||
for taskName, taskState := range taskStates {
|
||||
if taskState.State == structs.TaskStateRunning {
|
||||
if tr, ok := ar.tasks[taskName]; ok {
|
||||
for _, service := range tr.task.Services {
|
||||
services = append(services, service)
|
||||
}
|
||||
}
|
||||
}
|
||||
if ar.Alloc().TerminalStatus() {
|
||||
// Ignore non-running allocations
|
||||
continue
|
||||
}
|
||||
servicesInRunningAllocs[allocID] = services
|
||||
ar.taskLock.RLock()
|
||||
for task := range ar.tasks {
|
||||
d := consul.NewExecutorDomain(allocID, task)
|
||||
domains = append(domains, d)
|
||||
}
|
||||
ar.taskLock.RUnlock()
|
||||
}
|
||||
c.consulSyncer.KeepServices(serviceGroupName, servicesInRunningAllocs)
|
||||
return nil
|
||||
|
||||
return c.consulSyncer.KeepDomains(domains)
|
||||
}
|
||||
c.consulSyncer.AddPeriodicHandler("Nomad Client Services Sync Handler", consulServicesSyncFn)
|
||||
c.consulSyncer.AddPeriodicHandler("Nomad Client Services Sync Handler", consulServicesReaperFn)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -37,9 +37,6 @@ const (
|
|||
// tree for finding out the pids that the executor and it's child processes
|
||||
// have forked
|
||||
pidScanInterval = 5 * time.Second
|
||||
|
||||
// serviceRegPrefix is the prefix the entire Executor should use
|
||||
executorServiceRegPrefix = "executor"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -365,11 +362,25 @@ func (e *UniversalExecutor) UpdateTask(task *structs.Task) error {
|
|||
// Re-syncing task with Consul agent
|
||||
if e.consulSyncer != nil {
|
||||
e.interpolateServices(e.ctx.Task)
|
||||
e.consulSyncer.SetServices(e.ctx.AllocID, task.Services, executorServiceRegPrefix)
|
||||
domain := consul.NewExecutorDomain(e.ctx.AllocID, task.Name)
|
||||
serviceMap := servicesToServiceMap(e.ctx.AllocID, task.Services)
|
||||
e.consulSyncer.SetServices(domain, serviceMap)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// servicesToServiceMap takes a list of interpolated services and returns a map
|
||||
// of ServiceKeys to services where the service key is appropriate for the
|
||||
// executor.
|
||||
func servicesToServiceMap(allocID string, services []*structs.Service) map[consul.ServiceKey]*structs.Service {
|
||||
keys := make(map[consul.ServiceKey]*structs.Service, len(services))
|
||||
for _, service := range services {
|
||||
key := consul.GenerateServiceKey(service)
|
||||
keys[key] = service
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
func (e *UniversalExecutor) wait() {
|
||||
defer close(e.processExited)
|
||||
err := e.cmd.Wait()
|
||||
|
@ -493,7 +504,9 @@ func (e *UniversalExecutor) SyncServices(ctx *ConsulContext) error {
|
|||
e.interpolateServices(e.ctx.Task)
|
||||
e.consulSyncer.SetDelegatedChecks(e.createCheckMap(), e.createCheck)
|
||||
e.consulSyncer.SetAddrFinder(e.ctx.Task.FindHostAndPortFor)
|
||||
e.consulSyncer.SetServices(e.ctx.AllocID, e.ctx.Task.Services, executorServiceRegPrefix)
|
||||
domain := consul.NewExecutorDomain(e.ctx.AllocID, e.ctx.Task.Name)
|
||||
serviceMap := servicesToServiceMap(e.ctx.AllocID, e.ctx.Task.Services)
|
||||
e.consulSyncer.SetServices(domain, serviceMap)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -361,24 +361,26 @@ func (a *Agent) setupServer() error {
|
|||
|
||||
// Create the Nomad Server services for Consul
|
||||
if a.config.Consul.AutoRegister && a.config.Consul.ServerServiceName != "" {
|
||||
const serviceGroupName = "server"
|
||||
a.consulSyncer.SetServices(serviceGroupName, []*structs.Service{
|
||||
&structs.Service{
|
||||
Name: a.config.Consul.ServerServiceName,
|
||||
PortLabel: a.serverHTTPAddr,
|
||||
Tags: []string{consul.ServiceTagHTTP},
|
||||
},
|
||||
&structs.Service{
|
||||
Name: a.config.Consul.ServerServiceName,
|
||||
PortLabel: a.serverRPCAddr,
|
||||
Tags: []string{consul.ServiceTagRPC},
|
||||
},
|
||||
&structs.Service{
|
||||
PortLabel: a.serverSerfAddr,
|
||||
Name: a.config.Consul.ServerServiceName,
|
||||
Tags: []string{consul.ServiceTagSerf},
|
||||
},
|
||||
}, "agent")
|
||||
httpServ := &structs.Service{
|
||||
Name: a.config.Consul.ServerServiceName,
|
||||
PortLabel: a.serverHTTPAddr,
|
||||
Tags: []string{consul.ServiceTagHTTP},
|
||||
}
|
||||
rpcServ := &structs.Service{
|
||||
Name: a.config.Consul.ServerServiceName,
|
||||
PortLabel: a.serverRPCAddr,
|
||||
Tags: []string{consul.ServiceTagRPC},
|
||||
}
|
||||
serfServ := &structs.Service{
|
||||
PortLabel: a.serverSerfAddr,
|
||||
Name: a.config.Consul.ServerServiceName,
|
||||
Tags: []string{consul.ServiceTagSerf},
|
||||
}
|
||||
a.consulSyncer.SetServices(consul.ServerDomain, map[consul.ServiceKey]*structs.Service{
|
||||
consul.GenerateServiceKey(httpServ): httpServ,
|
||||
consul.GenerateServiceKey(rpcServ): rpcServ,
|
||||
consul.GenerateServiceKey(serfServ): serfServ,
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -412,19 +414,20 @@ func (a *Agent) setupClient() error {
|
|||
|
||||
// Create the Nomad Server services for Consul
|
||||
if a.config.Consul.AutoRegister && a.config.Consul.ClientServiceName != "" {
|
||||
const serviceGroupName = "client"
|
||||
a.consulSyncer.SetServices(serviceGroupName, []*structs.Service{
|
||||
&structs.Service{
|
||||
Name: a.config.Consul.ClientServiceName,
|
||||
PortLabel: a.clientHTTPAddr,
|
||||
Tags: []string{consul.ServiceTagHTTP},
|
||||
},
|
||||
&structs.Service{
|
||||
Name: a.config.Consul.ClientServiceName,
|
||||
PortLabel: a.clientRPCAddr,
|
||||
Tags: []string{consul.ServiceTagRPC},
|
||||
},
|
||||
}, "agent")
|
||||
httpServ := &structs.Service{
|
||||
Name: a.config.Consul.ClientServiceName,
|
||||
PortLabel: a.clientHTTPAddr,
|
||||
Tags: []string{consul.ServiceTagHTTP},
|
||||
}
|
||||
rpcServ := &structs.Service{
|
||||
Name: a.config.Consul.ClientServiceName,
|
||||
PortLabel: a.clientRPCAddr,
|
||||
Tags: []string{consul.ServiceTagRPC},
|
||||
}
|
||||
a.consulSyncer.SetServices(consul.ClientDomain, map[consul.ServiceKey]*structs.Service{
|
||||
consul.GenerateServiceKey(httpServ): httpServ,
|
||||
consul.GenerateServiceKey(rpcServ): rpcServ,
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -1,3 +1,27 @@
|
|||
// Package consul is used by Nomad to register all services both static services
|
||||
// and dynamic via allocations.
|
||||
//
|
||||
// Consul Service IDs have the following format: ${nomadServicePrefix}-${groupName}-${serviceKey}
|
||||
// groupName takes on one of the following values:
|
||||
// - server
|
||||
// - client
|
||||
// - executor-${alloc-id}-${task-name}
|
||||
//
|
||||
// serviceKey should be generated by service registrators.
|
||||
// If the serviceKey is being generated by the executor for a Nomad Task.Services
|
||||
// the following helper should be used:
|
||||
// NOTE: Executor should interpolate the service prior to calling
|
||||
// func GenerateTaskServiceKey(service *structs.Service) string
|
||||
//
|
||||
// The Nomad Client reaps services registered from dead allocations that were
|
||||
// not properly cleaned up by the executor (this is not the expected case).
|
||||
//
|
||||
// TODO fix this comment
|
||||
// The Consul ServiceIDs generated by the executor will contain the allocation
|
||||
// ID. Thus the client can generate the list of Consul ServiceIDs to keep by
|
||||
// calling the following method on all running allocations the client is aware
|
||||
// of:
|
||||
// func GenerateExecutorServiceKeyPrefixFromAlloc(allocID string) string
|
||||
package consul
|
||||
|
||||
import (
|
||||
|
@ -28,8 +52,8 @@ const (
|
|||
// initialSyncDelay is the delay before an initial sync.
|
||||
initialSyncDelay = 5 * time.Second
|
||||
|
||||
// nomadServicePrefix is the prefix used when registering a service
|
||||
// with consul
|
||||
// nomadServicePrefix is the first prefix that scopes all Nomad registered
|
||||
// services
|
||||
nomadServicePrefix = "nomad-registered-service"
|
||||
|
||||
// The periodic time interval for syncing services and checks with Consul
|
||||
|
@ -57,6 +81,27 @@ const (
|
|||
ServiceTagSerf = "serf"
|
||||
)
|
||||
|
||||
// consulServiceID and consulCheckID are the IDs registered with Consul
|
||||
type consulServiceID string
|
||||
type consulCheckID string
|
||||
|
||||
// ServiceKey is the generated service key that is used to build the Consul
|
||||
// ServiceID
|
||||
type ServiceKey string
|
||||
|
||||
// ServiceDomain is the domain of services registered by Nomad
|
||||
type ServiceDomain string
|
||||
|
||||
const (
|
||||
ClientDomain ServiceDomain = "client"
|
||||
ServerDomain ServiceDomain = "server"
|
||||
)
|
||||
|
||||
// NewExecutorDomain returns a domain specific to the alloc ID and task
|
||||
func NewExecutorDomain(allocID, task string) ServiceDomain {
|
||||
return ServiceDomain(fmt.Sprintf("executor-%s-%s", allocID, task))
|
||||
}
|
||||
|
||||
// Syncer allows syncing of services and checks with Consul
|
||||
type Syncer struct {
|
||||
client *consul.Client
|
||||
|
@ -67,27 +112,24 @@ type Syncer struct {
|
|||
// SyncServices() is called. The key to the servicesGroups map is unique
|
||||
// per handler and is used to allow the Agent's services to be maintained
|
||||
// independently of the Client or Server's services.
|
||||
servicesGroups map[string][]*consul.AgentServiceRegistration
|
||||
checkGroups map[string][]*consul.AgentCheckRegistration
|
||||
servicesGroups map[ServiceDomain]map[ServiceKey]*consul.AgentServiceRegistration
|
||||
checkGroups map[ServiceDomain]map[ServiceKey][]*consul.AgentCheckRegistration
|
||||
groupsLock sync.RWMutex
|
||||
|
||||
groupName string
|
||||
|
||||
// The "Consul Registry" is a collection of Consul Services and
|
||||
// Checks all guarded by the registryLock.
|
||||
registryLock sync.RWMutex
|
||||
|
||||
checkRunners map[string]*CheckRunner
|
||||
delegateChecks map[string]struct{} // delegateChecks are the checks that the Nomad client runs and reports to Consul
|
||||
|
||||
// trackedChecks and trackedServices are registered with consul
|
||||
trackedChecks map[string]*consul.AgentCheckRegistration
|
||||
trackedServices map[string]*consul.AgentServiceRegistration
|
||||
trackedChecks map[consulCheckID]*consul.AgentCheckRegistration
|
||||
trackedServices map[consulServiceID]*consul.AgentServiceRegistration
|
||||
|
||||
serviceRegPrefix string
|
||||
// checkRunners are delegated Consul checks being ran by the Syncer
|
||||
checkRunners map[consulCheckID]*CheckRunner
|
||||
|
||||
addrFinder func(portLabel string) (string, int)
|
||||
createDelegatedCheck func(*structs.ServiceCheck, string) (Check, error)
|
||||
delegateChecks map[string]struct{} // delegateChecks are the checks that the Nomad client runs and reports to Consul
|
||||
// End registryLock guarded attributes.
|
||||
|
||||
logger *log.Logger
|
||||
|
@ -174,13 +216,14 @@ func NewSyncer(consulConfig *config.ConsulConfig, shutdownCh chan struct{}, logg
|
|||
logger: logger,
|
||||
consulAvailable: true,
|
||||
shutdownCh: shutdownCh,
|
||||
servicesGroups: make(map[string][]*consul.AgentServiceRegistration),
|
||||
checkGroups: make(map[string][]*consul.AgentCheckRegistration),
|
||||
trackedServices: make(map[string]*consul.AgentServiceRegistration),
|
||||
trackedChecks: make(map[string]*consul.AgentCheckRegistration),
|
||||
checkRunners: make(map[string]*CheckRunner),
|
||||
servicesGroups: make(map[ServiceDomain]map[ServiceKey]*consul.AgentServiceRegistration),
|
||||
checkGroups: make(map[ServiceDomain]map[ServiceKey][]*consul.AgentCheckRegistration),
|
||||
trackedServices: make(map[consulServiceID]*consul.AgentServiceRegistration),
|
||||
trackedChecks: make(map[consulCheckID]*consul.AgentCheckRegistration),
|
||||
checkRunners: make(map[consulCheckID]*CheckRunner),
|
||||
periodicCallbacks: make(map[string]types.PeriodicCallback),
|
||||
}
|
||||
|
||||
return &consulSyncer, nil
|
||||
}
|
||||
|
||||
|
@ -198,47 +241,35 @@ func (c *Syncer) SetAddrFinder(addrFinder func(string) (string, int)) *Syncer {
|
|||
return c
|
||||
}
|
||||
|
||||
// filterPrefix generates a unique prefix that a Syncer can later filter on.
|
||||
func (c *Syncer) filterPrefix(serviceRegPrefix string) string {
|
||||
c.registryLock.RLock()
|
||||
defer c.registryLock.RUnlock()
|
||||
return fmt.Sprintf("%s-%s", nomadServicePrefix, serviceRegPrefix)
|
||||
}
|
||||
|
||||
// GenerateServiceID creates a unique Consul ServiceID for a given
|
||||
// GenerateServiceKey should be called to generate a serviceKey based on the
|
||||
// Service.
|
||||
func (c *Syncer) GenerateServiceID(groupName string, service *structs.Service, serviceRegPrefix string) string {
|
||||
func GenerateServiceKey(service *structs.Service) ServiceKey {
|
||||
var key string
|
||||
numTags := len(service.Tags)
|
||||
switch numTags {
|
||||
case 0:
|
||||
return fmt.Sprintf("%s-%s:%s", c.filterPrefix(serviceRegPrefix), groupName, service.Name)
|
||||
case 1:
|
||||
return fmt.Sprintf("%s-%s:%s@%s", c.filterPrefix(serviceRegPrefix), groupName, service.Tags[0], service.Name)
|
||||
key = fmt.Sprintf("%s", service.Name)
|
||||
default:
|
||||
tags := strings.Join(service.Tags, "|")
|
||||
return fmt.Sprintf("%s-%s:(%s)@%s", c.filterPrefix(serviceRegPrefix), groupName, tags, service.Name)
|
||||
tags := strings.Join(service.Tags, "-")
|
||||
key = fmt.Sprintf("%s-%s", service.Name, tags)
|
||||
}
|
||||
return ServiceKey(key)
|
||||
}
|
||||
|
||||
// SetServices assigns the slice of Nomad Services to the provided services
|
||||
// group name.
|
||||
func (c *Syncer) SetServices(groupName string, services []*structs.Service, serviceRegPrefix string) error {
|
||||
c.groupName = groupName
|
||||
// SetServices stores the map of Nomad Services to the provided service
|
||||
// domain name.
|
||||
func (c *Syncer) SetServices(domain ServiceDomain, services map[ServiceKey]*structs.Service) error {
|
||||
var mErr multierror.Error
|
||||
numServ := len(services)
|
||||
registeredServices := make([]*consul.AgentServiceRegistration, 0, numServ)
|
||||
registeredChecks := make([]*consul.AgentCheckRegistration, 0, numServ)
|
||||
for _, service := range services {
|
||||
if service.ServiceID == "" {
|
||||
service.ServiceID = c.GenerateServiceID(groupName, service, serviceRegPrefix)
|
||||
}
|
||||
var serviceReg *consul.AgentServiceRegistration
|
||||
var err error
|
||||
if serviceReg, err = c.createService(service); err != nil {
|
||||
registeredServices := make(map[ServiceKey]*consul.AgentServiceRegistration, numServ)
|
||||
registeredChecks := make(map[ServiceKey][]*consul.AgentCheckRegistration, numServ)
|
||||
for serviceKey, service := range services {
|
||||
serviceReg, err := c.createService(service, domain, serviceKey)
|
||||
if err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
continue
|
||||
}
|
||||
registeredServices = append(registeredServices, serviceReg)
|
||||
registeredServices[serviceKey] = serviceReg
|
||||
|
||||
// Register the check(s) for this service
|
||||
for _, chk := range service.Checks {
|
||||
|
@ -252,7 +283,7 @@ func (c *Syncer) SetServices(groupName string, services []*structs.Service, serv
|
|||
// creating a nomad check if we have to handle this particular check type
|
||||
c.registryLock.RLock()
|
||||
if _, ok := c.delegateChecks[chk.Type]; ok {
|
||||
_, ok := c.checkRunners[chkReg.ID]
|
||||
_, ok := c.checkRunners[consulCheckID(chkReg.ID)]
|
||||
c.registryLock.RUnlock()
|
||||
if ok {
|
||||
continue
|
||||
|
@ -266,13 +297,14 @@ func (c *Syncer) SetServices(groupName string, services []*structs.Service, serv
|
|||
|
||||
cr := NewCheckRunner(nc, c.runCheck, c.logger)
|
||||
c.registryLock.Lock()
|
||||
c.checkRunners[nc.ID()] = cr
|
||||
// TODO type the CheckRunner
|
||||
c.checkRunners[consulCheckID(nc.ID())] = cr
|
||||
c.registryLock.Unlock()
|
||||
} else {
|
||||
c.registryLock.RUnlock()
|
||||
}
|
||||
|
||||
registeredChecks = append(registeredChecks, chkReg)
|
||||
registeredChecks[serviceKey] = append(registeredChecks[serviceKey], chkReg)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -281,8 +313,22 @@ func (c *Syncer) SetServices(groupName string, services []*structs.Service, serv
|
|||
}
|
||||
|
||||
c.groupsLock.Lock()
|
||||
c.servicesGroups[groupName] = registeredServices
|
||||
c.checkGroups[groupName] = registeredChecks
|
||||
for serviceKey, service := range registeredServices {
|
||||
serviceKeys, ok := c.servicesGroups[domain]
|
||||
if !ok {
|
||||
serviceKeys = make(map[ServiceKey]*consul.AgentServiceRegistration, len(registeredServices))
|
||||
c.servicesGroups[domain] = serviceKeys
|
||||
}
|
||||
serviceKeys[serviceKey] = service
|
||||
}
|
||||
for serviceKey, checks := range registeredChecks {
|
||||
serviceKeys, ok := c.checkGroups[domain]
|
||||
if !ok {
|
||||
serviceKeys = make(map[ServiceKey][]*consul.AgentCheckRegistration, len(registeredChecks))
|
||||
c.checkGroups[domain] = serviceKeys
|
||||
}
|
||||
serviceKeys[serviceKey] = checks
|
||||
}
|
||||
c.groupsLock.Unlock()
|
||||
|
||||
// Sync immediately
|
||||
|
@ -300,7 +346,8 @@ func (c *Syncer) SyncNow() {
|
|||
}
|
||||
}
|
||||
|
||||
// flattenedServices returns a flattened list of services
|
||||
// flattenedServices returns a flattened list of services that are registered
|
||||
// locally
|
||||
func (c *Syncer) flattenedServices() []*consul.AgentServiceRegistration {
|
||||
const initialNumServices = 8
|
||||
services := make([]*consul.AgentServiceRegistration, 0, initialNumServices)
|
||||
|
@ -314,18 +361,18 @@ func (c *Syncer) flattenedServices() []*consul.AgentServiceRegistration {
|
|||
return services
|
||||
}
|
||||
|
||||
// flattenedChecks returns a flattened list of checks
|
||||
// flattenedChecks returns a flattened list of checks that are registered
|
||||
// locally
|
||||
func (c *Syncer) flattenedChecks() []*consul.AgentCheckRegistration {
|
||||
const initialNumChecks = 8
|
||||
checks := make([]*consul.AgentCheckRegistration, 0, initialNumChecks)
|
||||
c.groupsLock.RLock()
|
||||
for _, checkGroup := range c.checkGroups {
|
||||
for _, check := range checkGroup {
|
||||
checks = append(checks, check)
|
||||
checks = append(checks, check...)
|
||||
}
|
||||
}
|
||||
c.groupsLock.RUnlock()
|
||||
|
||||
return checks
|
||||
}
|
||||
|
||||
|
@ -350,26 +397,16 @@ func (c *Syncer) Shutdown() error {
|
|||
|
||||
// Stop all the checks that nomad is running
|
||||
c.registryLock.RLock()
|
||||
defer c.registryLock.RUnlock()
|
||||
for _, cr := range c.checkRunners {
|
||||
cr.Stop()
|
||||
}
|
||||
c.registryLock.RUnlock()
|
||||
|
||||
// De-register all the services from Consul
|
||||
consulServices := make(map[string]*consul.AgentService)
|
||||
for trackedGroup := range c.servicesGroups {
|
||||
serviceIDPrefix := fmt.Sprintf("%s-%s", c.filterPrefix(c.serviceRegPrefix), trackedGroup)
|
||||
services, err := c.queryAgentServices(serviceIDPrefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for serviceID, agentService := range services {
|
||||
consulServices[serviceID] = agentService
|
||||
}
|
||||
}
|
||||
for _, service := range consulServices {
|
||||
if err := c.client.Agent().ServiceDeregister(service.ID); err != nil {
|
||||
c.logger.Printf("[WARN] consul.syncer: failed to deregister service ID %+q: %v", service.ID, err)
|
||||
for serviceID := range c.trackedServices {
|
||||
convertedID := string(serviceID)
|
||||
if err := c.client.Agent().ServiceDeregister(convertedID); err != nil {
|
||||
c.logger.Printf("[WARN] consul.syncer: failed to deregister service ID %+q: %v", convertedID, err)
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
}
|
||||
|
@ -378,7 +415,7 @@ func (c *Syncer) Shutdown() error {
|
|||
|
||||
// queryChecks queries the Consul Agent for a list of Consul checks that
|
||||
// have been registered with this Consul Syncer.
|
||||
func (c *Syncer) queryChecks() (map[string]*consul.AgentCheck, error) {
|
||||
func (c *Syncer) queryChecks() (map[consulCheckID]*consul.AgentCheck, error) {
|
||||
checks, err := c.client.Agent().Checks()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -388,12 +425,12 @@ func (c *Syncer) queryChecks() (map[string]*consul.AgentCheck, error) {
|
|||
|
||||
// queryAgentServices queries the Consul Agent for a list of Consul services that
|
||||
// have been registered with this Consul Syncer.
|
||||
func (c *Syncer) queryAgentServices(serviceIDPrefix string) (map[string]*consul.AgentService, error) {
|
||||
func (c *Syncer) queryAgentServices() (map[consulServiceID]*consul.AgentService, error) {
|
||||
services, err := c.client.Agent().Services()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return c.filterConsulServices(serviceIDPrefix, services), nil
|
||||
return c.filterConsulServices(services), nil
|
||||
}
|
||||
|
||||
// syncChecks synchronizes this Syncer's Consul Checks with the Consul Agent.
|
||||
|
@ -411,7 +448,7 @@ func (c *Syncer) syncChecks() error {
|
|||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
c.registryLock.Lock()
|
||||
c.trackedChecks[check.ID] = check
|
||||
c.trackedChecks[consulCheckID(check.ID)] = check
|
||||
c.registryLock.Unlock()
|
||||
}
|
||||
for _, check := range changedChecks {
|
||||
|
@ -428,11 +465,11 @@ func (c *Syncer) syncChecks() error {
|
|||
}
|
||||
}
|
||||
for _, check := range staleChecks {
|
||||
if err := c.deregisterCheck(check.ID); err != nil {
|
||||
if err := c.deregisterCheck(consulCheckID(check.ID)); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
c.registryLock.Lock()
|
||||
delete(c.trackedChecks, check.ID)
|
||||
delete(c.trackedChecks, consulCheckID(check.ID))
|
||||
c.registryLock.Unlock()
|
||||
}
|
||||
return mErr.ErrorOrNil()
|
||||
|
@ -467,7 +504,7 @@ func compareConsulCheck(localCheck *consul.AgentCheckRegistration, consulCheck *
|
|||
// 4) a slice of checks that exist only in the Consul Agent (consulChecks)
|
||||
// and should be removed because the Consul Agent has drifted from the
|
||||
// Syncer.
|
||||
func (c *Syncer) calcChecksDiff(consulChecks map[string]*consul.AgentCheck) (
|
||||
func (c *Syncer) calcChecksDiff(consulChecks map[consulCheckID]*consul.AgentCheck) (
|
||||
missingChecks []*consul.AgentCheckRegistration,
|
||||
equalChecks []*consul.AgentCheckRegistration,
|
||||
changedChecks []*consul.AgentCheckRegistration,
|
||||
|
@ -584,7 +621,7 @@ func compareConsulService(localService *consul.AgentServiceRegistration, consulS
|
|||
// 4) a slice of services that exist only in the Consul Agent
|
||||
// (consulServices) and should be removed because the Consul Agent has
|
||||
// drifted from the Syncer.
|
||||
func (c *Syncer) calcServicesDiff(consulServices map[string]*consul.AgentService) (missingServices []*consul.AgentServiceRegistration, equalServices []*consul.AgentServiceRegistration, changedServices []*consul.AgentServiceRegistration, staleServices []*consul.AgentServiceRegistration) {
|
||||
func (c *Syncer) calcServicesDiff(consulServices map[consulServiceID]*consul.AgentService) (missingServices []*consul.AgentServiceRegistration, equalServices []*consul.AgentServiceRegistration, changedServices []*consul.AgentServiceRegistration, staleServices []*consul.AgentServiceRegistration) {
|
||||
type mergedService struct {
|
||||
service *consul.AgentServiceRegistration
|
||||
// 'l' == Nomad local only
|
||||
|
@ -652,16 +689,9 @@ func (c *Syncer) calcServicesDiff(consulServices map[string]*consul.AgentService
|
|||
// syncServices synchronizes this Syncer's Consul Services with the Consul
|
||||
// Agent.
|
||||
func (c *Syncer) syncServices() error {
|
||||
consulServices := make(map[string]*consul.AgentService)
|
||||
for trackedGroup := range c.servicesGroups {
|
||||
serviceIDPrefix := fmt.Sprintf("%s-%s", c.filterPrefix(c.serviceRegPrefix), trackedGroup)
|
||||
services, err := c.queryAgentServices(serviceIDPrefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for serviceID, agentService := range services {
|
||||
consulServices[serviceID] = agentService
|
||||
}
|
||||
consulServices, err := c.queryAgentServices()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Synchronize services with Consul
|
||||
|
@ -672,7 +702,7 @@ func (c *Syncer) syncServices() error {
|
|||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
c.registryLock.Lock()
|
||||
c.trackedServices[service.ID] = service
|
||||
c.trackedServices[consulServiceID(service.ID)] = service
|
||||
c.registryLock.Unlock()
|
||||
}
|
||||
for _, service := range changedServices {
|
||||
|
@ -686,7 +716,7 @@ func (c *Syncer) syncServices() error {
|
|||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
c.registryLock.Lock()
|
||||
delete(c.trackedServices, service.ID)
|
||||
delete(c.trackedServices, consulServiceID(service.ID))
|
||||
c.registryLock.Unlock()
|
||||
}
|
||||
return mErr.ErrorOrNil()
|
||||
|
@ -695,7 +725,7 @@ func (c *Syncer) syncServices() error {
|
|||
// registerCheck registers a check definition with Consul
|
||||
func (c *Syncer) registerCheck(chkReg *consul.AgentCheckRegistration) error {
|
||||
c.registryLock.RLock()
|
||||
if cr, ok := c.checkRunners[chkReg.ID]; ok {
|
||||
if cr, ok := c.checkRunners[consulCheckID(chkReg.ID)]; ok {
|
||||
cr.Start()
|
||||
}
|
||||
c.registryLock.RUnlock()
|
||||
|
@ -733,13 +763,19 @@ func (c *Syncer) createCheckReg(check *structs.ServiceCheck, service *consul.Age
|
|||
return &chkReg, nil
|
||||
}
|
||||
|
||||
// generateConsulServiceID takes the domain and service key and returns a Consul
|
||||
// ServiceID
|
||||
func generateConsulServiceID(domain ServiceDomain, key ServiceKey) consulServiceID {
|
||||
return consulServiceID(fmt.Sprintf("%s-%s-%s", nomadServicePrefix, domain, key))
|
||||
}
|
||||
|
||||
// createService creates a Consul AgentService from a Nomad ConsulService.
|
||||
func (c *Syncer) createService(service *structs.Service) (*consul.AgentServiceRegistration, error) {
|
||||
func (c *Syncer) createService(service *structs.Service, domain ServiceDomain, key ServiceKey) (*consul.AgentServiceRegistration, error) {
|
||||
c.registryLock.RLock()
|
||||
defer c.registryLock.RUnlock()
|
||||
|
||||
srv := consul.AgentServiceRegistration{
|
||||
ID: service.ServiceID,
|
||||
ID: string(generateConsulServiceID(domain, key)),
|
||||
Name: service.Name,
|
||||
Tags: service.Tags,
|
||||
}
|
||||
|
@ -761,21 +797,21 @@ func (c *Syncer) deregisterService(serviceID string) error {
|
|||
}
|
||||
|
||||
// deregisterCheck de-registers a check from Consul
|
||||
func (c *Syncer) deregisterCheck(checkID string) error {
|
||||
func (c *Syncer) deregisterCheck(id consulCheckID) error {
|
||||
c.registryLock.Lock()
|
||||
defer c.registryLock.Unlock()
|
||||
|
||||
// Deleting from Consul Agent
|
||||
if err := c.client.Agent().CheckDeregister(checkID); err != nil {
|
||||
if err := c.client.Agent().CheckDeregister(string(id)); err != nil {
|
||||
// CheckDeregister() will be reattempted again in a future
|
||||
// sync.
|
||||
return err
|
||||
}
|
||||
|
||||
// Remove the check from the local registry
|
||||
if cr, ok := c.checkRunners[checkID]; ok {
|
||||
if cr, ok := c.checkRunners[id]; ok {
|
||||
cr.Stop()
|
||||
delete(c.checkRunners, checkID)
|
||||
delete(c.checkRunners, id)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -847,15 +883,18 @@ func (c *Syncer) SyncServices() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// filterConsulServices prunes out all the service whose ids are not prefixed
|
||||
// with nomad-
|
||||
func (c *Syncer) filterConsulServices(serviceIDPrefix string, consulServices map[string]*consul.AgentService) map[string]*consul.AgentService {
|
||||
localServices := make(map[string]*consul.AgentService, len(consulServices))
|
||||
// filterConsulServices prunes out all the service who were not registered with
|
||||
// the syncer
|
||||
func (c *Syncer) filterConsulServices(consulServices map[string]*consul.AgentService) map[consulServiceID]*consul.AgentService {
|
||||
localServices := make(map[consulServiceID]*consul.AgentService, len(consulServices))
|
||||
c.registryLock.RLock()
|
||||
defer c.registryLock.RUnlock()
|
||||
for serviceID, service := range consulServices {
|
||||
if strings.HasPrefix(service.ID, serviceIDPrefix) {
|
||||
localServices[serviceID] = service
|
||||
for domain := range c.servicesGroups {
|
||||
if strings.HasPrefix(service.ID, fmt.Sprintf("%s-%s", nomadServicePrefix, domain)) {
|
||||
localServices[consulServiceID(serviceID)] = service
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
return localServices
|
||||
|
@ -863,12 +902,16 @@ func (c *Syncer) filterConsulServices(serviceIDPrefix string, consulServices map
|
|||
|
||||
// filterConsulChecks prunes out all the consul checks which do not have
|
||||
// services with Syncer's idPrefix.
|
||||
func (c *Syncer) filterConsulChecks(consulChecks map[string]*consul.AgentCheck) map[string]*consul.AgentCheck {
|
||||
localChecks := make(map[string]*consul.AgentCheck, len(consulChecks))
|
||||
filterPrefix := c.filterPrefix(c.serviceRegPrefix)
|
||||
func (c *Syncer) filterConsulChecks(consulChecks map[string]*consul.AgentCheck) map[consulCheckID]*consul.AgentCheck {
|
||||
localChecks := make(map[consulCheckID]*consul.AgentCheck, len(consulChecks))
|
||||
c.registryLock.RLock()
|
||||
defer c.registryLock.RUnlock()
|
||||
for checkID, check := range consulChecks {
|
||||
if strings.HasPrefix(check.ServiceID, filterPrefix) {
|
||||
localChecks[checkID] = check
|
||||
for domain := range c.checkGroups {
|
||||
if strings.HasPrefix(check.ServiceID, fmt.Sprintf("%s-%s", nomadServicePrefix, domain)) {
|
||||
localChecks[consulCheckID(checkID)] = check
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
return localChecks
|
||||
|
@ -910,28 +953,40 @@ func (c *Syncer) runCheck(check Check) {
|
|||
}
|
||||
}
|
||||
|
||||
// KeepServices prunes all services besides the ones specified by the user
|
||||
func (c *Syncer) KeepServices(serviceRegPrefix string, servicesInRunningAllocs map[string][]*structs.Service) {
|
||||
servicesToKeep := make(map[string]struct{})
|
||||
for allocID, services := range servicesInRunningAllocs {
|
||||
for _, service := range services {
|
||||
serviceID := c.GenerateServiceID(allocID, service, serviceRegPrefix)
|
||||
servicesToKeep[serviceID] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// KeepDomains prunes all services that do not exist in the passed domains
|
||||
func (c *Syncer) KeepDomains(domains []ServiceDomain) error {
|
||||
servicesInConsul, err := c.ConsulClient().Agent().Services()
|
||||
if err != nil {
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
filterServicesWithPrefix := c.filterPrefix(serviceRegPrefix)
|
||||
relevantServices := c.filterConsulServices(filterServicesWithPrefix, servicesInConsul)
|
||||
for serviceID := range relevantServices {
|
||||
if _, ok := servicesToKeep[serviceID]; !ok {
|
||||
c.deregisterService(serviceID)
|
||||
var mErr multierror.Error
|
||||
for serviceID := range servicesInConsul {
|
||||
// Skip any service that was not registered by Nomad
|
||||
if !strings.HasPrefix(serviceID, nomadServicePrefix) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Filter services that do not exist in the desired domains
|
||||
match := false
|
||||
for _, domain := range domains {
|
||||
// Include the hyphen so it is explicit to that domain otherwise it
|
||||
// maybe a subset match
|
||||
desired := fmt.Sprintf("%s-%s-", nomadServicePrefix, domain)
|
||||
if strings.HasPrefix(serviceID, desired) {
|
||||
match = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !match {
|
||||
if err := c.deregisterService(serviceID); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
// AddPeriodicHandler adds a uniquely named callback. Returns true if
|
||||
|
|
|
@ -1555,10 +1555,6 @@ func (sc *ServiceCheck) Hash(serviceID string) string {
|
|||
|
||||
// Service represents a Consul service definition in Nomad
|
||||
type Service struct {
|
||||
// ServiceID is the calculated Consul ServiceID used for a service.
|
||||
// This value is not available to be set via configuration.
|
||||
ServiceID string `mapstructure:"-"`
|
||||
|
||||
// Name of the service registered with Consul. Consul defaults the
|
||||
// Name to ServiceID if not specified. The Name if specified is used
|
||||
// as one of the seed values when generating a Consul ServiceID.
|
||||
|
|
Loading…
Reference in a new issue