Merge pull request #1267 from hashicorp/b-consul-register-checks

Consul.Syncer register checks and starts delegated checks
This commit is contained in:
Alex Dadgar 2016-06-12 22:38:02 -07:00 committed by GitHub
commit b34cf1f4ef
3 changed files with 57 additions and 36 deletions

View file

@ -366,7 +366,6 @@ func (e *UniversalExecutor) UpdateTask(task *structs.Task) error {
if e.consulSyncer != nil {
e.interpolateServices(e.ctx.Task)
e.consulSyncer.SetServices(e.ctx.AllocID, task.Services)
e.consulSyncer.SyncNow()
}
return nil
}
@ -496,7 +495,6 @@ func (e *UniversalExecutor) SyncServices(ctx *ConsulContext) error {
e.consulSyncer.SetServiceRegPrefix(serviceRegPrefix)
e.consulSyncer.SetAddrFinder(e.ctx.Task.FindHostAndPortFor)
e.consulSyncer.SetServices(e.ctx.AllocID, e.ctx.Task.Services)
e.consulSyncer.SyncNow() // Attempt to register immediately
return nil
}

View file

@ -74,19 +74,13 @@ func NewAgent(config *Config, logOutput io.Writer) (*Agent, error) {
return nil, fmt.Errorf("must have at least client or server mode enabled")
}
// The Nomad Agent runs the consul.Syncer regardless of whether or
// not the Agent is running in Client or Server mode (or both), and
// regardless of the consul.auto_register parameter. The Client and
// Server both reuse the same consul.Syncer instance. This Syncer
// task periodically executes callbacks that update Consul. The
// reason the Syncer is always running is because one of the
// callbacks is attempts to self-bootstrap Nomad using information
// found in Consul. The Syncer's handlers automatically deactivate
// when the Consul Fingerprinter has detected the local Consul Agent
// is missing.
if err := a.consulSyncer.SyncServices(); err != nil {
a.logger.Printf("[WARN] agent.consul: Initial sync of Consul failed: %v", err)
}
// The Nomad Agent runs the consul.Syncer regardless of whether or not the
// Agent is running in Client or Server mode (or both), and regardless of
// the consul.auto_register parameter. The Client and Server both reuse the
// same consul.Syncer instance. This Syncer task periodically executes
// callbacks that update Consul. The reason the Syncer is always running is
// because one of the callbacks is attempts to self-bootstrap Nomad using
// information found in Consul.
go a.consulSyncer.Run()
return a, nil

View file

@ -62,20 +62,23 @@ type Syncer struct {
client *consul.Client
consulAvailable bool
// servicesGroups is a named group of services that will be flattened
// and reconciled with Consul when SyncServices() is called. The key
// to the servicesGroups map is unique per handler and is used to
// allow the Agent's services to be maintained independently of the
// Client or Server's services.
servicesGroups map[string][]*consul.AgentServiceRegistration
servicesGroupsLock sync.RWMutex
// servicesGroups and checkGroups are named groups of services and checks
// respectively that will be flattened and reconciled with Consul when
// SyncServices() is called. The key to the servicesGroups map is unique
// per handler and is used to allow the Agent's services to be maintained
// independently of the Client or Server's services.
servicesGroups map[string][]*consul.AgentServiceRegistration
checkGroups map[string][]*consul.AgentCheckRegistration
groupsLock sync.RWMutex
// The "Consul Registry" is a collection of Consul Services and
// Checks all guarded by the registryLock.
registryLock sync.RWMutex
checkRunners map[string]*CheckRunner
delegateChecks map[string]struct{} // delegateChecks are the checks that the Nomad client runs and reports to Consul
checkRunners map[string]*CheckRunner
delegateChecks map[string]struct{} // delegateChecks are the checks that the Nomad client runs and reports to Consul
// trackedChecks and trackedServices are registered with consul
trackedChecks map[string]*consul.AgentCheckRegistration
trackedServices map[string]*consul.AgentServiceRegistration
@ -178,6 +181,7 @@ func NewSyncer(consulConfig *config.ConsulConfig, shutdownCh chan struct{}, logg
consulAvailable: true,
shutdownCh: shutdownCh,
servicesGroups: make(map[string][]*consul.AgentServiceRegistration),
checkGroups: make(map[string][]*consul.AgentCheckRegistration),
trackedServices: make(map[string]*consul.AgentServiceRegistration),
trackedChecks: make(map[string]*consul.AgentCheckRegistration),
checkRunners: make(map[string]*CheckRunner),
@ -235,7 +239,9 @@ func (c *Syncer) GenerateServiceID(groupName string, service *structs.Service) s
// group name.
func (c *Syncer) SetServices(groupName string, services []*structs.Service) error {
var mErr multierror.Error
registeredServices := make([]*consul.AgentServiceRegistration, 0, len(services))
numServ := len(services)
registeredServices := make([]*consul.AgentServiceRegistration, 0, numServ)
registeredChecks := make([]*consul.AgentCheckRegistration, 0, numServ)
for _, service := range services {
if service.ServiceID == "" {
service.ServiceID = c.GenerateServiceID(groupName, service)
@ -251,11 +257,12 @@ func (c *Syncer) SetServices(groupName string, services []*structs.Service) erro
// Register the check(s) for this service
for _, chk := range service.Checks {
// Create a Consul check registration
chkReg, err := c.createDelegatedCheckReg(chk, serviceReg)
chkReg, err := c.createCheckReg(chk, serviceReg)
if err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
// creating a nomad check if we have to handle this particular check type
c.registryLock.RLock()
if _, ok := c.delegateChecks[chk.Type]; ok {
@ -264,11 +271,13 @@ func (c *Syncer) SetServices(groupName string, services []*structs.Service) erro
if ok {
continue
}
nc, err := c.createDelegatedCheck(chk, chkReg.ID)
if err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
cr := NewCheckRunner(nc, c.runCheck, c.logger)
c.registryLock.Lock()
c.checkRunners[nc.ID()] = cr
@ -276,6 +285,8 @@ func (c *Syncer) SetServices(groupName string, services []*structs.Service) erro
} else {
c.registryLock.RUnlock()
}
registeredChecks = append(registeredChecks, chkReg)
}
}
@ -283,9 +294,13 @@ func (c *Syncer) SetServices(groupName string, services []*structs.Service) erro
return mErr.ErrorOrNil()
}
c.servicesGroupsLock.Lock()
c.groupsLock.Lock()
c.servicesGroups[groupName] = registeredServices
c.servicesGroupsLock.Unlock()
c.checkGroups[groupName] = registeredChecks
c.groupsLock.Unlock()
// Sync immediately
c.SyncNow()
return nil
}
@ -303,17 +318,32 @@ func (c *Syncer) SyncNow() {
func (c *Syncer) flattenedServices() []*consul.AgentServiceRegistration {
const initialNumServices = 8
services := make([]*consul.AgentServiceRegistration, 0, initialNumServices)
c.servicesGroupsLock.RLock()
c.groupsLock.RLock()
for _, servicesGroup := range c.servicesGroups {
for _, service := range servicesGroup {
services = append(services, service)
}
}
c.servicesGroupsLock.RUnlock()
c.groupsLock.RUnlock()
return services
}
// flattenedChecks returns a flattened list of checks
func (c *Syncer) flattenedChecks() []*consul.AgentCheckRegistration {
const initialNumChecks = 8
checks := make([]*consul.AgentCheckRegistration, 0, initialNumChecks)
c.groupsLock.RLock()
for _, checkGroup := range c.checkGroups {
for _, check := range checkGroup {
checks = append(checks, check)
}
}
c.groupsLock.RUnlock()
return checks
}
func (c *Syncer) signalShutdown() {
select {
case c.notifyShutdownCh <- struct{}{}:
@ -467,7 +497,7 @@ func (c *Syncer) calcChecksDiff(consulChecks map[string]*consul.AgentCheck) (
)
c.registryLock.RLock()
localChecks := make(map[string]*mergedCheck, len(c.trackedChecks)+len(consulChecks))
for _, localCheck := range c.trackedChecks {
for _, localCheck := range c.flattenedChecks() {
localChecksCount++
localChecks[localCheck.ID] = &mergedCheck{localCheck, 'l'}
}
@ -547,7 +577,7 @@ func compareConsulService(localService *consul.AgentServiceRegistration, consulS
// calcServicesDiff takes the argument (consulServices) and calculates the
// delta between the consul.Syncer's list of known services
// (c.trackedServices). Three arrays are returned:
// (c.trackedServices). Four arrays are returned:
//
// 1) a slice of services that exist only locally in the Syncer and are
// missing from the Consul Agent (consulServices) and therefore need to be
@ -673,10 +703,9 @@ func (c *Syncer) registerCheck(chkReg *consul.AgentCheckRegistration) error {
return c.client.Agent().CheckRegister(chkReg)
}
// createDelegatedCheckReg creates a Check that can be registered with
// Nomad. It also creates a Nomad check for the check types that it can
// handle.
func (c *Syncer) createDelegatedCheckReg(check *structs.ServiceCheck, service *consul.AgentServiceRegistration) (*consul.AgentCheckRegistration, error) {
// createCheckReg creates a Check that can be registered with Nomad. It also
// creates a Nomad check for the check types that it can handle.
func (c *Syncer) createCheckReg(check *structs.ServiceCheck, service *consul.AgentServiceRegistration) (*consul.AgentCheckRegistration, error) {
chkReg := consul.AgentCheckRegistration{
ID: check.Hash(service.ID),
Name: check.Name,