Sync services with Consul by comparing the AgentServiceReg w/ ConsulService

The source of truth is the local Nomad Agent.  Any services not local that
have a matching prefix are removed.  Changed services are re-registered
and missing services are re-added.
This commit is contained in:
Sean Chittenden 2016-06-09 22:55:01 -04:00
parent ed29946f5e
commit 197feae679
No known key found for this signature in database
GPG Key ID: 4EBC9DC16C2E5E16
2 changed files with 162 additions and 9 deletions

View File

@ -6,7 +6,6 @@ import (
"log"
"net/http"
"net/url"
"reflect"
"strings"
"sync"
"time"
@ -72,6 +71,7 @@ type Syncer struct {
checkRunners map[string]*CheckRunner
delegateChecks map[string]struct{} // delegateChecks are the checks that the Nomad client runs and reports to Consul
trackedServices map[string]*consul.AgentServiceRegistration
// serviceRegPrefix is used to namespace the domain of registered
// Consul Services and Checks belonging to a single Syncer. A given
@ -164,8 +164,8 @@ func NewSyncer(config *config.ConsulConfig, shutdownCh chan struct{}, logger *lo
logger: logger,
consulAvailable: true,
shutdownCh: shutdownCh,
trackedServices: make(map[string]*consul.AgentService),
servicesGroups: make(map[string][]*consul.AgentServiceRegistration),
trackedServices: make(map[string]*consul.AgentServiceRegistration),
trackedChecks: make(map[string]*consul.AgentCheckRegistration),
checkRunners: make(map[string]*CheckRunner),
periodicCallbacks: make(map[string]types.PeriodicCallback),
@ -328,8 +328,20 @@ func (c *Syncer) Shutdown() error {
}
// Get the services from Consul
cServices, err := c.client.Agent().Services()
// queryAgentServices queries the Consul Agent for a list of Consul services that
// have been registered with this Consul Syncer.
func (c *Syncer) queryAgentServices() (map[string]*consul.AgentService, error) {
services, err := c.client.Agent().Services()
if err != nil {
return nil, err
}
return c.filterConsulServices(services), nil
}
// syncChecks synchronizes this Syncer's Consul Checks with the Consul Agent.
func (c *Syncer) syncChecks() error {
var mErr multierror.Error
consulChecks, err := c.queryChecks()
if err != nil {
return err
}
@ -341,7 +353,147 @@ func (c *Syncer) Shutdown() error {
if err := c.deregisterService(service.ID); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
// compareConsulService takes a consul.AgentServiceRegistration instance and
// compares it with a consul.AgentService. Returns true if they are equal
// according to consul.AgentService, otherwise false.
func compareConsulService(localService *consul.AgentServiceRegistration, consulService *consul.AgentService) bool {
if consulService.ID != localService.ID ||
consulService.Service != localService.Name ||
consulService.Port != localService.Port ||
consulService.Address != localService.Address ||
consulService.Address != localService.Address ||
consulService.EnableTagOverride != localService.EnableTagOverride {
return false
}
serviceTags := make(map[string]byte, len(localService.Tags))
for _, tag := range localService.Tags {
serviceTags[tag] = 'l'
}
for _, tag := range consulService.Tags {
if _, found := serviceTags[tag]; !found {
return false
}
serviceTags[tag] = 'b'
}
for _, state := range serviceTags {
if state == 'l' {
return false
}
}
return true
}
// calcServicesDiff takes the argument (consulServices) and calculates the
// delta between the consul.Syncer's list of known services
// (c.trackedServices). Three arrays are returned:
//
// 1) a slice of services that exist only locally in the Syncer and are
// missing from the Consul Agent (consulServices) and therefore need to be
// registered.
//
// 2) a slice of services that exist in both the local consul.Syncer's
// tracked list and Consul Agent (consulServices) *AND* are identical.
//
// 3) a slice of services that exist in both the local consul.Syncer's
// tracked list and Consul Agent (consulServices) but have diverged state.
//
// 4) a slice of services that exist only in the Consul Agent
// (consulServices) and should be removed because the Consul Agent has
// drifted from the Syncer.
func (c *Syncer) calcServicesDiff(consulServices map[string]*consul.AgentService) (missingServices []*consul.AgentServiceRegistration, equalServices []*consul.AgentServiceRegistration, changedServices []*consul.AgentServiceRegistration, staleServices []*consul.AgentServiceRegistration) {
type mergedService struct {
service *consul.AgentServiceRegistration
// 'l' == Nomad local only
// 'e' == equal
// 'c' == changed
// 'a' == Consul agent only
state byte
}
var (
localServicesCount = 0
equalServicesCount = 0
changedServicesCount = 0
agentServices = 0
)
localServices := make(map[string]*mergedService, len(c.trackedServices)+len(consulServices))
for _, localService := range c.flattenedServices() {
localServicesCount++
localServices[localService.ID] = &mergedService{localService, 'l'}
}
for _, consulService := range consulServices {
if localService, found := localServices[consulService.ID]; found {
localServicesCount--
if compareConsulService(localService.service, consulService) {
equalServicesCount++
localServices[consulService.ID].state = 'e'
} else {
changedServicesCount++
localServices[consulService.ID].state = 'c'
}
} else {
agentServices++
agentServiceReg := &consul.AgentServiceRegistration{
ID: consulService.ID,
Name: consulService.Service,
Tags: consulService.Tags,
Port: consulService.Port,
Address: consulService.Address,
}
localServices[consulService.ID] = &mergedService{agentServiceReg, 'a'}
}
}
missingServices = make([]*consul.AgentServiceRegistration, 0, localServicesCount)
equalServices = make([]*consul.AgentServiceRegistration, 0, equalServicesCount)
changedServices = make([]*consul.AgentServiceRegistration, 0, changedServicesCount)
staleServices = make([]*consul.AgentServiceRegistration, 0, agentServices)
for _, service := range localServices {
switch service.state {
case 'l':
missingServices = append(missingServices, service.service)
case 'e':
equalServices = append(equalServices, service.service)
case 'c':
changedServices = append(changedServices, service.service)
case 'a':
staleServices = append(staleServices, service.service)
}
}
return missingServices, equalServices, changedServices, staleServices
}
// syncServices synchronizes this Syncer's Consul Services with the Consul
// Agent.
func (c *Syncer) syncServices() error {
consulServices, err := c.queryAgentServices()
if err != nil {
return err
}
// Synchronize services with Consul
var mErr multierror.Error
missingServices, _, changedServices, removedServices := c.calcServicesDiff(consulServices)
for _, service := range missingServices {
if err := c.client.Agent().ServiceRegister(service); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
c.trackedServices[service.ID] = service
}
for _, service := range changedServices {
// Re-register the local service
if err := c.client.Agent().ServiceRegister(service); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
for _, service := range removedServices {
if err := c.deregisterService(service.ID); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
delete(c.trackedServices, service.ID)
}
return mErr.ErrorOrNil()
}
@ -386,7 +538,7 @@ func (c *Syncer) createDelegatedCheckReg(check *structs.ServiceCheck, service *c
return &chkReg, nil
}
// createService creates a Consul AgentService from a Nomad Service
// createService creates a Consul AgentService from a Nomad ConsulService.
func (c *Syncer) createService(service *structs.ConsulService) (*consul.AgentServiceRegistration, error) {
c.registryLock.RLock()
defer c.registryLock.RUnlock()
@ -409,8 +561,8 @@ func (c *Syncer) createService(service *structs.ConsulService) (*consul.AgentSer
}
// deregisterService de-registers a service with the given ID from consul
func (c *Syncer) deregisterService(ID string) error {
return c.client.Agent().ServiceDeregister(ID)
func (c *Syncer) deregisterService(serviceID string) error {
return c.client.Agent().ServiceDeregister(serviceID)
}
// deregisterCheck de-registers a check with a given ID from Consul.
@ -501,8 +653,9 @@ func (c *Syncer) filterConsulServices(consulServices map[string]*consul.AgentSer
localServices := make(map[string]*consul.AgentService, len(consulServices))
c.registryLock.RLock()
defer c.registryLock.RUnlock()
filterPrefix := c.filterPrefix()
for serviceID, service := range consulServices {
if strings.HasPrefix(service.ID, c.serviceRegPrefix) {
if strings.HasPrefix(service.ID, filterPrefix) {
localServices[serviceID] = service
}
}

View File

@ -9,7 +9,7 @@ import (
// ConsulConfig contains the configuration information necessary to
// communicate with a Consul Agent in order to:
//
// - Register services and checks with Consul
// - Register services and their checks with Consul
//
// - Bootstrap this Nomad Client with the list of Nomad Servers registered
// with Consul