2016-03-23 07:50:41 +00:00
|
|
|
package consul
|
|
|
|
|
|
|
|
import (
|
|
|
|
"crypto/tls"
|
|
|
|
"fmt"
|
|
|
|
"log"
|
|
|
|
"net/http"
|
2016-03-23 17:43:35 +00:00
|
|
|
"net/url"
|
2016-03-23 07:50:41 +00:00
|
|
|
"strings"
|
2016-03-24 01:03:02 +00:00
|
|
|
"sync"
|
2016-03-23 07:50:41 +00:00
|
|
|
"time"
|
|
|
|
|
|
|
|
consul "github.com/hashicorp/consul/api"
|
2016-05-24 06:23:57 +00:00
|
|
|
"github.com/hashicorp/consul/lib"
|
2016-03-23 17:43:35 +00:00
|
|
|
"github.com/hashicorp/go-multierror"
|
2016-03-23 07:50:41 +00:00
|
|
|
|
2016-06-10 15:29:57 +00:00
|
|
|
cconfig "github.com/hashicorp/nomad/client/config"
|
2016-03-23 07:50:41 +00:00
|
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
2016-05-24 06:23:57 +00:00
|
|
|
"github.com/hashicorp/nomad/nomad/structs/config"
|
|
|
|
"github.com/hashicorp/nomad/nomad/types"
|
2016-03-23 07:50:41 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
2016-05-24 06:23:57 +00:00
|
|
|
// initialSyncBuffer is the max time an initial sync will sleep
|
|
|
|
// before syncing.
|
|
|
|
initialSyncBuffer = 30 * time.Second
|
|
|
|
|
|
|
|
// initialSyncDelay is the delay before an initial sync.
|
|
|
|
initialSyncDelay = 5 * time.Second
|
|
|
|
|
2016-06-10 01:45:51 +00:00
|
|
|
// nomadServicePrefix is the prefix used when registering a service
|
|
|
|
// with consul
|
|
|
|
nomadServicePrefix = "nomad"
|
|
|
|
|
2016-03-23 21:34:43 +00:00
|
|
|
// The periodic time interval for syncing services and checks with Consul
|
2016-03-23 07:50:41 +00:00
|
|
|
syncInterval = 5 * time.Second
|
2016-03-25 02:30:02 +00:00
|
|
|
|
2016-05-24 06:23:57 +00:00
|
|
|
// syncJitter provides a little variance in the frequency at which
|
|
|
|
// Syncer polls Consul.
|
|
|
|
syncJitter = 8
|
|
|
|
|
2016-03-25 02:30:02 +00:00
|
|
|
// ttlCheckBuffer is the time interval that Nomad can take to report Consul
|
|
|
|
// the check result
|
|
|
|
ttlCheckBuffer = 31 * time.Second
|
2016-05-27 08:35:10 +00:00
|
|
|
|
|
|
|
// ServiceTagHttp is the tag assigned to HTTP services
|
|
|
|
ServiceTagHttp = "http"
|
|
|
|
|
|
|
|
// ServiceTagRpc is the tag assigned to RPC services
|
|
|
|
ServiceTagRpc = "rpc"
|
2016-05-27 22:58:28 +00:00
|
|
|
|
|
|
|
// ServiceTagSerf is the tag assigned to Serf services
|
|
|
|
ServiceTagSerf = "serf"
|
2016-03-23 07:50:41 +00:00
|
|
|
)
|
|
|
|
|
2016-06-02 07:56:55 +00:00
|
|
|
// Syncer allows syncing of services and checks with Consul
|
|
|
|
type Syncer struct {
|
2016-06-10 01:32:04 +00:00
|
|
|
client *consul.Client
|
|
|
|
consulAvailable bool
|
2016-06-02 07:56:55 +00:00
|
|
|
|
2016-06-10 01:21:22 +00:00
|
|
|
// servicesGroups is a named group of services that will be flattened
|
|
|
|
// and reconciled with Consul when SyncServices() is called. The key
|
|
|
|
// to the servicesGroups map is unique per handler and is used to
|
|
|
|
// allow the Agent's services to be maintained independently of the
|
|
|
|
// Client or Server's services.
|
|
|
|
servicesGroups map[string][]*consul.AgentServiceRegistration
|
|
|
|
servicesGroupsLock sync.RWMutex
|
|
|
|
|
2016-06-07 16:37:39 +00:00
|
|
|
// The "Consul Registry" is a collection of Consul Services and
|
|
|
|
// Checks all guarded by the registryLock.
|
|
|
|
registryLock sync.RWMutex
|
2016-06-02 07:56:55 +00:00
|
|
|
|
|
|
|
checkRunners map[string]*CheckRunner
|
2016-06-07 14:01:13 +00:00
|
|
|
delegateChecks map[string]struct{} // delegateChecks are the checks that the Nomad client runs and reports to Consul
|
2016-06-10 02:58:33 +00:00
|
|
|
trackedChecks map[string]*consul.AgentCheckRegistration
|
2016-06-10 02:55:01 +00:00
|
|
|
trackedServices map[string]*consul.AgentServiceRegistration
|
2016-06-07 16:37:39 +00:00
|
|
|
|
|
|
|
// serviceRegPrefix is used to namespace the domain of registered
|
|
|
|
// Consul Services and Checks belonging to a single Syncer. A given
|
|
|
|
// Nomad Agent may spawn multiple Syncer tasks between the Agent
|
|
|
|
// Agent and its Executors, all syncing to a single Consul Agent.
|
|
|
|
// The serviceRegPrefix allows multiple Syncers to coexist without
|
|
|
|
// each Syncer clobbering each others Services. The Syncer namespace
|
|
|
|
// protocol is fmt.Sprintf("nomad-%s-%s", serviceRegPrefix, miscID).
|
|
|
|
// serviceRegPrefix is guarded by the registryLock.
|
|
|
|
serviceRegPrefix string
|
|
|
|
|
|
|
|
addrFinder func(portLabel string) (string, int)
|
2016-06-07 14:01:13 +00:00
|
|
|
createDelegatedCheck func(*structs.ServiceCheck, string) (Check, error)
|
2016-06-07 16:37:39 +00:00
|
|
|
// End registryLock guarded attributes.
|
2016-06-02 07:56:55 +00:00
|
|
|
|
|
|
|
logger *log.Logger
|
|
|
|
|
|
|
|
shutdownCh chan struct{}
|
|
|
|
shutdown bool
|
|
|
|
shutdownLock sync.Mutex
|
|
|
|
|
2016-06-02 16:15:30 +00:00
|
|
|
// notifyShutdownCh is used to notify a Syncer it needs to shutdown.
|
|
|
|
// This can happen because there was an explicit call to the Syncer's
|
|
|
|
// Shutdown() method, or because the calling task signaled the
|
|
|
|
// program is going to exit by closing its shutdownCh.
|
|
|
|
notifyShutdownCh chan struct{}
|
|
|
|
|
2016-06-02 07:56:55 +00:00
|
|
|
// periodicCallbacks is walked sequentially when the timer in Run
|
|
|
|
// fires.
|
|
|
|
periodicCallbacks map[string]types.PeriodicCallback
|
|
|
|
notifySyncCh chan struct{}
|
|
|
|
periodicLock sync.RWMutex
|
|
|
|
}
|
|
|
|
|
2016-05-23 14:29:08 +00:00
|
|
|
// NewSyncer returns a new consul.Syncer
|
2016-06-10 15:29:57 +00:00
|
|
|
func NewSyncer(consulConfig *config.ConsulConfig, shutdownCh chan struct{}, logger *log.Logger) (*Syncer, error) {
|
2016-03-23 07:50:41 +00:00
|
|
|
var err error
|
|
|
|
var c *consul.Client
|
2016-06-10 15:19:02 +00:00
|
|
|
|
2016-03-23 07:50:41 +00:00
|
|
|
cfg := consul.DefaultConfig()
|
2016-06-10 15:19:02 +00:00
|
|
|
|
|
|
|
// If a nil config was provided, fall back to the default config
|
2016-06-10 15:29:57 +00:00
|
|
|
if consulConfig == nil {
|
|
|
|
consulConfig = cconfig.DefaultConfig().ConsulConfig
|
2016-06-10 15:19:02 +00:00
|
|
|
}
|
|
|
|
|
2016-06-10 15:29:57 +00:00
|
|
|
if consulConfig.Addr != "" {
|
|
|
|
cfg.Address = consulConfig.Addr
|
2016-03-23 07:50:41 +00:00
|
|
|
}
|
2016-06-10 15:29:57 +00:00
|
|
|
if consulConfig.Token != "" {
|
|
|
|
cfg.Token = consulConfig.Token
|
2016-03-23 07:50:41 +00:00
|
|
|
}
|
2016-06-10 15:29:57 +00:00
|
|
|
if consulConfig.Auth != "" {
|
2016-03-23 07:50:41 +00:00
|
|
|
var username, password string
|
2016-06-10 15:29:57 +00:00
|
|
|
if strings.Contains(consulConfig.Auth, ":") {
|
|
|
|
split := strings.SplitN(consulConfig.Auth, ":", 2)
|
2016-03-23 07:50:41 +00:00
|
|
|
username = split[0]
|
|
|
|
password = split[1]
|
|
|
|
} else {
|
2016-06-10 15:29:57 +00:00
|
|
|
username = consulConfig.Auth
|
2016-03-23 07:50:41 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
cfg.HttpAuth = &consul.HttpBasicAuth{
|
|
|
|
Username: username,
|
|
|
|
Password: password,
|
|
|
|
}
|
|
|
|
}
|
2016-06-10 15:29:57 +00:00
|
|
|
if consulConfig.EnableSSL {
|
2016-03-23 07:50:41 +00:00
|
|
|
cfg.Scheme = "https"
|
2016-03-28 06:09:31 +00:00
|
|
|
tlsCfg := consul.TLSConfig{
|
|
|
|
Address: cfg.Address,
|
2016-06-10 15:29:57 +00:00
|
|
|
CAFile: consulConfig.CAFile,
|
|
|
|
CertFile: consulConfig.CertFile,
|
|
|
|
KeyFile: consulConfig.KeyFile,
|
|
|
|
InsecureSkipVerify: !consulConfig.VerifySSL,
|
2016-03-28 06:09:31 +00:00
|
|
|
}
|
|
|
|
tlsClientCfg, err := consul.SetupTLSConfig(&tlsCfg)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("error creating tls client config for consul: %v", err)
|
|
|
|
}
|
|
|
|
cfg.HttpClient.Transport = &http.Transport{
|
|
|
|
TLSClientConfig: tlsClientCfg,
|
|
|
|
}
|
2016-03-23 07:50:41 +00:00
|
|
|
}
|
2016-06-10 15:29:57 +00:00
|
|
|
if consulConfig.EnableSSL && !consulConfig.VerifySSL {
|
2016-03-23 07:50:41 +00:00
|
|
|
cfg.HttpClient.Transport = &http.Transport{
|
|
|
|
TLSClientConfig: &tls.Config{
|
|
|
|
InsecureSkipVerify: true,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if c, err = consul.NewClient(cfg); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2016-05-23 18:09:31 +00:00
|
|
|
consulSyncer := Syncer{
|
2016-05-24 06:23:57 +00:00
|
|
|
client: c,
|
|
|
|
logger: logger,
|
2016-06-10 01:32:04 +00:00
|
|
|
consulAvailable: true,
|
2016-06-02 16:15:30 +00:00
|
|
|
shutdownCh: shutdownCh,
|
2016-06-10 01:21:22 +00:00
|
|
|
servicesGroups: make(map[string][]*consul.AgentServiceRegistration),
|
2016-06-10 02:55:01 +00:00
|
|
|
trackedServices: make(map[string]*consul.AgentServiceRegistration),
|
2016-05-24 06:23:57 +00:00
|
|
|
trackedChecks: make(map[string]*consul.AgentCheckRegistration),
|
|
|
|
checkRunners: make(map[string]*CheckRunner),
|
|
|
|
periodicCallbacks: make(map[string]types.PeriodicCallback),
|
2016-03-23 07:50:41 +00:00
|
|
|
}
|
2016-05-23 18:09:31 +00:00
|
|
|
return &consulSyncer, nil
|
2016-03-23 07:50:41 +00:00
|
|
|
}
|
|
|
|
|
2016-03-25 02:30:02 +00:00
|
|
|
// SetDelegatedChecks sets the checks that nomad is going to run and report the
|
|
|
|
// result back to consul
|
2016-06-07 14:01:13 +00:00
|
|
|
func (c *Syncer) SetDelegatedChecks(delegateChecks map[string]struct{}, createDelegatedCheckFn func(*structs.ServiceCheck, string) (Check, error)) *Syncer {
|
2016-03-24 17:06:40 +00:00
|
|
|
c.delegateChecks = delegateChecks
|
2016-06-07 14:01:13 +00:00
|
|
|
c.createDelegatedCheck = createDelegatedCheckFn
|
2016-03-24 17:06:40 +00:00
|
|
|
return c
|
|
|
|
}
|
|
|
|
|
2016-05-14 07:36:26 +00:00
|
|
|
// SetAddrFinder sets a function to find the host and port for a Service given its port label
|
2016-05-22 15:24:54 +00:00
|
|
|
func (c *Syncer) SetAddrFinder(addrFinder func(string) (string, int)) *Syncer {
|
2016-04-12 05:55:19 +00:00
|
|
|
c.addrFinder = addrFinder
|
|
|
|
return c
|
|
|
|
}
|
|
|
|
|
2016-06-07 16:37:39 +00:00
|
|
|
// SetServiceRegPrefix sets the registration prefix used by the Syncer when
|
|
|
|
// registering Services with Consul.
|
|
|
|
func (c *Syncer) SetServiceRegPrefix(servicePrefix string) *Syncer {
|
|
|
|
c.registryLock.Lock()
|
|
|
|
defer c.registryLock.Unlock()
|
|
|
|
c.serviceRegPrefix = servicePrefix
|
2016-04-12 05:55:19 +00:00
|
|
|
return c
|
|
|
|
}
|
|
|
|
|
2016-06-10 01:45:51 +00:00
|
|
|
// filterPrefix generates a unique prefix that a Syncer can later filter on.
|
|
|
|
func (c *Syncer) filterPrefix() string {
|
|
|
|
c.registryLock.RLock()
|
|
|
|
defer c.registryLock.RUnlock()
|
|
|
|
return fmt.Sprintf("%s-%s", nomadServicePrefix, c.serviceRegPrefix)
|
|
|
|
}
|
|
|
|
|
|
|
|
// GenerateServiceID creates a unique Consul ServiceID for a given
|
|
|
|
// ConsulService.
|
|
|
|
func (c *Syncer) GenerateServiceID(groupName string, service *structs.ConsulService) string {
|
|
|
|
numTags := len(service.Tags)
|
|
|
|
switch numTags {
|
|
|
|
case 0:
|
|
|
|
return fmt.Sprintf("%s-%s:%s", c.filterPrefix(), groupName, service.Name)
|
|
|
|
case 1:
|
|
|
|
return fmt.Sprintf("%s-%s:%s@%s", c.filterPrefix(), groupName, service.Tags[0], service.Name)
|
2016-05-24 06:23:57 +00:00
|
|
|
default:
|
2016-06-10 01:45:51 +00:00
|
|
|
tags := strings.Join(service.Tags, "|")
|
|
|
|
return fmt.Sprintf("%s-%s:(%s)@%s", c.filterPrefix(), groupName, tags, service.Name)
|
2016-05-24 06:23:57 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-06-10 01:21:22 +00:00
|
|
|
// SetServices assigns the slice of Nomad Services to the provided services
|
|
|
|
// group name.
|
|
|
|
func (c *Syncer) SetServices(groupName string, services []*structs.ConsulService) error {
|
2016-03-23 17:43:35 +00:00
|
|
|
var mErr multierror.Error
|
2016-06-10 01:21:22 +00:00
|
|
|
registeredServices := make([]*consul.AgentServiceRegistration, 0, len(services))
|
2016-04-12 05:55:19 +00:00
|
|
|
for _, service := range services {
|
2016-06-10 01:21:22 +00:00
|
|
|
if service.ServiceID == "" {
|
|
|
|
service.ServiceID = c.GenerateServiceID(groupName, service)
|
|
|
|
}
|
|
|
|
var serviceReg *consul.AgentServiceRegistration
|
|
|
|
var err error
|
|
|
|
if serviceReg, err = c.createService(service); err != nil {
|
2016-03-23 17:43:35 +00:00
|
|
|
mErr.Errors = append(mErr.Errors, err)
|
|
|
|
continue
|
|
|
|
}
|
2016-06-10 01:21:22 +00:00
|
|
|
registeredServices = append(registeredServices, serviceReg)
|
2016-03-23 17:43:35 +00:00
|
|
|
|
2016-06-10 01:21:22 +00:00
|
|
|
// Register the check(s) for this service
|
2016-03-23 17:43:35 +00:00
|
|
|
for _, chk := range service.Checks {
|
2016-06-10 01:21:22 +00:00
|
|
|
// Create a Consul check registration
|
|
|
|
chkReg, err := c.createDelegatedCheckReg(chk, serviceReg)
|
2016-03-24 22:09:50 +00:00
|
|
|
if err != nil {
|
|
|
|
mErr.Errors = append(mErr.Errors, err)
|
|
|
|
continue
|
|
|
|
}
|
2016-03-25 17:36:31 +00:00
|
|
|
// creating a nomad check if we have to handle this particular check type
|
2016-06-10 03:31:14 +00:00
|
|
|
c.registryLock.RLock()
|
2016-03-25 17:36:31 +00:00
|
|
|
if _, ok := c.delegateChecks[chk.Type]; ok {
|
2016-06-10 03:31:14 +00:00
|
|
|
_, ok := c.checkRunners[chkReg.ID]
|
|
|
|
c.registryLock.RUnlock()
|
|
|
|
if ok {
|
2016-06-10 01:21:22 +00:00
|
|
|
continue
|
|
|
|
}
|
2016-06-07 14:01:13 +00:00
|
|
|
nc, err := c.createDelegatedCheck(chk, chkReg.ID)
|
2016-03-25 17:36:31 +00:00
|
|
|
if err != nil {
|
|
|
|
mErr.Errors = append(mErr.Errors, err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
cr := NewCheckRunner(nc, c.runCheck, c.logger)
|
2016-06-10 03:31:14 +00:00
|
|
|
c.registryLock.Lock()
|
2016-03-25 17:36:31 +00:00
|
|
|
c.checkRunners[nc.ID()] = cr
|
2016-06-10 03:31:14 +00:00
|
|
|
c.registryLock.Unlock()
|
|
|
|
} else {
|
|
|
|
c.registryLock.RUnlock()
|
2016-03-25 17:36:31 +00:00
|
|
|
}
|
2016-03-23 17:43:35 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-06-10 01:21:22 +00:00
|
|
|
if len(mErr.Errors) > 0 {
|
|
|
|
return mErr.ErrorOrNil()
|
2016-03-23 17:43:35 +00:00
|
|
|
}
|
|
|
|
|
2016-06-10 01:21:22 +00:00
|
|
|
c.servicesGroupsLock.Lock()
|
|
|
|
c.servicesGroups[groupName] = registeredServices
|
|
|
|
c.servicesGroupsLock.Unlock()
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// SyncNow expires the current timer forcing the list of periodic callbacks
|
|
|
|
// to be synced immediately.
|
|
|
|
func (c *Syncer) SyncNow() {
|
|
|
|
select {
|
|
|
|
case c.notifySyncCh <- struct{}{}:
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// flattenedServices returns a flattened list of services
|
|
|
|
func (c *Syncer) flattenedServices() []*consul.AgentServiceRegistration {
|
|
|
|
const initialNumServices = 8
|
|
|
|
services := make([]*consul.AgentServiceRegistration, 0, initialNumServices)
|
|
|
|
c.servicesGroupsLock.RLock()
|
|
|
|
for _, servicesGroup := range c.servicesGroups {
|
|
|
|
for _, service := range servicesGroup {
|
|
|
|
services = append(services, service)
|
2016-03-23 17:43:35 +00:00
|
|
|
}
|
|
|
|
}
|
2016-06-10 01:21:22 +00:00
|
|
|
c.servicesGroupsLock.RUnlock()
|
|
|
|
|
|
|
|
return services
|
2016-03-23 07:50:41 +00:00
|
|
|
}
|
|
|
|
|
2016-06-02 16:15:30 +00:00
|
|
|
func (c *Syncer) signalShutdown() {
|
|
|
|
select {
|
|
|
|
case c.notifyShutdownCh <- struct{}{}:
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-03-23 21:34:43 +00:00
|
|
|
// Shutdown de-registers the services and checks and shuts down periodic syncing
|
2016-05-22 15:24:54 +00:00
|
|
|
func (c *Syncer) Shutdown() error {
|
2016-03-23 17:43:35 +00:00
|
|
|
var mErr multierror.Error
|
2016-03-24 01:03:02 +00:00
|
|
|
|
|
|
|
c.shutdownLock.Lock()
|
|
|
|
if !c.shutdown {
|
|
|
|
c.shutdown = true
|
2016-03-23 21:23:32 +00:00
|
|
|
}
|
2016-03-24 01:03:02 +00:00
|
|
|
c.shutdownLock.Unlock()
|
2016-03-25 02:19:13 +00:00
|
|
|
|
2016-06-02 16:15:30 +00:00
|
|
|
c.signalShutdown()
|
|
|
|
|
2016-03-25 02:19:13 +00:00
|
|
|
// Stop all the checks that nomad is running
|
2016-06-10 03:31:14 +00:00
|
|
|
c.registryLock.RLock()
|
2016-03-25 04:17:33 +00:00
|
|
|
for _, cr := range c.checkRunners {
|
|
|
|
cr.Stop()
|
2016-03-25 02:19:13 +00:00
|
|
|
}
|
2016-06-10 03:31:14 +00:00
|
|
|
c.registryLock.RUnlock()
|
2016-03-25 02:19:13 +00:00
|
|
|
|
2016-06-10 03:01:55 +00:00
|
|
|
// De-register all the services from Consul
|
|
|
|
services, err := c.queryAgentServices()
|
|
|
|
if err != nil {
|
|
|
|
mErr.Errors = append(mErr.Errors, err)
|
|
|
|
}
|
|
|
|
for _, service := range services {
|
2016-03-23 17:43:35 +00:00
|
|
|
if err := c.client.Agent().ServiceDeregister(service.ID); err != nil {
|
2016-06-10 03:01:55 +00:00
|
|
|
c.logger.Printf("[WARN] consul.syncer: failed to deregister service ID %q: %v", service.ID, err)
|
2016-03-23 17:43:35 +00:00
|
|
|
mErr.Errors = append(mErr.Errors, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return mErr.ErrorOrNil()
|
2016-03-23 07:50:41 +00:00
|
|
|
}
|
|
|
|
|
2016-06-10 02:58:33 +00:00
|
|
|
// queryChecks queries the Consul Agent for a list of Consul checks that
|
|
|
|
// have been registered with this Consul Syncer.
|
|
|
|
func (c *Syncer) queryChecks() (map[string]*consul.AgentCheck, error) {
|
|
|
|
checks, err := c.client.Agent().Checks()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return c.filterConsulChecks(checks), nil
|
|
|
|
}
|
2016-03-23 22:28:55 +00:00
|
|
|
|
2016-06-10 02:55:01 +00:00
|
|
|
// queryAgentServices queries the Consul Agent for a list of Consul services that
|
|
|
|
// have been registered with this Consul Syncer.
|
|
|
|
func (c *Syncer) queryAgentServices() (map[string]*consul.AgentService, error) {
|
|
|
|
services, err := c.client.Agent().Services()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return c.filterConsulServices(services), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// syncChecks synchronizes this Syncer's Consul Checks with the Consul Agent.
|
|
|
|
func (c *Syncer) syncChecks() error {
|
|
|
|
var mErr multierror.Error
|
|
|
|
consulChecks, err := c.queryChecks()
|
2016-03-23 22:28:55 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2016-06-10 02:58:33 +00:00
|
|
|
// Synchronize checks with Consul
|
|
|
|
missingChecks, _, changedChecks, staleChecks := c.calcChecksDiff(consulChecks)
|
|
|
|
for _, check := range missingChecks {
|
|
|
|
if err := c.registerCheck(check); err != nil {
|
|
|
|
mErr.Errors = append(mErr.Errors, err)
|
|
|
|
}
|
2016-06-10 03:31:14 +00:00
|
|
|
c.registryLock.Lock()
|
2016-06-10 02:58:33 +00:00
|
|
|
c.trackedChecks[check.ID] = check
|
2016-06-10 03:31:14 +00:00
|
|
|
c.registryLock.Unlock()
|
2016-06-10 02:58:33 +00:00
|
|
|
}
|
|
|
|
for _, check := range changedChecks {
|
|
|
|
// NOTE(sean@): Do we need to deregister the check before
|
|
|
|
// re-registering it? Not deregistering to avoid missing the
|
|
|
|
// TTL but doesn't correct reconcile any possible drift with
|
|
|
|
// the check.
|
|
|
|
//
|
|
|
|
// if err := c.deregisterCheck(check.ID); err != nil {
|
|
|
|
// mErr.Errors = append(mErr.Errors, err)
|
|
|
|
// }
|
|
|
|
if err := c.registerCheck(check); err != nil {
|
|
|
|
mErr.Errors = append(mErr.Errors, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for _, check := range staleChecks {
|
|
|
|
if err := c.deregisterCheck(check.ID); err != nil {
|
|
|
|
mErr.Errors = append(mErr.Errors, err)
|
|
|
|
}
|
2016-06-10 03:31:14 +00:00
|
|
|
c.registryLock.Lock()
|
2016-06-10 02:58:33 +00:00
|
|
|
delete(c.trackedChecks, check.ID)
|
2016-06-10 03:31:14 +00:00
|
|
|
c.registryLock.Unlock()
|
2016-06-10 02:58:33 +00:00
|
|
|
}
|
|
|
|
return mErr.ErrorOrNil()
|
|
|
|
}
|
|
|
|
|
|
|
|
// compareConsulCheck takes a consul.AgentCheckRegistration instance and
|
|
|
|
// compares it with a consul.AgentCheck. Returns true if they are equal
|
|
|
|
// according to consul.AgentCheck, otherwise false.
|
|
|
|
func compareConsulCheck(localCheck *consul.AgentCheckRegistration, consulCheck *consul.AgentCheck) bool {
|
|
|
|
if consulCheck.CheckID != localCheck.ID ||
|
|
|
|
consulCheck.Name != localCheck.Name ||
|
|
|
|
consulCheck.Notes != localCheck.Notes ||
|
|
|
|
consulCheck.ServiceID != localCheck.ServiceID {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
// calcChecksDiff takes the argument (consulChecks) and calculates the delta
|
|
|
|
// between the consul.Syncer's list of known checks (c.trackedChecks). Three
|
|
|
|
// arrays are returned:
|
|
|
|
//
|
|
|
|
// 1) a slice of checks that exist only locally in the Syncer and are missing
|
|
|
|
// from the Consul Agent (consulChecks) and therefore need to be registered.
|
|
|
|
//
|
|
|
|
// 2) a slice of checks that exist in both the local consul.Syncer's
|
|
|
|
// tracked list and Consul Agent (consulChecks).
|
|
|
|
//
|
|
|
|
// 3) a slice of checks that exist in both the local consul.Syncer's
|
|
|
|
// tracked list and Consul Agent (consulServices) but have diverged state.
|
|
|
|
//
|
|
|
|
// 4) a slice of checks that exist only in the Consul Agent (consulChecks)
|
|
|
|
// and should be removed because the Consul Agent has drifted from the
|
|
|
|
// Syncer.
|
|
|
|
func (c *Syncer) calcChecksDiff(consulChecks map[string]*consul.AgentCheck) (missingChecks []*consul.AgentCheckRegistration, equalChecks []*consul.AgentCheckRegistration, changedChecks []*consul.AgentCheckRegistration, staleChecks []*consul.AgentCheckRegistration) {
|
|
|
|
type mergedCheck struct {
|
|
|
|
check *consul.AgentCheckRegistration
|
|
|
|
// 'l' == Nomad local only
|
|
|
|
// 'e' == equal
|
|
|
|
// 'c' == changed
|
|
|
|
// 'a' == Consul agent only
|
|
|
|
state byte
|
|
|
|
}
|
|
|
|
var (
|
|
|
|
localChecksCount = 0
|
|
|
|
equalChecksCount = 0
|
|
|
|
changedChecksCount = 0
|
|
|
|
agentChecks = 0
|
|
|
|
)
|
2016-06-10 03:31:14 +00:00
|
|
|
c.registryLock.RLock()
|
2016-06-10 02:58:33 +00:00
|
|
|
localChecks := make(map[string]*mergedCheck, len(c.trackedChecks)+len(consulChecks))
|
|
|
|
for _, localCheck := range c.trackedChecks {
|
|
|
|
localChecksCount++
|
|
|
|
localChecks[localCheck.ID] = &mergedCheck{localCheck, 'l'}
|
|
|
|
}
|
2016-06-10 03:31:14 +00:00
|
|
|
c.registryLock.RUnlock()
|
2016-06-10 02:58:33 +00:00
|
|
|
for _, consulCheck := range consulChecks {
|
|
|
|
if localCheck, found := localChecks[consulCheck.CheckID]; found {
|
|
|
|
localChecksCount--
|
|
|
|
if compareConsulCheck(localCheck.check, consulCheck) {
|
|
|
|
equalChecksCount++
|
|
|
|
localChecks[consulCheck.CheckID].state = 'e'
|
|
|
|
} else {
|
|
|
|
changedChecksCount++
|
|
|
|
localChecks[consulCheck.CheckID].state = 'c'
|
2016-03-23 22:28:55 +00:00
|
|
|
}
|
2016-06-10 02:58:33 +00:00
|
|
|
} else {
|
|
|
|
agentChecks++
|
|
|
|
agentCheckReg := &consul.AgentCheckRegistration{
|
|
|
|
ID: consulCheck.CheckID,
|
|
|
|
Name: consulCheck.Name,
|
|
|
|
Notes: consulCheck.Notes,
|
|
|
|
ServiceID: consulCheck.ServiceID,
|
|
|
|
}
|
|
|
|
localChecks[consulCheck.CheckID] = &mergedCheck{agentCheckReg, 'a'}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
missingChecks = make([]*consul.AgentCheckRegistration, 0, localChecksCount)
|
|
|
|
equalChecks = make([]*consul.AgentCheckRegistration, 0, equalChecksCount)
|
|
|
|
changedChecks = make([]*consul.AgentCheckRegistration, 0, changedChecksCount)
|
|
|
|
staleChecks = make([]*consul.AgentCheckRegistration, 0, agentChecks)
|
|
|
|
for _, check := range localChecks {
|
|
|
|
switch check.state {
|
|
|
|
case 'l':
|
|
|
|
missingChecks = append(missingChecks, check.check)
|
|
|
|
case 'e':
|
|
|
|
equalChecks = append(equalChecks, check.check)
|
|
|
|
case 'c':
|
|
|
|
changedChecks = append(changedChecks, check.check)
|
|
|
|
case 'a':
|
|
|
|
staleChecks = append(staleChecks, check.check)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return missingChecks, equalChecks, changedChecks, staleChecks
|
|
|
|
}
|
2016-06-10 02:55:01 +00:00
|
|
|
|
|
|
|
// compareConsulService takes a consul.AgentServiceRegistration instance and
|
|
|
|
// compares it with a consul.AgentService. Returns true if they are equal
|
|
|
|
// according to consul.AgentService, otherwise false.
|
|
|
|
func compareConsulService(localService *consul.AgentServiceRegistration, consulService *consul.AgentService) bool {
|
|
|
|
if consulService.ID != localService.ID ||
|
|
|
|
consulService.Service != localService.Name ||
|
|
|
|
consulService.Port != localService.Port ||
|
|
|
|
consulService.Address != localService.Address ||
|
|
|
|
consulService.Address != localService.Address ||
|
|
|
|
consulService.EnableTagOverride != localService.EnableTagOverride {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
serviceTags := make(map[string]byte, len(localService.Tags))
|
|
|
|
for _, tag := range localService.Tags {
|
|
|
|
serviceTags[tag] = 'l'
|
|
|
|
}
|
|
|
|
for _, tag := range consulService.Tags {
|
|
|
|
if _, found := serviceTags[tag]; !found {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
serviceTags[tag] = 'b'
|
|
|
|
}
|
|
|
|
for _, state := range serviceTags {
|
|
|
|
if state == 'l' {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
// calcServicesDiff takes the argument (consulServices) and calculates the
|
|
|
|
// delta between the consul.Syncer's list of known services
|
|
|
|
// (c.trackedServices). Three arrays are returned:
|
|
|
|
//
|
|
|
|
// 1) a slice of services that exist only locally in the Syncer and are
|
|
|
|
// missing from the Consul Agent (consulServices) and therefore need to be
|
|
|
|
// registered.
|
|
|
|
//
|
|
|
|
// 2) a slice of services that exist in both the local consul.Syncer's
|
|
|
|
// tracked list and Consul Agent (consulServices) *AND* are identical.
|
|
|
|
//
|
|
|
|
// 3) a slice of services that exist in both the local consul.Syncer's
|
|
|
|
// tracked list and Consul Agent (consulServices) but have diverged state.
|
|
|
|
//
|
|
|
|
// 4) a slice of services that exist only in the Consul Agent
|
|
|
|
// (consulServices) and should be removed because the Consul Agent has
|
|
|
|
// drifted from the Syncer.
|
|
|
|
func (c *Syncer) calcServicesDiff(consulServices map[string]*consul.AgentService) (missingServices []*consul.AgentServiceRegistration, equalServices []*consul.AgentServiceRegistration, changedServices []*consul.AgentServiceRegistration, staleServices []*consul.AgentServiceRegistration) {
|
|
|
|
type mergedService struct {
|
|
|
|
service *consul.AgentServiceRegistration
|
|
|
|
// 'l' == Nomad local only
|
|
|
|
// 'e' == equal
|
|
|
|
// 'c' == changed
|
|
|
|
// 'a' == Consul agent only
|
|
|
|
state byte
|
|
|
|
}
|
|
|
|
var (
|
|
|
|
localServicesCount = 0
|
|
|
|
equalServicesCount = 0
|
|
|
|
changedServicesCount = 0
|
|
|
|
agentServices = 0
|
|
|
|
)
|
2016-06-10 03:31:14 +00:00
|
|
|
c.registryLock.RLock()
|
2016-06-10 02:55:01 +00:00
|
|
|
localServices := make(map[string]*mergedService, len(c.trackedServices)+len(consulServices))
|
2016-06-10 03:31:14 +00:00
|
|
|
c.registryLock.RUnlock()
|
2016-06-10 02:55:01 +00:00
|
|
|
for _, localService := range c.flattenedServices() {
|
|
|
|
localServicesCount++
|
|
|
|
localServices[localService.ID] = &mergedService{localService, 'l'}
|
|
|
|
}
|
|
|
|
for _, consulService := range consulServices {
|
|
|
|
if localService, found := localServices[consulService.ID]; found {
|
|
|
|
localServicesCount--
|
|
|
|
if compareConsulService(localService.service, consulService) {
|
|
|
|
equalServicesCount++
|
|
|
|
localServices[consulService.ID].state = 'e'
|
|
|
|
} else {
|
|
|
|
changedServicesCount++
|
|
|
|
localServices[consulService.ID].state = 'c'
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
agentServices++
|
|
|
|
agentServiceReg := &consul.AgentServiceRegistration{
|
|
|
|
ID: consulService.ID,
|
|
|
|
Name: consulService.Service,
|
|
|
|
Tags: consulService.Tags,
|
|
|
|
Port: consulService.Port,
|
|
|
|
Address: consulService.Address,
|
|
|
|
}
|
|
|
|
localServices[consulService.ID] = &mergedService{agentServiceReg, 'a'}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
missingServices = make([]*consul.AgentServiceRegistration, 0, localServicesCount)
|
|
|
|
equalServices = make([]*consul.AgentServiceRegistration, 0, equalServicesCount)
|
|
|
|
changedServices = make([]*consul.AgentServiceRegistration, 0, changedServicesCount)
|
|
|
|
staleServices = make([]*consul.AgentServiceRegistration, 0, agentServices)
|
|
|
|
for _, service := range localServices {
|
|
|
|
switch service.state {
|
|
|
|
case 'l':
|
|
|
|
missingServices = append(missingServices, service.service)
|
|
|
|
case 'e':
|
|
|
|
equalServices = append(equalServices, service.service)
|
|
|
|
case 'c':
|
|
|
|
changedServices = append(changedServices, service.service)
|
|
|
|
case 'a':
|
|
|
|
staleServices = append(staleServices, service.service)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return missingServices, equalServices, changedServices, staleServices
|
|
|
|
}
|
|
|
|
|
|
|
|
// syncServices synchronizes this Syncer's Consul Services with the Consul
|
|
|
|
// Agent.
|
|
|
|
func (c *Syncer) syncServices() error {
|
|
|
|
consulServices, err := c.queryAgentServices()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Synchronize services with Consul
|
|
|
|
var mErr multierror.Error
|
|
|
|
missingServices, _, changedServices, removedServices := c.calcServicesDiff(consulServices)
|
|
|
|
for _, service := range missingServices {
|
|
|
|
if err := c.client.Agent().ServiceRegister(service); err != nil {
|
|
|
|
mErr.Errors = append(mErr.Errors, err)
|
|
|
|
}
|
2016-06-10 03:31:14 +00:00
|
|
|
c.registryLock.Lock()
|
2016-06-10 02:55:01 +00:00
|
|
|
c.trackedServices[service.ID] = service
|
2016-06-10 03:31:14 +00:00
|
|
|
c.registryLock.Unlock()
|
2016-06-10 02:55:01 +00:00
|
|
|
}
|
|
|
|
for _, service := range changedServices {
|
|
|
|
// Re-register the local service
|
|
|
|
if err := c.client.Agent().ServiceRegister(service); err != nil {
|
|
|
|
mErr.Errors = append(mErr.Errors, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for _, service := range removedServices {
|
|
|
|
if err := c.deregisterService(service.ID); err != nil {
|
|
|
|
mErr.Errors = append(mErr.Errors, err)
|
2016-03-23 22:28:55 +00:00
|
|
|
}
|
2016-06-10 03:31:14 +00:00
|
|
|
c.registryLock.Lock()
|
2016-06-10 02:55:01 +00:00
|
|
|
delete(c.trackedServices, service.ID)
|
2016-06-10 03:31:14 +00:00
|
|
|
c.registryLock.Unlock()
|
2016-03-23 22:28:55 +00:00
|
|
|
}
|
|
|
|
return mErr.ErrorOrNil()
|
|
|
|
}
|
|
|
|
|
2016-03-23 21:34:43 +00:00
|
|
|
// registerCheck registers a check definition with Consul
|
2016-05-22 15:24:54 +00:00
|
|
|
func (c *Syncer) registerCheck(chkReg *consul.AgentCheckRegistration) error {
|
2016-06-10 03:31:14 +00:00
|
|
|
c.registryLock.RLock()
|
2016-03-25 04:17:33 +00:00
|
|
|
if cr, ok := c.checkRunners[chkReg.ID]; ok {
|
|
|
|
cr.Start()
|
2016-03-25 02:00:24 +00:00
|
|
|
}
|
2016-06-10 03:31:14 +00:00
|
|
|
c.registryLock.RUnlock()
|
2016-03-24 21:12:09 +00:00
|
|
|
return c.client.Agent().CheckRegister(chkReg)
|
|
|
|
}
|
|
|
|
|
2016-06-07 14:01:13 +00:00
|
|
|
// createDelegatedCheckReg creates a Check that can be registered with
|
|
|
|
// Nomad. It also creates a Nomad check for the check types that it can
|
|
|
|
// handle.
|
2016-06-10 02:58:33 +00:00
|
|
|
func (c *Syncer) createDelegatedCheckReg(check *structs.ServiceCheck, service *consul.AgentServiceRegistration) (*consul.AgentCheckRegistration, error) {
|
2016-03-23 17:43:35 +00:00
|
|
|
chkReg := consul.AgentCheckRegistration{
|
2016-03-24 00:12:53 +00:00
|
|
|
ID: check.Hash(service.ID),
|
2016-03-23 17:43:35 +00:00
|
|
|
Name: check.Name,
|
|
|
|
ServiceID: service.ID,
|
|
|
|
}
|
|
|
|
chkReg.Timeout = check.Timeout.String()
|
2016-03-23 21:23:32 +00:00
|
|
|
chkReg.Interval = check.Interval.String()
|
2016-03-23 17:43:35 +00:00
|
|
|
switch check.Type {
|
|
|
|
case structs.ServiceCheckHTTP:
|
|
|
|
if check.Protocol == "" {
|
|
|
|
check.Protocol = "http"
|
|
|
|
}
|
|
|
|
url := url.URL{
|
|
|
|
Scheme: check.Protocol,
|
|
|
|
Host: fmt.Sprintf("%s:%d", service.Address, service.Port),
|
|
|
|
Path: check.Path,
|
|
|
|
}
|
|
|
|
chkReg.HTTP = url.String()
|
|
|
|
case structs.ServiceCheckTCP:
|
|
|
|
chkReg.TCP = fmt.Sprintf("%s:%d", service.Address, service.Port)
|
|
|
|
case structs.ServiceCheckScript:
|
2016-03-25 02:30:02 +00:00
|
|
|
chkReg.TTL = (check.Interval + ttlCheckBuffer).String()
|
2016-03-24 22:09:50 +00:00
|
|
|
default:
|
|
|
|
return nil, fmt.Errorf("check type %q not valid", check.Type)
|
2016-03-24 20:05:08 +00:00
|
|
|
}
|
2016-03-24 22:09:50 +00:00
|
|
|
return &chkReg, nil
|
2016-03-23 07:50:41 +00:00
|
|
|
}
|
|
|
|
|
2016-06-10 02:55:01 +00:00
|
|
|
// createService creates a Consul AgentService from a Nomad ConsulService.
|
2016-06-07 15:54:03 +00:00
|
|
|
func (c *Syncer) createService(service *structs.ConsulService) (*consul.AgentServiceRegistration, error) {
|
2016-06-07 16:37:39 +00:00
|
|
|
c.registryLock.RLock()
|
|
|
|
defer c.registryLock.RUnlock()
|
|
|
|
|
|
|
|
srv := consul.AgentServiceRegistration{
|
2016-06-10 01:45:51 +00:00
|
|
|
ID: service.ServiceID,
|
2016-06-07 16:37:39 +00:00
|
|
|
Name: service.Name,
|
|
|
|
Tags: service.Tags,
|
2016-03-23 07:50:41 +00:00
|
|
|
}
|
2016-04-12 05:55:19 +00:00
|
|
|
host, port := c.addrFinder(service.PortLabel)
|
2016-04-14 07:56:39 +00:00
|
|
|
if host != "" {
|
|
|
|
srv.Address = host
|
|
|
|
}
|
|
|
|
|
|
|
|
if port != 0 {
|
|
|
|
srv.Port = port
|
|
|
|
}
|
|
|
|
|
2016-03-23 07:50:41 +00:00
|
|
|
return &srv, nil
|
|
|
|
}
|
|
|
|
|
2016-03-23 17:43:35 +00:00
|
|
|
// deregisterService de-registers a service with the given ID from consul
|
2016-06-10 02:55:01 +00:00
|
|
|
func (c *Syncer) deregisterService(serviceID string) error {
|
|
|
|
return c.client.Agent().ServiceDeregister(serviceID)
|
2016-03-23 17:43:35 +00:00
|
|
|
}
|
|
|
|
|
2016-06-10 02:58:33 +00:00
|
|
|
// deregisterCheck de-registers a check from Consul
|
|
|
|
func (c *Syncer) deregisterCheck(checkID string) error {
|
|
|
|
c.registryLock.Lock()
|
|
|
|
defer c.registryLock.Unlock()
|
|
|
|
|
|
|
|
// Deleting from Consul Agent
|
|
|
|
if err := c.client.Agent().CheckDeregister(checkID); err != nil {
|
|
|
|
// CheckDeregister() will be reattempted again in a future
|
|
|
|
// sync.
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Remove the check from the local registry
|
|
|
|
if cr, ok := c.checkRunners[checkID]; ok {
|
2016-03-25 04:17:33 +00:00
|
|
|
cr.Stop()
|
2016-06-10 02:58:33 +00:00
|
|
|
delete(c.checkRunners, checkID)
|
2016-03-24 20:05:08 +00:00
|
|
|
}
|
2016-03-25 02:00:24 +00:00
|
|
|
|
2016-06-10 02:58:33 +00:00
|
|
|
return nil
|
2016-03-23 17:43:35 +00:00
|
|
|
}
|
|
|
|
|
2016-05-23 14:24:00 +00:00
|
|
|
// Run triggers periodic syncing of services and checks with Consul. This is
|
|
|
|
// a long lived go-routine which is stopped during shutdown.
|
|
|
|
func (c *Syncer) Run() {
|
2016-05-24 06:23:57 +00:00
|
|
|
d := initialSyncDelay + lib.RandomStagger(initialSyncBuffer-initialSyncDelay)
|
|
|
|
sync := time.NewTimer(d)
|
2016-06-10 01:29:10 +00:00
|
|
|
c.logger.Printf("[DEBUG] consul.syncer: sleeping %v before first sync", d)
|
2016-05-24 06:23:57 +00:00
|
|
|
|
2016-03-23 07:50:41 +00:00
|
|
|
for {
|
|
|
|
select {
|
2016-03-25 21:18:04 +00:00
|
|
|
case <-sync.C:
|
2016-05-24 06:23:57 +00:00
|
|
|
d = syncInterval - lib.RandomStagger(syncInterval/syncJitter)
|
|
|
|
sync.Reset(d)
|
|
|
|
|
2016-06-10 01:29:10 +00:00
|
|
|
if err := c.SyncServices(); err != nil {
|
|
|
|
if c.consulAvailable {
|
|
|
|
c.logger.Printf("[DEBUG] consul.syncer: disabling checks until successful sync for %q: %v", c.serviceRegPrefix, err)
|
2016-04-02 21:48:10 +00:00
|
|
|
}
|
2016-06-10 03:42:54 +00:00
|
|
|
c.consulAvailable = false
|
2016-04-02 21:48:10 +00:00
|
|
|
} else {
|
2016-06-10 01:29:10 +00:00
|
|
|
if !c.consulAvailable {
|
|
|
|
c.logger.Printf("[DEBUG] consul.syncer: re-enabling checks for for %q", c.serviceRegPrefix)
|
|
|
|
}
|
|
|
|
c.consulAvailable = true
|
2016-03-23 07:50:41 +00:00
|
|
|
}
|
2016-05-24 06:23:57 +00:00
|
|
|
case <-c.notifySyncCh:
|
|
|
|
sync.Reset(syncInterval)
|
2016-03-23 07:50:41 +00:00
|
|
|
case <-c.shutdownCh:
|
2016-06-02 16:15:30 +00:00
|
|
|
c.Shutdown()
|
|
|
|
case <-c.notifyShutdownCh:
|
2016-03-25 21:18:04 +00:00
|
|
|
sync.Stop()
|
2016-06-08 17:38:00 +00:00
|
|
|
c.logger.Printf("[INFO] consul.syncer: shutting down sync for %q", c.serviceRegPrefix)
|
2016-03-23 07:50:41 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-05-24 06:23:57 +00:00
|
|
|
// RunHandlers executes each handler (randomly)
|
2016-06-07 15:59:17 +00:00
|
|
|
func (c *Syncer) RunHandlers() error {
|
2016-05-24 06:23:57 +00:00
|
|
|
c.periodicLock.RLock()
|
|
|
|
handlers := make(map[string]types.PeriodicCallback, len(c.periodicCallbacks))
|
|
|
|
for name, fn := range c.periodicCallbacks {
|
|
|
|
handlers[name] = fn
|
|
|
|
}
|
|
|
|
c.periodicLock.RUnlock()
|
2016-06-07 15:59:17 +00:00
|
|
|
|
|
|
|
var mErr multierror.Error
|
2016-05-24 06:28:42 +00:00
|
|
|
for _, fn := range handlers {
|
2016-06-07 15:59:17 +00:00
|
|
|
if err := fn(); err != nil {
|
|
|
|
mErr.Errors = append(mErr.Errors, err)
|
|
|
|
}
|
2016-05-24 06:23:57 +00:00
|
|
|
}
|
2016-06-07 15:59:17 +00:00
|
|
|
return mErr.ErrorOrNil()
|
2016-05-24 06:23:57 +00:00
|
|
|
}
|
|
|
|
|
2016-06-10 01:29:10 +00:00
|
|
|
// SyncServices sync the services with the Consul Agent
|
|
|
|
func (c *Syncer) SyncServices() error {
|
2016-06-07 15:59:17 +00:00
|
|
|
if err := c.RunHandlers(); err != nil {
|
2016-06-10 01:29:10 +00:00
|
|
|
return err
|
2016-03-23 17:43:35 +00:00
|
|
|
}
|
2016-06-07 15:59:17 +00:00
|
|
|
if err := c.syncServices(); err != nil {
|
2016-06-10 01:29:10 +00:00
|
|
|
return err
|
2016-03-23 17:43:35 +00:00
|
|
|
}
|
2016-06-07 15:59:17 +00:00
|
|
|
if err := c.syncChecks(); err != nil {
|
2016-06-10 01:29:10 +00:00
|
|
|
return err
|
2016-03-23 17:43:35 +00:00
|
|
|
}
|
2016-03-23 18:31:04 +00:00
|
|
|
|
2016-06-10 01:29:10 +00:00
|
|
|
return nil
|
2016-03-23 17:43:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// filterConsulServices prunes out all the service whose ids are not prefixed
|
|
|
|
// with nomad-
|
2016-06-07 16:37:39 +00:00
|
|
|
func (c *Syncer) filterConsulServices(consulServices map[string]*consul.AgentService) map[string]*consul.AgentService {
|
|
|
|
localServices := make(map[string]*consul.AgentService, len(consulServices))
|
|
|
|
c.registryLock.RLock()
|
|
|
|
defer c.registryLock.RUnlock()
|
2016-06-10 02:55:01 +00:00
|
|
|
filterPrefix := c.filterPrefix()
|
2016-06-07 16:37:39 +00:00
|
|
|
for serviceID, service := range consulServices {
|
2016-06-10 02:55:01 +00:00
|
|
|
if strings.HasPrefix(service.ID, filterPrefix) {
|
2016-06-07 16:37:39 +00:00
|
|
|
localServices[serviceID] = service
|
2016-03-23 17:43:35 +00:00
|
|
|
}
|
|
|
|
}
|
2016-06-07 16:37:39 +00:00
|
|
|
return localServices
|
2016-03-23 17:43:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// filterConsulChecks prunes out all the consul checks which do not have
|
2016-06-10 02:58:33 +00:00
|
|
|
// services with Syncer's idPrefix.
|
|
|
|
func (c *Syncer) filterConsulChecks(consulChecks map[string]*consul.AgentCheck) map[string]*consul.AgentCheck {
|
|
|
|
localChecks := make(map[string]*consul.AgentCheck, len(consulChecks))
|
|
|
|
filterPrefix := c.filterPrefix()
|
|
|
|
for checkID, check := range consulChecks {
|
|
|
|
if strings.HasPrefix(check.ServiceID, filterPrefix) {
|
|
|
|
localChecks[checkID] = check
|
2016-03-23 17:43:35 +00:00
|
|
|
}
|
|
|
|
}
|
2016-06-10 02:58:33 +00:00
|
|
|
return localChecks
|
2016-03-23 07:50:41 +00:00
|
|
|
}
|
2016-03-23 22:36:46 +00:00
|
|
|
|
2016-06-10 03:01:55 +00:00
|
|
|
// consulPresent indicates whether the Consul Agent is responding
|
2016-05-28 01:17:37 +00:00
|
|
|
func (c *Syncer) consulPresent() bool {
|
|
|
|
_, err := c.client.Agent().Self()
|
|
|
|
return err == nil
|
|
|
|
}
|
|
|
|
|
2016-03-24 20:05:08 +00:00
|
|
|
// runCheck runs a check and updates the corresponding ttl check in consul
|
2016-05-22 15:24:54 +00:00
|
|
|
func (c *Syncer) runCheck(check Check) {
|
2016-03-24 20:05:08 +00:00
|
|
|
res := check.Run()
|
2016-05-05 17:01:38 +00:00
|
|
|
if res.Duration >= check.Timeout() {
|
2016-06-08 17:38:00 +00:00
|
|
|
c.logger.Printf("[DEBUG] consul.syncer: check took time: %v, timeout: %v", res.Duration, check.Timeout())
|
2016-05-05 17:01:38 +00:00
|
|
|
}
|
2016-03-25 02:00:24 +00:00
|
|
|
state := consul.HealthCritical
|
|
|
|
output := res.Output
|
2016-03-25 21:26:56 +00:00
|
|
|
switch res.ExitCode {
|
|
|
|
case 0:
|
2016-03-25 02:00:24 +00:00
|
|
|
state = consul.HealthPassing
|
2016-03-25 21:26:56 +00:00
|
|
|
case 1:
|
2016-03-25 02:00:24 +00:00
|
|
|
state = consul.HealthWarning
|
2016-03-25 21:26:56 +00:00
|
|
|
default:
|
|
|
|
state = consul.HealthCritical
|
2016-03-25 02:00:24 +00:00
|
|
|
}
|
2016-04-05 17:15:38 +00:00
|
|
|
if res.Err != nil {
|
|
|
|
state = consul.HealthCritical
|
|
|
|
output = res.Err.Error()
|
|
|
|
}
|
2016-03-25 02:00:24 +00:00
|
|
|
if err := c.client.Agent().UpdateTTL(check.ID(), output, state); err != nil {
|
2016-06-10 01:32:04 +00:00
|
|
|
if c.consulAvailable {
|
2016-06-08 17:38:00 +00:00
|
|
|
c.logger.Printf("[DEBUG] consul.syncer: check %q failed, disabling Consul checks until until next successful sync: %v", check.ID(), err)
|
2016-06-10 01:32:04 +00:00
|
|
|
c.consulAvailable = false
|
2016-04-02 21:48:10 +00:00
|
|
|
} else {
|
2016-06-10 01:32:04 +00:00
|
|
|
c.consulAvailable = true
|
2016-04-02 21:48:10 +00:00
|
|
|
}
|
2016-03-24 20:05:08 +00:00
|
|
|
}
|
|
|
|
}
|
2016-05-14 07:36:26 +00:00
|
|
|
|
2016-05-24 06:23:57 +00:00
|
|
|
// AddPeriodicHandler adds a uniquely named callback. Returns true if
|
|
|
|
// successful, false if a handler with the same name already exists.
|
|
|
|
func (c *Syncer) AddPeriodicHandler(name string, fn types.PeriodicCallback) bool {
|
|
|
|
c.periodicLock.Lock()
|
|
|
|
defer c.periodicLock.Unlock()
|
|
|
|
if _, found := c.periodicCallbacks[name]; found {
|
2016-06-08 17:38:00 +00:00
|
|
|
c.logger.Printf("[ERROR] consul.syncer: failed adding handler %q", name)
|
2016-05-24 06:23:57 +00:00
|
|
|
return false
|
|
|
|
}
|
|
|
|
c.periodicCallbacks[name] = fn
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Syncer) NumHandlers() int {
|
|
|
|
c.periodicLock.RLock()
|
|
|
|
defer c.periodicLock.RUnlock()
|
|
|
|
return len(c.periodicCallbacks)
|
|
|
|
}
|
|
|
|
|
|
|
|
// RemovePeriodicHandler removes a handler with a given name.
|
|
|
|
func (c *Syncer) RemovePeriodicHandler(name string) {
|
|
|
|
c.periodicLock.Lock()
|
|
|
|
defer c.periodicLock.Unlock()
|
|
|
|
delete(c.periodicCallbacks, name)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Syncer) ConsulClient() *consul.Client {
|
|
|
|
return c.client
|
|
|
|
}
|