open-nomad/command/agent/consul/syncer.go

1009 lines
31 KiB
Go

// Package consul is used by Nomad to register all services both static services
// and dynamic via allocations.
//
// Consul Service IDs have the following format: ${nomadServicePrefix}-${groupName}-${serviceKey}
// groupName takes on one of the following values:
// - server
// - client
// - executor-${alloc-id}-${task-name}
//
// serviceKey should be generated by service registrators.
// If the serviceKey is being generated by the executor for a Nomad Task.Services
// the following helper should be used:
// NOTE: Executor should interpolate the service prior to calling
// func GenerateTaskServiceKey(service *structs.Service) string
//
// The Nomad Client reaps services registered from dead allocations that were
// not properly cleaned up by the executor (this is not the expected case).
//
// TODO fix this comment
// The Consul ServiceIDs generated by the executor will contain the allocation
// ID. Thus the client can generate the list of Consul ServiceIDs to keep by
// calling the following method on all running allocations the client is aware
// of:
// func GenerateExecutorServiceKeyPrefixFromAlloc(allocID string) string
package consul
import (
"fmt"
"log"
"net"
"net/url"
"strconv"
"strings"
"sync"
"time"
consul "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
"github.com/hashicorp/nomad/nomad/types"
)
const (
// initialSyncBuffer is the max time an initial sync will sleep
// before syncing.
initialSyncBuffer = 30 * time.Second
// initialSyncDelay is the delay before an initial sync.
initialSyncDelay = 5 * time.Second
// nomadServicePrefix is the first prefix that scopes all Nomad registered
// services
nomadServicePrefix = "_nomad"
// The periodic time interval for syncing services and checks with Consul
syncInterval = 5 * time.Second
// syncJitter provides a little variance in the frequency at which
// Syncer polls Consul.
syncJitter = 8
// ttlCheckBuffer is the time interval that Nomad can take to report Consul
// the check result
ttlCheckBuffer = 31 * time.Second
// DefaultQueryWaitDuration is the max duration the Consul Agent will
// spend waiting for a response from a Consul Query.
DefaultQueryWaitDuration = 2 * time.Second
// ServiceTagHTTP is the tag assigned to HTTP services
ServiceTagHTTP = "http"
// ServiceTagRPC is the tag assigned to RPC services
ServiceTagRPC = "rpc"
// ServiceTagSerf is the tag assigned to Serf services
ServiceTagSerf = "serf"
)
// consulServiceID and consulCheckID are the IDs registered with Consul
type consulServiceID string
type consulCheckID string
// ServiceKey is the generated service key that is used to build the Consul
// ServiceID
type ServiceKey string
// ServiceDomain is the domain of services registered by Nomad
type ServiceDomain string
const (
ClientDomain ServiceDomain = "client"
ServerDomain ServiceDomain = "server"
)
// NewExecutorDomain returns a domain specific to the alloc ID and task
func NewExecutorDomain(allocID, task string) ServiceDomain {
return ServiceDomain(fmt.Sprintf("executor-%s-%s", allocID, task))
}
// Syncer allows syncing of services and checks with Consul
type Syncer struct {
client *consul.Client
consulAvailable bool
// servicesGroups and checkGroups are named groups of services and checks
// respectively that will be flattened and reconciled with Consul when
// SyncServices() is called. The key to the servicesGroups map is unique
// per handler and is used to allow the Agent's services to be maintained
// independently of the Client or Server's services.
servicesGroups map[ServiceDomain]map[ServiceKey]*consul.AgentServiceRegistration
checkGroups map[ServiceDomain]map[ServiceKey][]*consul.AgentCheckRegistration
groupsLock sync.RWMutex
// The "Consul Registry" is a collection of Consul Services and
// Checks all guarded by the registryLock.
registryLock sync.RWMutex
// checkRunners are delegated Consul checks being ran by the Syncer
checkRunners map[consulCheckID]*CheckRunner
addrFinder func(portLabel string) (string, int)
createDelegatedCheck func(*structs.ServiceCheck, string) (Check, error)
delegateChecks map[string]struct{} // delegateChecks are the checks that the Nomad client runs and reports to Consul
// End registryLock guarded attributes.
logger *log.Logger
shutdownCh chan struct{}
shutdown bool
shutdownLock sync.Mutex
// notifyShutdownCh is used to notify a Syncer it needs to shutdown.
// This can happen because there was an explicit call to the Syncer's
// Shutdown() method, or because the calling task signaled the
// program is going to exit by closing its shutdownCh.
notifyShutdownCh chan struct{}
// periodicCallbacks is walked sequentially when the timer in Run
// fires.
periodicCallbacks map[string]types.PeriodicCallback
notifySyncCh chan struct{}
periodicLock sync.RWMutex
}
// NewSyncer returns a new consul.Syncer
func NewSyncer(consulConfig *config.ConsulConfig, shutdownCh chan struct{}, logger *log.Logger) (*Syncer, error) {
var consulClientConfig *consul.Config
var err error
consulClientConfig, err = consulConfig.ApiConfig()
if err != nil {
return nil, err
}
var consulClient *consul.Client
if consulClient, err = consul.NewClient(consulClientConfig); err != nil {
return nil, err
}
consulSyncer := Syncer{
client: consulClient,
logger: logger,
consulAvailable: true,
shutdownCh: shutdownCh,
servicesGroups: make(map[ServiceDomain]map[ServiceKey]*consul.AgentServiceRegistration),
checkGroups: make(map[ServiceDomain]map[ServiceKey][]*consul.AgentCheckRegistration),
checkRunners: make(map[consulCheckID]*CheckRunner),
periodicCallbacks: make(map[string]types.PeriodicCallback),
// default noop implementation of addrFinder
addrFinder: func(string) (string, int) { return "", 0 },
}
return &consulSyncer, nil
}
// SetDelegatedChecks sets the checks that nomad is going to run and report the
// result back to consul
func (c *Syncer) SetDelegatedChecks(delegateChecks map[string]struct{}, createDelegatedCheckFn func(*structs.ServiceCheck, string) (Check, error)) *Syncer {
c.delegateChecks = delegateChecks
c.createDelegatedCheck = createDelegatedCheckFn
return c
}
// SetAddrFinder sets a function to find the host and port for a Service given its port label
func (c *Syncer) SetAddrFinder(addrFinder func(string) (string, int)) *Syncer {
c.addrFinder = addrFinder
return c
}
// GenerateServiceKey should be called to generate a serviceKey based on the
// Service.
func GenerateServiceKey(service *structs.Service) ServiceKey {
var key string
numTags := len(service.Tags)
switch numTags {
case 0:
key = fmt.Sprintf("%s", service.Name)
default:
tags := strings.Join(service.Tags, "-")
key = fmt.Sprintf("%s-%s", service.Name, tags)
}
return ServiceKey(key)
}
// SetServices stores the map of Nomad Services to the provided service
// domain name.
func (c *Syncer) SetServices(domain ServiceDomain, services map[ServiceKey]*structs.Service) error {
var mErr multierror.Error
numServ := len(services)
registeredServices := make(map[ServiceKey]*consul.AgentServiceRegistration, numServ)
registeredChecks := make(map[ServiceKey][]*consul.AgentCheckRegistration, numServ)
for serviceKey, service := range services {
serviceReg, err := c.createService(service, domain, serviceKey)
if err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
registeredServices[serviceKey] = serviceReg
// Register the check(s) for this service
for _, chk := range service.Checks {
// Create a Consul check registration
chkReg, err := c.createCheckReg(chk, serviceReg)
if err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
// creating a nomad check if we have to handle this particular check type
c.registryLock.RLock()
if _, ok := c.delegateChecks[chk.Type]; ok {
_, ok := c.checkRunners[consulCheckID(chkReg.ID)]
c.registryLock.RUnlock()
if ok {
continue
}
nc, err := c.createDelegatedCheck(chk, chkReg.ID)
if err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
cr := NewCheckRunner(nc, c.runCheck, c.logger)
c.registryLock.Lock()
// TODO type the CheckRunner
c.checkRunners[consulCheckID(nc.ID())] = cr
c.registryLock.Unlock()
} else {
c.registryLock.RUnlock()
}
registeredChecks[serviceKey] = append(registeredChecks[serviceKey], chkReg)
}
}
if len(mErr.Errors) > 0 {
return mErr.ErrorOrNil()
}
// Update the services and checks groups for this domain
c.groupsLock.Lock()
// Create map for service group if it doesn't exist
serviceKeys, ok := c.servicesGroups[domain]
if !ok {
serviceKeys = make(map[ServiceKey]*consul.AgentServiceRegistration, len(registeredServices))
c.servicesGroups[domain] = serviceKeys
}
// Remove stale services
for existingServiceKey := range serviceKeys {
if _, ok := registeredServices[existingServiceKey]; !ok {
// Exisitng service needs to be removed
delete(serviceKeys, existingServiceKey)
}
}
// Add registered services
for serviceKey, service := range registeredServices {
serviceKeys[serviceKey] = service
}
// Create map for check group if it doesn't exist
checkKeys, ok := c.checkGroups[domain]
if !ok {
checkKeys = make(map[ServiceKey][]*consul.AgentCheckRegistration, len(registeredChecks))
c.checkGroups[domain] = checkKeys
}
// Remove stale checks
for existingCheckKey := range checkKeys {
if _, ok := registeredChecks[existingCheckKey]; !ok {
// Exisitng check needs to be removed
delete(checkKeys, existingCheckKey)
}
}
// Add registered checks
for checkKey, checks := range registeredChecks {
checkKeys[checkKey] = checks
}
c.groupsLock.Unlock()
// Sync immediately
c.SyncNow()
return nil
}
// SyncNow expires the current timer forcing the list of periodic callbacks
// to be synced immediately.
func (c *Syncer) SyncNow() {
select {
case c.notifySyncCh <- struct{}{}:
default:
}
}
// flattenedServices returns a flattened list of services that are registered
// locally
func (c *Syncer) flattenedServices() []*consul.AgentServiceRegistration {
const initialNumServices = 8
services := make([]*consul.AgentServiceRegistration, 0, initialNumServices)
c.groupsLock.RLock()
defer c.groupsLock.RUnlock()
for _, servicesGroup := range c.servicesGroups {
for _, service := range servicesGroup {
services = append(services, service)
}
}
return services
}
// flattenedChecks returns a flattened list of checks that are registered
// locally
func (c *Syncer) flattenedChecks() []*consul.AgentCheckRegistration {
const initialNumChecks = 8
checks := make([]*consul.AgentCheckRegistration, 0, initialNumChecks)
c.groupsLock.RLock()
for _, checkGroup := range c.checkGroups {
for _, check := range checkGroup {
checks = append(checks, check...)
}
}
c.groupsLock.RUnlock()
return checks
}
func (c *Syncer) signalShutdown() {
select {
case c.notifyShutdownCh <- struct{}{}:
default:
}
}
// Shutdown de-registers the services and checks and shuts down periodic syncing
func (c *Syncer) Shutdown() error {
var mErr multierror.Error
c.shutdownLock.Lock()
if !c.shutdown {
c.shutdown = true
}
c.shutdownLock.Unlock()
c.signalShutdown()
// Stop all the checks that nomad is running
c.registryLock.RLock()
defer c.registryLock.RUnlock()
for _, cr := range c.checkRunners {
cr.Stop()
}
// De-register all the services from Consul
services, err := c.queryAgentServices()
if err != nil {
c.logger.Printf("[WARN] consul.syncer: failed to fetch services for deregistering due to error: %v", err)
mErr.Errors = append(mErr.Errors, err)
}
for serviceID := range services {
convertedID := string(serviceID)
if err := c.client.Agent().ServiceDeregister(convertedID); err != nil {
c.logger.Printf("[WARN] consul.syncer: failed to deregister service ID %+q: %v", convertedID, err)
mErr.Errors = append(mErr.Errors, err)
}
}
return mErr.ErrorOrNil()
}
// queryChecks queries the Consul Agent for a list of Consul checks that
// have been registered with this Consul Syncer.
func (c *Syncer) queryChecks() (map[consulCheckID]*consul.AgentCheck, error) {
checks, err := c.client.Agent().Checks()
if err != nil {
return nil, err
}
return c.filterConsulChecks(checks), nil
}
// queryAgentServices queries the Consul Agent for a list of Consul services that
// have been registered with this Consul Syncer.
func (c *Syncer) queryAgentServices() (map[consulServiceID]*consul.AgentService, error) {
services, err := c.client.Agent().Services()
if err != nil {
return nil, err
}
return c.filterConsulServices(services), nil
}
// syncChecks synchronizes this Syncer's Consul Checks with the Consul Agent.
func (c *Syncer) syncChecks() error {
var mErr multierror.Error
consulChecks, err := c.queryChecks()
if err != nil {
return err
}
// Synchronize checks with Consul
missingChecks, existingChecks, changedChecks, staleChecks := c.calcChecksDiff(consulChecks)
for _, check := range missingChecks {
if err := c.registerCheck(check); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
for _, check := range existingChecks {
c.ensureCheckRunning(check)
}
for _, check := range changedChecks {
// NOTE(sean@): Do we need to deregister the check before
// re-registering it? Not deregistering to avoid missing the
// TTL but doesn't correct reconcile any possible drift with
// the check.
//
// if err := c.deregisterCheck(check.ID); err != nil {
// mErr.Errors = append(mErr.Errors, err)
// }
if err := c.registerCheck(check); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
for _, check := range staleChecks {
if err := c.deregisterCheck(consulCheckID(check.ID)); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
return mErr.ErrorOrNil()
}
// compareConsulCheck takes a consul.AgentCheckRegistration instance and
// compares it with a consul.AgentCheck. Returns true if they are equal
// according to consul.AgentCheck, otherwise false.
func compareConsulCheck(localCheck *consul.AgentCheckRegistration, consulCheck *consul.AgentCheck) bool {
if consulCheck.CheckID != localCheck.ID ||
consulCheck.Name != localCheck.Name ||
consulCheck.Notes != localCheck.Notes ||
consulCheck.ServiceID != localCheck.ServiceID {
return false
}
return true
}
// calcChecksDiff takes the argument (consulChecks) and calculates the delta
// between the consul.Syncer's list of known checks (c.flattenedChecks()).
// Four arrays are returned:
//
// 1) a slice of checks that exist only locally in the Syncer and are missing
// from the Consul Agent (consulChecks) and therefore need to be registered.
//
// 2) a slice of checks that exist in both the local consul.Syncer's
// tracked list and Consul Agent (consulChecks).
//
// 3) a slice of checks that exist in both the local consul.Syncer's
// tracked list and Consul Agent (consulServices) but have diverged state.
//
// 4) a slice of checks that exist only in the Consul Agent (consulChecks)
// and should be removed because the Consul Agent has drifted from the
// Syncer.
func (c *Syncer) calcChecksDiff(consulChecks map[consulCheckID]*consul.AgentCheck) (
missingChecks []*consul.AgentCheckRegistration,
equalChecks []*consul.AgentCheckRegistration,
changedChecks []*consul.AgentCheckRegistration,
staleChecks []*consul.AgentCheckRegistration) {
type mergedCheck struct {
check *consul.AgentCheckRegistration
// 'l' == Nomad local only
// 'e' == equal
// 'c' == changed
// 'a' == Consul agent only
state byte
}
var (
localChecksCount = 0
equalChecksCount = 0
changedChecksCount = 0
agentChecks = 0
)
flattenedChecks := c.flattenedChecks()
localChecks := make(map[string]*mergedCheck, len(flattenedChecks)+len(consulChecks))
for _, localCheck := range flattenedChecks {
localChecksCount++
localChecks[localCheck.ID] = &mergedCheck{localCheck, 'l'}
}
for _, consulCheck := range consulChecks {
if localCheck, found := localChecks[consulCheck.CheckID]; found {
localChecksCount--
if compareConsulCheck(localCheck.check, consulCheck) {
equalChecksCount++
localChecks[consulCheck.CheckID].state = 'e'
} else {
changedChecksCount++
localChecks[consulCheck.CheckID].state = 'c'
}
} else {
agentChecks++
agentCheckReg := &consul.AgentCheckRegistration{
ID: consulCheck.CheckID,
Name: consulCheck.Name,
Notes: consulCheck.Notes,
ServiceID: consulCheck.ServiceID,
}
localChecks[consulCheck.CheckID] = &mergedCheck{agentCheckReg, 'a'}
}
}
missingChecks = make([]*consul.AgentCheckRegistration, 0, localChecksCount)
equalChecks = make([]*consul.AgentCheckRegistration, 0, equalChecksCount)
changedChecks = make([]*consul.AgentCheckRegistration, 0, changedChecksCount)
staleChecks = make([]*consul.AgentCheckRegistration, 0, agentChecks)
for _, check := range localChecks {
switch check.state {
case 'l':
missingChecks = append(missingChecks, check.check)
case 'e':
equalChecks = append(equalChecks, check.check)
case 'c':
changedChecks = append(changedChecks, check.check)
case 'a':
staleChecks = append(staleChecks, check.check)
}
}
return missingChecks, equalChecks, changedChecks, staleChecks
}
// compareConsulService takes a consul.AgentServiceRegistration instance and
// compares it with a consul.AgentService. Returns true if they are equal
// according to consul.AgentService, otherwise false.
func compareConsulService(localService *consul.AgentServiceRegistration, consulService *consul.AgentService) bool {
if consulService.ID != localService.ID ||
consulService.Service != localService.Name ||
consulService.Port != localService.Port ||
consulService.Address != localService.Address ||
consulService.EnableTagOverride != localService.EnableTagOverride {
return false
}
serviceTags := make(map[string]byte, len(localService.Tags))
for _, tag := range localService.Tags {
serviceTags[tag] = 'l'
}
for _, tag := range consulService.Tags {
if _, found := serviceTags[tag]; !found {
return false
}
serviceTags[tag] = 'b'
}
for _, state := range serviceTags {
if state == 'l' {
return false
}
}
return true
}
// calcServicesDiff takes the argument (consulServices) and calculates the
// delta between the consul.Syncer's list of known services
// (c.flattenedServices()). Four arrays are returned:
//
// 1) a slice of services that exist only locally in the Syncer and are
// missing from the Consul Agent (consulServices) and therefore need to be
// registered.
//
// 2) a slice of services that exist in both the local consul.Syncer's
// tracked list and Consul Agent (consulServices) *AND* are identical.
//
// 3) a slice of services that exist in both the local consul.Syncer's
// tracked list and Consul Agent (consulServices) but have diverged state.
//
// 4) a slice of services that exist only in the Consul Agent
// (consulServices) and should be removed because the Consul Agent has
// drifted from the Syncer.
func (c *Syncer) calcServicesDiff(consulServices map[consulServiceID]*consul.AgentService) (missingServices []*consul.AgentServiceRegistration, equalServices []*consul.AgentServiceRegistration, changedServices []*consul.AgentServiceRegistration, staleServices []*consul.AgentServiceRegistration) {
type mergedService struct {
service *consul.AgentServiceRegistration
// 'l' == Nomad local only
// 'e' == equal
// 'c' == changed
// 'a' == Consul agent only
state byte
}
var (
localServicesCount = 0
equalServicesCount = 0
changedServicesCount = 0
agentServices = 0
)
flattenedServices := c.flattenedServices()
localServices := make(map[string]*mergedService, len(flattenedServices)+len(consulServices))
for _, localService := range flattenedServices {
localServicesCount++
localServices[localService.ID] = &mergedService{localService, 'l'}
}
for _, consulService := range consulServices {
if localService, found := localServices[consulService.ID]; found {
localServicesCount--
if compareConsulService(localService.service, consulService) {
equalServicesCount++
localServices[consulService.ID].state = 'e'
} else {
changedServicesCount++
localServices[consulService.ID].state = 'c'
}
} else {
agentServices++
agentServiceReg := &consul.AgentServiceRegistration{
ID: consulService.ID,
Name: consulService.Service,
Tags: consulService.Tags,
Port: consulService.Port,
Address: consulService.Address,
}
localServices[consulService.ID] = &mergedService{agentServiceReg, 'a'}
}
}
missingServices = make([]*consul.AgentServiceRegistration, 0, localServicesCount)
equalServices = make([]*consul.AgentServiceRegistration, 0, equalServicesCount)
changedServices = make([]*consul.AgentServiceRegistration, 0, changedServicesCount)
staleServices = make([]*consul.AgentServiceRegistration, 0, agentServices)
for _, service := range localServices {
switch service.state {
case 'l':
missingServices = append(missingServices, service.service)
case 'e':
equalServices = append(equalServices, service.service)
case 'c':
changedServices = append(changedServices, service.service)
case 'a':
staleServices = append(staleServices, service.service)
}
}
return missingServices, equalServices, changedServices, staleServices
}
// syncServices synchronizes this Syncer's Consul Services with the Consul
// Agent.
func (c *Syncer) syncServices() error {
consulServices, err := c.queryAgentServices()
if err != nil {
return err
}
// Synchronize services with Consul
var mErr multierror.Error
missingServices, _, changedServices, removedServices := c.calcServicesDiff(consulServices)
for _, service := range missingServices {
if err := c.client.Agent().ServiceRegister(service); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
for _, service := range changedServices {
// Re-register the local service
if err := c.client.Agent().ServiceRegister(service); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
for _, service := range removedServices {
if err := c.deregisterService(service.ID); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
return mErr.ErrorOrNil()
}
// registerCheck registers a check definition with Consul
func (c *Syncer) registerCheck(chkReg *consul.AgentCheckRegistration) error {
c.registryLock.RLock()
if cr, ok := c.checkRunners[consulCheckID(chkReg.ID)]; ok {
cr.Start()
}
c.registryLock.RUnlock()
return c.client.Agent().CheckRegister(chkReg)
}
// ensureCheckRunning starts the check runner for a check if it's not already running
func (c *Syncer) ensureCheckRunning(chk *consul.AgentCheckRegistration) {
c.registryLock.RLock()
defer c.registryLock.RUnlock()
if cr, ok := c.checkRunners[consulCheckID(chk.ID)]; ok && !cr.Started() {
c.logger.Printf("[DEBUG] consul.syncer: starting runner for existing check. %v", chk.ID)
cr.Start()
}
}
// createCheckReg creates a Check that can be registered with Nomad. It also
// creates a Nomad check for the check types that it can handle.
func (c *Syncer) createCheckReg(check *structs.ServiceCheck, serviceReg *consul.AgentServiceRegistration) (*consul.AgentCheckRegistration, error) {
chkReg := consul.AgentCheckRegistration{
ID: check.Hash(serviceReg.ID),
Name: check.Name,
ServiceID: serviceReg.ID,
}
chkReg.Timeout = check.Timeout.String()
chkReg.Interval = check.Interval.String()
host, port := serviceReg.Address, serviceReg.Port
if check.PortLabel != "" {
host, port = c.addrFinder(check.PortLabel)
}
switch check.Type {
case structs.ServiceCheckHTTP:
if check.Protocol == "" {
check.Protocol = "http"
}
base := url.URL{
Scheme: check.Protocol,
Host: net.JoinHostPort(host, strconv.Itoa(port)),
}
relative, err := url.Parse(check.Path)
if err != nil {
return nil, err
}
url := base.ResolveReference(relative)
chkReg.HTTP = url.String()
case structs.ServiceCheckTCP:
chkReg.TCP = net.JoinHostPort(host, strconv.Itoa(port))
case structs.ServiceCheckScript:
chkReg.TTL = (check.Interval + ttlCheckBuffer).String()
default:
return nil, fmt.Errorf("check type %+q not valid", check.Type)
}
chkReg.Status = check.InitialStatus
return &chkReg, nil
}
// generateConsulServiceID takes the domain and service key and returns a Consul
// ServiceID
func generateConsulServiceID(domain ServiceDomain, key ServiceKey) consulServiceID {
return consulServiceID(fmt.Sprintf("%s-%s-%s", nomadServicePrefix, domain, key))
}
// createService creates a Consul AgentService from a Nomad ConsulService.
func (c *Syncer) createService(service *structs.Service, domain ServiceDomain, key ServiceKey) (*consul.AgentServiceRegistration, error) {
c.registryLock.RLock()
defer c.registryLock.RUnlock()
srv := consul.AgentServiceRegistration{
ID: string(generateConsulServiceID(domain, key)),
Name: service.Name,
Tags: service.Tags,
}
host, port := c.addrFinder(service.PortLabel)
if host != "" {
srv.Address = host
}
if port != 0 {
srv.Port = port
}
return &srv, nil
}
// deregisterService de-registers a service with the given ID from consul
func (c *Syncer) deregisterService(serviceID string) error {
return c.client.Agent().ServiceDeregister(serviceID)
}
// deregisterCheck de-registers a check from Consul
func (c *Syncer) deregisterCheck(id consulCheckID) error {
c.registryLock.Lock()
defer c.registryLock.Unlock()
// Deleting from Consul Agent
if err := c.client.Agent().CheckDeregister(string(id)); err != nil {
// CheckDeregister() will be reattempted again in a future
// sync.
return err
}
// Remove the check from the local registry
if cr, ok := c.checkRunners[id]; ok {
cr.Stop()
delete(c.checkRunners, id)
}
return nil
}
// Run triggers periodic syncing of services and checks with Consul. This is
// a long lived go-routine which is stopped during shutdown.
func (c *Syncer) Run() {
sync := time.NewTimer(0)
for {
select {
case <-sync.C:
d := syncInterval - lib.RandomStagger(syncInterval/syncJitter)
sync.Reset(d)
if err := c.SyncServices(); err != nil {
if c.consulAvailable {
c.logger.Printf("[DEBUG] consul.syncer: error in syncing: %v", err)
}
c.consulAvailable = false
} else {
if !c.consulAvailable {
c.logger.Printf("[DEBUG] consul.syncer: syncs succesful")
}
c.consulAvailable = true
}
case <-c.notifySyncCh:
sync.Reset(syncInterval)
case <-c.shutdownCh:
c.Shutdown()
case <-c.notifyShutdownCh:
sync.Stop()
c.logger.Printf("[INFO] consul.syncer: shutting down syncer ")
return
}
}
}
// RunHandlers executes each handler (randomly)
func (c *Syncer) RunHandlers() error {
c.periodicLock.RLock()
handlers := make(map[string]types.PeriodicCallback, len(c.periodicCallbacks))
for name, fn := range c.periodicCallbacks {
handlers[name] = fn
}
c.periodicLock.RUnlock()
var mErr multierror.Error
for _, fn := range handlers {
if err := fn(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
return mErr.ErrorOrNil()
}
// SyncServices sync the services with the Consul Agent
func (c *Syncer) SyncServices() error {
var mErr multierror.Error
if err := c.syncServices(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
if err := c.syncChecks(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
if err := c.RunHandlers(); err != nil {
return err
}
return mErr.ErrorOrNil()
}
// filterConsulServices prunes out all the service who were not registered with
// the syncer
func (c *Syncer) filterConsulServices(consulServices map[string]*consul.AgentService) map[consulServiceID]*consul.AgentService {
localServices := make(map[consulServiceID]*consul.AgentService, len(consulServices))
c.registryLock.RLock()
defer c.registryLock.RUnlock()
for serviceID, service := range consulServices {
for domain := range c.servicesGroups {
if strings.HasPrefix(service.ID, fmt.Sprintf("%s-%s", nomadServicePrefix, domain)) {
localServices[consulServiceID(serviceID)] = service
break
}
}
}
return localServices
}
// filterConsulChecks prunes out all the consul checks which do not have
// services with Syncer's idPrefix.
func (c *Syncer) filterConsulChecks(consulChecks map[string]*consul.AgentCheck) map[consulCheckID]*consul.AgentCheck {
localChecks := make(map[consulCheckID]*consul.AgentCheck, len(consulChecks))
c.registryLock.RLock()
defer c.registryLock.RUnlock()
for checkID, check := range consulChecks {
for domain := range c.checkGroups {
if strings.HasPrefix(check.ServiceID, fmt.Sprintf("%s-%s", nomadServicePrefix, domain)) {
localChecks[consulCheckID(checkID)] = check
break
}
}
}
return localChecks
}
// consulPresent indicates whether the Consul Agent is responding
func (c *Syncer) consulPresent() bool {
_, err := c.client.Agent().Self()
return err == nil
}
// runCheck runs a check and updates the corresponding ttl check in consul
func (c *Syncer) runCheck(check Check) {
res := check.Run()
if res.Duration >= check.Timeout() {
c.logger.Printf("[DEBUG] consul.syncer: check took time: %v, timeout: %v", res.Duration, check.Timeout())
}
state := consul.HealthCritical
output := res.Output
switch res.ExitCode {
case 0:
state = consul.HealthPassing
case 1:
state = consul.HealthWarning
default:
state = consul.HealthCritical
}
if res.Err != nil {
state = consul.HealthCritical
output = res.Err.Error()
}
if err := c.client.Agent().UpdateTTL(check.ID(), output, state); err != nil {
if c.consulAvailable {
c.logger.Printf("[DEBUG] consul.syncer: check %+q failed, disabling Consul checks until until next successful sync: %v", check.ID(), err)
c.consulAvailable = false
} else {
c.consulAvailable = true
}
}
}
// ReapUnmatched prunes all services that do not exist in the passed domains
func (c *Syncer) ReapUnmatched(domains []ServiceDomain) error {
servicesInConsul, err := c.ConsulClient().Agent().Services()
if err != nil {
return err
}
var mErr multierror.Error
for serviceID := range servicesInConsul {
// Skip any service that was not registered by Nomad
if !strings.HasPrefix(serviceID, nomadServicePrefix) {
continue
}
// Filter services that do not exist in the desired domains
match := false
for _, domain := range domains {
// Include the hyphen so it is explicit to that domain otherwise it
// maybe a subset match
desired := fmt.Sprintf("%s-%s-", nomadServicePrefix, domain)
if strings.HasPrefix(serviceID, desired) {
match = true
break
}
}
if !match {
if err := c.deregisterService(serviceID); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
}
return mErr.ErrorOrNil()
}
// AddPeriodicHandler adds a uniquely named callback. Returns true if
// successful, false if a handler with the same name already exists.
func (c *Syncer) AddPeriodicHandler(name string, fn types.PeriodicCallback) bool {
c.periodicLock.Lock()
defer c.periodicLock.Unlock()
if _, found := c.periodicCallbacks[name]; found {
c.logger.Printf("[ERROR] consul.syncer: failed adding handler %+q", name)
return false
}
c.periodicCallbacks[name] = fn
return true
}
// NumHandlers returns the number of callbacks registered with the syncer
func (c *Syncer) NumHandlers() int {
c.periodicLock.RLock()
defer c.periodicLock.RUnlock()
return len(c.periodicCallbacks)
}
// RemovePeriodicHandler removes a handler with a given name.
func (c *Syncer) RemovePeriodicHandler(name string) {
c.periodicLock.Lock()
defer c.periodicLock.Unlock()
delete(c.periodicCallbacks, name)
}
// ConsulClient returns the Consul client used by the Syncer.
func (c *Syncer) ConsulClient() *consul.Client {
return c.client
}