2017-02-01 00:43:57 +00:00
package consul
import (
"fmt"
"log"
"net"
"net/url"
"strconv"
"strings"
"sync"
2017-07-24 19:12:02 +00:00
"sync/atomic"
2017-02-01 00:43:57 +00:00
"time"
2017-04-18 23:23:39 +00:00
metrics "github.com/armon/go-metrics"
2017-02-01 00:43:57 +00:00
"github.com/hashicorp/consul/api"
2017-04-12 20:26:55 +00:00
"github.com/hashicorp/nomad/client/driver"
2017-06-09 17:29:41 +00:00
cstructs "github.com/hashicorp/nomad/client/structs"
2017-02-01 00:43:57 +00:00
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// nomadServicePrefix is the first prefix that scopes all Nomad registered
// services
nomadServicePrefix = "_nomad"
2017-04-08 00:10:26 +00:00
// defaultRetryInterval is how quickly to retry syncing services and
// checks to Consul when an error occurs. Will backoff up to a max.
defaultRetryInterval = time . Second
// defaultMaxRetryInterval is the default max retry interval.
defaultMaxRetryInterval = 30 * time . Second
2017-02-01 00:43:57 +00:00
// ttlCheckBuffer is the time interval that Nomad can take to report Consul
// the check result
ttlCheckBuffer = 31 * time . Second
// defaultShutdownWait is how long Shutdown() should block waiting for
// enqueued operations to sync to Consul by default.
defaultShutdownWait = time . Minute
// DefaultQueryWaitDuration is the max duration the Consul Agent will
// spend waiting for a response from a Consul Query.
DefaultQueryWaitDuration = 2 * time . Second
// ServiceTagHTTP is the tag assigned to HTTP services
ServiceTagHTTP = "http"
// ServiceTagRPC is the tag assigned to RPC services
ServiceTagRPC = "rpc"
// ServiceTagSerf is the tag assigned to Serf services
ServiceTagSerf = "serf"
)
// CatalogAPI is the consul/api.Catalog API used by Nomad.
type CatalogAPI interface {
Datacenters ( ) ( [ ] string , error )
Service ( service , tag string , q * api . QueryOptions ) ( [ ] * api . CatalogService , * api . QueryMeta , error )
}
// AgentAPI is the consul/api.Agent API used by Nomad.
type AgentAPI interface {
2017-04-08 00:10:26 +00:00
Services ( ) ( map [ string ] * api . AgentService , error )
Checks ( ) ( map [ string ] * api . AgentCheck , error )
2017-02-01 00:43:57 +00:00
CheckRegister ( check * api . AgentCheckRegistration ) error
CheckDeregister ( checkID string ) error
ServiceRegister ( service * api . AgentServiceRegistration ) error
ServiceDeregister ( serviceID string ) error
UpdateTTL ( id , output , status string ) error
}
2017-04-08 00:10:26 +00:00
// addrParser is usually the Task.FindHostAndPortFor method for turning a
// portLabel into an address and port.
type addrParser func ( portLabel string ) ( string , int )
// operations are submitted to the main loop via commit() for synchronizing
// with Consul.
type operations struct {
regServices [ ] * api . AgentServiceRegistration
regChecks [ ] * api . AgentCheckRegistration
scripts [ ] * scriptCheck
deregServices [ ] string
deregChecks [ ] string
}
2017-02-01 00:43:57 +00:00
// ServiceClient handles task and agent service registration with Consul.
type ServiceClient struct {
2017-04-08 00:10:26 +00:00
client AgentAPI
logger * log . Logger
retryInterval time . Duration
maxRetryInterval time . Duration
2017-02-01 00:43:57 +00:00
2017-04-19 19:18:06 +00:00
// skipVerifySupport is true if the local Consul agent suppots TLSSkipVerify
skipVerifySupport bool
2017-04-08 00:10:26 +00:00
// exitCh is closed when the main Run loop exits
exitCh chan struct { }
2017-02-01 00:43:57 +00:00
// shutdownCh is closed when the client should shutdown
shutdownCh chan struct { }
// shutdownWait is how long Shutdown() blocks waiting for the final
// sync() to finish. Defaults to defaultShutdownWait
shutdownWait time . Duration
2017-04-08 00:10:26 +00:00
opCh chan * operations
2017-02-01 00:43:57 +00:00
2017-04-08 00:10:26 +00:00
services map [ string ] * api . AgentServiceRegistration
checks map [ string ] * api . AgentCheckRegistration
scripts map [ string ] * scriptCheck
2017-02-01 00:43:57 +00:00
runningScripts map [ string ] * scriptHandle
2017-04-08 00:10:26 +00:00
// agent services and checks record entries for the agent itself which
// should be removed on shutdown
2017-02-01 00:43:57 +00:00
agentServices map [ string ] struct { }
agentChecks map [ string ] struct { }
2017-04-08 00:10:26 +00:00
agentLock sync . Mutex
2017-07-24 19:12:02 +00:00
// seen is 1 if Consul has ever been seen; otherise 0. Accessed with
// atomics.
2017-08-04 17:14:16 +00:00
seen int32
2017-02-01 00:43:57 +00:00
}
// NewServiceClient creates a new Consul ServiceClient from an existing Consul API
// Client and logger.
2017-04-19 19:18:06 +00:00
func NewServiceClient ( consulClient AgentAPI , skipVerifySupport bool , logger * log . Logger ) * ServiceClient {
2017-02-01 00:43:57 +00:00
return & ServiceClient {
2017-04-19 19:18:06 +00:00
client : consulClient ,
skipVerifySupport : skipVerifySupport ,
logger : logger ,
retryInterval : defaultRetryInterval ,
maxRetryInterval : defaultMaxRetryInterval ,
exitCh : make ( chan struct { } ) ,
shutdownCh : make ( chan struct { } ) ,
shutdownWait : defaultShutdownWait ,
opCh : make ( chan * operations , 8 ) ,
services : make ( map [ string ] * api . AgentServiceRegistration ) ,
checks : make ( map [ string ] * api . AgentCheckRegistration ) ,
scripts : make ( map [ string ] * scriptCheck ) ,
runningScripts : make ( map [ string ] * scriptHandle ) ,
agentServices : make ( map [ string ] struct { } ) ,
agentChecks : make ( map [ string ] struct { } ) ,
2017-02-01 00:43:57 +00:00
}
}
2017-07-25 19:13:05 +00:00
// seen is used by markSeen and hasSeen
2017-07-24 19:12:02 +00:00
const seen = 1
2017-07-24 23:48:40 +00:00
// markSeen marks Consul as having been seen (meaning at least one operation
2017-07-24 19:12:02 +00:00
// has succeeded).
2017-07-24 23:48:40 +00:00
func ( c * ServiceClient ) markSeen ( ) {
2017-08-04 17:14:16 +00:00
atomic . StoreInt32 ( & c . seen , seen )
2017-07-24 19:12:02 +00:00
}
2017-07-24 23:48:40 +00:00
// hasSeen returns true if any Consul operation has ever succeeded. Useful to
2017-07-24 19:12:02 +00:00
// squelch errors if Consul isn't running.
2017-07-24 23:48:40 +00:00
func ( c * ServiceClient ) hasSeen ( ) bool {
2017-08-04 17:14:16 +00:00
return atomic . LoadInt32 ( & c . seen ) == seen
2017-07-24 19:12:02 +00:00
}
2017-02-01 00:43:57 +00:00
// Run the Consul main loop which retries operations against Consul. It should
// be called exactly once.
func ( c * ServiceClient ) Run ( ) {
2017-04-08 00:10:26 +00:00
defer close ( c . exitCh )
retryTimer := time . NewTimer ( 0 )
<- retryTimer . C // disabled by default
failures := 0
for {
select {
case <- retryTimer . C :
case <- c . shutdownCh :
case ops := <- c . opCh :
c . merge ( ops )
}
2017-02-01 00:43:57 +00:00
2017-04-08 00:10:26 +00:00
if err := c . sync ( ) ; err != nil {
2017-07-24 23:48:40 +00:00
if failures == 0 {
c . logger . Printf ( "[WARN] consul.sync: failed to update services in Consul: %v" , err )
2017-04-08 00:10:26 +00:00
}
2017-07-24 22:37:53 +00:00
failures ++
2017-04-08 00:10:26 +00:00
if ! retryTimer . Stop ( ) {
2017-04-18 23:36:20 +00:00
// Timer already expired, since the timer may
// or may not have been read in the select{}
// above, conditionally receive on it
2017-04-12 19:07:10 +00:00
select {
case <- retryTimer . C :
default :
}
2017-04-08 00:10:26 +00:00
}
backoff := c . retryInterval * time . Duration ( failures )
if backoff > c . maxRetryInterval {
backoff = c . maxRetryInterval
}
retryTimer . Reset ( backoff )
} else {
if failures > 0 {
2017-04-12 19:07:10 +00:00
c . logger . Printf ( "[INFO] consul.sync: successfully updated services in Consul" )
2017-04-08 00:10:26 +00:00
failures = 0
}
}
2017-02-01 00:43:57 +00:00
select {
2017-04-08 00:10:26 +00:00
case <- c . shutdownCh :
// Exit only after sync'ing all outstanding operations
if len ( c . opCh ) > 0 {
for len ( c . opCh ) > 0 {
c . merge ( <- c . opCh )
2017-02-01 00:43:57 +00:00
}
2017-04-08 00:10:26 +00:00
continue
2017-02-01 00:43:57 +00:00
}
return
2017-04-08 00:10:26 +00:00
default :
2017-02-01 00:43:57 +00:00
}
2017-04-08 00:10:26 +00:00
2017-02-01 00:43:57 +00:00
}
}
2017-04-18 00:07:42 +00:00
// commit operations unless already shutting down.
func ( c * ServiceClient ) commit ( ops * operations ) {
2017-02-01 00:43:57 +00:00
select {
2017-04-08 00:10:26 +00:00
case c . opCh <- ops :
case <- c . shutdownCh :
}
}
// merge registrations into state map prior to sync'ing with Consul
func ( c * ServiceClient ) merge ( ops * operations ) {
for _ , s := range ops . regServices {
c . services [ s . ID ] = s
}
for _ , check := range ops . regChecks {
c . checks [ check . ID ] = check
}
for _ , s := range ops . scripts {
c . scripts [ s . id ] = s
}
for _ , sid := range ops . deregServices {
delete ( c . services , sid )
}
for _ , cid := range ops . deregChecks {
if script , ok := c . runningScripts [ cid ] ; ok {
script . cancel ( )
delete ( c . scripts , cid )
2017-04-26 18:22:01 +00:00
delete ( c . runningScripts , cid )
2017-04-08 00:10:26 +00:00
}
delete ( c . checks , cid )
2017-02-01 00:43:57 +00:00
}
2017-04-18 23:23:39 +00:00
metrics . SetGauge ( [ ] string { "client" , "consul" , "services" } , float32 ( len ( c . services ) ) )
metrics . SetGauge ( [ ] string { "client" , "consul" , "checks" } , float32 ( len ( c . checks ) ) )
metrics . SetGauge ( [ ] string { "client" , "consul" , "script_checks" } , float32 ( len ( c . runningScripts ) ) )
2017-02-01 00:43:57 +00:00
}
// sync enqueued operations.
func ( c * ServiceClient ) sync ( ) error {
2017-04-08 00:10:26 +00:00
sreg , creg , sdereg , cdereg := 0 , 0 , 0 , 0
2017-02-01 00:43:57 +00:00
2017-04-08 00:10:26 +00:00
consulServices , err := c . client . Services ( )
if err != nil {
2017-04-18 23:23:39 +00:00
metrics . IncrCounter ( [ ] string { "client" , "consul" , "sync_failure" } , 1 )
2017-04-08 00:10:26 +00:00
return fmt . Errorf ( "error querying Consul services: %v" , err )
}
2017-02-01 00:43:57 +00:00
2017-04-08 00:10:26 +00:00
consulChecks , err := c . client . Checks ( )
if err != nil {
2017-04-18 23:23:39 +00:00
metrics . IncrCounter ( [ ] string { "client" , "consul" , "sync_failure" } , 1 )
2017-04-08 00:10:26 +00:00
return fmt . Errorf ( "error querying Consul checks: %v" , err )
}
2017-02-01 00:43:57 +00:00
2017-04-08 00:10:26 +00:00
// Remove Nomad services in Consul but unknown locally
for id := range consulServices {
if _ , ok := c . services [ id ] ; ok {
// Known service, skip
continue
2017-02-01 00:43:57 +00:00
}
2017-04-08 00:10:26 +00:00
if ! isNomadService ( id ) {
// Not managed by Nomad, skip
continue
}
// Unknown Nomad managed service; kill
if err := c . client . ServiceDeregister ( id ) ; err != nil {
2017-04-18 23:23:39 +00:00
metrics . IncrCounter ( [ ] string { "client" , "consul" , "sync_failure" } , 1 )
2017-04-08 00:10:26 +00:00
return err
}
sdereg ++
2017-04-18 23:23:39 +00:00
metrics . IncrCounter ( [ ] string { "client" , "consul" , "service_deregisrations" } , 1 )
2017-02-01 00:43:57 +00:00
}
2017-04-18 04:15:13 +00:00
// Track services whose ports have changed as their checks may also
// need updating
portsChanged := make ( map [ string ] struct { } , len ( c . services ) )
2017-04-08 00:10:26 +00:00
// Add Nomad services missing from Consul
2017-04-18 04:15:13 +00:00
for id , locals := range c . services {
if remotes , ok := consulServices [ id ] ; ok {
2017-06-16 23:35:16 +00:00
// Make sure Port and Address are stable since
// PortLabel and AddressMode aren't included in the
// service ID.
if locals . Port == remotes . Port && locals . Address == remotes . Address {
2017-04-18 04:15:13 +00:00
// Already exists in Consul; skip
continue
}
// Port changed, reregister it and its checks
portsChanged [ id ] = struct { } { }
2017-02-01 00:43:57 +00:00
}
2017-04-18 04:15:13 +00:00
if err = c . client . ServiceRegister ( locals ) ; err != nil {
2017-04-18 23:23:39 +00:00
metrics . IncrCounter ( [ ] string { "client" , "consul" , "sync_failure" } , 1 )
2017-04-08 00:10:26 +00:00
return err
2017-02-01 00:43:57 +00:00
}
2017-04-08 00:10:26 +00:00
sreg ++
2017-04-18 23:23:39 +00:00
metrics . IncrCounter ( [ ] string { "client" , "consul" , "service_regisrations" } , 1 )
2017-02-01 00:43:57 +00:00
}
2017-04-08 00:10:26 +00:00
// Remove Nomad checks in Consul but unknown locally
for id , check := range consulChecks {
if _ , ok := c . checks [ id ] ; ok {
2017-04-18 04:15:13 +00:00
// Known check, leave it
2017-04-08 00:10:26 +00:00
continue
2017-02-01 00:43:57 +00:00
}
2017-04-08 00:10:26 +00:00
if ! isNomadService ( check . ServiceID ) {
2017-06-16 23:35:16 +00:00
// Service not managed by Nomad, skip
2017-04-08 00:10:26 +00:00
continue
2017-02-01 00:43:57 +00:00
}
2017-04-08 00:10:26 +00:00
// Unknown Nomad managed check; kill
if err := c . client . CheckDeregister ( id ) ; err != nil {
2017-04-18 23:23:39 +00:00
metrics . IncrCounter ( [ ] string { "client" , "consul" , "sync_failure" } , 1 )
2017-04-08 00:10:26 +00:00
return err
}
cdereg ++
2017-04-18 23:23:39 +00:00
metrics . IncrCounter ( [ ] string { "client" , "consul" , "check_deregisrations" } , 1 )
2017-02-01 00:43:57 +00:00
}
2017-04-08 00:10:26 +00:00
// Add Nomad checks missing from Consul
for id , check := range c . checks {
2017-04-18 04:15:13 +00:00
if check , ok := consulChecks [ id ] ; ok {
if _ , changed := portsChanged [ check . ServiceID ] ; ! changed {
// Already in Consul and ports didn't change; skipping
continue
}
2017-04-08 00:10:26 +00:00
}
if err := c . client . CheckRegister ( check ) ; err != nil {
2017-04-18 23:23:39 +00:00
metrics . IncrCounter ( [ ] string { "client" , "consul" , "sync_failure" } , 1 )
2017-04-08 00:10:26 +00:00
return err
}
creg ++
2017-04-18 23:23:39 +00:00
metrics . IncrCounter ( [ ] string { "client" , "consul" , "check_regisrations" } , 1 )
2017-04-08 00:10:26 +00:00
// Handle starting scripts
if script , ok := c . scripts [ id ] ; ok {
2017-04-18 04:15:13 +00:00
// If it's already running, cancel and replace
if oldScript , running := c . runningScripts [ id ] ; running {
oldScript . cancel ( )
2017-04-08 00:10:26 +00:00
}
2017-04-18 04:15:13 +00:00
// Start and store the handle
2017-04-08 00:10:26 +00:00
c . runningScripts [ id ] = script . run ( )
2017-02-01 00:43:57 +00:00
}
}
2017-07-24 23:48:40 +00:00
// A Consul operation has succeeded, mark Consul as having been seen
c . markSeen ( )
2017-04-08 00:10:26 +00:00
c . logger . Printf ( "[DEBUG] consul.sync: registered %d services, %d checks; deregistered %d services, %d checks" ,
sreg , creg , sdereg , cdereg )
2017-02-01 00:43:57 +00:00
return nil
}
2017-04-13 23:59:27 +00:00
// RegisterAgent registers Nomad agents (client or server). The
// Service.PortLabel should be a literal port to be parsed with SplitHostPort.
// Script checks are not supported and will return an error. Registration is
// asynchronous.
2017-02-01 00:43:57 +00:00
//
// Agents will be deregistered when Shutdown is called.
func ( c * ServiceClient ) RegisterAgent ( role string , services [ ] * structs . Service ) error {
2017-04-08 00:10:26 +00:00
ops := operations { }
2017-02-01 00:43:57 +00:00
2017-04-04 00:08:08 +00:00
for _ , service := range services {
2017-02-01 00:43:57 +00:00
id := makeAgentServiceID ( role , service )
2017-04-13 23:59:27 +00:00
// Unlike tasks, agents don't use port labels. Agent ports are
// stored directly in the PortLabel.
2017-02-01 00:43:57 +00:00
host , rawport , err := net . SplitHostPort ( service . PortLabel )
if err != nil {
return fmt . Errorf ( "error parsing port label %q from service %q: %v" , service . PortLabel , service . Name , err )
}
port , err := strconv . Atoi ( rawport )
if err != nil {
return fmt . Errorf ( "error parsing port %q from service %q: %v" , rawport , service . Name , err )
}
serviceReg := & api . AgentServiceRegistration {
ID : id ,
Name : service . Name ,
Tags : service . Tags ,
Address : host ,
Port : port ,
}
2017-04-08 00:10:26 +00:00
ops . regServices = append ( ops . regServices , serviceReg )
2017-02-01 00:43:57 +00:00
for _ , check := range service . Checks {
2017-06-16 23:35:16 +00:00
checkID := makeCheckID ( id , check )
2017-02-01 00:43:57 +00:00
if check . Type == structs . ServiceCheckScript {
return fmt . Errorf ( "service %q contains invalid check: agent checks do not support scripts" , service . Name )
}
checkHost , checkPort := serviceReg . Address , serviceReg . Port
if check . PortLabel != "" {
2017-04-13 23:59:27 +00:00
// Unlike tasks, agents don't use port labels. Agent ports are
// stored directly in the PortLabel.
2017-02-01 00:43:57 +00:00
host , rawport , err := net . SplitHostPort ( check . PortLabel )
if err != nil {
return fmt . Errorf ( "error parsing port label %q from check %q: %v" , service . PortLabel , check . Name , err )
}
port , err := strconv . Atoi ( rawport )
if err != nil {
return fmt . Errorf ( "error parsing port %q from check %q: %v" , rawport , check . Name , err )
}
checkHost , checkPort = host , port
}
checkReg , err := createCheckReg ( id , checkID , check , checkHost , checkPort )
if err != nil {
return fmt . Errorf ( "failed to add check %q: %v" , check . Name , err )
}
2017-04-08 00:10:26 +00:00
ops . regChecks = append ( ops . regChecks , checkReg )
2017-02-01 00:43:57 +00:00
}
}
2017-04-18 00:07:42 +00:00
// Don't bother committing agent checks if we're already shutting down
c . agentLock . Lock ( )
defer c . agentLock . Unlock ( )
select {
case <- c . shutdownCh :
2017-04-08 00:10:26 +00:00
return nil
2017-04-18 00:07:42 +00:00
default :
2017-04-08 00:10:26 +00:00
}
2017-02-01 00:43:57 +00:00
2017-04-18 00:07:42 +00:00
// Now add them to the registration queue
c . commit ( & ops )
2017-02-01 00:43:57 +00:00
// Record IDs for deregistering on shutdown
2017-04-08 00:10:26 +00:00
for _ , id := range ops . regServices {
2017-04-13 20:49:23 +00:00
c . agentServices [ id . ID ] = struct { } { }
2017-02-01 00:43:57 +00:00
}
2017-04-08 00:10:26 +00:00
for _ , id := range ops . regChecks {
2017-04-13 20:49:23 +00:00
c . agentChecks [ id . ID ] = struct { } { }
2017-02-01 00:43:57 +00:00
}
2017-04-04 00:08:08 +00:00
return nil
}
// serviceRegs creates service registrations, check registrations, and script
// checks from a service.
2017-04-08 00:10:26 +00:00
func ( c * ServiceClient ) serviceRegs ( ops * operations , allocID string , service * structs . Service ,
2017-06-09 17:29:41 +00:00
task * structs . Task , exec driver . ScriptExecutor , net * cstructs . DriverNetwork ) error {
2017-04-04 00:08:08 +00:00
id := makeTaskServiceID ( allocID , task . Name , service )
2017-06-09 17:29:41 +00:00
addrMode := service . AddressMode
if addrMode == structs . AddressModeAuto {
2017-06-13 21:02:11 +00:00
if net . Advertise ( ) {
addrMode = structs . AddressModeDriver
} else {
2017-06-09 17:29:41 +00:00
// No driver network or shouldn't default to driver's network
addrMode = structs . AddressModeHost
}
}
ip , port := task . Resources . Networks . Port ( service . PortLabel )
if addrMode == structs . AddressModeDriver {
if net == nil {
2017-06-16 23:35:16 +00:00
return fmt . Errorf ( "service %s cannot use driver's IP because driver didn't set one" , service . Name )
2017-06-09 17:29:41 +00:00
}
ip = net . IP
port = net . PortMap [ service . PortLabel ]
}
2017-04-04 00:08:08 +00:00
serviceReg := & api . AgentServiceRegistration {
ID : id ,
Name : service . Name ,
Tags : make ( [ ] string , len ( service . Tags ) ) ,
2017-06-09 17:29:41 +00:00
Address : ip ,
2017-04-04 00:08:08 +00:00
Port : port ,
}
// copy isn't strictly necessary but can avoid bugs especially
// with tests that may reuse Tasks
copy ( serviceReg . Tags , service . Tags )
2017-04-08 00:10:26 +00:00
ops . regServices = append ( ops . regServices , serviceReg )
2017-06-16 23:35:16 +00:00
return c . checkRegs ( ops , allocID , id , service , task , exec , net )
}
func ( c * ServiceClient ) checkRegs ( ops * operations , allocID , serviceID string , service * structs . Service ,
task * structs . Task , exec driver . ScriptExecutor , net * cstructs . DriverNetwork ) error {
2017-04-04 00:08:08 +00:00
for _ , check := range service . Checks {
2017-04-19 19:18:06 +00:00
if check . TLSSkipVerify && ! c . skipVerifySupport {
c . logger . Printf ( "[WARN] consul.sync: skipping check %q for task %q alloc %q because Consul doesn't support tls_skip_verify. Please upgrade to Consul >= 0.7.2." ,
check . Name , task . Name , allocID )
continue
}
2017-06-16 23:35:16 +00:00
checkID := makeCheckID ( serviceID , check )
2017-04-13 23:43:38 +00:00
if check . Type == structs . ServiceCheckScript {
if exec == nil {
return fmt . Errorf ( "driver doesn't support script checks" )
}
ops . scripts = append ( ops . scripts , newScriptCheck (
allocID , task . Name , checkID , check , exec , c . client , c . logger , c . shutdownCh ) )
}
2017-04-19 19:18:06 +00:00
2017-06-09 17:29:41 +00:00
// Checks should always use the host ip:port
portLabel := check . PortLabel
if portLabel == "" {
// Default to the service's port label
portLabel = service . PortLabel
2017-04-13 23:43:38 +00:00
}
2017-06-09 17:29:41 +00:00
ip , port := task . Resources . Networks . Port ( portLabel )
2017-06-16 23:35:16 +00:00
checkReg , err := createCheckReg ( serviceID , checkID , check , ip , port )
2017-04-04 00:08:08 +00:00
if err != nil {
2017-04-13 23:43:38 +00:00
return fmt . Errorf ( "failed to add check %q: %v" , check . Name , err )
2017-04-04 00:08:08 +00:00
}
2017-04-13 23:43:38 +00:00
ops . regChecks = append ( ops . regChecks , checkReg )
2017-04-04 00:08:08 +00:00
}
2017-02-01 00:43:57 +00:00
return nil
}
2017-08-07 21:13:05 +00:00
// RegisterTask with Consul. Adds all service entries and checks to Consul. If
2017-02-01 00:43:57 +00:00
// exec is nil and a script check exists an error is returned.
//
2017-06-09 17:29:41 +00:00
// If the service IP is set it used as the address in the service registration.
// Checks will always use the IP from the Task struct (host's IP).
//
2017-02-01 00:43:57 +00:00
// Actual communication with Consul is done asynchrously (see Run).
2017-06-09 17:29:41 +00:00
func ( c * ServiceClient ) RegisterTask ( allocID string , task * structs . Task , exec driver . ScriptExecutor , net * cstructs . DriverNetwork ) error {
2017-04-08 00:10:26 +00:00
ops := & operations { }
2017-04-04 00:08:08 +00:00
for _ , service := range task . Services {
2017-06-09 17:29:41 +00:00
if err := c . serviceRegs ( ops , allocID , service , task , exec , net ) ; err != nil {
2017-04-04 00:08:08 +00:00
return err
}
}
2017-04-08 00:10:26 +00:00
c . commit ( ops )
2017-04-04 00:08:08 +00:00
return nil
}
// UpdateTask in Consul. Does not alter the service if only checks have
// changed.
2017-06-16 23:35:16 +00:00
//
// DriverNetwork must not change between invocations for the same allocation.
func ( c * ServiceClient ) UpdateTask ( allocID string , existing , newTask * structs . Task , exec driver . ScriptExecutor , net * cstructs . DriverNetwork ) error {
2017-04-08 00:10:26 +00:00
ops := & operations { }
2017-04-04 00:08:08 +00:00
existingIDs := make ( map [ string ] * structs . Service , len ( existing . Services ) )
for _ , s := range existing . Services {
existingIDs [ makeTaskServiceID ( allocID , existing . Name , s ) ] = s
}
newIDs := make ( map [ string ] * structs . Service , len ( newTask . Services ) )
for _ , s := range newTask . Services {
newIDs [ makeTaskServiceID ( allocID , newTask . Name , s ) ] = s
}
// Loop over existing Service IDs to see if they have been removed or
// updated.
for existingID , existingSvc := range existingIDs {
newSvc , ok := newIDs [ existingID ]
if ! ok {
2017-08-07 21:13:05 +00:00
// Existing service entry removed
2017-04-08 00:10:26 +00:00
ops . deregServices = append ( ops . deregServices , existingID )
2017-04-04 00:08:08 +00:00
for _ , check := range existingSvc . Checks {
2017-06-16 23:35:16 +00:00
ops . deregChecks = append ( ops . deregChecks , makeCheckID ( existingID , check ) )
2017-04-04 00:08:08 +00:00
}
continue
2017-02-01 00:43:57 +00:00
}
2017-06-16 23:35:16 +00:00
// PortLabel and AddressMode aren't included in the ID, so we
// have to compare manually.
serviceUnchanged := newSvc . PortLabel == existingSvc . PortLabel && newSvc . AddressMode == existingSvc . AddressMode
if serviceUnchanged {
2017-04-18 04:15:13 +00:00
// Service exists and hasn't changed, don't add it later
delete ( newIDs , existingID )
}
2017-04-08 00:10:26 +00:00
// Check to see what checks were updated
existingChecks := make ( map [ string ] struct { } , len ( existingSvc . Checks ) )
2017-04-04 00:08:08 +00:00
for _ , check := range existingSvc . Checks {
2017-06-16 23:35:16 +00:00
existingChecks [ makeCheckID ( existingID , check ) ] = struct { } { }
2017-04-04 00:08:08 +00:00
}
// Register new checks
for _ , check := range newSvc . Checks {
2017-06-16 23:35:16 +00:00
checkID := makeCheckID ( existingID , check )
2017-04-08 00:10:26 +00:00
if _ , exists := existingChecks [ checkID ] ; exists {
2017-04-18 04:15:13 +00:00
// Check exists, so don't remove it
2017-04-08 00:10:26 +00:00
delete ( existingChecks , checkID )
2017-06-16 23:35:16 +00:00
} else if serviceUnchanged {
// New check on an unchanged service; add them now
err := c . checkRegs ( ops , allocID , existingID , newSvc , newTask , exec , net )
if err != nil {
return err
}
2017-02-01 00:43:57 +00:00
}
}
2017-04-08 00:10:26 +00:00
// Remove existing checks not in updated service
for cid := range existingChecks {
ops . deregChecks = append ( ops . deregChecks , cid )
}
2017-02-01 00:43:57 +00:00
}
2017-04-04 00:08:08 +00:00
// Any remaining services should just be enqueued directly
for _ , newSvc := range newIDs {
2017-06-16 23:35:16 +00:00
err := c . serviceRegs ( ops , allocID , newSvc , newTask , exec , net )
2017-04-04 00:08:08 +00:00
if err != nil {
return err
}
}
2017-04-08 00:10:26 +00:00
c . commit ( ops )
2017-02-01 00:43:57 +00:00
return nil
}
// RemoveTask from Consul. Removes all service entries and checks.
//
// Actual communication with Consul is done asynchrously (see Run).
func ( c * ServiceClient ) RemoveTask ( allocID string , task * structs . Task ) {
2017-04-08 00:10:26 +00:00
ops := operations { }
2017-02-01 00:43:57 +00:00
2017-04-04 00:08:08 +00:00
for _ , service := range task . Services {
2017-02-01 00:43:57 +00:00
id := makeTaskServiceID ( allocID , task . Name , service )
2017-04-08 00:10:26 +00:00
ops . deregServices = append ( ops . deregServices , id )
2017-02-01 00:43:57 +00:00
for _ , check := range service . Checks {
2017-06-16 23:35:16 +00:00
ops . deregChecks = append ( ops . deregChecks , makeCheckID ( id , check ) )
2017-02-01 00:43:57 +00:00
}
}
// Now add them to the deregistration fields; main Run loop will update
2017-04-08 00:10:26 +00:00
c . commit ( & ops )
2017-02-01 00:43:57 +00:00
}
2017-07-04 19:24:27 +00:00
// Checks returns the checks registered against the agent for the given
// allocation.
func ( c * ServiceClient ) Checks ( a * structs . Allocation ) ( [ ] * api . AgentCheck , error ) {
tg := a . Job . LookupTaskGroup ( a . TaskGroup )
if tg == nil {
return nil , fmt . Errorf ( "failed to find task group in alloc" )
}
// Determine the checks that are relevant
relevant := make ( map [ string ] struct { } , 4 )
for _ , task := range tg . Tasks {
for _ , service := range task . Services {
id := makeTaskServiceID ( a . ID , task . Name , service )
for _ , check := range service . Checks {
2017-07-07 19:15:09 +00:00
relevant [ makeCheckID ( id , check ) ] = struct { } { }
2017-07-04 19:24:27 +00:00
}
}
}
// Query all the checks
checks , err := c . client . Checks ( )
if err != nil {
return nil , err
}
allocChecks := make ( [ ] * api . AgentCheck , 0 , len ( relevant ) )
for checkID := range relevant {
if check , ok := checks [ checkID ] ; ok {
allocChecks = append ( allocChecks , check )
}
}
return allocChecks , nil
}
2017-02-01 00:43:57 +00:00
// Shutdown the Consul client. Update running task registations and deregister
2017-04-18 00:07:42 +00:00
// agent from Consul. On first call blocks up to shutdownWait before giving up
// on syncing operations.
2017-02-01 00:43:57 +00:00
func ( c * ServiceClient ) Shutdown ( ) error {
2017-04-18 00:07:42 +00:00
// Serialize Shutdown calls with RegisterAgent to prevent leaking agent
// entries.
c . agentLock . Lock ( )
2017-07-24 18:40:37 +00:00
defer c . agentLock . Unlock ( )
2017-02-01 00:43:57 +00:00
select {
case <- c . shutdownCh :
return nil
default :
2017-07-24 18:40:37 +00:00
close ( c . shutdownCh )
2017-02-01 00:43:57 +00:00
}
2017-04-12 19:07:10 +00:00
// Give run loop time to sync, but don't block indefinitely
deadline := time . After ( c . shutdownWait )
2017-02-01 00:43:57 +00:00
2017-04-08 00:10:26 +00:00
// Wait for Run to finish any outstanding operations and exit
2017-02-01 00:43:57 +00:00
select {
2017-04-08 00:10:26 +00:00
case <- c . exitCh :
2017-02-01 00:43:57 +00:00
case <- deadline :
// Don't wait forever though
2017-07-24 18:40:37 +00:00
}
2017-07-24 23:48:40 +00:00
// If Consul was never seen nothing could be written so exit early
if ! c . hasSeen ( ) {
2017-07-24 19:12:02 +00:00
return nil
}
2017-07-24 18:40:37 +00:00
// Always attempt to deregister Nomad agent Consul entries, even if
// deadline was reached
for id := range c . agentServices {
if err := c . client . ServiceDeregister ( id ) ; err != nil {
c . logger . Printf ( "[ERR] consul.sync: error deregistering agent service (id: %q): %v" , id , err )
}
}
for id := range c . agentChecks {
if err := c . client . CheckDeregister ( id ) ; err != nil {
c . logger . Printf ( "[ERR] consul.sync: error deregistering agent service (id: %q): %v" , id , err )
}
2017-02-01 00:43:57 +00:00
}
// Give script checks time to exit (no need to lock as Run() has exited)
for _ , h := range c . runningScripts {
select {
case <- h . wait ( ) :
case <- deadline :
2017-04-12 19:07:10 +00:00
return fmt . Errorf ( "timed out waiting for script checks to run" )
2017-02-01 00:43:57 +00:00
}
}
2017-04-12 19:07:10 +00:00
return nil
2017-02-01 00:43:57 +00:00
}
// makeAgentServiceID creates a unique ID for identifying an agent service in
// Consul.
//
// Agent service IDs are of the form:
//
// {nomadServicePrefix}-{ROLE}-{Service.Name}-{Service.Tags...}
// Example Server ID: _nomad-server-nomad-serf
// Example Client ID: _nomad-client-nomad-client-http
//
func makeAgentServiceID ( role string , service * structs . Service ) string {
parts := make ( [ ] string , len ( service . Tags ) + 3 )
parts [ 0 ] = nomadServicePrefix
parts [ 1 ] = role
parts [ 2 ] = service . Name
copy ( parts [ 3 : ] , service . Tags )
return strings . Join ( parts , "-" )
}
// makeTaskServiceID creates a unique ID for identifying a task service in
// Consul.
//
// Task service IDs are of the form:
//
// {nomadServicePrefix}-executor-{ALLOC_ID}-{Service.Name}-{Service.Tags...}
// Example Service ID: _nomad-executor-1234-echo-http-tag1-tag2-tag3
//
func makeTaskServiceID ( allocID , taskName string , service * structs . Service ) string {
parts := make ( [ ] string , len ( service . Tags ) + 5 )
parts [ 0 ] = nomadServicePrefix
parts [ 1 ] = "executor"
parts [ 2 ] = allocID
parts [ 3 ] = taskName
parts [ 4 ] = service . Name
copy ( parts [ 5 : ] , service . Tags )
return strings . Join ( parts , "-" )
}
2017-06-16 23:35:16 +00:00
// makeCheckID creates a unique ID for a check.
func makeCheckID ( serviceID string , check * structs . ServiceCheck ) string {
2017-02-01 00:43:57 +00:00
return check . Hash ( serviceID )
}
// createCheckReg creates a Check that can be registered with Consul.
//
2017-04-12 20:27:56 +00:00
// Script checks simply have a TTL set and the caller is responsible for
// running the script and heartbeating.
2017-02-01 00:43:57 +00:00
func createCheckReg ( serviceID , checkID string , check * structs . ServiceCheck , host string , port int ) ( * api . AgentCheckRegistration , error ) {
chkReg := api . AgentCheckRegistration {
ID : checkID ,
Name : check . Name ,
ServiceID : serviceID ,
}
chkReg . Status = check . InitialStatus
chkReg . Timeout = check . Timeout . String ( )
chkReg . Interval = check . Interval . String ( )
switch check . Type {
case structs . ServiceCheckHTTP :
2017-04-26 18:22:01 +00:00
proto := check . Protocol
if proto == "" {
proto = "http"
2017-02-01 00:43:57 +00:00
}
2017-04-19 04:28:25 +00:00
if check . TLSSkipVerify {
chkReg . TLSSkipVerify = true
}
2017-02-01 00:43:57 +00:00
base := url . URL {
2017-04-26 18:22:01 +00:00
Scheme : proto ,
2017-02-01 00:43:57 +00:00
Host : net . JoinHostPort ( host , strconv . Itoa ( port ) ) ,
}
relative , err := url . Parse ( check . Path )
if err != nil {
return nil , err
}
url := base . ResolveReference ( relative )
chkReg . HTTP = url . String ( )
case structs . ServiceCheckTCP :
chkReg . TCP = net . JoinHostPort ( host , strconv . Itoa ( port ) )
case structs . ServiceCheckScript :
chkReg . TTL = ( check . Interval + ttlCheckBuffer ) . String ( )
default :
return nil , fmt . Errorf ( "check type %+q not valid" , check . Type )
}
return & chkReg , nil
}
2017-04-08 00:10:26 +00:00
// isNomadService returns true if the ID matches the pattern of a Nomad managed
2017-07-18 20:23:01 +00:00
// service. Agent services return false as independent client and server agents
// may be running on the same machine. #2827
2017-04-08 00:10:26 +00:00
func isNomadService ( id string ) bool {
2017-07-18 20:23:01 +00:00
const prefix = nomadServicePrefix + "-executor"
return strings . HasPrefix ( id , prefix )
2017-04-08 00:10:26 +00:00
}