2017-02-01 00:43:57 +00:00
package consul
import (
2017-08-26 05:40:18 +00:00
"context"
2017-02-01 00:43:57 +00:00
"fmt"
"log"
"net"
"net/url"
"strconv"
"strings"
"sync"
2017-07-24 19:12:02 +00:00
"sync/atomic"
2017-02-01 00:43:57 +00:00
"time"
2017-04-18 23:23:39 +00:00
metrics "github.com/armon/go-metrics"
2017-02-01 00:43:57 +00:00
"github.com/hashicorp/consul/api"
2017-04-12 20:26:55 +00:00
"github.com/hashicorp/nomad/client/driver"
2017-06-09 17:29:41 +00:00
cstructs "github.com/hashicorp/nomad/client/structs"
2017-08-07 22:54:05 +00:00
"github.com/hashicorp/nomad/helper"
2017-02-01 00:43:57 +00:00
"github.com/hashicorp/nomad/nomad/structs"
)
const (
2017-12-08 01:08:25 +00:00
// nomadServicePrefix is the prefix that scopes all Nomad registered
// services (both agent and task entries).
2017-02-01 00:43:57 +00:00
nomadServicePrefix = "_nomad"
2017-12-08 01:08:25 +00:00
// nomadTaskPrefix is the prefix that scopes Nomad registered services
// for tasks.
nomadTaskPrefix = nomadServicePrefix + "-task-"
2017-04-08 00:10:26 +00:00
// defaultRetryInterval is how quickly to retry syncing services and
// checks to Consul when an error occurs. Will backoff up to a max.
defaultRetryInterval = time . Second
// defaultMaxRetryInterval is the default max retry interval.
defaultMaxRetryInterval = 30 * time . Second
2017-02-01 00:43:57 +00:00
// ttlCheckBuffer is the time interval that Nomad can take to report Consul
// the check result
ttlCheckBuffer = 31 * time . Second
// defaultShutdownWait is how long Shutdown() should block waiting for
// enqueued operations to sync to Consul by default.
defaultShutdownWait = time . Minute
// DefaultQueryWaitDuration is the max duration the Consul Agent will
// spend waiting for a response from a Consul Query.
DefaultQueryWaitDuration = 2 * time . Second
// ServiceTagHTTP is the tag assigned to HTTP services
ServiceTagHTTP = "http"
// ServiceTagRPC is the tag assigned to RPC services
ServiceTagRPC = "rpc"
// ServiceTagSerf is the tag assigned to Serf services
ServiceTagSerf = "serf"
)
// CatalogAPI is the consul/api.Catalog API used by Nomad.
type CatalogAPI interface {
Datacenters ( ) ( [ ] string , error )
Service ( service , tag string , q * api . QueryOptions ) ( [ ] * api . CatalogService , * api . QueryMeta , error )
}
// AgentAPI is the consul/api.Agent API used by Nomad.
type AgentAPI interface {
2017-04-08 00:10:26 +00:00
Services ( ) ( map [ string ] * api . AgentService , error )
Checks ( ) ( map [ string ] * api . AgentCheck , error )
2017-02-01 00:43:57 +00:00
CheckRegister ( check * api . AgentCheckRegistration ) error
CheckDeregister ( checkID string ) error
ServiceRegister ( service * api . AgentServiceRegistration ) error
ServiceDeregister ( serviceID string ) error
UpdateTTL ( id , output , status string ) error
}
2017-04-08 00:10:26 +00:00
// operations are submitted to the main loop via commit() for synchronizing
// with Consul.
type operations struct {
regServices [ ] * api . AgentServiceRegistration
regChecks [ ] * api . AgentCheckRegistration
scripts [ ] * scriptCheck
deregServices [ ] string
deregChecks [ ] string
}
2017-08-07 22:54:05 +00:00
// AllocRegistration holds the status of services registered for a particular
// allocations by task.
type AllocRegistration struct {
// Tasks maps the name of a task to its registered services and checks
Tasks map [ string ] * TaskRegistration
}
2017-08-10 20:07:03 +00:00
func ( a * AllocRegistration ) copy ( ) * AllocRegistration {
2017-08-07 22:54:05 +00:00
c := & AllocRegistration {
Tasks : make ( map [ string ] * TaskRegistration , len ( a . Tasks ) ) ,
}
for k , v := range a . Tasks {
2017-08-10 20:07:03 +00:00
c . Tasks [ k ] = v . copy ( )
2017-08-07 22:54:05 +00:00
}
return c
}
// NumServices returns the number of registered services
func ( a * AllocRegistration ) NumServices ( ) int {
if a == nil {
return 0
}
total := 0
for _ , treg := range a . Tasks {
for _ , sreg := range treg . Services {
if sreg . Service != nil {
total ++
}
}
}
return total
}
// NumChecks returns the number of registered checks
func ( a * AllocRegistration ) NumChecks ( ) int {
if a == nil {
return 0
}
total := 0
for _ , treg := range a . Tasks {
for _ , sreg := range treg . Services {
total += len ( sreg . Checks )
}
}
return total
}
// TaskRegistration holds the status of services registered for a particular
// task.
type TaskRegistration struct {
Services map [ string ] * ServiceRegistration
}
2017-08-10 20:07:03 +00:00
func ( t * TaskRegistration ) copy ( ) * TaskRegistration {
2017-08-07 22:54:05 +00:00
c := & TaskRegistration {
Services : make ( map [ string ] * ServiceRegistration , len ( t . Services ) ) ,
}
for k , v := range t . Services {
2017-08-10 20:07:03 +00:00
c . Services [ k ] = v . copy ( )
2017-08-07 22:54:05 +00:00
}
return c
}
// ServiceRegistration holds the status of a registered Consul Service and its
// Checks.
type ServiceRegistration struct {
// serviceID and checkIDs are internal fields that track just the IDs of the
// services/checks registered in Consul. It is used to materialize the other
// fields when queried.
serviceID string
checkIDs map [ string ] struct { }
// Service is the AgentService registered in Consul.
Service * api . AgentService
// Checks is the status of the registered checks.
Checks [ ] * api . AgentCheck
}
2017-08-10 20:07:03 +00:00
func ( s * ServiceRegistration ) copy ( ) * ServiceRegistration {
// Copy does not copy the external fields but only the internal fields. This
// is so that the caller of AllocRegistrations can not access the internal
// fields and that method uses these fields to populate the external fields.
2017-08-07 22:54:05 +00:00
return & ServiceRegistration {
serviceID : s . serviceID ,
checkIDs : helper . CopyMapStringStruct ( s . checkIDs ) ,
}
}
2017-02-01 00:43:57 +00:00
// ServiceClient handles task and agent service registration with Consul.
type ServiceClient struct {
2017-04-08 00:10:26 +00:00
client AgentAPI
logger * log . Logger
retryInterval time . Duration
maxRetryInterval time . Duration
2017-02-01 00:43:57 +00:00
2017-04-19 19:18:06 +00:00
// skipVerifySupport is true if the local Consul agent suppots TLSSkipVerify
skipVerifySupport bool
2017-04-08 00:10:26 +00:00
// exitCh is closed when the main Run loop exits
exitCh chan struct { }
2017-02-01 00:43:57 +00:00
// shutdownCh is closed when the client should shutdown
shutdownCh chan struct { }
// shutdownWait is how long Shutdown() blocks waiting for the final
// sync() to finish. Defaults to defaultShutdownWait
shutdownWait time . Duration
2017-04-08 00:10:26 +00:00
opCh chan * operations
2017-02-01 00:43:57 +00:00
2017-04-08 00:10:26 +00:00
services map [ string ] * api . AgentServiceRegistration
checks map [ string ] * api . AgentCheckRegistration
scripts map [ string ] * scriptCheck
2017-02-01 00:43:57 +00:00
runningScripts map [ string ] * scriptHandle
2017-08-07 22:54:05 +00:00
// allocRegistrations stores the services and checks that are registered
// with Consul by allocation ID.
allocRegistrations map [ string ] * AllocRegistration
allocRegistrationsLock sync . RWMutex
2017-04-08 00:10:26 +00:00
// agent services and checks record entries for the agent itself which
// should be removed on shutdown
2017-02-01 00:43:57 +00:00
agentServices map [ string ] struct { }
agentChecks map [ string ] struct { }
2017-04-08 00:10:26 +00:00
agentLock sync . Mutex
2017-07-24 19:12:02 +00:00
// seen is 1 if Consul has ever been seen; otherise 0. Accessed with
// atomics.
2017-08-04 17:14:16 +00:00
seen int32
2017-08-26 05:40:18 +00:00
// checkWatcher restarts checks that are unhealthy.
checkWatcher * checkWatcher
2017-02-01 00:43:57 +00:00
}
// NewServiceClient creates a new Consul ServiceClient from an existing Consul API
// Client and logger.
2017-04-19 19:18:06 +00:00
func NewServiceClient ( consulClient AgentAPI , skipVerifySupport bool , logger * log . Logger ) * ServiceClient {
2017-02-01 00:43:57 +00:00
return & ServiceClient {
2017-08-07 22:54:05 +00:00
client : consulClient ,
skipVerifySupport : skipVerifySupport ,
logger : logger ,
retryInterval : defaultRetryInterval ,
maxRetryInterval : defaultMaxRetryInterval ,
exitCh : make ( chan struct { } ) ,
shutdownCh : make ( chan struct { } ) ,
shutdownWait : defaultShutdownWait ,
opCh : make ( chan * operations , 8 ) ,
services : make ( map [ string ] * api . AgentServiceRegistration ) ,
checks : make ( map [ string ] * api . AgentCheckRegistration ) ,
scripts : make ( map [ string ] * scriptCheck ) ,
runningScripts : make ( map [ string ] * scriptHandle ) ,
allocRegistrations : make ( map [ string ] * AllocRegistration ) ,
agentServices : make ( map [ string ] struct { } ) ,
agentChecks : make ( map [ string ] struct { } ) ,
2017-08-26 05:40:18 +00:00
checkWatcher : newCheckWatcher ( logger , consulClient ) ,
2017-02-01 00:43:57 +00:00
}
}
2017-07-25 19:13:05 +00:00
// seen is used by markSeen and hasSeen
2017-07-24 19:12:02 +00:00
const seen = 1
2017-07-24 23:48:40 +00:00
// markSeen marks Consul as having been seen (meaning at least one operation
2017-07-24 19:12:02 +00:00
// has succeeded).
2017-07-24 23:48:40 +00:00
func ( c * ServiceClient ) markSeen ( ) {
2017-08-04 17:14:16 +00:00
atomic . StoreInt32 ( & c . seen , seen )
2017-07-24 19:12:02 +00:00
}
2017-07-24 23:48:40 +00:00
// hasSeen returns true if any Consul operation has ever succeeded. Useful to
2017-07-24 19:12:02 +00:00
// squelch errors if Consul isn't running.
2017-07-24 23:48:40 +00:00
func ( c * ServiceClient ) hasSeen ( ) bool {
2017-08-04 17:14:16 +00:00
return atomic . LoadInt32 ( & c . seen ) == seen
2017-07-24 19:12:02 +00:00
}
2017-02-01 00:43:57 +00:00
// Run the Consul main loop which retries operations against Consul. It should
// be called exactly once.
func ( c * ServiceClient ) Run ( ) {
2017-04-08 00:10:26 +00:00
defer close ( c . exitCh )
2017-08-26 05:40:18 +00:00
// start checkWatcher
ctx , cancelWatcher := context . WithCancel ( context . Background ( ) )
defer cancelWatcher ( )
go c . checkWatcher . Run ( ctx )
2017-04-08 00:10:26 +00:00
retryTimer := time . NewTimer ( 0 )
<- retryTimer . C // disabled by default
failures := 0
for {
select {
case <- retryTimer . C :
case <- c . shutdownCh :
2017-08-26 05:40:18 +00:00
cancelWatcher ( )
2017-04-08 00:10:26 +00:00
case ops := <- c . opCh :
c . merge ( ops )
}
2017-02-01 00:43:57 +00:00
2017-04-08 00:10:26 +00:00
if err := c . sync ( ) ; err != nil {
2017-07-24 23:48:40 +00:00
if failures == 0 {
2017-12-08 01:08:25 +00:00
// Log on the first failure
2017-07-24 23:48:40 +00:00
c . logger . Printf ( "[WARN] consul.sync: failed to update services in Consul: %v" , err )
2017-12-08 01:08:25 +00:00
} else if failures % 10 == 0 {
// Log every 10th consecutive failure
c . logger . Printf ( "[ERR] consul.sync: still unable to update services in Consul after %d failures; latest error: %v" , failures , err )
2017-04-08 00:10:26 +00:00
}
2017-12-08 01:08:25 +00:00
2017-07-24 22:37:53 +00:00
failures ++
2017-04-08 00:10:26 +00:00
if ! retryTimer . Stop ( ) {
2017-04-18 23:36:20 +00:00
// Timer already expired, since the timer may
// or may not have been read in the select{}
// above, conditionally receive on it
2017-04-12 19:07:10 +00:00
select {
case <- retryTimer . C :
default :
}
2017-04-08 00:10:26 +00:00
}
backoff := c . retryInterval * time . Duration ( failures )
if backoff > c . maxRetryInterval {
backoff = c . maxRetryInterval
}
retryTimer . Reset ( backoff )
} else {
if failures > 0 {
2017-04-12 19:07:10 +00:00
c . logger . Printf ( "[INFO] consul.sync: successfully updated services in Consul" )
2017-04-08 00:10:26 +00:00
failures = 0
}
}
2017-02-01 00:43:57 +00:00
select {
2017-04-08 00:10:26 +00:00
case <- c . shutdownCh :
// Exit only after sync'ing all outstanding operations
if len ( c . opCh ) > 0 {
for len ( c . opCh ) > 0 {
c . merge ( <- c . opCh )
2017-02-01 00:43:57 +00:00
}
2017-04-08 00:10:26 +00:00
continue
2017-02-01 00:43:57 +00:00
}
return
2017-04-08 00:10:26 +00:00
default :
2017-02-01 00:43:57 +00:00
}
2017-04-08 00:10:26 +00:00
2017-02-01 00:43:57 +00:00
}
}
2017-04-18 00:07:42 +00:00
// commit operations unless already shutting down.
func ( c * ServiceClient ) commit ( ops * operations ) {
2017-02-01 00:43:57 +00:00
select {
2017-04-08 00:10:26 +00:00
case c . opCh <- ops :
case <- c . shutdownCh :
}
}
// merge registrations into state map prior to sync'ing with Consul
func ( c * ServiceClient ) merge ( ops * operations ) {
for _ , s := range ops . regServices {
c . services [ s . ID ] = s
}
for _ , check := range ops . regChecks {
c . checks [ check . ID ] = check
}
for _ , s := range ops . scripts {
c . scripts [ s . id ] = s
}
for _ , sid := range ops . deregServices {
delete ( c . services , sid )
}
for _ , cid := range ops . deregChecks {
if script , ok := c . runningScripts [ cid ] ; ok {
script . cancel ( )
delete ( c . scripts , cid )
2017-04-26 18:22:01 +00:00
delete ( c . runningScripts , cid )
2017-04-08 00:10:26 +00:00
}
delete ( c . checks , cid )
2017-02-01 00:43:57 +00:00
}
2017-04-18 23:23:39 +00:00
metrics . SetGauge ( [ ] string { "client" , "consul" , "services" } , float32 ( len ( c . services ) ) )
metrics . SetGauge ( [ ] string { "client" , "consul" , "checks" } , float32 ( len ( c . checks ) ) )
metrics . SetGauge ( [ ] string { "client" , "consul" , "script_checks" } , float32 ( len ( c . runningScripts ) ) )
2017-02-01 00:43:57 +00:00
}
// sync enqueued operations.
func ( c * ServiceClient ) sync ( ) error {
2017-04-08 00:10:26 +00:00
sreg , creg , sdereg , cdereg := 0 , 0 , 0 , 0
2017-02-01 00:43:57 +00:00
2017-04-08 00:10:26 +00:00
consulServices , err := c . client . Services ( )
if err != nil {
2017-04-18 23:23:39 +00:00
metrics . IncrCounter ( [ ] string { "client" , "consul" , "sync_failure" } , 1 )
2017-04-08 00:10:26 +00:00
return fmt . Errorf ( "error querying Consul services: %v" , err )
}
2017-02-01 00:43:57 +00:00
2017-04-08 00:10:26 +00:00
consulChecks , err := c . client . Checks ( )
if err != nil {
2017-04-18 23:23:39 +00:00
metrics . IncrCounter ( [ ] string { "client" , "consul" , "sync_failure" } , 1 )
2017-04-08 00:10:26 +00:00
return fmt . Errorf ( "error querying Consul checks: %v" , err )
}
2017-02-01 00:43:57 +00:00
2017-04-08 00:10:26 +00:00
// Remove Nomad services in Consul but unknown locally
for id := range consulServices {
if _ , ok := c . services [ id ] ; ok {
// Known service, skip
continue
2017-02-01 00:43:57 +00:00
}
2017-04-08 00:10:26 +00:00
if ! isNomadService ( id ) {
// Not managed by Nomad, skip
continue
}
2017-12-08 01:08:25 +00:00
2017-04-08 00:10:26 +00:00
// Unknown Nomad managed service; kill
if err := c . client . ServiceDeregister ( id ) ; err != nil {
2017-12-08 01:08:25 +00:00
if isOldNomadService ( id ) {
// Don't hard-fail on old entries. See #3620
continue
}
2017-04-18 23:23:39 +00:00
metrics . IncrCounter ( [ ] string { "client" , "consul" , "sync_failure" } , 1 )
2017-04-08 00:10:26 +00:00
return err
}
sdereg ++
2017-12-01 14:24:14 +00:00
metrics . IncrCounter ( [ ] string { "client" , "consul" , "service_deregistrations" } , 1 )
2017-02-01 00:43:57 +00:00
}
2017-04-08 00:10:26 +00:00
// Add Nomad services missing from Consul
2017-04-18 04:15:13 +00:00
for id , locals := range c . services {
2017-12-08 01:08:25 +00:00
if _ , ok := consulServices [ id ] ; ! ok {
if err = c . client . ServiceRegister ( locals ) ; err != nil {
metrics . IncrCounter ( [ ] string { "client" , "consul" , "sync_failure" } , 1 )
return err
2017-04-18 04:15:13 +00:00
}
2017-12-08 01:08:25 +00:00
sreg ++
metrics . IncrCounter ( [ ] string { "client" , "consul" , "service_registrations" } , 1 )
2017-02-01 00:43:57 +00:00
}
}
2017-04-08 00:10:26 +00:00
// Remove Nomad checks in Consul but unknown locally
for id , check := range consulChecks {
if _ , ok := c . checks [ id ] ; ok {
2017-04-18 04:15:13 +00:00
// Known check, leave it
2017-04-08 00:10:26 +00:00
continue
2017-02-01 00:43:57 +00:00
}
2017-04-08 00:10:26 +00:00
if ! isNomadService ( check . ServiceID ) {
2017-06-16 23:35:16 +00:00
// Service not managed by Nomad, skip
2017-04-08 00:10:26 +00:00
continue
2017-02-01 00:43:57 +00:00
}
2017-12-08 01:08:25 +00:00
// Unknown Nomad managed check; remove
2017-04-08 00:10:26 +00:00
if err := c . client . CheckDeregister ( id ) ; err != nil {
2017-12-08 01:08:25 +00:00
if isOldNomadService ( check . ServiceID ) {
// Don't hard-fail on old entries.
continue
}
2017-04-18 23:23:39 +00:00
metrics . IncrCounter ( [ ] string { "client" , "consul" , "sync_failure" } , 1 )
2017-04-08 00:10:26 +00:00
return err
}
cdereg ++
2017-12-01 14:24:14 +00:00
metrics . IncrCounter ( [ ] string { "client" , "consul" , "check_deregistrations" } , 1 )
2017-02-01 00:43:57 +00:00
}
2017-04-08 00:10:26 +00:00
// Add Nomad checks missing from Consul
for id , check := range c . checks {
2017-12-08 01:08:25 +00:00
if _ , ok := consulChecks [ id ] ; ok {
// Already in Consul; skipping
continue
2017-04-08 00:10:26 +00:00
}
2017-12-08 01:08:25 +00:00
2017-04-08 00:10:26 +00:00
if err := c . client . CheckRegister ( check ) ; err != nil {
2017-04-18 23:23:39 +00:00
metrics . IncrCounter ( [ ] string { "client" , "consul" , "sync_failure" } , 1 )
2017-04-08 00:10:26 +00:00
return err
}
creg ++
2017-12-01 14:24:14 +00:00
metrics . IncrCounter ( [ ] string { "client" , "consul" , "check_registrations" } , 1 )
2017-04-08 00:10:26 +00:00
// Handle starting scripts
if script , ok := c . scripts [ id ] ; ok {
2017-04-18 04:15:13 +00:00
// If it's already running, cancel and replace
if oldScript , running := c . runningScripts [ id ] ; running {
oldScript . cancel ( )
2017-04-08 00:10:26 +00:00
}
2017-04-18 04:15:13 +00:00
// Start and store the handle
2017-04-08 00:10:26 +00:00
c . runningScripts [ id ] = script . run ( )
2017-02-01 00:43:57 +00:00
}
}
2017-07-24 23:48:40 +00:00
// A Consul operation has succeeded, mark Consul as having been seen
c . markSeen ( )
2017-04-08 00:10:26 +00:00
c . logger . Printf ( "[DEBUG] consul.sync: registered %d services, %d checks; deregistered %d services, %d checks" ,
sreg , creg , sdereg , cdereg )
2017-02-01 00:43:57 +00:00
return nil
}
2017-04-13 23:59:27 +00:00
// RegisterAgent registers Nomad agents (client or server). The
// Service.PortLabel should be a literal port to be parsed with SplitHostPort.
// Script checks are not supported and will return an error. Registration is
// asynchronous.
2017-02-01 00:43:57 +00:00
//
// Agents will be deregistered when Shutdown is called.
func ( c * ServiceClient ) RegisterAgent ( role string , services [ ] * structs . Service ) error {
2017-04-08 00:10:26 +00:00
ops := operations { }
2017-02-01 00:43:57 +00:00
2017-04-04 00:08:08 +00:00
for _ , service := range services {
2017-02-01 00:43:57 +00:00
id := makeAgentServiceID ( role , service )
2017-04-13 23:59:27 +00:00
// Unlike tasks, agents don't use port labels. Agent ports are
// stored directly in the PortLabel.
2017-02-01 00:43:57 +00:00
host , rawport , err := net . SplitHostPort ( service . PortLabel )
if err != nil {
return fmt . Errorf ( "error parsing port label %q from service %q: %v" , service . PortLabel , service . Name , err )
}
port , err := strconv . Atoi ( rawport )
if err != nil {
return fmt . Errorf ( "error parsing port %q from service %q: %v" , rawport , service . Name , err )
}
serviceReg := & api . AgentServiceRegistration {
ID : id ,
Name : service . Name ,
Tags : service . Tags ,
Address : host ,
Port : port ,
}
2017-04-08 00:10:26 +00:00
ops . regServices = append ( ops . regServices , serviceReg )
2017-02-01 00:43:57 +00:00
for _ , check := range service . Checks {
2017-06-16 23:35:16 +00:00
checkID := makeCheckID ( id , check )
2017-02-01 00:43:57 +00:00
if check . Type == structs . ServiceCheckScript {
return fmt . Errorf ( "service %q contains invalid check: agent checks do not support scripts" , service . Name )
}
checkHost , checkPort := serviceReg . Address , serviceReg . Port
if check . PortLabel != "" {
2017-04-13 23:59:27 +00:00
// Unlike tasks, agents don't use port labels. Agent ports are
// stored directly in the PortLabel.
2017-02-01 00:43:57 +00:00
host , rawport , err := net . SplitHostPort ( check . PortLabel )
if err != nil {
return fmt . Errorf ( "error parsing port label %q from check %q: %v" , service . PortLabel , check . Name , err )
}
port , err := strconv . Atoi ( rawport )
if err != nil {
return fmt . Errorf ( "error parsing port %q from check %q: %v" , rawport , check . Name , err )
}
checkHost , checkPort = host , port
}
checkReg , err := createCheckReg ( id , checkID , check , checkHost , checkPort )
if err != nil {
return fmt . Errorf ( "failed to add check %q: %v" , check . Name , err )
}
2017-04-08 00:10:26 +00:00
ops . regChecks = append ( ops . regChecks , checkReg )
2017-02-01 00:43:57 +00:00
}
}
2017-04-18 00:07:42 +00:00
// Don't bother committing agent checks if we're already shutting down
c . agentLock . Lock ( )
defer c . agentLock . Unlock ( )
select {
case <- c . shutdownCh :
2017-04-08 00:10:26 +00:00
return nil
2017-04-18 00:07:42 +00:00
default :
2017-04-08 00:10:26 +00:00
}
2017-02-01 00:43:57 +00:00
2017-04-18 00:07:42 +00:00
// Now add them to the registration queue
c . commit ( & ops )
2017-02-01 00:43:57 +00:00
// Record IDs for deregistering on shutdown
2017-04-08 00:10:26 +00:00
for _ , id := range ops . regServices {
2017-04-13 20:49:23 +00:00
c . agentServices [ id . ID ] = struct { } { }
2017-02-01 00:43:57 +00:00
}
2017-04-08 00:10:26 +00:00
for _ , id := range ops . regChecks {
2017-04-13 20:49:23 +00:00
c . agentChecks [ id . ID ] = struct { } { }
2017-02-01 00:43:57 +00:00
}
2017-04-04 00:08:08 +00:00
return nil
}
// serviceRegs creates service registrations, check registrations, and script
2017-08-07 22:54:05 +00:00
// checks from a service. It returns a service registration object with the
// service and check IDs populated.
2017-04-08 00:10:26 +00:00
func ( c * ServiceClient ) serviceRegs ( ops * operations , allocID string , service * structs . Service ,
2017-08-07 22:54:05 +00:00
task * structs . Task , exec driver . ScriptExecutor , net * cstructs . DriverNetwork ) ( * ServiceRegistration , error ) {
2017-04-04 00:08:08 +00:00
2017-08-07 22:54:05 +00:00
// Get the services ID
2017-04-04 00:08:08 +00:00
id := makeTaskServiceID ( allocID , task . Name , service )
2017-08-07 22:54:05 +00:00
sreg := & ServiceRegistration {
serviceID : id ,
checkIDs : make ( map [ string ] struct { } , len ( service . Checks ) ) ,
}
2017-12-05 19:39:42 +00:00
// Service address modes default to auto
2017-06-09 17:29:41 +00:00
addrMode := service . AddressMode
2017-12-05 19:39:42 +00:00
if addrMode == "" {
addrMode = structs . AddressModeAuto
2017-06-09 17:29:41 +00:00
}
2017-12-05 19:39:42 +00:00
// Determine the address to advertise based on the mode
ip , port , err := getAddress ( addrMode , service . PortLabel , task . Resources . Networks , net )
if err != nil {
return nil , fmt . Errorf ( "unable to get address for service %q: %v" , service . Name , err )
2017-06-09 17:29:41 +00:00
}
2017-08-07 22:54:05 +00:00
// Build the Consul Service registration request
2017-04-04 00:08:08 +00:00
serviceReg := & api . AgentServiceRegistration {
ID : id ,
Name : service . Name ,
Tags : make ( [ ] string , len ( service . Tags ) ) ,
2017-06-09 17:29:41 +00:00
Address : ip ,
2017-04-04 00:08:08 +00:00
Port : port ,
}
// copy isn't strictly necessary but can avoid bugs especially
// with tests that may reuse Tasks
copy ( serviceReg . Tags , service . Tags )
2017-04-08 00:10:26 +00:00
ops . regServices = append ( ops . regServices , serviceReg )
2017-08-07 22:54:05 +00:00
// Build the check registrations
checkIDs , err := c . checkRegs ( ops , allocID , id , service , task , exec , net )
if err != nil {
return nil , err
}
for _ , cid := range checkIDs {
sreg . checkIDs [ cid ] = struct { } { }
}
return sreg , nil
2017-06-16 23:35:16 +00:00
}
2017-08-07 22:54:05 +00:00
// checkRegs registers the checks for the given service and returns the
// registered check ids.
2017-06-16 23:35:16 +00:00
func ( c * ServiceClient ) checkRegs ( ops * operations , allocID , serviceID string , service * structs . Service ,
2017-08-07 22:54:05 +00:00
task * structs . Task , exec driver . ScriptExecutor , net * cstructs . DriverNetwork ) ( [ ] string , error ) {
2017-04-04 00:08:08 +00:00
2017-08-07 22:54:05 +00:00
// Fast path
numChecks := len ( service . Checks )
if numChecks == 0 {
return nil , nil
}
checkIDs := make ( [ ] string , 0 , numChecks )
2017-04-04 00:08:08 +00:00
for _ , check := range service . Checks {
2017-04-19 19:18:06 +00:00
if check . TLSSkipVerify && ! c . skipVerifySupport {
c . logger . Printf ( "[WARN] consul.sync: skipping check %q for task %q alloc %q because Consul doesn't support tls_skip_verify. Please upgrade to Consul >= 0.7.2." ,
check . Name , task . Name , allocID )
continue
}
2017-06-16 23:35:16 +00:00
checkID := makeCheckID ( serviceID , check )
2017-08-07 22:54:05 +00:00
checkIDs = append ( checkIDs , checkID )
2017-04-13 23:43:38 +00:00
if check . Type == structs . ServiceCheckScript {
if exec == nil {
2017-08-07 22:54:05 +00:00
return nil , fmt . Errorf ( "driver doesn't support script checks" )
2017-04-13 23:43:38 +00:00
}
ops . scripts = append ( ops . scripts , newScriptCheck (
allocID , task . Name , checkID , check , exec , c . client , c . logger , c . shutdownCh ) )
2017-12-08 05:58:15 +00:00
// Skip getAddress for script checks
checkReg , err := createCheckReg ( serviceID , checkID , check , "" , 0 )
if err != nil {
return nil , fmt . Errorf ( "failed to add script check %q: %v" , check . Name , err )
}
ops . regChecks = append ( ops . regChecks , checkReg )
continue
2017-04-13 23:43:38 +00:00
}
2017-04-19 19:18:06 +00:00
2017-12-05 19:39:42 +00:00
// Default to the service's port but allow check to override
2017-06-09 17:29:41 +00:00
portLabel := check . PortLabel
if portLabel == "" {
// Default to the service's port label
portLabel = service . PortLabel
2017-04-13 23:43:38 +00:00
}
2017-12-05 19:39:42 +00:00
// Checks address mode defaults to host for pre-#3380 backward compat
addrMode := check . AddressMode
if addrMode == "" {
addrMode = structs . AddressModeHost
}
ip , port , err := getAddress ( addrMode , portLabel , task . Resources . Networks , net )
if err != nil {
2017-12-19 00:18:42 +00:00
return nil , fmt . Errorf ( "error getting address for check %q: %v" , check . Name , err )
2017-12-05 19:39:42 +00:00
}
2017-06-16 23:35:16 +00:00
checkReg , err := createCheckReg ( serviceID , checkID , check , ip , port )
2017-04-04 00:08:08 +00:00
if err != nil {
2017-08-07 22:54:05 +00:00
return nil , fmt . Errorf ( "failed to add check %q: %v" , check . Name , err )
2017-04-04 00:08:08 +00:00
}
2017-04-13 23:43:38 +00:00
ops . regChecks = append ( ops . regChecks , checkReg )
2017-04-04 00:08:08 +00:00
}
2017-08-07 22:54:05 +00:00
return checkIDs , nil
2017-02-01 00:43:57 +00:00
}
2017-08-07 21:13:05 +00:00
// RegisterTask with Consul. Adds all service entries and checks to Consul. If
2017-02-01 00:43:57 +00:00
// exec is nil and a script check exists an error is returned.
//
2017-06-09 17:29:41 +00:00
// If the service IP is set it used as the address in the service registration.
// Checks will always use the IP from the Task struct (host's IP).
//
2017-02-01 00:43:57 +00:00
// Actual communication with Consul is done asynchrously (see Run).
2017-08-26 05:40:18 +00:00
func ( c * ServiceClient ) RegisterTask ( allocID string , task * structs . Task , restarter TaskRestarter , exec driver . ScriptExecutor , net * cstructs . DriverNetwork ) error {
2017-08-07 22:54:05 +00:00
// Fast path
numServices := len ( task . Services )
if numServices == 0 {
return nil
}
t := new ( TaskRegistration )
t . Services = make ( map [ string ] * ServiceRegistration , numServices )
2017-04-08 00:10:26 +00:00
ops := & operations { }
2017-04-04 00:08:08 +00:00
for _ , service := range task . Services {
2017-08-07 22:54:05 +00:00
sreg , err := c . serviceRegs ( ops , allocID , service , task , exec , net )
if err != nil {
2017-04-04 00:08:08 +00:00
return err
}
2017-08-07 22:54:05 +00:00
t . Services [ sreg . serviceID ] = sreg
2017-04-04 00:08:08 +00:00
}
2017-08-07 22:54:05 +00:00
// Add the task to the allocation's registration
c . addTaskRegistration ( allocID , task . Name , t )
2017-04-08 00:10:26 +00:00
c . commit ( ops )
2017-08-26 05:40:18 +00:00
// Start watching checks. Done after service registrations are built
// since an error building them could leak watches.
for _ , service := range task . Services {
serviceID := makeTaskServiceID ( allocID , task . Name , service )
for _ , check := range service . Checks {
2017-09-14 16:58:35 +00:00
if check . TriggersRestarts ( ) {
2017-08-26 05:40:18 +00:00
checkID := makeCheckID ( serviceID , check )
c . checkWatcher . Watch ( allocID , task . Name , checkID , check , restarter )
}
}
}
2017-04-04 00:08:08 +00:00
return nil
}
// UpdateTask in Consul. Does not alter the service if only checks have
// changed.
2017-06-16 23:35:16 +00:00
//
// DriverNetwork must not change between invocations for the same allocation.
2017-08-26 05:40:18 +00:00
func ( c * ServiceClient ) UpdateTask ( allocID string , existing , newTask * structs . Task , restarter TaskRestarter , exec driver . ScriptExecutor , net * cstructs . DriverNetwork ) error {
2017-04-08 00:10:26 +00:00
ops := & operations { }
2017-04-04 00:08:08 +00:00
2017-12-05 19:39:42 +00:00
taskReg := new ( TaskRegistration )
taskReg . Services = make ( map [ string ] * ServiceRegistration , len ( newTask . Services ) )
2017-08-07 22:54:05 +00:00
2017-04-04 00:08:08 +00:00
existingIDs := make ( map [ string ] * structs . Service , len ( existing . Services ) )
for _ , s := range existing . Services {
existingIDs [ makeTaskServiceID ( allocID , existing . Name , s ) ] = s
}
newIDs := make ( map [ string ] * structs . Service , len ( newTask . Services ) )
for _ , s := range newTask . Services {
newIDs [ makeTaskServiceID ( allocID , newTask . Name , s ) ] = s
}
// Loop over existing Service IDs to see if they have been removed or
// updated.
for existingID , existingSvc := range existingIDs {
newSvc , ok := newIDs [ existingID ]
if ! ok {
2017-08-07 21:13:05 +00:00
// Existing service entry removed
2017-04-08 00:10:26 +00:00
ops . deregServices = append ( ops . deregServices , existingID )
2017-04-04 00:08:08 +00:00
for _ , check := range existingSvc . Checks {
2017-08-26 05:40:18 +00:00
cid := makeCheckID ( existingID , check )
ops . deregChecks = append ( ops . deregChecks , cid )
// Unwatch watched checks
2017-09-14 16:58:35 +00:00
if check . TriggersRestarts ( ) {
2017-08-26 05:40:18 +00:00
c . checkWatcher . Unwatch ( cid )
}
2017-04-04 00:08:08 +00:00
}
continue
2017-02-01 00:43:57 +00:00
}
2017-12-08 01:08:25 +00:00
// Service exists and hasn't changed, don't re-add it later
delete ( newIDs , existingID )
2017-08-07 22:54:05 +00:00
// Service still exists so add it to the task's registration
sreg := & ServiceRegistration {
serviceID : existingID ,
checkIDs : make ( map [ string ] struct { } , len ( newSvc . Checks ) ) ,
}
2017-12-05 19:39:42 +00:00
taskReg . Services [ existingID ] = sreg
2017-08-07 22:54:05 +00:00
2017-12-08 01:08:25 +00:00
// See if any checks were updated
2017-08-26 05:40:18 +00:00
existingChecks := make ( map [ string ] * structs . ServiceCheck , len ( existingSvc . Checks ) )
2017-04-04 00:08:08 +00:00
for _ , check := range existingSvc . Checks {
2017-08-26 05:40:18 +00:00
existingChecks [ makeCheckID ( existingID , check ) ] = check
2017-04-04 00:08:08 +00:00
}
// Register new checks
for _ , check := range newSvc . Checks {
2017-06-16 23:35:16 +00:00
checkID := makeCheckID ( existingID , check )
2017-04-08 00:10:26 +00:00
if _ , exists := existingChecks [ checkID ] ; exists {
2017-04-18 04:15:13 +00:00
// Check exists, so don't remove it
2017-04-08 00:10:26 +00:00
delete ( existingChecks , checkID )
2017-08-07 22:54:05 +00:00
sreg . checkIDs [ checkID ] = struct { } { }
2017-12-08 01:08:25 +00:00
}
2017-08-26 05:40:18 +00:00
2017-12-08 01:08:25 +00:00
// New check on an unchanged service; add them now
newCheckIDs , err := c . checkRegs ( ops , allocID , existingID , newSvc , newTask , exec , net )
if err != nil {
return err
}
2017-08-26 05:40:18 +00:00
2017-12-08 01:08:25 +00:00
for _ , checkID := range newCheckIDs {
sreg . checkIDs [ checkID ] = struct { } { }
2017-08-26 05:40:18 +00:00
}
// Update all watched checks as CheckRestart fields aren't part of ID
2017-09-14 16:58:35 +00:00
if check . TriggersRestarts ( ) {
2017-08-26 05:40:18 +00:00
c . checkWatcher . Watch ( allocID , newTask . Name , checkID , check , restarter )
2017-02-01 00:43:57 +00:00
}
}
2017-04-08 00:10:26 +00:00
// Remove existing checks not in updated service
2017-08-26 05:40:18 +00:00
for cid , check := range existingChecks {
2017-04-08 00:10:26 +00:00
ops . deregChecks = append ( ops . deregChecks , cid )
2017-08-26 05:40:18 +00:00
// Unwatch checks
2017-09-14 16:58:35 +00:00
if check . TriggersRestarts ( ) {
2017-08-26 05:40:18 +00:00
c . checkWatcher . Unwatch ( cid )
}
2017-04-08 00:10:26 +00:00
}
2017-02-01 00:43:57 +00:00
}
2017-04-04 00:08:08 +00:00
// Any remaining services should just be enqueued directly
for _ , newSvc := range newIDs {
2017-08-07 22:54:05 +00:00
sreg , err := c . serviceRegs ( ops , allocID , newSvc , newTask , exec , net )
2017-04-04 00:08:08 +00:00
if err != nil {
return err
}
2017-08-07 22:54:05 +00:00
2017-12-05 19:39:42 +00:00
taskReg . Services [ sreg . serviceID ] = sreg
2017-04-04 00:08:08 +00:00
}
2017-08-07 22:54:05 +00:00
// Add the task to the allocation's registration
2017-12-05 19:39:42 +00:00
c . addTaskRegistration ( allocID , newTask . Name , taskReg )
2017-08-07 22:54:05 +00:00
2017-04-08 00:10:26 +00:00
c . commit ( ops )
2017-08-26 05:40:18 +00:00
// Start watching checks. Done after service registrations are built
// since an error building them could leak watches.
for _ , service := range newIDs {
serviceID := makeTaskServiceID ( allocID , newTask . Name , service )
for _ , check := range service . Checks {
2017-09-14 16:58:35 +00:00
if check . TriggersRestarts ( ) {
2017-08-26 05:40:18 +00:00
checkID := makeCheckID ( serviceID , check )
c . checkWatcher . Watch ( allocID , newTask . Name , checkID , check , restarter )
}
}
}
2017-02-01 00:43:57 +00:00
return nil
}
// RemoveTask from Consul. Removes all service entries and checks.
//
// Actual communication with Consul is done asynchrously (see Run).
func ( c * ServiceClient ) RemoveTask ( allocID string , task * structs . Task ) {
2017-04-08 00:10:26 +00:00
ops := operations { }
2017-02-01 00:43:57 +00:00
2017-04-04 00:08:08 +00:00
for _ , service := range task . Services {
2017-02-01 00:43:57 +00:00
id := makeTaskServiceID ( allocID , task . Name , service )
2017-04-08 00:10:26 +00:00
ops . deregServices = append ( ops . deregServices , id )
2017-02-01 00:43:57 +00:00
for _ , check := range service . Checks {
2017-08-26 05:40:18 +00:00
cid := makeCheckID ( id , check )
ops . deregChecks = append ( ops . deregChecks , cid )
2017-09-14 16:58:35 +00:00
if check . TriggersRestarts ( ) {
2017-08-26 05:40:18 +00:00
c . checkWatcher . Unwatch ( cid )
}
2017-02-01 00:43:57 +00:00
}
}
2017-08-07 22:54:05 +00:00
// Remove the task from the alloc's registrations
c . removeTaskRegistration ( allocID , task . Name )
2017-02-01 00:43:57 +00:00
// Now add them to the deregistration fields; main Run loop will update
2017-04-08 00:10:26 +00:00
c . commit ( & ops )
2017-02-01 00:43:57 +00:00
}
2017-08-07 22:54:05 +00:00
// AllocRegistrations returns the registrations for the given allocation. If the
// allocation has no reservations, the response is a nil object.
func ( c * ServiceClient ) AllocRegistrations ( allocID string ) ( * AllocRegistration , error ) {
// Get the internal struct using the lock
c . allocRegistrationsLock . RLock ( )
regInternal , ok := c . allocRegistrations [ allocID ]
if ! ok {
c . allocRegistrationsLock . RUnlock ( )
return nil , nil
}
// Copy so we don't expose internal structs
2017-08-10 20:07:03 +00:00
reg := regInternal . copy ( )
2017-08-07 22:54:05 +00:00
c . allocRegistrationsLock . RUnlock ( )
// Query the services and checks to populate the allocation registrations.
services , err := c . client . Services ( )
if err != nil {
return nil , err
2017-07-04 19:24:27 +00:00
}
checks , err := c . client . Checks ( )
if err != nil {
return nil , err
}
2017-08-07 22:54:05 +00:00
// Populate the object
for _ , treg := range reg . Tasks {
for serviceID , sreg := range treg . Services {
sreg . Service = services [ serviceID ]
for checkID := range sreg . checkIDs {
if check , ok := checks [ checkID ] ; ok {
sreg . Checks = append ( sreg . Checks , check )
}
}
2017-07-04 19:24:27 +00:00
}
}
2017-08-07 22:54:05 +00:00
return reg , nil
2017-07-04 19:24:27 +00:00
}
2017-02-01 00:43:57 +00:00
// Shutdown the Consul client. Update running task registations and deregister
2017-04-18 00:07:42 +00:00
// agent from Consul. On first call blocks up to shutdownWait before giving up
// on syncing operations.
2017-02-01 00:43:57 +00:00
func ( c * ServiceClient ) Shutdown ( ) error {
2017-04-18 00:07:42 +00:00
// Serialize Shutdown calls with RegisterAgent to prevent leaking agent
// entries.
c . agentLock . Lock ( )
2017-07-24 18:40:37 +00:00
defer c . agentLock . Unlock ( )
2017-02-01 00:43:57 +00:00
select {
case <- c . shutdownCh :
return nil
default :
2017-07-24 18:40:37 +00:00
close ( c . shutdownCh )
2017-02-01 00:43:57 +00:00
}
2017-04-12 19:07:10 +00:00
// Give run loop time to sync, but don't block indefinitely
deadline := time . After ( c . shutdownWait )
2017-02-01 00:43:57 +00:00
2017-04-08 00:10:26 +00:00
// Wait for Run to finish any outstanding operations and exit
2017-02-01 00:43:57 +00:00
select {
2017-04-08 00:10:26 +00:00
case <- c . exitCh :
2017-02-01 00:43:57 +00:00
case <- deadline :
// Don't wait forever though
2017-07-24 18:40:37 +00:00
}
2017-07-24 23:48:40 +00:00
// If Consul was never seen nothing could be written so exit early
if ! c . hasSeen ( ) {
2017-07-24 19:12:02 +00:00
return nil
}
2017-07-24 18:40:37 +00:00
// Always attempt to deregister Nomad agent Consul entries, even if
// deadline was reached
for id := range c . agentServices {
if err := c . client . ServiceDeregister ( id ) ; err != nil {
c . logger . Printf ( "[ERR] consul.sync: error deregistering agent service (id: %q): %v" , id , err )
}
}
for id := range c . agentChecks {
if err := c . client . CheckDeregister ( id ) ; err != nil {
c . logger . Printf ( "[ERR] consul.sync: error deregistering agent service (id: %q): %v" , id , err )
}
2017-02-01 00:43:57 +00:00
}
// Give script checks time to exit (no need to lock as Run() has exited)
for _ , h := range c . runningScripts {
select {
case <- h . wait ( ) :
case <- deadline :
2017-04-12 19:07:10 +00:00
return fmt . Errorf ( "timed out waiting for script checks to run" )
2017-02-01 00:43:57 +00:00
}
}
2017-04-12 19:07:10 +00:00
return nil
2017-02-01 00:43:57 +00:00
}
2017-08-07 22:54:05 +00:00
// addTaskRegistration adds the task registration for the given allocation.
func ( c * ServiceClient ) addTaskRegistration ( allocID , taskName string , reg * TaskRegistration ) {
c . allocRegistrationsLock . Lock ( )
defer c . allocRegistrationsLock . Unlock ( )
alloc , ok := c . allocRegistrations [ allocID ]
if ! ok {
alloc = & AllocRegistration {
Tasks : make ( map [ string ] * TaskRegistration ) ,
}
c . allocRegistrations [ allocID ] = alloc
}
alloc . Tasks [ taskName ] = reg
}
// removeTaskRegistration removes the task registration for the given allocation.
func ( c * ServiceClient ) removeTaskRegistration ( allocID , taskName string ) {
c . allocRegistrationsLock . Lock ( )
defer c . allocRegistrationsLock . Unlock ( )
alloc , ok := c . allocRegistrations [ allocID ]
if ! ok {
return
}
// Delete the task and if it is the last one also delete the alloc's
// registration
delete ( alloc . Tasks , taskName )
if len ( alloc . Tasks ) == 0 {
delete ( c . allocRegistrations , allocID )
}
}
2017-02-01 00:43:57 +00:00
// makeAgentServiceID creates a unique ID for identifying an agent service in
// Consul.
//
// Agent service IDs are of the form:
//
2017-12-08 01:08:25 +00:00
// {nomadServicePrefix}-{ROLE}-b32(sha1({Service.Name}-{Service.Tags...})
2017-12-12 00:50:15 +00:00
// Example Server ID: _nomad-server-fbbk265qn4tmt25nd4ep42tjvmyj3hr4
// Example Client ID: _nomad-client-ggnjpgl7yn7rgmvxzilmpvrzzvrszc7l
2017-02-01 00:43:57 +00:00
//
func makeAgentServiceID ( role string , service * structs . Service ) string {
2017-12-12 00:50:15 +00:00
return fmt . Sprintf ( "%s-%s-%s" , nomadServicePrefix , role , service . Hash ( role , "" ) )
2017-02-01 00:43:57 +00:00
}
// makeTaskServiceID creates a unique ID for identifying a task service in
2017-12-08 01:08:25 +00:00
// Consul. All structs.Service fields are included in the ID's hash except
// Checks. This allows updates to merely compare IDs.
2017-02-01 00:43:57 +00:00
//
2017-12-08 01:08:25 +00:00
// Example Service ID: _nomad-task-TNM333JKJPM5AK4FAS3VXQLXFDWOF4VH
2017-02-01 00:43:57 +00:00
func makeTaskServiceID ( allocID , taskName string , service * structs . Service ) string {
2017-12-08 19:50:14 +00:00
return nomadTaskPrefix + service . Hash ( allocID , taskName )
2017-02-01 00:43:57 +00:00
}
2017-06-16 23:35:16 +00:00
// makeCheckID creates a unique ID for a check.
func makeCheckID ( serviceID string , check * structs . ServiceCheck ) string {
2017-02-01 00:43:57 +00:00
return check . Hash ( serviceID )
}
// createCheckReg creates a Check that can be registered with Consul.
//
2017-04-12 20:27:56 +00:00
// Script checks simply have a TTL set and the caller is responsible for
// running the script and heartbeating.
2017-02-01 00:43:57 +00:00
func createCheckReg ( serviceID , checkID string , check * structs . ServiceCheck , host string , port int ) ( * api . AgentCheckRegistration , error ) {
chkReg := api . AgentCheckRegistration {
ID : checkID ,
Name : check . Name ,
ServiceID : serviceID ,
}
chkReg . Status = check . InitialStatus
chkReg . Timeout = check . Timeout . String ( )
chkReg . Interval = check . Interval . String ( )
2017-12-19 00:18:42 +00:00
// Require an address for http or tcp checks
if port == 0 && check . RequiresPort ( ) {
return nil , fmt . Errorf ( "%s checks require an address" , check . Type )
}
2017-02-01 00:43:57 +00:00
switch check . Type {
case structs . ServiceCheckHTTP :
2017-04-26 18:22:01 +00:00
proto := check . Protocol
if proto == "" {
proto = "http"
2017-02-01 00:43:57 +00:00
}
2017-04-19 04:28:25 +00:00
if check . TLSSkipVerify {
chkReg . TLSSkipVerify = true
}
2017-02-01 00:43:57 +00:00
base := url . URL {
2017-04-26 18:22:01 +00:00
Scheme : proto ,
2017-02-01 00:43:57 +00:00
Host : net . JoinHostPort ( host , strconv . Itoa ( port ) ) ,
}
relative , err := url . Parse ( check . Path )
if err != nil {
return nil , err
}
url := base . ResolveReference ( relative )
chkReg . HTTP = url . String ( )
2017-08-15 23:13:05 +00:00
chkReg . Method = check . Method
chkReg . Header = check . Header
2017-02-01 00:43:57 +00:00
case structs . ServiceCheckTCP :
chkReg . TCP = net . JoinHostPort ( host , strconv . Itoa ( port ) )
case structs . ServiceCheckScript :
chkReg . TTL = ( check . Interval + ttlCheckBuffer ) . String ( )
2017-10-17 00:35:47 +00:00
// As of Consul 1.0.0 setting TTL and Interval is a 400
chkReg . Interval = ""
2017-02-01 00:43:57 +00:00
default :
return nil , fmt . Errorf ( "check type %+q not valid" , check . Type )
}
return & chkReg , nil
}
2017-04-08 00:10:26 +00:00
// isNomadService returns true if the ID matches the pattern of a Nomad managed
2017-12-08 01:08:25 +00:00
// service (new or old formats). Agent services return false as independent
// client and server agents may be running on the same machine. #2827
2017-04-08 00:10:26 +00:00
func isNomadService ( id string ) bool {
2017-12-08 01:08:25 +00:00
return strings . HasPrefix ( id , nomadTaskPrefix ) || isOldNomadService ( id )
}
// isOldNomadService returns true if the ID matches an old pattern managed by
// Nomad.
//
// Pre-0.7.1 task service IDs are of the form:
//
// {nomadServicePrefix}-executor-{ALLOC_ID}-{Service.Name}-{Service.Tags...}
// Example Service ID: _nomad-executor-1234-echo-http-tag1-tag2-tag3
//
func isOldNomadService ( id string ) bool {
2017-07-18 20:23:01 +00:00
const prefix = nomadServicePrefix + "-executor"
return strings . HasPrefix ( id , prefix )
2017-04-08 00:10:26 +00:00
}
2017-12-05 19:39:42 +00:00
2017-12-19 00:18:42 +00:00
// getAddress returns the IP and port to use for a service or check. If no port
// label is specified (an empty value), zero values are returned because no
// address could be resolved.
2017-12-05 19:39:42 +00:00
func getAddress ( addrMode , portLabel string , networks structs . Networks , driverNet * cstructs . DriverNetwork ) ( string , int , error ) {
2017-12-19 00:18:42 +00:00
// No port label specified, no address can be assembled
if portLabel == "" {
return "" , 0 , nil
}
2017-12-05 19:39:42 +00:00
switch addrMode {
case structs . AddressModeAuto :
if driverNet . Advertise ( ) {
addrMode = structs . AddressModeDriver
} else {
addrMode = structs . AddressModeHost
}
return getAddress ( addrMode , portLabel , networks , driverNet )
case structs . AddressModeHost :
// Default path: use host ip:port
ip , port := networks . Port ( portLabel )
2017-12-08 20:27:57 +00:00
if ip == "" && port <= 0 {
2017-12-08 05:58:15 +00:00
return "" , 0 , fmt . Errorf ( "invalid port %q: port label not found" , portLabel )
}
2017-12-05 19:39:42 +00:00
return ip , port , nil
case structs . AddressModeDriver :
// Require a driver network if driver address mode is used
if driverNet == nil {
return "" , 0 , fmt . Errorf ( ` cannot use address_mode="driver": no driver network exists ` )
}
// If the port is a label, use the driver's port (not the host's)
if port , ok := driverNet . PortMap [ portLabel ] ; ok {
return driverNet . IP , port , nil
}
// If port isn't a label, try to parse it as a literal port number
port , err := strconv . Atoi ( portLabel )
if err != nil {
return "" , 0 , fmt . Errorf ( "invalid port %q: %v" , portLabel , err )
}
2017-12-08 20:27:57 +00:00
if port <= 0 {
2017-12-08 06:04:22 +00:00
return "" , 0 , fmt . Errorf ( "invalid port: %q: port 0 is invalid" , portLabel )
}
2017-12-05 19:39:42 +00:00
return driverNet . IP , port , nil
default :
// Shouldn't happen due to validation, but enforce invariants
return "" , 0 , fmt . Errorf ( "invalid address mode %q" , addrMode )
}
}