open-nomad/client/consul/sync.go

551 lines
16 KiB
Go

package consul
import (
"crypto/tls"
"fmt"
"log"
"net/http"
"net/url"
"reflect"
"strings"
"sync"
"time"
consul "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
"github.com/hashicorp/nomad/nomad/types"
)
type notifyEvent struct{}
type notifyChannel chan notifyEvent
// Syncer allows syncing of services and checks with Consul
type Syncer struct {
client *consul.Client
runChecks bool
serviceIdentifier string // serviceIdentifier is a token which identifies which task/alloc the service belongs to
delegateChecks map[string]struct{} // delegateChecks are the checks that the Nomad client runs and reports to Consul
createCheck func(*structs.ServiceCheck, string) (Check, error)
addrFinder func(portLabel string) (string, int)
trackedServices map[string]*consul.AgentService
trackedChecks map[string]*consul.AgentCheckRegistration
checkRunners map[string]*CheckRunner
logger *log.Logger
shutdownCh chan struct{}
shutdown bool
shutdownLock sync.Mutex
// periodicCallbacks is walked sequentially when the timer in Run
// fires.
periodicCallbacks map[string]types.PeriodicCallback
notifySyncCh notifyChannel
periodicLock sync.RWMutex
}
const (
// initialSyncBuffer is the max time an initial sync will sleep
// before syncing.
initialSyncBuffer = 30 * time.Second
// initialSyncDelay is the delay before an initial sync.
initialSyncDelay = 5 * time.Second
// The periodic time interval for syncing services and checks with Consul
syncInterval = 5 * time.Second
// syncJitter provides a little variance in the frequency at which
// Syncer polls Consul.
syncJitter = 8
// ttlCheckBuffer is the time interval that Nomad can take to report Consul
// the check result
ttlCheckBuffer = 31 * time.Second
// ServiceTagHttp is the tag assigned to HTTP services
ServiceTagHttp = "http"
// ServiceTagRpc is the tag assigned to RPC services
ServiceTagRpc = "rpc"
// ServiceTagSerf is the tag assigned to Serf services
ServiceTagSerf = "serf"
)
// NewSyncer returns a new consul.Syncer
func NewSyncer(config *config.ConsulConfig, logger *log.Logger) (*Syncer, error) {
var err error
var c *consul.Client
cfg := consul.DefaultConfig()
if config.Addr != "" {
cfg.Address = config.Addr
}
if config.Token != "" {
cfg.Token = config.Token
}
if config.Auth != "" {
var username, password string
if strings.Contains(config.Auth, ":") {
split := strings.SplitN(config.Auth, ":", 2)
username = split[0]
password = split[1]
} else {
username = config.Auth
}
cfg.HttpAuth = &consul.HttpBasicAuth{
Username: username,
Password: password,
}
}
if config.EnableSSL {
cfg.Scheme = "https"
tlsCfg := consul.TLSConfig{
Address: cfg.Address,
CAFile: config.CAFile,
CertFile: config.CertFile,
KeyFile: config.KeyFile,
InsecureSkipVerify: !config.VerifySSL,
}
tlsClientCfg, err := consul.SetupTLSConfig(&tlsCfg)
if err != nil {
return nil, fmt.Errorf("error creating tls client config for consul: %v", err)
}
cfg.HttpClient.Transport = &http.Transport{
TLSClientConfig: tlsClientCfg,
}
}
if config.EnableSSL && !config.VerifySSL {
cfg.HttpClient.Transport = &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
}
}
if c, err = consul.NewClient(cfg); err != nil {
return nil, err
}
consulSyncer := Syncer{
client: c,
logger: logger,
trackedServices: make(map[string]*consul.AgentService),
trackedChecks: make(map[string]*consul.AgentCheckRegistration),
checkRunners: make(map[string]*CheckRunner),
shutdownCh: make(chan struct{}),
periodicCallbacks: make(map[string]types.PeriodicCallback),
}
return &consulSyncer, nil
}
// SetDelegatedChecks sets the checks that nomad is going to run and report the
// result back to consul
func (c *Syncer) SetDelegatedChecks(delegateChecks map[string]struct{}, createCheck func(*structs.ServiceCheck, string) (Check, error)) *Syncer {
c.delegateChecks = delegateChecks
c.createCheck = createCheck
return c
}
// SetAddrFinder sets a function to find the host and port for a Service given its port label
func (c *Syncer) SetAddrFinder(addrFinder func(string) (string, int)) *Syncer {
c.addrFinder = addrFinder
return c
}
// SetServiceIdentifier sets the identifier of the services we are syncing with Consul
func (c *Syncer) SetServiceIdentifier(serviceIdentifier string) *Syncer {
c.serviceIdentifier = serviceIdentifier
return c
}
// SyncNow expires the current timer forcing the list of periodic callbacks
// to be synced immediately.
func (c *Syncer) SyncNow() {
select {
case c.notifySyncCh <- notifyEvent{}:
default:
}
}
// SyncServices sync the services with the Consul Agent
func (c *Syncer) SyncServices(services []*structs.Service) error {
var mErr multierror.Error
taskServices := make(map[string]*consul.AgentService)
taskChecks := make(map[string]*consul.AgentCheckRegistration)
// Register Services and Checks that we don't know about or has changed
for _, service := range services {
srv, err := c.createService(service)
if err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
trackedService, ok := c.trackedServices[srv.ID]
if (ok && !reflect.DeepEqual(trackedService, srv)) || !ok {
if err := c.registerService(srv); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
c.trackedServices[srv.ID] = srv
taskServices[srv.ID] = srv
for _, chk := range service.Checks {
// Create a consul check registration
chkReg, err := c.createCheckReg(chk, srv)
if err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
// creating a nomad check if we have to handle this particular check type
if _, ok := c.delegateChecks[chk.Type]; ok {
nc, err := c.createCheck(chk, chkReg.ID)
if err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
cr := NewCheckRunner(nc, c.runCheck, c.logger)
c.checkRunners[nc.ID()] = cr
}
if _, ok := c.trackedChecks[chkReg.ID]; !ok {
if err := c.registerCheck(chkReg); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
c.trackedChecks[chkReg.ID] = chkReg
taskChecks[chkReg.ID] = chkReg
}
}
// Remove services that are not present anymore
for _, service := range c.trackedServices {
if _, ok := taskServices[service.ID]; !ok {
if err := c.deregisterService(service.ID); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
delete(c.trackedServices, service.ID)
}
}
// Remove the checks that are not present anymore
for checkID, _ := range c.trackedChecks {
if _, ok := taskChecks[checkID]; !ok {
if err := c.deregisterCheck(checkID); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
delete(c.trackedChecks, checkID)
}
}
return mErr.ErrorOrNil()
}
// Shutdown de-registers the services and checks and shuts down periodic syncing
func (c *Syncer) Shutdown() error {
var mErr multierror.Error
c.shutdownLock.Lock()
if !c.shutdown {
close(c.shutdownCh)
c.shutdown = true
}
c.shutdownLock.Unlock()
// Stop all the checks that nomad is running
for _, cr := range c.checkRunners {
cr.Stop()
}
// De-register all the services from consul
for _, service := range c.trackedServices {
if err := c.client.Agent().ServiceDeregister(service.ID); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
return mErr.ErrorOrNil()
}
// KeepServices removes services from consul which are not present in the list
// of tasks passed to it
func (c *Syncer) KeepServices(services map[string]struct{}) error {
var mErr multierror.Error
// Get the services from Consul
cServices, err := c.client.Agent().Services()
if err != nil {
return err
}
cServices = c.filterConsulServices(cServices)
// Remove the services from consul which are not in any of the tasks
for _, service := range cServices {
if _, validService := services[service.ID]; !validService {
if err := c.deregisterService(service.ID); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
}
return mErr.ErrorOrNil()
}
// registerCheck registers a check definition with Consul
func (c *Syncer) registerCheck(chkReg *consul.AgentCheckRegistration) error {
if cr, ok := c.checkRunners[chkReg.ID]; ok {
cr.Start()
}
return c.client.Agent().CheckRegister(chkReg)
}
// createCheckReg creates a Check that can be registered with Nomad. It also
// creates a Nomad check for the check types that it can handle.
func (c *Syncer) createCheckReg(check *structs.ServiceCheck, service *consul.AgentService) (*consul.AgentCheckRegistration, error) {
chkReg := consul.AgentCheckRegistration{
ID: check.Hash(service.ID),
Name: check.Name,
ServiceID: service.ID,
}
chkReg.Timeout = check.Timeout.String()
chkReg.Interval = check.Interval.String()
switch check.Type {
case structs.ServiceCheckHTTP:
if check.Protocol == "" {
check.Protocol = "http"
}
url := url.URL{
Scheme: check.Protocol,
Host: fmt.Sprintf("%s:%d", service.Address, service.Port),
Path: check.Path,
}
chkReg.HTTP = url.String()
case structs.ServiceCheckTCP:
chkReg.TCP = fmt.Sprintf("%s:%d", service.Address, service.Port)
case structs.ServiceCheckScript:
chkReg.TTL = (check.Interval + ttlCheckBuffer).String()
default:
return nil, fmt.Errorf("check type %q not valid", check.Type)
}
return &chkReg, nil
}
// createService creates a Consul AgentService from a Nomad Service
func (c *Syncer) createService(service *structs.Service) (*consul.AgentService, error) {
srv := consul.AgentService{
ID: service.ID(c.serviceIdentifier),
Service: service.Name,
Tags: service.Tags,
}
host, port := c.addrFinder(service.PortLabel)
if host != "" {
srv.Address = host
}
if port != 0 {
srv.Port = port
}
return &srv, nil
}
// registerService registers a service with Consul
func (c *Syncer) registerService(service *consul.AgentService) error {
srvReg := consul.AgentServiceRegistration{
ID: service.ID,
Name: service.Service,
Tags: service.Tags,
Port: service.Port,
Address: service.Address,
}
return c.client.Agent().ServiceRegister(&srvReg)
}
// deregisterService de-registers a service with the given ID from consul
func (c *Syncer) deregisterService(ID string) error {
return c.client.Agent().ServiceDeregister(ID)
}
// deregisterCheck de-registers a check with a given ID from Consul.
func (c *Syncer) deregisterCheck(ID string) error {
// Deleting the nomad check
if cr, ok := c.checkRunners[ID]; ok {
cr.Stop()
delete(c.checkRunners, ID)
}
// Deleting from consul
return c.client.Agent().CheckDeregister(ID)
}
// Run triggers periodic syncing of services and checks with Consul. This is
// a long lived go-routine which is stopped during shutdown.
func (c *Syncer) Run() {
d := initialSyncDelay + lib.RandomStagger(initialSyncBuffer-initialSyncDelay)
sync := time.NewTimer(d)
c.logger.Printf("[DEBUG] consul.sync: sleeping %v before first sync", d)
for {
select {
case <-sync.C:
d = syncInterval - lib.RandomStagger(syncInterval/syncJitter)
sync.Reset(d)
if err := c.performSync(); err != nil {
if c.runChecks {
c.logger.Printf("[DEBUG] consul.sync: disabling checks until Consul sync completes for %q: %v", c.serviceIdentifier, err)
}
c.runChecks = false
} else {
c.runChecks = true
}
case <-c.notifySyncCh:
sync.Reset(syncInterval)
case <-c.shutdownCh:
sync.Stop()
c.logger.Printf("[INFO] consul.sync: shutting down sync for %q", c.serviceIdentifier)
return
}
}
}
// RunHandlers executes each handler (randomly)
func (c *Syncer) RunHandlers() {
c.periodicLock.RLock()
handlers := make(map[string]types.PeriodicCallback, len(c.periodicCallbacks))
for name, fn := range c.periodicCallbacks {
handlers[name] = fn
}
c.periodicLock.RUnlock()
for _, fn := range handlers {
fn()
}
}
// performSync sync the services and checks we are tracking with Consul.
func (c *Syncer) performSync() error {
c.RunHandlers()
var mErr multierror.Error
cServices, err := c.client.Agent().Services()
if err != nil {
return err
}
cChecks, err := c.client.Agent().Checks()
if err != nil {
return err
}
// Add services and checks that consul doesn't have but we do
for serviceID, service := range c.trackedServices {
if _, ok := cServices[serviceID]; !ok {
if err := c.registerService(service); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
}
for checkID, check := range c.trackedChecks {
if _, ok := cChecks[checkID]; !ok {
if err := c.registerCheck(check); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
}
return mErr.ErrorOrNil()
}
// filterConsulServices prunes out all the service whose ids are not prefixed
// with nomad-
func (c *Syncer) filterConsulServices(srvcs map[string]*consul.AgentService) map[string]*consul.AgentService {
nomadServices := make(map[string]*consul.AgentService)
for _, srv := range srvcs {
if strings.HasPrefix(srv.ID, structs.NomadConsulPrefix) &&
!strings.HasPrefix(srv.ID, structs.AgentServicePrefix) {
nomadServices[srv.ID] = srv
}
}
return nomadServices
}
// filterConsulChecks prunes out all the consul checks which do not have
// services with id prefixed with noamd-
func (c *Syncer) filterConsulChecks(chks map[string]*consul.AgentCheck) map[string]*consul.AgentCheck {
nomadChecks := make(map[string]*consul.AgentCheck)
for _, chk := range chks {
if strings.HasPrefix(chk.ServiceID, structs.NomadConsulPrefix) {
nomadChecks[chk.CheckID] = chk
}
}
return nomadChecks
}
// runCheck runs a check and updates the corresponding ttl check in consul
func (c *Syncer) runCheck(check Check) {
res := check.Run()
if res.Duration >= check.Timeout() {
c.logger.Printf("[DEBUG] consul.sync: check took time: %v, timeout: %v", res.Duration, check.Timeout())
}
state := consul.HealthCritical
output := res.Output
switch res.ExitCode {
case 0:
state = consul.HealthPassing
case 1:
state = consul.HealthWarning
default:
state = consul.HealthCritical
}
if res.Err != nil {
state = consul.HealthCritical
output = res.Err.Error()
}
if err := c.client.Agent().UpdateTTL(check.ID(), output, state); err != nil {
if c.runChecks {
c.logger.Printf("[DEBUG] consul.sync: check %q failed, disabling Consul checks until until next successful sync: %v", check.ID(), err)
c.runChecks = false
} else {
c.runChecks = true
}
}
}
// GenerateServiceIdentifier returns a service identifier based on an allocation
// id and task name
func GenerateServiceIdentifier(allocID string, taskName string) string {
return fmt.Sprintf("%s-%s", taskName, allocID)
}
// AddPeriodicHandler adds a uniquely named callback. Returns true if
// successful, false if a handler with the same name already exists.
func (c *Syncer) AddPeriodicHandler(name string, fn types.PeriodicCallback) bool {
c.periodicLock.Lock()
defer c.periodicLock.Unlock()
if _, found := c.periodicCallbacks[name]; found {
c.logger.Printf("[ERROR] consul.sync: failed adding handler %q", name)
return false
}
c.periodicCallbacks[name] = fn
return true
}
func (c *Syncer) NumHandlers() int {
c.periodicLock.RLock()
defer c.periodicLock.RUnlock()
return len(c.periodicCallbacks)
}
// RemovePeriodicHandler removes a handler with a given name.
func (c *Syncer) RemovePeriodicHandler(name string) {
c.periodicLock.Lock()
defer c.periodicLock.Unlock()
delete(c.periodicCallbacks, name)
}
func (c *Syncer) ConsulClient() *consul.Client {
return c.client
}