implemented reconciliation of unwanted services

This commit is contained in:
Diptanu Choudhury 2016-06-13 14:05:35 +02:00
parent b34cf1f4ef
commit d019d8ef8e
4 changed files with 95 additions and 80 deletions

View file

@ -1344,35 +1344,30 @@ func (c *Client) setupConsulSyncer() error {
// TODO this should only deregister things that the executors do not know
// about
//consulServicesSyncFn := func() error {
//const estInitialConsulServices = 8
//const serviceGroupName = "executor"
//services := make([]*structs.Service, 0, estInitialConsulServices)
//for allocID, ar := range c.getAllocRunners() {
//ar.taskStatusLock.RLock()
//taskStates := copyTaskStates(ar.taskStates)
//ar.taskStatusLock.RUnlock()
//for taskName, taskState := range taskStates {
//if taskState.State == structs.TaskStateRunning {
//if tr, ok := ar.tasks[taskName]; ok {
//for _, service := range tr.task.Services {
//if service.Name == "" {
//service.Name = fmt.Sprintf("%s-%s", tr.task.Name, allocID)
//}
//if service.ServiceID == "" {
//service.ServiceID = fmt.Sprintf("%s-%s:%s/%s", c.consulSyncer.GenerateServiceID(serviceGroupName, service), tr.task.Name, allocID)
//}
//services = append(services, service)
//}
//}
//}
//}
//}
//c.consulSyncer.SetServices(serviceGroupName, services)
//return nil
//}
//c.consulSyncer.AddPeriodicHandler("Nomad Client Services Sync Handler", consulServicesSyncFn)
consulServicesSyncFn := func() error {
const estInitialConsulServices = 8
const serviceGroupName = "executor"
servicesInRunningAllocs := make(map[string][]*structs.Service)
for allocID, ar := range c.getAllocRunners() {
services := make([]*structs.Service, 0, estInitialConsulServices)
ar.taskStatusLock.RLock()
taskStates := copyTaskStates(ar.taskStates)
ar.taskStatusLock.RUnlock()
for taskName, taskState := range taskStates {
if taskState.State == structs.TaskStateRunning {
if tr, ok := ar.tasks[taskName]; ok {
for _, service := range tr.task.Services {
services = append(services, service)
}
}
}
}
servicesInRunningAllocs[allocID] = services
}
c.consulSyncer.KeepServices(serviceGroupName, servicesInRunningAllocs)
return nil
}
c.consulSyncer.AddPeriodicHandler("Nomad Client Services Sync Handler", consulServicesSyncFn)
return nil
}

View file

@ -39,7 +39,7 @@ const (
pidScanInterval = 5 * time.Second
// serviceRegPrefix is the prefix the entire Executor should use
serviceRegPrefix = "executor"
executorServiceRegPrefix = "executor"
)
var (
@ -365,7 +365,7 @@ func (e *UniversalExecutor) UpdateTask(task *structs.Task) error {
// Re-syncing task with Consul agent
if e.consulSyncer != nil {
e.interpolateServices(e.ctx.Task)
e.consulSyncer.SetServices(e.ctx.AllocID, task.Services)
e.consulSyncer.SetServices(e.ctx.AllocID, task.Services, executorServiceRegPrefix)
}
return nil
}
@ -492,9 +492,8 @@ func (e *UniversalExecutor) SyncServices(ctx *ConsulContext) error {
}
e.interpolateServices(e.ctx.Task)
e.consulSyncer.SetDelegatedChecks(e.createCheckMap(), e.createCheck)
e.consulSyncer.SetServiceRegPrefix(serviceRegPrefix)
e.consulSyncer.SetAddrFinder(e.ctx.Task.FindHostAndPortFor)
e.consulSyncer.SetServices(e.ctx.AllocID, e.ctx.Task.Services)
e.consulSyncer.SetServices(e.ctx.AllocID, e.ctx.Task.Services, executorServiceRegPrefix)
return nil
}

View file

@ -378,7 +378,7 @@ func (a *Agent) setupServer() error {
Name: a.config.Consul.ServerServiceName,
Tags: []string{consul.ServiceTagSerf},
},
})
}, "agent")
}
return nil
@ -424,7 +424,7 @@ func (a *Agent) setupClient() error {
PortLabel: a.clientRPCAddr,
Tags: []string{consul.ServiceTagRPC},
},
})
}, "agent")
}
return nil
@ -593,7 +593,6 @@ func (a *Agent) setupConsulSyncer(shutdownCh chan struct{}) error {
if err != nil {
return err
}
a.consulSyncer.SetServiceRegPrefix("agent")
a.consulSyncer.SetAddrFinder(func(portLabel string) (string, int) {
host, port, err := net.SplitHostPort(portLabel)

View file

@ -30,7 +30,7 @@ const (
// nomadServicePrefix is the prefix used when registering a service
// with consul
nomadServicePrefix = "nomad"
nomadServicePrefix = "nomad-registered-service"
// The periodic time interval for syncing services and checks with Consul
syncInterval = 5 * time.Second
@ -71,6 +71,8 @@ type Syncer struct {
checkGroups map[string][]*consul.AgentCheckRegistration
groupsLock sync.RWMutex
groupName string
// The "Consul Registry" is a collection of Consul Services and
// Checks all guarded by the registryLock.
registryLock sync.RWMutex
@ -82,14 +84,6 @@ type Syncer struct {
trackedChecks map[string]*consul.AgentCheckRegistration
trackedServices map[string]*consul.AgentServiceRegistration
// serviceRegPrefix is used to namespace the domain of registered
// Consul Services and Checks belonging to a single Syncer. A given
// Nomad Agent may spawn multiple Syncer tasks between the Agent
// Agent and its Executors, all syncing to a single Consul Agent.
// The serviceRegPrefix allows multiple Syncers to coexist without
// each Syncer clobbering each others Services. The Syncer namespace
// protocol is fmt.Sprintf("nomad-%s-%s", serviceRegPrefix, miscID).
// serviceRegPrefix is guarded by the registryLock.
serviceRegPrefix string
addrFinder func(portLabel string) (string, int)
@ -204,47 +198,40 @@ func (c *Syncer) SetAddrFinder(addrFinder func(string) (string, int)) *Syncer {
return c
}
// SetServiceRegPrefix sets the registration prefix used by the Syncer when
// registering Services with Consul.
func (c *Syncer) SetServiceRegPrefix(servicePrefix string) *Syncer {
c.registryLock.Lock()
defer c.registryLock.Unlock()
c.serviceRegPrefix = servicePrefix
return c
}
// filterPrefix generates a unique prefix that a Syncer can later filter on.
func (c *Syncer) filterPrefix() string {
func (c *Syncer) filterPrefix(serviceRegPrefix string) string {
c.registryLock.RLock()
defer c.registryLock.RUnlock()
return fmt.Sprintf("%s-%s", nomadServicePrefix, c.serviceRegPrefix)
return fmt.Sprintf("%s-%s", nomadServicePrefix, serviceRegPrefix)
}
// GenerateServiceID creates a unique Consul ServiceID for a given
// Service.
func (c *Syncer) GenerateServiceID(groupName string, service *structs.Service) string {
func (c *Syncer) GenerateServiceID(groupName string, service *structs.Service, serviceRegPrefix string) string {
numTags := len(service.Tags)
switch numTags {
case 0:
return fmt.Sprintf("%s-%s:%s", c.filterPrefix(), groupName, service.Name)
return fmt.Sprintf("%s-%s:%s", c.filterPrefix(serviceRegPrefix), groupName, service.Name)
case 1:
return fmt.Sprintf("%s-%s:%s@%s", c.filterPrefix(), groupName, service.Tags[0], service.Name)
return fmt.Sprintf("%s-%s:%s@%s", c.filterPrefix(serviceRegPrefix), groupName, service.Tags[0], service.Name)
default:
tags := strings.Join(service.Tags, "|")
return fmt.Sprintf("%s-%s:(%s)@%s", c.filterPrefix(), groupName, tags, service.Name)
return fmt.Sprintf("%s-%s:(%s)@%s", c.filterPrefix(serviceRegPrefix), groupName, tags, service.Name)
}
}
// SetServices assigns the slice of Nomad Services to the provided services
// group name.
func (c *Syncer) SetServices(groupName string, services []*structs.Service) error {
func (c *Syncer) SetServices(groupName string, services []*structs.Service, serviceRegPrefix string) error {
c.groupName = groupName
c.serviceRegPrefix = serviceRegPrefix
var mErr multierror.Error
numServ := len(services)
registeredServices := make([]*consul.AgentServiceRegistration, 0, numServ)
registeredChecks := make([]*consul.AgentCheckRegistration, 0, numServ)
for _, service := range services {
if service.ServiceID == "" {
service.ServiceID = c.GenerateServiceID(groupName, service)
service.ServiceID = c.GenerateServiceID(groupName, service, serviceRegPrefix)
}
var serviceReg *consul.AgentServiceRegistration
var err error
@ -319,13 +306,12 @@ func (c *Syncer) flattenedServices() []*consul.AgentServiceRegistration {
const initialNumServices = 8
services := make([]*consul.AgentServiceRegistration, 0, initialNumServices)
c.groupsLock.RLock()
defer c.groupsLock.RUnlock()
for _, servicesGroup := range c.servicesGroups {
for _, service := range servicesGroup {
services = append(services, service)
}
}
c.groupsLock.RUnlock()
return services
}
@ -371,11 +357,18 @@ func (c *Syncer) Shutdown() error {
c.registryLock.RUnlock()
// De-register all the services from Consul
services, err := c.queryAgentServices()
if err != nil {
mErr.Errors = append(mErr.Errors, err)
consulServices := make(map[string]*consul.AgentService)
for trackedGroup := range c.servicesGroups {
serviceIDPrefix := fmt.Sprintf("%s-%s", c.filterPrefix(c.serviceRegPrefix), trackedGroup)
services, err := c.queryAgentServices(serviceIDPrefix)
if err != nil {
return err
}
for serviceID, agentService := range services {
consulServices[serviceID] = agentService
}
}
for _, service := range services {
for _, service := range consulServices {
if err := c.client.Agent().ServiceDeregister(service.ID); err != nil {
c.logger.Printf("[WARN] consul.syncer: failed to deregister service ID %+q: %v", service.ID, err)
mErr.Errors = append(mErr.Errors, err)
@ -396,12 +389,12 @@ func (c *Syncer) queryChecks() (map[string]*consul.AgentCheck, error) {
// queryAgentServices queries the Consul Agent for a list of Consul services that
// have been registered with this Consul Syncer.
func (c *Syncer) queryAgentServices() (map[string]*consul.AgentService, error) {
func (c *Syncer) queryAgentServices(serviceIDPrefix string) (map[string]*consul.AgentService, error) {
services, err := c.client.Agent().Services()
if err != nil {
return nil, err
}
return c.filterConsulServices(services), nil
return c.filterConsulServices(serviceIDPrefix, services), nil
}
// syncChecks synchronizes this Syncer's Consul Checks with the Consul Agent.
@ -660,9 +653,16 @@ func (c *Syncer) calcServicesDiff(consulServices map[string]*consul.AgentService
// syncServices synchronizes this Syncer's Consul Services with the Consul
// Agent.
func (c *Syncer) syncServices() error {
consulServices, err := c.queryAgentServices()
if err != nil {
return err
consulServices := make(map[string]*consul.AgentService)
for trackedGroup := range c.servicesGroups {
serviceIDPrefix := fmt.Sprintf("%s-%s", c.filterPrefix(c.serviceRegPrefix), trackedGroup)
services, err := c.queryAgentServices(serviceIDPrefix)
if err != nil {
return err
}
for serviceID, agentService := range services {
consulServices[serviceID] = agentService
}
}
// Synchronize services with Consul
@ -794,12 +794,12 @@ func (c *Syncer) Run() {
if err := c.SyncServices(); err != nil {
if c.consulAvailable {
c.logger.Printf("[DEBUG] consul.syncer: disabling checks until successful sync for %+q: %v", c.serviceRegPrefix, err)
c.logger.Printf("[DEBUG] consul.syncer: error in syncing: %v", err)
}
c.consulAvailable = false
} else {
if !c.consulAvailable {
c.logger.Printf("[DEBUG] consul.syncer: re-enabling checks for for %+q", c.serviceRegPrefix)
c.logger.Printf("[DEBUG] consul.syncer: syncs succesful")
}
c.consulAvailable = true
}
@ -809,7 +809,7 @@ func (c *Syncer) Run() {
c.Shutdown()
case <-c.notifyShutdownCh:
sync.Stop()
c.logger.Printf("[INFO] consul.syncer: shutting down sync for %+q", c.serviceRegPrefix)
c.logger.Printf("[INFO] consul.syncer: shutting down syncer ")
return
}
}
@ -850,13 +850,12 @@ func (c *Syncer) SyncServices() error {
// filterConsulServices prunes out all the service whose ids are not prefixed
// with nomad-
func (c *Syncer) filterConsulServices(consulServices map[string]*consul.AgentService) map[string]*consul.AgentService {
func (c *Syncer) filterConsulServices(serviceIDPrefix string, consulServices map[string]*consul.AgentService) map[string]*consul.AgentService {
localServices := make(map[string]*consul.AgentService, len(consulServices))
c.registryLock.RLock()
defer c.registryLock.RUnlock()
filterPrefix := c.filterPrefix()
for serviceID, service := range consulServices {
if strings.HasPrefix(service.ID, filterPrefix) {
if strings.HasPrefix(service.ID, serviceIDPrefix) {
localServices[serviceID] = service
}
}
@ -867,7 +866,7 @@ func (c *Syncer) filterConsulServices(consulServices map[string]*consul.AgentSer
// services with Syncer's idPrefix.
func (c *Syncer) filterConsulChecks(consulChecks map[string]*consul.AgentCheck) map[string]*consul.AgentCheck {
localChecks := make(map[string]*consul.AgentCheck, len(consulChecks))
filterPrefix := c.filterPrefix()
filterPrefix := c.filterPrefix(c.serviceRegPrefix)
for checkID, check := range consulChecks {
if strings.HasPrefix(check.ServiceID, filterPrefix) {
localChecks[checkID] = check
@ -912,6 +911,29 @@ func (c *Syncer) runCheck(check Check) {
}
}
func (c *Syncer) KeepServices(serviceRegPrefix string, servicesInRunningAllocs map[string][]*structs.Service) {
servicesToKeep := make(map[string]struct{})
for allocID, services := range servicesInRunningAllocs {
for _, service := range services {
serviceID := c.GenerateServiceID(allocID, service, serviceRegPrefix)
servicesToKeep[serviceID] = struct{}{}
}
}
servicesInConsul, err := c.ConsulClient().Agent().Services()
if err != nil {
return
}
filterServicesWithPrefix := c.filterPrefix(serviceRegPrefix)
relevantServices := c.filterConsulServices(filterServicesWithPrefix, servicesInConsul)
for serviceID := range relevantServices {
if _, ok := servicesToKeep[serviceID]; !ok {
c.deregisterService(serviceID)
}
}
}
// AddPeriodicHandler adds a uniquely named callback. Returns true if
// successful, false if a handler with the same name already exists.
func (c *Syncer) AddPeriodicHandler(name string, fn types.PeriodicCallback) bool {