Sync checks with Consul by comparing the AgentCheckReg w/ ConsulService
The source of truth is the local Nomad Agent. Any checks are not local that have a matching prefix are removed. Changed checks are re-registered and missing checks are re-added.
This commit is contained in:
parent
197feae679
commit
57e084e4df
|
@ -71,6 +71,7 @@ type Syncer struct {
|
|||
|
||||
checkRunners map[string]*CheckRunner
|
||||
delegateChecks map[string]struct{} // delegateChecks are the checks that the Nomad client runs and reports to Consul
|
||||
trackedChecks map[string]*consul.AgentCheckRegistration
|
||||
trackedServices map[string]*consul.AgentServiceRegistration
|
||||
|
||||
// serviceRegPrefix is used to namespace the domain of registered
|
||||
|
@ -327,6 +328,15 @@ func (c *Syncer) Shutdown() error {
|
|||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
// queryChecks queries the Consul Agent for a list of Consul checks that
|
||||
// have been registered with this Consul Syncer.
|
||||
func (c *Syncer) queryChecks() (map[string]*consul.AgentCheck, error) {
|
||||
checks, err := c.client.Agent().Checks()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return c.filterConsulChecks(checks), nil
|
||||
}
|
||||
|
||||
// queryAgentServices queries the Consul Agent for a list of Consul services that
|
||||
// have been registered with this Consul Syncer.
|
||||
|
@ -345,14 +355,127 @@ func (c *Syncer) syncChecks() error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cServices = c.filterConsulServices(cServices)
|
||||
|
||||
// Remove the services from consul which are not in any of the tasks
|
||||
for _, service := range cServices {
|
||||
if _, validService := services[service.ID]; !validService {
|
||||
if err := c.deregisterService(service.ID); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
// Synchronize checks with Consul
|
||||
missingChecks, _, changedChecks, staleChecks := c.calcChecksDiff(consulChecks)
|
||||
for _, check := range missingChecks {
|
||||
if err := c.registerCheck(check); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
c.trackedChecks[check.ID] = check
|
||||
}
|
||||
for _, check := range changedChecks {
|
||||
// NOTE(sean@): Do we need to deregister the check before
|
||||
// re-registering it? Not deregistering to avoid missing the
|
||||
// TTL but doesn't correct reconcile any possible drift with
|
||||
// the check.
|
||||
//
|
||||
// if err := c.deregisterCheck(check.ID); err != nil {
|
||||
// mErr.Errors = append(mErr.Errors, err)
|
||||
// }
|
||||
if err := c.registerCheck(check); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
}
|
||||
for _, check := range staleChecks {
|
||||
if err := c.deregisterCheck(check.ID); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
delete(c.trackedChecks, check.ID)
|
||||
}
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
// compareConsulCheck takes a consul.AgentCheckRegistration instance and
|
||||
// compares it with a consul.AgentCheck. Returns true if they are equal
|
||||
// according to consul.AgentCheck, otherwise false.
|
||||
func compareConsulCheck(localCheck *consul.AgentCheckRegistration, consulCheck *consul.AgentCheck) bool {
|
||||
if consulCheck.CheckID != localCheck.ID ||
|
||||
consulCheck.Name != localCheck.Name ||
|
||||
consulCheck.Notes != localCheck.Notes ||
|
||||
consulCheck.ServiceID != localCheck.ServiceID {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// calcChecksDiff takes the argument (consulChecks) and calculates the delta
|
||||
// between the consul.Syncer's list of known checks (c.trackedChecks). Three
|
||||
// arrays are returned:
|
||||
//
|
||||
// 1) a slice of checks that exist only locally in the Syncer and are missing
|
||||
// from the Consul Agent (consulChecks) and therefore need to be registered.
|
||||
//
|
||||
// 2) a slice of checks that exist in both the local consul.Syncer's
|
||||
// tracked list and Consul Agent (consulChecks).
|
||||
//
|
||||
// 3) a slice of checks that exist in both the local consul.Syncer's
|
||||
// tracked list and Consul Agent (consulServices) but have diverged state.
|
||||
//
|
||||
// 4) a slice of checks that exist only in the Consul Agent (consulChecks)
|
||||
// and should be removed because the Consul Agent has drifted from the
|
||||
// Syncer.
|
||||
func (c *Syncer) calcChecksDiff(consulChecks map[string]*consul.AgentCheck) (missingChecks []*consul.AgentCheckRegistration, equalChecks []*consul.AgentCheckRegistration, changedChecks []*consul.AgentCheckRegistration, staleChecks []*consul.AgentCheckRegistration) {
|
||||
type mergedCheck struct {
|
||||
check *consul.AgentCheckRegistration
|
||||
// 'l' == Nomad local only
|
||||
// 'e' == equal
|
||||
// 'c' == changed
|
||||
// 'a' == Consul agent only
|
||||
state byte
|
||||
}
|
||||
var (
|
||||
localChecksCount = 0
|
||||
equalChecksCount = 0
|
||||
changedChecksCount = 0
|
||||
agentChecks = 0
|
||||
)
|
||||
localChecks := make(map[string]*mergedCheck, len(c.trackedChecks)+len(consulChecks))
|
||||
for _, localCheck := range c.trackedChecks {
|
||||
localChecksCount++
|
||||
localChecks[localCheck.ID] = &mergedCheck{localCheck, 'l'}
|
||||
}
|
||||
for _, consulCheck := range consulChecks {
|
||||
if localCheck, found := localChecks[consulCheck.CheckID]; found {
|
||||
localChecksCount--
|
||||
if compareConsulCheck(localCheck.check, consulCheck) {
|
||||
equalChecksCount++
|
||||
localChecks[consulCheck.CheckID].state = 'e'
|
||||
} else {
|
||||
changedChecksCount++
|
||||
localChecks[consulCheck.CheckID].state = 'c'
|
||||
}
|
||||
} else {
|
||||
agentChecks++
|
||||
agentCheckReg := &consul.AgentCheckRegistration{
|
||||
ID: consulCheck.CheckID,
|
||||
Name: consulCheck.Name,
|
||||
Notes: consulCheck.Notes,
|
||||
ServiceID: consulCheck.ServiceID,
|
||||
}
|
||||
localChecks[consulCheck.CheckID] = &mergedCheck{agentCheckReg, 'a'}
|
||||
}
|
||||
}
|
||||
|
||||
missingChecks = make([]*consul.AgentCheckRegistration, 0, localChecksCount)
|
||||
equalChecks = make([]*consul.AgentCheckRegistration, 0, equalChecksCount)
|
||||
changedChecks = make([]*consul.AgentCheckRegistration, 0, changedChecksCount)
|
||||
staleChecks = make([]*consul.AgentCheckRegistration, 0, agentChecks)
|
||||
for _, check := range localChecks {
|
||||
switch check.state {
|
||||
case 'l':
|
||||
missingChecks = append(missingChecks, check.check)
|
||||
case 'e':
|
||||
equalChecks = append(equalChecks, check.check)
|
||||
case 'c':
|
||||
changedChecks = append(changedChecks, check.check)
|
||||
case 'a':
|
||||
staleChecks = append(staleChecks, check.check)
|
||||
}
|
||||
}
|
||||
|
||||
return missingChecks, equalChecks, changedChecks, staleChecks
|
||||
}
|
||||
|
||||
// compareConsulService takes a consul.AgentServiceRegistration instance and
|
||||
// compares it with a consul.AgentService. Returns true if they are equal
|
||||
|
@ -509,7 +632,7 @@ func (c *Syncer) registerCheck(chkReg *consul.AgentCheckRegistration) error {
|
|||
// createDelegatedCheckReg creates a Check that can be registered with
|
||||
// Nomad. It also creates a Nomad check for the check types that it can
|
||||
// handle.
|
||||
func (c *Syncer) createDelegatedCheckReg(check *structs.ServiceCheck, service *consul.AgentService) (*consul.AgentCheckRegistration, error) {
|
||||
func (c *Syncer) createDelegatedCheckReg(check *structs.ServiceCheck, service *consul.AgentServiceRegistration) (*consul.AgentCheckRegistration, error) {
|
||||
chkReg := consul.AgentCheckRegistration{
|
||||
ID: check.Hash(service.ID),
|
||||
Name: check.Name,
|
||||
|
@ -565,16 +688,25 @@ func (c *Syncer) deregisterService(serviceID string) error {
|
|||
return c.client.Agent().ServiceDeregister(serviceID)
|
||||
}
|
||||
|
||||
// deregisterCheck de-registers a check with a given ID from Consul.
|
||||
func (c *Syncer) deregisterCheck(ID string) error {
|
||||
// Deleting the nomad check
|
||||
if cr, ok := c.checkRunners[ID]; ok {
|
||||
cr.Stop()
|
||||
delete(c.checkRunners, ID)
|
||||
// deregisterCheck de-registers a check from Consul
|
||||
func (c *Syncer) deregisterCheck(checkID string) error {
|
||||
c.registryLock.Lock()
|
||||
defer c.registryLock.Unlock()
|
||||
|
||||
// Deleting from Consul Agent
|
||||
if err := c.client.Agent().CheckDeregister(checkID); err != nil {
|
||||
// CheckDeregister() will be reattempted again in a future
|
||||
// sync.
|
||||
return err
|
||||
}
|
||||
|
||||
// Deleting from consul
|
||||
return c.client.Agent().CheckDeregister(ID)
|
||||
// Remove the check from the local registry
|
||||
if cr, ok := c.checkRunners[checkID]; ok {
|
||||
cr.Stop()
|
||||
delete(c.checkRunners, checkID)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Run triggers periodic syncing of services and checks with Consul. This is
|
||||
|
@ -663,15 +795,16 @@ func (c *Syncer) filterConsulServices(consulServices map[string]*consul.AgentSer
|
|||
}
|
||||
|
||||
// filterConsulChecks prunes out all the consul checks which do not have
|
||||
// services with id prefixed with noamd-
|
||||
func (c *Syncer) filterConsulChecks(chks map[string]*consul.AgentCheck) map[string]*consul.AgentCheck {
|
||||
nomadChecks := make(map[string]*consul.AgentCheck)
|
||||
for _, chk := range chks {
|
||||
if strings.HasPrefix(chk.ServiceID, structs.NomadConsulPrefix) {
|
||||
nomadChecks[chk.CheckID] = chk
|
||||
// services with Syncer's idPrefix.
|
||||
func (c *Syncer) filterConsulChecks(consulChecks map[string]*consul.AgentCheck) map[string]*consul.AgentCheck {
|
||||
localChecks := make(map[string]*consul.AgentCheck, len(consulChecks))
|
||||
filterPrefix := c.filterPrefix()
|
||||
for checkID, check := range consulChecks {
|
||||
if strings.HasPrefix(check.ServiceID, filterPrefix) {
|
||||
localChecks[checkID] = check
|
||||
}
|
||||
}
|
||||
return nomadChecks
|
||||
return localChecks
|
||||
}
|
||||
|
||||
// consulPresent indicates whether the consul agent is responding
|
||||
|
|
Loading…
Reference in a new issue