2017-02-01 00:43:57 +00:00
|
|
|
package consul
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"log"
|
|
|
|
"net"
|
|
|
|
"net/url"
|
|
|
|
"strconv"
|
|
|
|
"strings"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/hashicorp/consul/api"
|
2017-04-12 20:26:55 +00:00
|
|
|
"github.com/hashicorp/nomad/client/driver"
|
2017-02-01 00:43:57 +00:00
|
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
|
|
// nomadServicePrefix is the first prefix that scopes all Nomad registered
|
|
|
|
// services
|
|
|
|
nomadServicePrefix = "_nomad"
|
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
// defaultRetryInterval is how quickly to retry syncing services and
|
|
|
|
// checks to Consul when an error occurs. Will backoff up to a max.
|
|
|
|
defaultRetryInterval = time.Second
|
|
|
|
|
|
|
|
// defaultMaxRetryInterval is the default max retry interval.
|
|
|
|
defaultMaxRetryInterval = 30 * time.Second
|
2017-02-01 00:43:57 +00:00
|
|
|
|
|
|
|
// ttlCheckBuffer is the time interval that Nomad can take to report Consul
|
|
|
|
// the check result
|
|
|
|
ttlCheckBuffer = 31 * time.Second
|
|
|
|
|
|
|
|
// defaultShutdownWait is how long Shutdown() should block waiting for
|
|
|
|
// enqueued operations to sync to Consul by default.
|
|
|
|
defaultShutdownWait = time.Minute
|
|
|
|
|
|
|
|
// DefaultQueryWaitDuration is the max duration the Consul Agent will
|
|
|
|
// spend waiting for a response from a Consul Query.
|
|
|
|
DefaultQueryWaitDuration = 2 * time.Second
|
|
|
|
|
|
|
|
// ServiceTagHTTP is the tag assigned to HTTP services
|
|
|
|
ServiceTagHTTP = "http"
|
|
|
|
|
|
|
|
// ServiceTagRPC is the tag assigned to RPC services
|
|
|
|
ServiceTagRPC = "rpc"
|
|
|
|
|
|
|
|
// ServiceTagSerf is the tag assigned to Serf services
|
|
|
|
ServiceTagSerf = "serf"
|
|
|
|
)
|
|
|
|
|
|
|
|
// CatalogAPI is the consul/api.Catalog API used by Nomad.
|
|
|
|
type CatalogAPI interface {
|
|
|
|
Datacenters() ([]string, error)
|
|
|
|
Service(service, tag string, q *api.QueryOptions) ([]*api.CatalogService, *api.QueryMeta, error)
|
|
|
|
}
|
|
|
|
|
|
|
|
// AgentAPI is the consul/api.Agent API used by Nomad.
|
|
|
|
type AgentAPI interface {
|
2017-04-08 00:10:26 +00:00
|
|
|
Services() (map[string]*api.AgentService, error)
|
|
|
|
Checks() (map[string]*api.AgentCheck, error)
|
2017-02-01 00:43:57 +00:00
|
|
|
CheckRegister(check *api.AgentCheckRegistration) error
|
|
|
|
CheckDeregister(checkID string) error
|
|
|
|
ServiceRegister(service *api.AgentServiceRegistration) error
|
|
|
|
ServiceDeregister(serviceID string) error
|
|
|
|
UpdateTTL(id, output, status string) error
|
|
|
|
}
|
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
// addrParser is usually the Task.FindHostAndPortFor method for turning a
|
|
|
|
// portLabel into an address and port.
|
|
|
|
type addrParser func(portLabel string) (string, int)
|
|
|
|
|
|
|
|
// operations are submitted to the main loop via commit() for synchronizing
|
|
|
|
// with Consul.
|
|
|
|
type operations struct {
|
|
|
|
regServices []*api.AgentServiceRegistration
|
|
|
|
regChecks []*api.AgentCheckRegistration
|
|
|
|
scripts []*scriptCheck
|
|
|
|
|
|
|
|
deregServices []string
|
|
|
|
deregChecks []string
|
|
|
|
}
|
|
|
|
|
2017-02-01 00:43:57 +00:00
|
|
|
// ServiceClient handles task and agent service registration with Consul.
|
|
|
|
type ServiceClient struct {
|
2017-04-08 00:10:26 +00:00
|
|
|
client AgentAPI
|
|
|
|
logger *log.Logger
|
|
|
|
retryInterval time.Duration
|
|
|
|
maxRetryInterval time.Duration
|
2017-02-01 00:43:57 +00:00
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
// exitCh is closed when the main Run loop exits
|
|
|
|
exitCh chan struct{}
|
2017-02-01 00:43:57 +00:00
|
|
|
|
|
|
|
// shutdownCh is closed when the client should shutdown
|
|
|
|
shutdownCh chan struct{}
|
|
|
|
|
|
|
|
// shutdownWait is how long Shutdown() blocks waiting for the final
|
|
|
|
// sync() to finish. Defaults to defaultShutdownWait
|
|
|
|
shutdownWait time.Duration
|
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
opCh chan *operations
|
2017-02-01 00:43:57 +00:00
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
services map[string]*api.AgentServiceRegistration
|
|
|
|
checks map[string]*api.AgentCheckRegistration
|
|
|
|
scripts map[string]*scriptCheck
|
2017-02-01 00:43:57 +00:00
|
|
|
runningScripts map[string]*scriptHandle
|
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
// agent services and checks record entries for the agent itself which
|
|
|
|
// should be removed on shutdown
|
2017-02-01 00:43:57 +00:00
|
|
|
agentServices map[string]struct{}
|
|
|
|
agentChecks map[string]struct{}
|
2017-04-08 00:10:26 +00:00
|
|
|
agentLock sync.Mutex
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewServiceClient creates a new Consul ServiceClient from an existing Consul API
|
|
|
|
// Client and logger.
|
|
|
|
func NewServiceClient(consulClient AgentAPI, logger *log.Logger) *ServiceClient {
|
|
|
|
return &ServiceClient{
|
2017-04-08 00:10:26 +00:00
|
|
|
client: consulClient,
|
|
|
|
logger: logger,
|
|
|
|
retryInterval: defaultRetryInterval,
|
|
|
|
maxRetryInterval: defaultMaxRetryInterval,
|
|
|
|
exitCh: make(chan struct{}),
|
|
|
|
shutdownCh: make(chan struct{}),
|
|
|
|
shutdownWait: defaultShutdownWait,
|
|
|
|
opCh: make(chan *operations, 8),
|
|
|
|
services: make(map[string]*api.AgentServiceRegistration),
|
|
|
|
checks: make(map[string]*api.AgentCheckRegistration),
|
|
|
|
scripts: make(map[string]*scriptCheck),
|
|
|
|
runningScripts: make(map[string]*scriptHandle),
|
|
|
|
agentServices: make(map[string]struct{}),
|
|
|
|
agentChecks: make(map[string]struct{}),
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Run the Consul main loop which retries operations against Consul. It should
|
|
|
|
// be called exactly once.
|
|
|
|
func (c *ServiceClient) Run() {
|
2017-04-08 00:10:26 +00:00
|
|
|
defer close(c.exitCh)
|
|
|
|
retryTimer := time.NewTimer(0)
|
|
|
|
<-retryTimer.C // disabled by default
|
|
|
|
failures := 0
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-retryTimer.C:
|
|
|
|
case <-c.shutdownCh:
|
|
|
|
case ops := <-c.opCh:
|
|
|
|
c.merge(ops)
|
|
|
|
}
|
2017-02-01 00:43:57 +00:00
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
if err := c.sync(); err != nil {
|
|
|
|
if failures == 0 {
|
2017-04-12 19:07:10 +00:00
|
|
|
c.logger.Printf("[WARN] consul.sync: failed to update services in Consul: %v", err)
|
2017-04-08 00:10:26 +00:00
|
|
|
}
|
|
|
|
failures++
|
|
|
|
if !retryTimer.Stop() {
|
2017-04-12 19:07:10 +00:00
|
|
|
select {
|
|
|
|
case <-retryTimer.C:
|
|
|
|
default:
|
|
|
|
}
|
2017-04-08 00:10:26 +00:00
|
|
|
}
|
|
|
|
backoff := c.retryInterval * time.Duration(failures)
|
|
|
|
if backoff > c.maxRetryInterval {
|
|
|
|
backoff = c.maxRetryInterval
|
|
|
|
}
|
|
|
|
retryTimer.Reset(backoff)
|
|
|
|
} else {
|
|
|
|
if failures > 0 {
|
2017-04-12 19:07:10 +00:00
|
|
|
c.logger.Printf("[INFO] consul.sync: successfully updated services in Consul")
|
2017-04-08 00:10:26 +00:00
|
|
|
failures = 0
|
|
|
|
}
|
|
|
|
}
|
2017-02-01 00:43:57 +00:00
|
|
|
|
|
|
|
select {
|
2017-04-08 00:10:26 +00:00
|
|
|
case <-c.shutdownCh:
|
|
|
|
// Exit only after sync'ing all outstanding operations
|
|
|
|
if len(c.opCh) > 0 {
|
|
|
|
for len(c.opCh) > 0 {
|
|
|
|
c.merge(<-c.opCh)
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
2017-04-08 00:10:26 +00:00
|
|
|
continue
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
return
|
2017-04-08 00:10:26 +00:00
|
|
|
default:
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
2017-04-08 00:10:26 +00:00
|
|
|
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-04-18 00:07:42 +00:00
|
|
|
// commit operations unless already shutting down.
|
|
|
|
func (c *ServiceClient) commit(ops *operations) {
|
2017-02-01 00:43:57 +00:00
|
|
|
select {
|
2017-04-08 00:10:26 +00:00
|
|
|
case c.opCh <- ops:
|
|
|
|
case <-c.shutdownCh:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// merge registrations into state map prior to sync'ing with Consul
|
|
|
|
func (c *ServiceClient) merge(ops *operations) {
|
|
|
|
for _, s := range ops.regServices {
|
|
|
|
c.services[s.ID] = s
|
|
|
|
}
|
|
|
|
for _, check := range ops.regChecks {
|
|
|
|
c.checks[check.ID] = check
|
|
|
|
}
|
|
|
|
for _, s := range ops.scripts {
|
|
|
|
c.scripts[s.id] = s
|
|
|
|
}
|
|
|
|
for _, sid := range ops.deregServices {
|
|
|
|
delete(c.services, sid)
|
|
|
|
}
|
|
|
|
for _, cid := range ops.deregChecks {
|
|
|
|
if script, ok := c.runningScripts[cid]; ok {
|
|
|
|
script.cancel()
|
|
|
|
delete(c.scripts, cid)
|
|
|
|
}
|
|
|
|
delete(c.checks, cid)
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// sync enqueued operations.
|
|
|
|
func (c *ServiceClient) sync() error {
|
2017-04-08 00:10:26 +00:00
|
|
|
sreg, creg, sdereg, cdereg := 0, 0, 0, 0
|
2017-02-01 00:43:57 +00:00
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
consulServices, err := c.client.Services()
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("error querying Consul services: %v", err)
|
|
|
|
}
|
2017-02-01 00:43:57 +00:00
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
consulChecks, err := c.client.Checks()
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("error querying Consul checks: %v", err)
|
|
|
|
}
|
2017-02-01 00:43:57 +00:00
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
// Remove Nomad services in Consul but unknown locally
|
|
|
|
for id := range consulServices {
|
|
|
|
if _, ok := c.services[id]; ok {
|
|
|
|
// Known service, skip
|
|
|
|
continue
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
2017-04-08 00:10:26 +00:00
|
|
|
if !isNomadService(id) {
|
|
|
|
// Not managed by Nomad, skip
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
// Unknown Nomad managed service; kill
|
|
|
|
if err := c.client.ServiceDeregister(id); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
sdereg++
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
|
2017-04-18 04:15:13 +00:00
|
|
|
// Track services whose ports have changed as their checks may also
|
|
|
|
// need updating
|
|
|
|
portsChanged := make(map[string]struct{}, len(c.services))
|
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
// Add Nomad services missing from Consul
|
2017-04-18 04:15:13 +00:00
|
|
|
for id, locals := range c.services {
|
|
|
|
if remotes, ok := consulServices[id]; ok {
|
|
|
|
if locals.Port == remotes.Port {
|
|
|
|
// Already exists in Consul; skip
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
// Port changed, reregister it and its checks
|
|
|
|
portsChanged[id] = struct{}{}
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
2017-04-18 04:15:13 +00:00
|
|
|
if err = c.client.ServiceRegister(locals); err != nil {
|
2017-04-08 00:10:26 +00:00
|
|
|
return err
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
2017-04-08 00:10:26 +00:00
|
|
|
sreg++
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
// Remove Nomad checks in Consul but unknown locally
|
|
|
|
for id, check := range consulChecks {
|
|
|
|
if _, ok := c.checks[id]; ok {
|
2017-04-18 04:15:13 +00:00
|
|
|
// Known check, leave it
|
2017-04-08 00:10:26 +00:00
|
|
|
continue
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
2017-04-08 00:10:26 +00:00
|
|
|
if !isNomadService(check.ServiceID) {
|
|
|
|
// Not managed by Nomad, skip
|
|
|
|
continue
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
2017-04-08 00:10:26 +00:00
|
|
|
// Unknown Nomad managed check; kill
|
|
|
|
if err := c.client.CheckDeregister(id); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
cdereg++
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
// Add Nomad checks missing from Consul
|
|
|
|
for id, check := range c.checks {
|
2017-04-18 04:15:13 +00:00
|
|
|
if check, ok := consulChecks[id]; ok {
|
|
|
|
if _, changed := portsChanged[check.ServiceID]; !changed {
|
|
|
|
// Already in Consul and ports didn't change; skipping
|
|
|
|
continue
|
|
|
|
}
|
2017-04-08 00:10:26 +00:00
|
|
|
}
|
|
|
|
if err := c.client.CheckRegister(check); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
creg++
|
|
|
|
|
|
|
|
// Handle starting scripts
|
|
|
|
if script, ok := c.scripts[id]; ok {
|
2017-04-18 04:15:13 +00:00
|
|
|
// If it's already running, cancel and replace
|
|
|
|
if oldScript, running := c.runningScripts[id]; running {
|
|
|
|
oldScript.cancel()
|
2017-04-08 00:10:26 +00:00
|
|
|
}
|
2017-04-18 04:15:13 +00:00
|
|
|
// Start and store the handle
|
2017-04-08 00:10:26 +00:00
|
|
|
c.runningScripts[id] = script.run()
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
c.logger.Printf("[DEBUG] consul.sync: registered %d services, %d checks; deregistered %d services, %d checks",
|
|
|
|
sreg, creg, sdereg, cdereg)
|
2017-02-01 00:43:57 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-04-13 23:59:27 +00:00
|
|
|
// RegisterAgent registers Nomad agents (client or server). The
|
|
|
|
// Service.PortLabel should be a literal port to be parsed with SplitHostPort.
|
|
|
|
// Script checks are not supported and will return an error. Registration is
|
|
|
|
// asynchronous.
|
2017-02-01 00:43:57 +00:00
|
|
|
//
|
|
|
|
// Agents will be deregistered when Shutdown is called.
|
|
|
|
func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service) error {
|
2017-04-08 00:10:26 +00:00
|
|
|
ops := operations{}
|
2017-02-01 00:43:57 +00:00
|
|
|
|
2017-04-04 00:08:08 +00:00
|
|
|
for _, service := range services {
|
2017-02-01 00:43:57 +00:00
|
|
|
id := makeAgentServiceID(role, service)
|
2017-04-13 23:59:27 +00:00
|
|
|
|
|
|
|
// Unlike tasks, agents don't use port labels. Agent ports are
|
|
|
|
// stored directly in the PortLabel.
|
2017-02-01 00:43:57 +00:00
|
|
|
host, rawport, err := net.SplitHostPort(service.PortLabel)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("error parsing port label %q from service %q: %v", service.PortLabel, service.Name, err)
|
|
|
|
}
|
|
|
|
port, err := strconv.Atoi(rawport)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("error parsing port %q from service %q: %v", rawport, service.Name, err)
|
|
|
|
}
|
|
|
|
serviceReg := &api.AgentServiceRegistration{
|
|
|
|
ID: id,
|
|
|
|
Name: service.Name,
|
|
|
|
Tags: service.Tags,
|
|
|
|
Address: host,
|
|
|
|
Port: port,
|
|
|
|
}
|
2017-04-08 00:10:26 +00:00
|
|
|
ops.regServices = append(ops.regServices, serviceReg)
|
2017-02-01 00:43:57 +00:00
|
|
|
|
|
|
|
for _, check := range service.Checks {
|
|
|
|
checkID := createCheckID(id, check)
|
|
|
|
if check.Type == structs.ServiceCheckScript {
|
|
|
|
return fmt.Errorf("service %q contains invalid check: agent checks do not support scripts", service.Name)
|
|
|
|
}
|
|
|
|
checkHost, checkPort := serviceReg.Address, serviceReg.Port
|
|
|
|
if check.PortLabel != "" {
|
2017-04-13 23:59:27 +00:00
|
|
|
// Unlike tasks, agents don't use port labels. Agent ports are
|
|
|
|
// stored directly in the PortLabel.
|
2017-02-01 00:43:57 +00:00
|
|
|
host, rawport, err := net.SplitHostPort(check.PortLabel)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("error parsing port label %q from check %q: %v", service.PortLabel, check.Name, err)
|
|
|
|
}
|
|
|
|
port, err := strconv.Atoi(rawport)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("error parsing port %q from check %q: %v", rawport, check.Name, err)
|
|
|
|
}
|
|
|
|
checkHost, checkPort = host, port
|
|
|
|
}
|
|
|
|
checkReg, err := createCheckReg(id, checkID, check, checkHost, checkPort)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("failed to add check %q: %v", check.Name, err)
|
|
|
|
}
|
2017-04-08 00:10:26 +00:00
|
|
|
ops.regChecks = append(ops.regChecks, checkReg)
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-04-18 00:07:42 +00:00
|
|
|
// Don't bother committing agent checks if we're already shutting down
|
|
|
|
c.agentLock.Lock()
|
|
|
|
defer c.agentLock.Unlock()
|
|
|
|
select {
|
|
|
|
case <-c.shutdownCh:
|
2017-04-08 00:10:26 +00:00
|
|
|
return nil
|
2017-04-18 00:07:42 +00:00
|
|
|
default:
|
2017-04-08 00:10:26 +00:00
|
|
|
}
|
2017-02-01 00:43:57 +00:00
|
|
|
|
2017-04-18 00:07:42 +00:00
|
|
|
// Now add them to the registration queue
|
|
|
|
c.commit(&ops)
|
|
|
|
|
2017-02-01 00:43:57 +00:00
|
|
|
// Record IDs for deregistering on shutdown
|
2017-04-08 00:10:26 +00:00
|
|
|
for _, id := range ops.regServices {
|
2017-04-13 20:49:23 +00:00
|
|
|
c.agentServices[id.ID] = struct{}{}
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
2017-04-08 00:10:26 +00:00
|
|
|
for _, id := range ops.regChecks {
|
2017-04-13 20:49:23 +00:00
|
|
|
c.agentChecks[id.ID] = struct{}{}
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
2017-04-04 00:08:08 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// serviceRegs creates service registrations, check registrations, and script
|
|
|
|
// checks from a service.
|
2017-04-08 00:10:26 +00:00
|
|
|
func (c *ServiceClient) serviceRegs(ops *operations, allocID string, service *structs.Service,
|
2017-04-12 20:26:55 +00:00
|
|
|
exec driver.ScriptExecutor, task *structs.Task) error {
|
2017-04-04 00:08:08 +00:00
|
|
|
|
|
|
|
id := makeTaskServiceID(allocID, task.Name, service)
|
|
|
|
host, port := task.FindHostAndPortFor(service.PortLabel)
|
|
|
|
serviceReg := &api.AgentServiceRegistration{
|
|
|
|
ID: id,
|
|
|
|
Name: service.Name,
|
|
|
|
Tags: make([]string, len(service.Tags)),
|
|
|
|
Address: host,
|
|
|
|
Port: port,
|
|
|
|
}
|
|
|
|
// copy isn't strictly necessary but can avoid bugs especially
|
|
|
|
// with tests that may reuse Tasks
|
|
|
|
copy(serviceReg.Tags, service.Tags)
|
2017-04-08 00:10:26 +00:00
|
|
|
ops.regServices = append(ops.regServices, serviceReg)
|
2017-04-04 00:08:08 +00:00
|
|
|
|
|
|
|
for _, check := range service.Checks {
|
2017-04-13 23:43:38 +00:00
|
|
|
checkID := createCheckID(id, check)
|
|
|
|
if check.Type == structs.ServiceCheckScript {
|
|
|
|
if exec == nil {
|
|
|
|
return fmt.Errorf("driver doesn't support script checks")
|
|
|
|
}
|
|
|
|
ops.scripts = append(ops.scripts, newScriptCheck(
|
|
|
|
allocID, task.Name, checkID, check, exec, c.client, c.logger, c.shutdownCh))
|
|
|
|
|
|
|
|
}
|
|
|
|
host, port := serviceReg.Address, serviceReg.Port
|
|
|
|
if check.PortLabel != "" {
|
|
|
|
host, port = task.FindHostAndPortFor(check.PortLabel)
|
|
|
|
}
|
|
|
|
checkReg, err := createCheckReg(id, checkID, check, host, port)
|
2017-04-04 00:08:08 +00:00
|
|
|
if err != nil {
|
2017-04-13 23:43:38 +00:00
|
|
|
return fmt.Errorf("failed to add check %q: %v", check.Name, err)
|
2017-04-04 00:08:08 +00:00
|
|
|
}
|
2017-04-13 23:43:38 +00:00
|
|
|
ops.regChecks = append(ops.regChecks, checkReg)
|
2017-04-04 00:08:08 +00:00
|
|
|
}
|
2017-02-01 00:43:57 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// RegisterTask with Consul. Adds all sevice entries and checks to Consul. If
|
|
|
|
// exec is nil and a script check exists an error is returned.
|
|
|
|
//
|
|
|
|
// Actual communication with Consul is done asynchrously (see Run).
|
2017-04-12 20:26:55 +00:00
|
|
|
func (c *ServiceClient) RegisterTask(allocID string, task *structs.Task, exec driver.ScriptExecutor) error {
|
2017-04-08 00:10:26 +00:00
|
|
|
ops := &operations{}
|
2017-04-04 00:08:08 +00:00
|
|
|
for _, service := range task.Services {
|
|
|
|
if err := c.serviceRegs(ops, allocID, service, exec, task); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
2017-04-08 00:10:26 +00:00
|
|
|
c.commit(ops)
|
2017-04-04 00:08:08 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// UpdateTask in Consul. Does not alter the service if only checks have
|
|
|
|
// changed.
|
2017-04-12 20:26:55 +00:00
|
|
|
func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Task, exec driver.ScriptExecutor) error {
|
2017-04-08 00:10:26 +00:00
|
|
|
ops := &operations{}
|
2017-04-04 00:08:08 +00:00
|
|
|
|
|
|
|
existingIDs := make(map[string]*structs.Service, len(existing.Services))
|
|
|
|
for _, s := range existing.Services {
|
|
|
|
existingIDs[makeTaskServiceID(allocID, existing.Name, s)] = s
|
|
|
|
}
|
|
|
|
newIDs := make(map[string]*structs.Service, len(newTask.Services))
|
|
|
|
for _, s := range newTask.Services {
|
|
|
|
newIDs[makeTaskServiceID(allocID, newTask.Name, s)] = s
|
|
|
|
}
|
|
|
|
|
|
|
|
// Loop over existing Service IDs to see if they have been removed or
|
|
|
|
// updated.
|
|
|
|
for existingID, existingSvc := range existingIDs {
|
|
|
|
newSvc, ok := newIDs[existingID]
|
|
|
|
if !ok {
|
|
|
|
// Existing sevice entry removed
|
2017-04-08 00:10:26 +00:00
|
|
|
ops.deregServices = append(ops.deregServices, existingID)
|
2017-04-04 00:08:08 +00:00
|
|
|
for _, check := range existingSvc.Checks {
|
2017-04-08 00:10:26 +00:00
|
|
|
ops.deregChecks = append(ops.deregChecks, createCheckID(existingID, check))
|
2017-04-04 00:08:08 +00:00
|
|
|
}
|
|
|
|
continue
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
|
2017-04-18 04:15:13 +00:00
|
|
|
if newSvc.PortLabel == existingSvc.PortLabel {
|
|
|
|
// Service exists and hasn't changed, don't add it later
|
|
|
|
delete(newIDs, existingID)
|
|
|
|
}
|
2017-04-08 00:10:26 +00:00
|
|
|
|
|
|
|
// Check to see what checks were updated
|
|
|
|
existingChecks := make(map[string]struct{}, len(existingSvc.Checks))
|
2017-04-04 00:08:08 +00:00
|
|
|
for _, check := range existingSvc.Checks {
|
2017-04-13 20:49:23 +00:00
|
|
|
existingChecks[createCheckID(existingID, check)] = struct{}{}
|
2017-04-04 00:08:08 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Register new checks
|
|
|
|
for _, check := range newSvc.Checks {
|
|
|
|
checkID := createCheckID(existingID, check)
|
2017-04-08 00:10:26 +00:00
|
|
|
if _, exists := existingChecks[checkID]; exists {
|
2017-04-18 04:15:13 +00:00
|
|
|
// Check exists, so don't remove it
|
2017-04-08 00:10:26 +00:00
|
|
|
delete(existingChecks, checkID)
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
// Remove existing checks not in updated service
|
|
|
|
for cid := range existingChecks {
|
|
|
|
ops.deregChecks = append(ops.deregChecks, cid)
|
|
|
|
}
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
|
2017-04-04 00:08:08 +00:00
|
|
|
// Any remaining services should just be enqueued directly
|
|
|
|
for _, newSvc := range newIDs {
|
|
|
|
err := c.serviceRegs(ops, allocID, newSvc, exec, newTask)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
c.commit(ops)
|
2017-02-01 00:43:57 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// RemoveTask from Consul. Removes all service entries and checks.
|
|
|
|
//
|
|
|
|
// Actual communication with Consul is done asynchrously (see Run).
|
|
|
|
func (c *ServiceClient) RemoveTask(allocID string, task *structs.Task) {
|
2017-04-08 00:10:26 +00:00
|
|
|
ops := operations{}
|
2017-02-01 00:43:57 +00:00
|
|
|
|
2017-04-04 00:08:08 +00:00
|
|
|
for _, service := range task.Services {
|
2017-02-01 00:43:57 +00:00
|
|
|
id := makeTaskServiceID(allocID, task.Name, service)
|
2017-04-08 00:10:26 +00:00
|
|
|
ops.deregServices = append(ops.deregServices, id)
|
2017-02-01 00:43:57 +00:00
|
|
|
|
|
|
|
for _, check := range service.Checks {
|
2017-04-08 00:10:26 +00:00
|
|
|
ops.deregChecks = append(ops.deregChecks, createCheckID(id, check))
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Now add them to the deregistration fields; main Run loop will update
|
2017-04-08 00:10:26 +00:00
|
|
|
c.commit(&ops)
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Shutdown the Consul client. Update running task registations and deregister
|
2017-04-18 00:07:42 +00:00
|
|
|
// agent from Consul. On first call blocks up to shutdownWait before giving up
|
|
|
|
// on syncing operations.
|
2017-02-01 00:43:57 +00:00
|
|
|
func (c *ServiceClient) Shutdown() error {
|
2017-04-18 00:07:42 +00:00
|
|
|
// Serialize Shutdown calls with RegisterAgent to prevent leaking agent
|
|
|
|
// entries.
|
|
|
|
c.agentLock.Lock()
|
2017-02-01 00:43:57 +00:00
|
|
|
select {
|
|
|
|
case <-c.shutdownCh:
|
|
|
|
return nil
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
2017-04-18 00:07:42 +00:00
|
|
|
// Deregister Nomad agent Consul entries before closing shutdown.
|
2017-04-12 19:07:10 +00:00
|
|
|
ops := operations{}
|
2017-02-01 00:43:57 +00:00
|
|
|
for id := range c.agentServices {
|
2017-04-12 19:07:10 +00:00
|
|
|
ops.deregServices = append(ops.deregServices, id)
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
for id := range c.agentChecks {
|
2017-04-12 19:07:10 +00:00
|
|
|
ops.deregChecks = append(ops.deregChecks, id)
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
2017-04-12 19:07:10 +00:00
|
|
|
c.commit(&ops)
|
|
|
|
|
|
|
|
// Then signal shutdown
|
|
|
|
close(c.shutdownCh)
|
|
|
|
|
2017-04-18 00:07:42 +00:00
|
|
|
// Safe to unlock after shutdownCh closed as RegisterAgent will check
|
|
|
|
// shutdownCh before committing.
|
|
|
|
c.agentLock.Unlock()
|
|
|
|
|
2017-04-12 19:07:10 +00:00
|
|
|
// Give run loop time to sync, but don't block indefinitely
|
|
|
|
deadline := time.After(c.shutdownWait)
|
2017-02-01 00:43:57 +00:00
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
// Wait for Run to finish any outstanding operations and exit
|
2017-02-01 00:43:57 +00:00
|
|
|
select {
|
2017-04-08 00:10:26 +00:00
|
|
|
case <-c.exitCh:
|
2017-02-01 00:43:57 +00:00
|
|
|
case <-deadline:
|
|
|
|
// Don't wait forever though
|
2017-04-12 19:07:10 +00:00
|
|
|
return fmt.Errorf("timed out waiting for Consul operations to complete")
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Give script checks time to exit (no need to lock as Run() has exited)
|
|
|
|
for _, h := range c.runningScripts {
|
|
|
|
select {
|
|
|
|
case <-h.wait():
|
|
|
|
case <-deadline:
|
2017-04-12 19:07:10 +00:00
|
|
|
return fmt.Errorf("timed out waiting for script checks to run")
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
}
|
2017-04-12 19:07:10 +00:00
|
|
|
return nil
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// makeAgentServiceID creates a unique ID for identifying an agent service in
|
|
|
|
// Consul.
|
|
|
|
//
|
|
|
|
// Agent service IDs are of the form:
|
|
|
|
//
|
|
|
|
// {nomadServicePrefix}-{ROLE}-{Service.Name}-{Service.Tags...}
|
|
|
|
// Example Server ID: _nomad-server-nomad-serf
|
|
|
|
// Example Client ID: _nomad-client-nomad-client-http
|
|
|
|
//
|
|
|
|
func makeAgentServiceID(role string, service *structs.Service) string {
|
|
|
|
parts := make([]string, len(service.Tags)+3)
|
|
|
|
parts[0] = nomadServicePrefix
|
|
|
|
parts[1] = role
|
|
|
|
parts[2] = service.Name
|
|
|
|
copy(parts[3:], service.Tags)
|
|
|
|
return strings.Join(parts, "-")
|
|
|
|
}
|
|
|
|
|
|
|
|
// makeTaskServiceID creates a unique ID for identifying a task service in
|
|
|
|
// Consul.
|
|
|
|
//
|
|
|
|
// Task service IDs are of the form:
|
|
|
|
//
|
|
|
|
// {nomadServicePrefix}-executor-{ALLOC_ID}-{Service.Name}-{Service.Tags...}
|
|
|
|
// Example Service ID: _nomad-executor-1234-echo-http-tag1-tag2-tag3
|
|
|
|
//
|
|
|
|
func makeTaskServiceID(allocID, taskName string, service *structs.Service) string {
|
|
|
|
parts := make([]string, len(service.Tags)+5)
|
|
|
|
parts[0] = nomadServicePrefix
|
|
|
|
parts[1] = "executor"
|
|
|
|
parts[2] = allocID
|
|
|
|
parts[3] = taskName
|
|
|
|
parts[4] = service.Name
|
|
|
|
copy(parts[5:], service.Tags)
|
|
|
|
return strings.Join(parts, "-")
|
|
|
|
}
|
|
|
|
|
|
|
|
// createCheckID creates a unique ID for a check.
|
|
|
|
func createCheckID(serviceID string, check *structs.ServiceCheck) string {
|
|
|
|
return check.Hash(serviceID)
|
|
|
|
}
|
|
|
|
|
|
|
|
// createCheckReg creates a Check that can be registered with Consul.
|
|
|
|
//
|
2017-04-12 20:27:56 +00:00
|
|
|
// Script checks simply have a TTL set and the caller is responsible for
|
|
|
|
// running the script and heartbeating.
|
2017-02-01 00:43:57 +00:00
|
|
|
func createCheckReg(serviceID, checkID string, check *structs.ServiceCheck, host string, port int) (*api.AgentCheckRegistration, error) {
|
|
|
|
chkReg := api.AgentCheckRegistration{
|
|
|
|
ID: checkID,
|
|
|
|
Name: check.Name,
|
|
|
|
ServiceID: serviceID,
|
|
|
|
}
|
|
|
|
chkReg.Status = check.InitialStatus
|
|
|
|
chkReg.Timeout = check.Timeout.String()
|
|
|
|
chkReg.Interval = check.Interval.String()
|
|
|
|
|
|
|
|
switch check.Type {
|
|
|
|
case structs.ServiceCheckHTTP:
|
|
|
|
if check.Protocol == "" {
|
|
|
|
check.Protocol = "http"
|
|
|
|
}
|
|
|
|
base := url.URL{
|
|
|
|
Scheme: check.Protocol,
|
|
|
|
Host: net.JoinHostPort(host, strconv.Itoa(port)),
|
|
|
|
}
|
|
|
|
relative, err := url.Parse(check.Path)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
url := base.ResolveReference(relative)
|
|
|
|
chkReg.HTTP = url.String()
|
|
|
|
case structs.ServiceCheckTCP:
|
|
|
|
chkReg.TCP = net.JoinHostPort(host, strconv.Itoa(port))
|
|
|
|
case structs.ServiceCheckScript:
|
|
|
|
chkReg.TTL = (check.Interval + ttlCheckBuffer).String()
|
|
|
|
default:
|
|
|
|
return nil, fmt.Errorf("check type %+q not valid", check.Type)
|
|
|
|
}
|
|
|
|
return &chkReg, nil
|
|
|
|
}
|
2017-04-08 00:10:26 +00:00
|
|
|
|
|
|
|
// isNomadService returns true if the ID matches the pattern of a Nomad managed
|
|
|
|
// service.
|
|
|
|
func isNomadService(id string) bool {
|
|
|
|
return strings.HasPrefix(id, nomadServicePrefix)
|
|
|
|
}
|