Add "Service Groups" to the Syncer.

Now the right way to register services with the Syncer is to call
`SetServices(groupName, []*services)`.  This was required to allow
the Syncer to sync either the Client, Server, or Both using a
single Syncer.
This commit is contained in:
Sean Chittenden 2016-06-09 21:21:22 -04:00
parent c54539b860
commit 83775c5c8b
No known key found for this signature in database
GPG Key ID: 4EBC9DC16C2E5E16
1 changed files with 55 additions and 44 deletions

View File

@ -54,6 +54,14 @@ type Syncer struct {
client *consul.Client
runChecks bool
// servicesGroups is a named group of services that will be flattened
// and reconciled with Consul when SyncServices() is called. The key
// to the servicesGroups map is unique per handler and is used to
// allow the Agent's services to be maintained independently of the
// Client or Server's services.
servicesGroups map[string][]*consul.AgentServiceRegistration
servicesGroupsLock sync.RWMutex
// The "Consul Registry" is a collection of Consul Services and
// Checks all guarded by the registryLock.
registryLock sync.RWMutex
@ -152,6 +160,7 @@ func NewSyncer(config *config.ConsulConfig, shutdownCh chan struct{}, logger *lo
logger: logger,
shutdownCh: shutdownCh,
trackedServices: make(map[string]*consul.AgentService),
servicesGroups: make(map[string][]*consul.AgentServiceRegistration),
trackedChecks: make(map[string]*consul.AgentCheckRegistration),
checkRunners: make(map[string]*CheckRunner),
periodicCallbacks: make(map[string]types.PeriodicCallback),
@ -191,39 +200,36 @@ func (c *Syncer) SyncNow() {
}
}
// SyncServices sync the services with the Consul Agent
func (c *Syncer) SyncServices() error {
services := c.flattenedServices()
// SetServices assigns the slice of Nomad Services to the provided services
// group name.
func (c *Syncer) SetServices(groupName string, services []*structs.ConsulService) error {
var mErr multierror.Error
taskServices := make(map[string]*consul.AgentServiceRegistration)
taskChecks := make(map[string]*consul.AgentCheckRegistration)
// Register Services and Checks that we don't know about or has changed
registeredServices := make([]*consul.AgentServiceRegistration, 0, len(services))
for _, service := range services {
srv, err := c.createService(service)
if err != nil {
if service.ServiceID == "" {
service.ServiceID = c.GenerateServiceID(groupName, service)
}
var serviceReg *consul.AgentServiceRegistration
var err error
if serviceReg, err = c.createService(service); err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
trackedService, ok := c.trackedServices[srv.ID]
if (ok && !reflect.DeepEqual(trackedService, srv)) || !ok {
if err := c.client.Agent().ServiceRegister(srv); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
c.trackedServices[srv.ID] = srv
taskServices[srv.ID] = srv
registeredServices = append(registeredServices, serviceReg)
// Register the check(s) for this service
for _, chk := range service.Checks {
// Create a consul check registration
chkReg, err := c.createDelegatedCheckReg(chk, srv)
// Create a Consul check registration
chkReg, err := c.createDelegatedCheckReg(chk, serviceReg)
if err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
// creating a nomad check if we have to handle this particular check type
if _, ok := c.delegateChecks[chk.Type]; ok {
if _, ok := c.checkRunners[chkReg.ID]; ok {
continue
}
nc, err := c.createDelegatedCheck(chk, chkReg.ID)
if err != nil {
mErr.Errors = append(mErr.Errors, err)
@ -232,37 +238,42 @@ func (c *Syncer) SyncServices() error {
cr := NewCheckRunner(nc, c.runCheck, c.logger)
c.checkRunners[nc.ID()] = cr
}
if _, ok := c.trackedChecks[chkReg.ID]; !ok {
if err := c.registerCheck(chkReg); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
c.trackedChecks[chkReg.ID] = chkReg
taskChecks[chkReg.ID] = chkReg
}
}
// Remove services that are not present anymore
for _, service := range c.trackedServices {
if _, ok := taskServices[service.ID]; !ok {
if err := c.deregisterService(service.ID); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
delete(c.trackedServices, service.ID)
}
if len(mErr.Errors) > 0 {
return mErr.ErrorOrNil()
}
// Remove the checks that are not present anymore
for checkID, _ := range c.trackedChecks {
if _, ok := taskChecks[checkID]; !ok {
if err := c.deregisterCheck(checkID); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
delete(c.trackedChecks, checkID)
c.servicesGroupsLock.Lock()
c.servicesGroups[groupName] = registeredServices
c.servicesGroupsLock.Unlock()
return nil
}
// SyncNow expires the current timer forcing the list of periodic callbacks
// to be synced immediately.
func (c *Syncer) SyncNow() {
select {
case c.notifySyncCh <- struct{}{}:
default:
}
}
// flattenedServices returns a flattened list of services
func (c *Syncer) flattenedServices() []*consul.AgentServiceRegistration {
const initialNumServices = 8
services := make([]*consul.AgentServiceRegistration, 0, initialNumServices)
c.servicesGroupsLock.RLock()
for _, servicesGroup := range c.servicesGroups {
for _, service := range servicesGroup {
services = append(services, service)
}
}
return mErr.ErrorOrNil()
c.servicesGroupsLock.RUnlock()
return services
}
func (c *Syncer) signalShutdown() {