2017-02-01 00:43:57 +00:00
|
|
|
package consul
|
|
|
|
|
|
|
|
import (
|
2017-08-26 05:40:18 +00:00
|
|
|
"context"
|
2017-02-01 00:43:57 +00:00
|
|
|
"fmt"
|
|
|
|
"net"
|
|
|
|
"net/url"
|
consul: Use a stable identifier for services
The current implementation of Service Registration uses a hash of the
nomad-internal state of a service to register it with Consul, this means that
any update to the service invalidates this name and we then deregister, and
recreate the service in Consul.
While this behaviour slightly simplifies reasoning about service registration,
this becomes problematic when we add consul health checks to a service. When
the service is re-registered, so are the checks, which default to failing for
at least one check period.
This commit migrates us to using a stable identifier based on the
allocation, task, and service identifiers, and uses the difference
between the remote and local state to decide when to push updates.
It uses the existing hashing mechanic to decide when UpdateTask should
regenerate service registrations for providing to Sync, but this should
be removable as part of a future refactor.
It additionally introduces the _nomad-check- prefix for check
definitions, to allow for future allowing of consul features like
maintenance mode.
2019-04-10 08:39:24 +00:00
|
|
|
"reflect"
|
2017-02-01 00:43:57 +00:00
|
|
|
"strconv"
|
|
|
|
"strings"
|
|
|
|
"sync"
|
2017-07-24 19:12:02 +00:00
|
|
|
"sync/atomic"
|
2017-02-01 00:43:57 +00:00
|
|
|
"time"
|
|
|
|
|
2017-04-18 23:23:39 +00:00
|
|
|
metrics "github.com/armon/go-metrics"
|
2018-09-13 17:43:40 +00:00
|
|
|
log "github.com/hashicorp/go-hclog"
|
|
|
|
|
|
|
|
"github.com/hashicorp/consul/api"
|
2017-08-07 22:54:05 +00:00
|
|
|
"github.com/hashicorp/nomad/helper"
|
2017-02-01 00:43:57 +00:00
|
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
2019-01-04 23:01:35 +00:00
|
|
|
"github.com/hashicorp/nomad/plugins/drivers"
|
2017-02-01 00:43:57 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
2017-12-08 01:08:25 +00:00
|
|
|
// nomadServicePrefix is the prefix that scopes all Nomad registered
|
|
|
|
// services (both agent and task entries).
|
2017-02-01 00:43:57 +00:00
|
|
|
nomadServicePrefix = "_nomad"
|
|
|
|
|
2017-12-08 01:08:25 +00:00
|
|
|
// nomadTaskPrefix is the prefix that scopes Nomad registered services
|
|
|
|
// for tasks.
|
|
|
|
nomadTaskPrefix = nomadServicePrefix + "-task-"
|
|
|
|
|
consul: Use a stable identifier for services
The current implementation of Service Registration uses a hash of the
nomad-internal state of a service to register it with Consul, this means that
any update to the service invalidates this name and we then deregister, and
recreate the service in Consul.
While this behaviour slightly simplifies reasoning about service registration,
this becomes problematic when we add consul health checks to a service. When
the service is re-registered, so are the checks, which default to failing for
at least one check period.
This commit migrates us to using a stable identifier based on the
allocation, task, and service identifiers, and uses the difference
between the remote and local state to decide when to push updates.
It uses the existing hashing mechanic to decide when UpdateTask should
regenerate service registrations for providing to Sync, but this should
be removable as part of a future refactor.
It additionally introduces the _nomad-check- prefix for check
definitions, to allow for future allowing of consul features like
maintenance mode.
2019-04-10 08:39:24 +00:00
|
|
|
// nomadCheckPrefix is the prefix that scopes Nomad registered checks for
|
|
|
|
// services.
|
|
|
|
nomadCheckPrefix = nomadServicePrefix + "-check-"
|
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
// defaultRetryInterval is how quickly to retry syncing services and
|
|
|
|
// checks to Consul when an error occurs. Will backoff up to a max.
|
|
|
|
defaultRetryInterval = time.Second
|
|
|
|
|
|
|
|
// defaultMaxRetryInterval is the default max retry interval.
|
|
|
|
defaultMaxRetryInterval = 30 * time.Second
|
2017-02-01 00:43:57 +00:00
|
|
|
|
2018-04-17 19:36:50 +00:00
|
|
|
// defaultPeriodicalInterval is the interval at which the service
|
|
|
|
// client reconciles state between the desired services and checks and
|
|
|
|
// what's actually registered in Consul. This is done at an interval,
|
|
|
|
// rather than being purely edge triggered, to handle the case that the
|
|
|
|
// Consul agent's state may change underneath us
|
|
|
|
defaultPeriodicInterval = 30 * time.Second
|
|
|
|
|
2017-02-01 00:43:57 +00:00
|
|
|
// ttlCheckBuffer is the time interval that Nomad can take to report Consul
|
|
|
|
// the check result
|
|
|
|
ttlCheckBuffer = 31 * time.Second
|
|
|
|
|
|
|
|
// defaultShutdownWait is how long Shutdown() should block waiting for
|
|
|
|
// enqueued operations to sync to Consul by default.
|
|
|
|
defaultShutdownWait = time.Minute
|
|
|
|
|
|
|
|
// DefaultQueryWaitDuration is the max duration the Consul Agent will
|
|
|
|
// spend waiting for a response from a Consul Query.
|
|
|
|
DefaultQueryWaitDuration = 2 * time.Second
|
|
|
|
|
|
|
|
// ServiceTagHTTP is the tag assigned to HTTP services
|
|
|
|
ServiceTagHTTP = "http"
|
|
|
|
|
|
|
|
// ServiceTagRPC is the tag assigned to RPC services
|
|
|
|
ServiceTagRPC = "rpc"
|
|
|
|
|
|
|
|
// ServiceTagSerf is the tag assigned to Serf services
|
|
|
|
ServiceTagSerf = "serf"
|
2019-06-14 14:57:46 +00:00
|
|
|
|
|
|
|
// deregisterProbationPeriod is the initialization period where
|
2019-07-17 03:43:13 +00:00
|
|
|
// services registered in Consul but not in Nomad don't get deregistered,
|
2019-06-14 14:57:46 +00:00
|
|
|
// to allow for nomad restoring tasks
|
2019-07-17 03:43:13 +00:00
|
|
|
deregisterProbationPeriod = time.Minute
|
2017-02-01 00:43:57 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// CatalogAPI is the consul/api.Catalog API used by Nomad.
|
|
|
|
type CatalogAPI interface {
|
|
|
|
Datacenters() ([]string, error)
|
|
|
|
Service(service, tag string, q *api.QueryOptions) ([]*api.CatalogService, *api.QueryMeta, error)
|
|
|
|
}
|
|
|
|
|
|
|
|
// AgentAPI is the consul/api.Agent API used by Nomad.
|
|
|
|
type AgentAPI interface {
|
2017-04-08 00:10:26 +00:00
|
|
|
Services() (map[string]*api.AgentService, error)
|
|
|
|
Checks() (map[string]*api.AgentCheck, error)
|
2017-02-01 00:43:57 +00:00
|
|
|
CheckRegister(check *api.AgentCheckRegistration) error
|
|
|
|
CheckDeregister(checkID string) error
|
2018-03-15 00:37:54 +00:00
|
|
|
Self() (map[string]map[string]interface{}, error)
|
2017-02-01 00:43:57 +00:00
|
|
|
ServiceRegister(service *api.AgentServiceRegistration) error
|
|
|
|
ServiceDeregister(serviceID string) error
|
|
|
|
UpdateTTL(id, output, status string) error
|
|
|
|
}
|
|
|
|
|
consul: Use a stable identifier for services
The current implementation of Service Registration uses a hash of the
nomad-internal state of a service to register it with Consul, this means that
any update to the service invalidates this name and we then deregister, and
recreate the service in Consul.
While this behaviour slightly simplifies reasoning about service registration,
this becomes problematic when we add consul health checks to a service. When
the service is re-registered, so are the checks, which default to failing for
at least one check period.
This commit migrates us to using a stable identifier based on the
allocation, task, and service identifiers, and uses the difference
between the remote and local state to decide when to push updates.
It uses the existing hashing mechanic to decide when UpdateTask should
regenerate service registrations for providing to Sync, but this should
be removable as part of a future refactor.
It additionally introduces the _nomad-check- prefix for check
definitions, to allow for future allowing of consul features like
maintenance mode.
2019-04-10 08:39:24 +00:00
|
|
|
func agentServiceUpdateRequired(reg *api.AgentServiceRegistration, svc *api.AgentService) bool {
|
|
|
|
return !(reg.Kind == svc.Kind &&
|
|
|
|
reg.ID == svc.ID &&
|
|
|
|
reg.Port == svc.Port &&
|
|
|
|
reg.Address == svc.Address &&
|
|
|
|
reg.Name == svc.Service &&
|
|
|
|
reflect.DeepEqual(reg.Tags, svc.Tags))
|
|
|
|
}
|
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
// operations are submitted to the main loop via commit() for synchronizing
|
|
|
|
// with Consul.
|
|
|
|
type operations struct {
|
support script checks for task group services (#6197)
In Nomad prior to Consul Connect, all Consul checks work the same
except for Script checks. Because the Task being checked is running in
its own container namespaces, the check is executed by Nomad in the
Task's context. If the Script check passes, Nomad uses the TTL check
feature of Consul to update the check status. This means in order to
run a Script check, we need to know what Task to execute it in.
To support Consul Connect, we need Group Services, and these need to
be registered in Consul along with their checks. We could push the
Service down into the Task, but this doesn't work if someone wants to
associate a service with a task's ports, but do script checks in
another task in the allocation.
Because Nomad is handling the Script check and not Consul anyways,
this moves the script check handling into the task runner so that the
task runner can own the script check's configuration and
lifecycle. This will allow us to pass the group service check
configuration down into a task without associating the service itself
with the task.
When tasks are checked for script checks, we walk back through their
task group to see if there are script checks associated with the
task. If so, we'll spin off script check tasklets for them. The
group-level service and any restart behaviors it needs are entirely
encapsulated within the group service hook.
2019-09-03 19:09:04 +00:00
|
|
|
regServices []*api.AgentServiceRegistration
|
|
|
|
regChecks []*api.AgentCheckRegistration
|
2017-04-08 00:10:26 +00:00
|
|
|
deregServices []string
|
|
|
|
deregChecks []string
|
|
|
|
}
|
|
|
|
|
2017-08-07 22:54:05 +00:00
|
|
|
// AllocRegistration holds the status of services registered for a particular
|
|
|
|
// allocations by task.
|
|
|
|
type AllocRegistration struct {
|
|
|
|
// Tasks maps the name of a task to its registered services and checks
|
|
|
|
Tasks map[string]*TaskRegistration
|
|
|
|
}
|
|
|
|
|
2017-08-10 20:07:03 +00:00
|
|
|
func (a *AllocRegistration) copy() *AllocRegistration {
|
2017-08-07 22:54:05 +00:00
|
|
|
c := &AllocRegistration{
|
|
|
|
Tasks: make(map[string]*TaskRegistration, len(a.Tasks)),
|
|
|
|
}
|
|
|
|
|
|
|
|
for k, v := range a.Tasks {
|
2017-08-10 20:07:03 +00:00
|
|
|
c.Tasks[k] = v.copy()
|
2017-08-07 22:54:05 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return c
|
|
|
|
}
|
|
|
|
|
|
|
|
// NumServices returns the number of registered services
|
|
|
|
func (a *AllocRegistration) NumServices() int {
|
|
|
|
if a == nil {
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
|
|
|
|
total := 0
|
|
|
|
for _, treg := range a.Tasks {
|
|
|
|
for _, sreg := range treg.Services {
|
|
|
|
if sreg.Service != nil {
|
|
|
|
total++
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return total
|
|
|
|
}
|
|
|
|
|
|
|
|
// NumChecks returns the number of registered checks
|
|
|
|
func (a *AllocRegistration) NumChecks() int {
|
|
|
|
if a == nil {
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
|
|
|
|
total := 0
|
|
|
|
for _, treg := range a.Tasks {
|
|
|
|
for _, sreg := range treg.Services {
|
|
|
|
total += len(sreg.Checks)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return total
|
|
|
|
}
|
|
|
|
|
|
|
|
// TaskRegistration holds the status of services registered for a particular
|
|
|
|
// task.
|
|
|
|
type TaskRegistration struct {
|
|
|
|
Services map[string]*ServiceRegistration
|
|
|
|
}
|
|
|
|
|
2017-08-10 20:07:03 +00:00
|
|
|
func (t *TaskRegistration) copy() *TaskRegistration {
|
2017-08-07 22:54:05 +00:00
|
|
|
c := &TaskRegistration{
|
|
|
|
Services: make(map[string]*ServiceRegistration, len(t.Services)),
|
|
|
|
}
|
|
|
|
|
|
|
|
for k, v := range t.Services {
|
2017-08-10 20:07:03 +00:00
|
|
|
c.Services[k] = v.copy()
|
2017-08-07 22:54:05 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return c
|
|
|
|
}
|
|
|
|
|
|
|
|
// ServiceRegistration holds the status of a registered Consul Service and its
|
|
|
|
// Checks.
|
|
|
|
type ServiceRegistration struct {
|
|
|
|
// serviceID and checkIDs are internal fields that track just the IDs of the
|
|
|
|
// services/checks registered in Consul. It is used to materialize the other
|
|
|
|
// fields when queried.
|
|
|
|
serviceID string
|
|
|
|
checkIDs map[string]struct{}
|
|
|
|
|
|
|
|
// Service is the AgentService registered in Consul.
|
|
|
|
Service *api.AgentService
|
|
|
|
|
|
|
|
// Checks is the status of the registered checks.
|
|
|
|
Checks []*api.AgentCheck
|
|
|
|
}
|
|
|
|
|
2017-08-10 20:07:03 +00:00
|
|
|
func (s *ServiceRegistration) copy() *ServiceRegistration {
|
|
|
|
// Copy does not copy the external fields but only the internal fields. This
|
|
|
|
// is so that the caller of AllocRegistrations can not access the internal
|
|
|
|
// fields and that method uses these fields to populate the external fields.
|
2017-08-07 22:54:05 +00:00
|
|
|
return &ServiceRegistration{
|
|
|
|
serviceID: s.serviceID,
|
|
|
|
checkIDs: helper.CopyMapStringStruct(s.checkIDs),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-02-01 00:43:57 +00:00
|
|
|
// ServiceClient handles task and agent service registration with Consul.
|
|
|
|
type ServiceClient struct {
|
2017-04-08 00:10:26 +00:00
|
|
|
client AgentAPI
|
2018-09-13 17:43:40 +00:00
|
|
|
logger log.Logger
|
2017-04-08 00:10:26 +00:00
|
|
|
retryInterval time.Duration
|
|
|
|
maxRetryInterval time.Duration
|
2018-04-17 19:36:50 +00:00
|
|
|
periodicInterval time.Duration
|
2017-02-01 00:43:57 +00:00
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
// exitCh is closed when the main Run loop exits
|
|
|
|
exitCh chan struct{}
|
2017-02-01 00:43:57 +00:00
|
|
|
|
|
|
|
// shutdownCh is closed when the client should shutdown
|
|
|
|
shutdownCh chan struct{}
|
|
|
|
|
|
|
|
// shutdownWait is how long Shutdown() blocks waiting for the final
|
|
|
|
// sync() to finish. Defaults to defaultShutdownWait
|
|
|
|
shutdownWait time.Duration
|
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
opCh chan *operations
|
2017-02-01 00:43:57 +00:00
|
|
|
|
support script checks for task group services (#6197)
In Nomad prior to Consul Connect, all Consul checks work the same
except for Script checks. Because the Task being checked is running in
its own container namespaces, the check is executed by Nomad in the
Task's context. If the Script check passes, Nomad uses the TTL check
feature of Consul to update the check status. This means in order to
run a Script check, we need to know what Task to execute it in.
To support Consul Connect, we need Group Services, and these need to
be registered in Consul along with their checks. We could push the
Service down into the Task, but this doesn't work if someone wants to
associate a service with a task's ports, but do script checks in
another task in the allocation.
Because Nomad is handling the Script check and not Consul anyways,
this moves the script check handling into the task runner so that the
task runner can own the script check's configuration and
lifecycle. This will allow us to pass the group service check
configuration down into a task without associating the service itself
with the task.
When tasks are checked for script checks, we walk back through their
task group to see if there are script checks associated with the
task. If so, we'll spin off script check tasklets for them. The
group-level service and any restart behaviors it needs are entirely
encapsulated within the group service hook.
2019-09-03 19:09:04 +00:00
|
|
|
services map[string]*api.AgentServiceRegistration
|
|
|
|
checks map[string]*api.AgentCheckRegistration
|
2017-02-01 00:43:57 +00:00
|
|
|
|
2019-06-14 14:57:46 +00:00
|
|
|
explicitlyDeregisteredServices map[string]bool
|
|
|
|
explicitlyDeregisteredChecks map[string]bool
|
|
|
|
|
2017-08-07 22:54:05 +00:00
|
|
|
// allocRegistrations stores the services and checks that are registered
|
|
|
|
// with Consul by allocation ID.
|
|
|
|
allocRegistrations map[string]*AllocRegistration
|
|
|
|
allocRegistrationsLock sync.RWMutex
|
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
// agent services and checks record entries for the agent itself which
|
|
|
|
// should be removed on shutdown
|
2017-02-01 00:43:57 +00:00
|
|
|
agentServices map[string]struct{}
|
|
|
|
agentChecks map[string]struct{}
|
2017-04-08 00:10:26 +00:00
|
|
|
agentLock sync.Mutex
|
2017-07-24 19:12:02 +00:00
|
|
|
|
2018-03-11 18:34:27 +00:00
|
|
|
// seen is 1 if Consul has ever been seen; otherwise 0. Accessed with
|
2017-07-24 19:12:02 +00:00
|
|
|
// atomics.
|
2017-08-04 17:14:16 +00:00
|
|
|
seen int32
|
2017-08-26 05:40:18 +00:00
|
|
|
|
2019-06-14 14:57:46 +00:00
|
|
|
// deregisterProbationExpiry is the time before which consul sync shouldn't deregister
|
|
|
|
// unknown services.
|
|
|
|
// Used to mitigate risk of deleting restored services upon client restart.
|
|
|
|
deregisterProbationExpiry time.Time
|
|
|
|
|
2017-08-26 05:40:18 +00:00
|
|
|
// checkWatcher restarts checks that are unhealthy.
|
|
|
|
checkWatcher *checkWatcher
|
2018-06-01 19:48:25 +00:00
|
|
|
|
2018-06-01 20:59:53 +00:00
|
|
|
// isClientAgent specifies whether this Consul client is being used
|
|
|
|
// by a Nomad client.
|
2018-06-01 19:48:25 +00:00
|
|
|
isClientAgent bool
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewServiceClient creates a new Consul ServiceClient from an existing Consul API
|
2018-06-01 20:59:53 +00:00
|
|
|
// Client, logger and takes whether the client is being used by a Nomad Client agent.
|
|
|
|
// When being used by a Nomad client, this Consul client reconciles all services and
|
|
|
|
// checks created by Nomad on behalf of running tasks.
|
2018-09-13 17:43:40 +00:00
|
|
|
func NewServiceClient(consulClient AgentAPI, logger log.Logger, isNomadClient bool) *ServiceClient {
|
|
|
|
logger = logger.ResetNamed("consul.sync")
|
2017-02-01 00:43:57 +00:00
|
|
|
return &ServiceClient{
|
2019-06-14 14:57:46 +00:00
|
|
|
client: consulClient,
|
|
|
|
logger: logger,
|
|
|
|
retryInterval: defaultRetryInterval,
|
|
|
|
maxRetryInterval: defaultMaxRetryInterval,
|
|
|
|
periodicInterval: defaultPeriodicInterval,
|
|
|
|
exitCh: make(chan struct{}),
|
|
|
|
shutdownCh: make(chan struct{}),
|
|
|
|
shutdownWait: defaultShutdownWait,
|
|
|
|
opCh: make(chan *operations, 8),
|
|
|
|
services: make(map[string]*api.AgentServiceRegistration),
|
|
|
|
checks: make(map[string]*api.AgentCheckRegistration),
|
|
|
|
explicitlyDeregisteredServices: make(map[string]bool),
|
|
|
|
explicitlyDeregisteredChecks: make(map[string]bool),
|
|
|
|
allocRegistrations: make(map[string]*AllocRegistration),
|
|
|
|
agentServices: make(map[string]struct{}),
|
|
|
|
agentChecks: make(map[string]struct{}),
|
|
|
|
checkWatcher: newCheckWatcher(logger, consulClient),
|
|
|
|
isClientAgent: isNomadClient,
|
|
|
|
deregisterProbationExpiry: time.Now().Add(deregisterProbationPeriod),
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-07-25 19:13:05 +00:00
|
|
|
// seen is used by markSeen and hasSeen
|
2017-07-24 19:12:02 +00:00
|
|
|
const seen = 1
|
|
|
|
|
2017-07-24 23:48:40 +00:00
|
|
|
// markSeen marks Consul as having been seen (meaning at least one operation
|
2017-07-24 19:12:02 +00:00
|
|
|
// has succeeded).
|
2017-07-24 23:48:40 +00:00
|
|
|
func (c *ServiceClient) markSeen() {
|
2017-08-04 17:14:16 +00:00
|
|
|
atomic.StoreInt32(&c.seen, seen)
|
2017-07-24 19:12:02 +00:00
|
|
|
}
|
|
|
|
|
2017-07-24 23:48:40 +00:00
|
|
|
// hasSeen returns true if any Consul operation has ever succeeded. Useful to
|
2017-07-24 19:12:02 +00:00
|
|
|
// squelch errors if Consul isn't running.
|
2017-07-24 23:48:40 +00:00
|
|
|
func (c *ServiceClient) hasSeen() bool {
|
2017-08-04 17:14:16 +00:00
|
|
|
return atomic.LoadInt32(&c.seen) == seen
|
2017-07-24 19:12:02 +00:00
|
|
|
}
|
|
|
|
|
2017-02-01 00:43:57 +00:00
|
|
|
// Run the Consul main loop which retries operations against Consul. It should
|
|
|
|
// be called exactly once.
|
|
|
|
func (c *ServiceClient) Run() {
|
2017-04-08 00:10:26 +00:00
|
|
|
defer close(c.exitCh)
|
2017-08-26 05:40:18 +00:00
|
|
|
|
2018-03-15 00:37:54 +00:00
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
// init will be closed when Consul has been contacted
|
|
|
|
init := make(chan struct{})
|
|
|
|
go checkConsulTLSSkipVerify(ctx, c.logger, c.client, init)
|
|
|
|
|
|
|
|
// Process operations while waiting for initial contact with Consul but
|
|
|
|
// do not sync until contact has been made.
|
|
|
|
INIT:
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-init:
|
|
|
|
c.markSeen()
|
|
|
|
break INIT
|
|
|
|
case <-c.shutdownCh:
|
|
|
|
return
|
|
|
|
case ops := <-c.opCh:
|
|
|
|
c.merge(ops)
|
|
|
|
}
|
|
|
|
}
|
2018-09-13 17:43:40 +00:00
|
|
|
c.logger.Trace("able to contact Consul")
|
2018-03-15 00:37:54 +00:00
|
|
|
|
|
|
|
// Block until contact with Consul has been established
|
|
|
|
// Start checkWatcher
|
2017-08-26 05:40:18 +00:00
|
|
|
go c.checkWatcher.Run(ctx)
|
|
|
|
|
2018-04-17 19:36:50 +00:00
|
|
|
// Always immediately sync to reconcile Nomad and Consul's state
|
2017-04-08 00:10:26 +00:00
|
|
|
retryTimer := time.NewTimer(0)
|
2018-03-15 00:37:54 +00:00
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
failures := 0
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-retryTimer.C:
|
|
|
|
case <-c.shutdownCh:
|
2018-03-15 00:37:54 +00:00
|
|
|
// Cancel check watcher but sync one last time
|
|
|
|
cancel()
|
2017-04-08 00:10:26 +00:00
|
|
|
case ops := <-c.opCh:
|
|
|
|
c.merge(ops)
|
|
|
|
}
|
2017-02-01 00:43:57 +00:00
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
if err := c.sync(); err != nil {
|
2017-07-24 23:48:40 +00:00
|
|
|
if failures == 0 {
|
2017-12-08 01:08:25 +00:00
|
|
|
// Log on the first failure
|
2018-09-13 17:43:40 +00:00
|
|
|
c.logger.Warn("failed to update services in Consul", "error", err)
|
2017-12-08 01:08:25 +00:00
|
|
|
} else if failures%10 == 0 {
|
|
|
|
// Log every 10th consecutive failure
|
2018-09-13 17:43:40 +00:00
|
|
|
c.logger.Error("still unable to update services in Consul", "failures", failures, "error", err)
|
2017-04-08 00:10:26 +00:00
|
|
|
}
|
2017-12-08 01:08:25 +00:00
|
|
|
|
2017-07-24 22:37:53 +00:00
|
|
|
failures++
|
2017-04-08 00:10:26 +00:00
|
|
|
if !retryTimer.Stop() {
|
2017-04-18 23:36:20 +00:00
|
|
|
// Timer already expired, since the timer may
|
|
|
|
// or may not have been read in the select{}
|
|
|
|
// above, conditionally receive on it
|
2017-04-12 19:07:10 +00:00
|
|
|
select {
|
|
|
|
case <-retryTimer.C:
|
|
|
|
default:
|
|
|
|
}
|
2017-04-08 00:10:26 +00:00
|
|
|
}
|
|
|
|
backoff := c.retryInterval * time.Duration(failures)
|
|
|
|
if backoff > c.maxRetryInterval {
|
|
|
|
backoff = c.maxRetryInterval
|
|
|
|
}
|
|
|
|
retryTimer.Reset(backoff)
|
|
|
|
} else {
|
|
|
|
if failures > 0 {
|
2018-09-13 17:43:40 +00:00
|
|
|
c.logger.Info("successfully updated services in Consul")
|
2017-04-08 00:10:26 +00:00
|
|
|
failures = 0
|
|
|
|
}
|
2018-04-17 19:36:50 +00:00
|
|
|
|
2019-06-14 14:57:46 +00:00
|
|
|
// on successful sync, clear deregistered consul entities
|
|
|
|
c.clearExplicitlyDeregistered()
|
|
|
|
|
2018-04-17 19:36:50 +00:00
|
|
|
// Reset timer to periodic interval to periodically
|
|
|
|
// reconile with Consul
|
|
|
|
if !retryTimer.Stop() {
|
|
|
|
select {
|
|
|
|
case <-retryTimer.C:
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
retryTimer.Reset(c.periodicInterval)
|
2017-04-08 00:10:26 +00:00
|
|
|
}
|
2017-02-01 00:43:57 +00:00
|
|
|
|
|
|
|
select {
|
2017-04-08 00:10:26 +00:00
|
|
|
case <-c.shutdownCh:
|
|
|
|
// Exit only after sync'ing all outstanding operations
|
|
|
|
if len(c.opCh) > 0 {
|
|
|
|
for len(c.opCh) > 0 {
|
|
|
|
c.merge(<-c.opCh)
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
2017-04-08 00:10:26 +00:00
|
|
|
continue
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
return
|
2017-04-08 00:10:26 +00:00
|
|
|
default:
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
2017-04-08 00:10:26 +00:00
|
|
|
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-04-18 00:07:42 +00:00
|
|
|
// commit operations unless already shutting down.
|
|
|
|
func (c *ServiceClient) commit(ops *operations) {
|
2017-02-01 00:43:57 +00:00
|
|
|
select {
|
2017-04-08 00:10:26 +00:00
|
|
|
case c.opCh <- ops:
|
|
|
|
case <-c.shutdownCh:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-06-14 14:57:46 +00:00
|
|
|
func (c *ServiceClient) clearExplicitlyDeregistered() {
|
|
|
|
c.explicitlyDeregisteredServices = map[string]bool{}
|
|
|
|
c.explicitlyDeregisteredChecks = map[string]bool{}
|
|
|
|
}
|
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
// merge registrations into state map prior to sync'ing with Consul
|
|
|
|
func (c *ServiceClient) merge(ops *operations) {
|
|
|
|
for _, s := range ops.regServices {
|
|
|
|
c.services[s.ID] = s
|
|
|
|
}
|
|
|
|
for _, check := range ops.regChecks {
|
|
|
|
c.checks[check.ID] = check
|
|
|
|
}
|
|
|
|
for _, sid := range ops.deregServices {
|
|
|
|
delete(c.services, sid)
|
2019-06-14 14:57:46 +00:00
|
|
|
c.explicitlyDeregisteredServices[sid] = true
|
2017-04-08 00:10:26 +00:00
|
|
|
}
|
|
|
|
for _, cid := range ops.deregChecks {
|
|
|
|
delete(c.checks, cid)
|
2019-06-14 14:57:46 +00:00
|
|
|
c.explicitlyDeregisteredChecks[cid] = true
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
2017-04-18 23:23:39 +00:00
|
|
|
metrics.SetGauge([]string{"client", "consul", "services"}, float32(len(c.services)))
|
|
|
|
metrics.SetGauge([]string{"client", "consul", "checks"}, float32(len(c.checks)))
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// sync enqueued operations.
|
|
|
|
func (c *ServiceClient) sync() error {
|
2017-04-08 00:10:26 +00:00
|
|
|
sreg, creg, sdereg, cdereg := 0, 0, 0, 0
|
2017-02-01 00:43:57 +00:00
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
consulServices, err := c.client.Services()
|
|
|
|
if err != nil {
|
2017-04-18 23:23:39 +00:00
|
|
|
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
|
2017-04-08 00:10:26 +00:00
|
|
|
return fmt.Errorf("error querying Consul services: %v", err)
|
|
|
|
}
|
2017-02-01 00:43:57 +00:00
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
consulChecks, err := c.client.Checks()
|
|
|
|
if err != nil {
|
2017-04-18 23:23:39 +00:00
|
|
|
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
|
2017-04-08 00:10:26 +00:00
|
|
|
return fmt.Errorf("error querying Consul checks: %v", err)
|
|
|
|
}
|
2017-02-01 00:43:57 +00:00
|
|
|
|
2019-06-14 14:57:46 +00:00
|
|
|
inProbation := time.Now().Before(c.deregisterProbationExpiry)
|
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
// Remove Nomad services in Consul but unknown locally
|
|
|
|
for id := range consulServices {
|
|
|
|
if _, ok := c.services[id]; ok {
|
|
|
|
// Known service, skip
|
|
|
|
continue
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
2018-06-01 20:59:53 +00:00
|
|
|
|
|
|
|
// Ignore if this is not a Nomad managed service. Also ignore
|
|
|
|
// Nomad managed services if this is not a client agent.
|
|
|
|
// This is to prevent server agents from removing services
|
|
|
|
// registered by client agents
|
|
|
|
if !isNomadService(id) || !c.isClientAgent {
|
2017-04-08 00:10:26 +00:00
|
|
|
// Not managed by Nomad, skip
|
|
|
|
continue
|
|
|
|
}
|
2017-12-08 01:08:25 +00:00
|
|
|
|
2019-06-14 14:57:46 +00:00
|
|
|
// Ignore unknown services during probation
|
|
|
|
if inProbation && !c.explicitlyDeregisteredServices[id] {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2019-08-30 18:05:30 +00:00
|
|
|
// Ignore if this is a service for a Nomad managed sidecar proxy.
|
|
|
|
if isNomadSidecar(id, c.services) {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
// Unknown Nomad managed service; kill
|
|
|
|
if err := c.client.ServiceDeregister(id); err != nil {
|
2017-12-08 01:08:25 +00:00
|
|
|
if isOldNomadService(id) {
|
|
|
|
// Don't hard-fail on old entries. See #3620
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2017-04-18 23:23:39 +00:00
|
|
|
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
|
2017-04-08 00:10:26 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
sdereg++
|
2017-12-01 14:24:14 +00:00
|
|
|
metrics.IncrCounter([]string{"client", "consul", "service_deregistrations"}, 1)
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
|
consul: Use a stable identifier for services
The current implementation of Service Registration uses a hash of the
nomad-internal state of a service to register it with Consul, this means that
any update to the service invalidates this name and we then deregister, and
recreate the service in Consul.
While this behaviour slightly simplifies reasoning about service registration,
this becomes problematic when we add consul health checks to a service. When
the service is re-registered, so are the checks, which default to failing for
at least one check period.
This commit migrates us to using a stable identifier based on the
allocation, task, and service identifiers, and uses the difference
between the remote and local state to decide when to push updates.
It uses the existing hashing mechanic to decide when UpdateTask should
regenerate service registrations for providing to Sync, but this should
be removable as part of a future refactor.
It additionally introduces the _nomad-check- prefix for check
definitions, to allow for future allowing of consul features like
maintenance mode.
2019-04-10 08:39:24 +00:00
|
|
|
// Add Nomad services missing from Consul, or where the service has been updated.
|
2017-04-18 04:15:13 +00:00
|
|
|
for id, locals := range c.services {
|
consul: Use a stable identifier for services
The current implementation of Service Registration uses a hash of the
nomad-internal state of a service to register it with Consul, this means that
any update to the service invalidates this name and we then deregister, and
recreate the service in Consul.
While this behaviour slightly simplifies reasoning about service registration,
this becomes problematic when we add consul health checks to a service. When
the service is re-registered, so are the checks, which default to failing for
at least one check period.
This commit migrates us to using a stable identifier based on the
allocation, task, and service identifiers, and uses the difference
between the remote and local state to decide when to push updates.
It uses the existing hashing mechanic to decide when UpdateTask should
regenerate service registrations for providing to Sync, but this should
be removable as part of a future refactor.
It additionally introduces the _nomad-check- prefix for check
definitions, to allow for future allowing of consul features like
maintenance mode.
2019-04-10 08:39:24 +00:00
|
|
|
existingSvc, ok := consulServices[id]
|
|
|
|
|
|
|
|
if ok {
|
|
|
|
// There is an existing registration of this service in Consul, so here
|
|
|
|
// we validate to see if the service has been invalidated to see if it
|
|
|
|
// should be updated.
|
|
|
|
if !agentServiceUpdateRequired(locals, existingSvc) {
|
|
|
|
// No Need to update services that have not changed
|
|
|
|
continue
|
2017-04-18 04:15:13 +00:00
|
|
|
}
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
consul: Use a stable identifier for services
The current implementation of Service Registration uses a hash of the
nomad-internal state of a service to register it with Consul, this means that
any update to the service invalidates this name and we then deregister, and
recreate the service in Consul.
While this behaviour slightly simplifies reasoning about service registration,
this becomes problematic when we add consul health checks to a service. When
the service is re-registered, so are the checks, which default to failing for
at least one check period.
This commit migrates us to using a stable identifier based on the
allocation, task, and service identifiers, and uses the difference
between the remote and local state to decide when to push updates.
It uses the existing hashing mechanic to decide when UpdateTask should
regenerate service registrations for providing to Sync, but this should
be removable as part of a future refactor.
It additionally introduces the _nomad-check- prefix for check
definitions, to allow for future allowing of consul features like
maintenance mode.
2019-04-10 08:39:24 +00:00
|
|
|
|
|
|
|
if err = c.client.ServiceRegister(locals); err != nil {
|
|
|
|
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
sreg++
|
|
|
|
metrics.IncrCounter([]string{"client", "consul", "service_registrations"}, 1)
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
// Remove Nomad checks in Consul but unknown locally
|
|
|
|
for id, check := range consulChecks {
|
|
|
|
if _, ok := c.checks[id]; ok {
|
2017-04-18 04:15:13 +00:00
|
|
|
// Known check, leave it
|
2017-04-08 00:10:26 +00:00
|
|
|
continue
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
2018-06-01 20:59:53 +00:00
|
|
|
|
|
|
|
// Ignore if this is not a Nomad managed check. Also ignore
|
|
|
|
// Nomad managed checks if this is not a client agent.
|
|
|
|
// This is to prevent server agents from removing checks
|
|
|
|
// registered by client agents
|
2019-04-25 11:48:19 +00:00
|
|
|
if !isNomadService(check.ServiceID) || !c.isClientAgent || !isNomadCheck(check.CheckID) {
|
2017-06-16 23:35:16 +00:00
|
|
|
// Service not managed by Nomad, skip
|
2017-04-08 00:10:26 +00:00
|
|
|
continue
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
2017-12-08 01:08:25 +00:00
|
|
|
|
2019-06-14 14:57:46 +00:00
|
|
|
// Ignore unknown services during probation
|
|
|
|
if inProbation && !c.explicitlyDeregisteredChecks[id] {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2019-08-30 18:05:30 +00:00
|
|
|
// Ignore if this is a check for a Nomad managed sidecar proxy.
|
|
|
|
if isNomadSidecar(check.ServiceID, c.services) {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2017-12-08 01:08:25 +00:00
|
|
|
// Unknown Nomad managed check; remove
|
2017-04-08 00:10:26 +00:00
|
|
|
if err := c.client.CheckDeregister(id); err != nil {
|
2017-12-08 01:08:25 +00:00
|
|
|
if isOldNomadService(check.ServiceID) {
|
|
|
|
// Don't hard-fail on old entries.
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2017-04-18 23:23:39 +00:00
|
|
|
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
|
2017-04-08 00:10:26 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
cdereg++
|
2017-12-01 14:24:14 +00:00
|
|
|
metrics.IncrCounter([]string{"client", "consul", "check_deregistrations"}, 1)
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
// Add Nomad checks missing from Consul
|
|
|
|
for id, check := range c.checks {
|
2017-12-08 01:08:25 +00:00
|
|
|
if _, ok := consulChecks[id]; ok {
|
|
|
|
// Already in Consul; skipping
|
|
|
|
continue
|
2017-04-08 00:10:26 +00:00
|
|
|
}
|
2017-12-08 01:08:25 +00:00
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
if err := c.client.CheckRegister(check); err != nil {
|
2017-04-18 23:23:39 +00:00
|
|
|
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
|
2017-04-08 00:10:26 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
creg++
|
2017-12-01 14:24:14 +00:00
|
|
|
metrics.IncrCounter([]string{"client", "consul", "check_registrations"}, 1)
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
|
2019-02-02 20:18:30 +00:00
|
|
|
// Only log if something was actually synced
|
|
|
|
if sreg > 0 || sdereg > 0 || creg > 0 || cdereg > 0 {
|
|
|
|
c.logger.Debug("sync complete", "registered_services", sreg, "deregistered_services", sdereg,
|
|
|
|
"registered_checks", creg, "deregistered_checks", cdereg)
|
|
|
|
}
|
2017-02-01 00:43:57 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-04-13 23:59:27 +00:00
|
|
|
// RegisterAgent registers Nomad agents (client or server). The
|
|
|
|
// Service.PortLabel should be a literal port to be parsed with SplitHostPort.
|
|
|
|
// Script checks are not supported and will return an error. Registration is
|
|
|
|
// asynchronous.
|
2017-02-01 00:43:57 +00:00
|
|
|
//
|
|
|
|
// Agents will be deregistered when Shutdown is called.
|
|
|
|
func (c *ServiceClient) RegisterAgent(role string, services []*structs.Service) error {
|
2017-04-08 00:10:26 +00:00
|
|
|
ops := operations{}
|
2017-02-01 00:43:57 +00:00
|
|
|
|
2017-04-04 00:08:08 +00:00
|
|
|
for _, service := range services {
|
2017-02-01 00:43:57 +00:00
|
|
|
id := makeAgentServiceID(role, service)
|
2017-04-13 23:59:27 +00:00
|
|
|
|
|
|
|
// Unlike tasks, agents don't use port labels. Agent ports are
|
|
|
|
// stored directly in the PortLabel.
|
2017-02-01 00:43:57 +00:00
|
|
|
host, rawport, err := net.SplitHostPort(service.PortLabel)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("error parsing port label %q from service %q: %v", service.PortLabel, service.Name, err)
|
|
|
|
}
|
|
|
|
port, err := strconv.Atoi(rawport)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("error parsing port %q from service %q: %v", rawport, service.Name, err)
|
|
|
|
}
|
|
|
|
serviceReg := &api.AgentServiceRegistration{
|
|
|
|
ID: id,
|
|
|
|
Name: service.Name,
|
|
|
|
Tags: service.Tags,
|
|
|
|
Address: host,
|
|
|
|
Port: port,
|
2018-11-16 17:28:56 +00:00
|
|
|
// This enables the consul UI to show that Nomad registered this service
|
|
|
|
Meta: map[string]string{
|
|
|
|
"external-source": "nomad",
|
|
|
|
},
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
2017-04-08 00:10:26 +00:00
|
|
|
ops.regServices = append(ops.regServices, serviceReg)
|
2017-02-01 00:43:57 +00:00
|
|
|
|
|
|
|
for _, check := range service.Checks {
|
support script checks for task group services (#6197)
In Nomad prior to Consul Connect, all Consul checks work the same
except for Script checks. Because the Task being checked is running in
its own container namespaces, the check is executed by Nomad in the
Task's context. If the Script check passes, Nomad uses the TTL check
feature of Consul to update the check status. This means in order to
run a Script check, we need to know what Task to execute it in.
To support Consul Connect, we need Group Services, and these need to
be registered in Consul along with their checks. We could push the
Service down into the Task, but this doesn't work if someone wants to
associate a service with a task's ports, but do script checks in
another task in the allocation.
Because Nomad is handling the Script check and not Consul anyways,
this moves the script check handling into the task runner so that the
task runner can own the script check's configuration and
lifecycle. This will allow us to pass the group service check
configuration down into a task without associating the service itself
with the task.
When tasks are checked for script checks, we walk back through their
task group to see if there are script checks associated with the
task. If so, we'll spin off script check tasklets for them. The
group-level service and any restart behaviors it needs are entirely
encapsulated within the group service hook.
2019-09-03 19:09:04 +00:00
|
|
|
checkID := MakeCheckID(id, check)
|
2017-02-01 00:43:57 +00:00
|
|
|
if check.Type == structs.ServiceCheckScript {
|
|
|
|
return fmt.Errorf("service %q contains invalid check: agent checks do not support scripts", service.Name)
|
|
|
|
}
|
|
|
|
checkHost, checkPort := serviceReg.Address, serviceReg.Port
|
|
|
|
if check.PortLabel != "" {
|
2017-04-13 23:59:27 +00:00
|
|
|
// Unlike tasks, agents don't use port labels. Agent ports are
|
|
|
|
// stored directly in the PortLabel.
|
2017-02-01 00:43:57 +00:00
|
|
|
host, rawport, err := net.SplitHostPort(check.PortLabel)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("error parsing port label %q from check %q: %v", service.PortLabel, check.Name, err)
|
|
|
|
}
|
|
|
|
port, err := strconv.Atoi(rawport)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("error parsing port %q from check %q: %v", rawport, check.Name, err)
|
|
|
|
}
|
|
|
|
checkHost, checkPort = host, port
|
|
|
|
}
|
|
|
|
checkReg, err := createCheckReg(id, checkID, check, checkHost, checkPort)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("failed to add check %q: %v", check.Name, err)
|
|
|
|
}
|
2017-04-08 00:10:26 +00:00
|
|
|
ops.regChecks = append(ops.regChecks, checkReg)
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-04-18 00:07:42 +00:00
|
|
|
// Don't bother committing agent checks if we're already shutting down
|
|
|
|
c.agentLock.Lock()
|
|
|
|
defer c.agentLock.Unlock()
|
|
|
|
select {
|
|
|
|
case <-c.shutdownCh:
|
2017-04-08 00:10:26 +00:00
|
|
|
return nil
|
2017-04-18 00:07:42 +00:00
|
|
|
default:
|
2017-04-08 00:10:26 +00:00
|
|
|
}
|
2017-02-01 00:43:57 +00:00
|
|
|
|
2017-04-18 00:07:42 +00:00
|
|
|
// Now add them to the registration queue
|
|
|
|
c.commit(&ops)
|
|
|
|
|
2017-02-01 00:43:57 +00:00
|
|
|
// Record IDs for deregistering on shutdown
|
2017-04-08 00:10:26 +00:00
|
|
|
for _, id := range ops.regServices {
|
2017-04-13 20:49:23 +00:00
|
|
|
c.agentServices[id.ID] = struct{}{}
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
2017-04-08 00:10:26 +00:00
|
|
|
for _, id := range ops.regChecks {
|
2017-04-13 20:49:23 +00:00
|
|
|
c.agentChecks[id.ID] = struct{}{}
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
2017-04-04 00:08:08 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// serviceRegs creates service registrations, check registrations, and script
|
2017-08-07 22:54:05 +00:00
|
|
|
// checks from a service. It returns a service registration object with the
|
|
|
|
// service and check IDs populated.
|
2018-04-23 23:34:53 +00:00
|
|
|
func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, task *TaskServices) (
|
|
|
|
*ServiceRegistration, error) {
|
2017-04-04 00:08:08 +00:00
|
|
|
|
2017-08-07 22:54:05 +00:00
|
|
|
// Get the services ID
|
2019-08-31 20:53:23 +00:00
|
|
|
id := MakeTaskServiceID(task.AllocID, task.Name, service)
|
2017-08-07 22:54:05 +00:00
|
|
|
sreg := &ServiceRegistration{
|
|
|
|
serviceID: id,
|
|
|
|
checkIDs: make(map[string]struct{}, len(service.Checks)),
|
|
|
|
}
|
|
|
|
|
2017-12-05 19:39:42 +00:00
|
|
|
// Service address modes default to auto
|
2017-06-09 17:29:41 +00:00
|
|
|
addrMode := service.AddressMode
|
2017-12-05 19:39:42 +00:00
|
|
|
if addrMode == "" {
|
|
|
|
addrMode = structs.AddressModeAuto
|
2017-06-09 17:29:41 +00:00
|
|
|
}
|
2017-12-05 19:39:42 +00:00
|
|
|
|
|
|
|
// Determine the address to advertise based on the mode
|
2018-04-23 23:34:53 +00:00
|
|
|
ip, port, err := getAddress(addrMode, service.PortLabel, task.Networks, task.DriverNetwork)
|
2017-12-05 19:39:42 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("unable to get address for service %q: %v", service.Name, err)
|
2017-06-09 17:29:41 +00:00
|
|
|
}
|
2017-08-07 22:54:05 +00:00
|
|
|
|
2018-04-23 23:34:53 +00:00
|
|
|
// Determine whether to use tags or canary_tags
|
|
|
|
var tags []string
|
2018-05-23 20:07:47 +00:00
|
|
|
if task.Canary && len(service.CanaryTags) > 0 {
|
2018-04-23 23:34:53 +00:00
|
|
|
tags = make([]string, len(service.CanaryTags))
|
|
|
|
copy(tags, service.CanaryTags)
|
|
|
|
} else {
|
|
|
|
tags = make([]string, len(service.Tags))
|
|
|
|
copy(tags, service.Tags)
|
|
|
|
}
|
|
|
|
|
2019-08-14 22:02:00 +00:00
|
|
|
// newConnect returns (nil, nil) if there's no Connect-enabled service.
|
|
|
|
connect, err := newConnect(service.Name, service.Connect, task.Networks)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("invalid Consul Connect configuration for service %q: %v", service.Name, err)
|
|
|
|
}
|
|
|
|
|
2019-08-23 16:49:02 +00:00
|
|
|
meta := make(map[string]string, len(service.Meta))
|
|
|
|
for k, v := range service.Meta {
|
|
|
|
meta[k] = v
|
|
|
|
}
|
|
|
|
|
|
|
|
// This enables the consul UI to show that Nomad registered this service
|
|
|
|
meta["external-source"] = "nomad"
|
|
|
|
|
2017-08-07 22:54:05 +00:00
|
|
|
// Build the Consul Service registration request
|
2017-04-04 00:08:08 +00:00
|
|
|
serviceReg := &api.AgentServiceRegistration{
|
|
|
|
ID: id,
|
|
|
|
Name: service.Name,
|
2018-04-23 23:34:53 +00:00
|
|
|
Tags: tags,
|
2017-06-09 17:29:41 +00:00
|
|
|
Address: ip,
|
2017-04-04 00:08:08 +00:00
|
|
|
Port: port,
|
2019-08-23 16:49:02 +00:00
|
|
|
Meta: meta,
|
2019-08-14 22:02:00 +00:00
|
|
|
Connect: connect, // will be nil if no Connect stanza
|
2017-04-04 00:08:08 +00:00
|
|
|
}
|
2017-04-08 00:10:26 +00:00
|
|
|
ops.regServices = append(ops.regServices, serviceReg)
|
2017-08-07 22:54:05 +00:00
|
|
|
|
|
|
|
// Build the check registrations
|
2018-04-23 23:34:53 +00:00
|
|
|
checkIDs, err := c.checkRegs(ops, id, service, task)
|
2017-08-07 22:54:05 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
for _, cid := range checkIDs {
|
|
|
|
sreg.checkIDs[cid] = struct{}{}
|
|
|
|
}
|
|
|
|
return sreg, nil
|
2017-06-16 23:35:16 +00:00
|
|
|
}
|
|
|
|
|
2017-08-07 22:54:05 +00:00
|
|
|
// checkRegs registers the checks for the given service and returns the
|
|
|
|
// registered check ids.
|
2018-04-23 23:34:53 +00:00
|
|
|
func (c *ServiceClient) checkRegs(ops *operations, serviceID string, service *structs.Service,
|
|
|
|
task *TaskServices) ([]string, error) {
|
2017-04-04 00:08:08 +00:00
|
|
|
|
2017-08-07 22:54:05 +00:00
|
|
|
// Fast path
|
|
|
|
numChecks := len(service.Checks)
|
|
|
|
if numChecks == 0 {
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
checkIDs := make([]string, 0, numChecks)
|
2017-04-04 00:08:08 +00:00
|
|
|
for _, check := range service.Checks {
|
support script checks for task group services (#6197)
In Nomad prior to Consul Connect, all Consul checks work the same
except for Script checks. Because the Task being checked is running in
its own container namespaces, the check is executed by Nomad in the
Task's context. If the Script check passes, Nomad uses the TTL check
feature of Consul to update the check status. This means in order to
run a Script check, we need to know what Task to execute it in.
To support Consul Connect, we need Group Services, and these need to
be registered in Consul along with their checks. We could push the
Service down into the Task, but this doesn't work if someone wants to
associate a service with a task's ports, but do script checks in
another task in the allocation.
Because Nomad is handling the Script check and not Consul anyways,
this moves the script check handling into the task runner so that the
task runner can own the script check's configuration and
lifecycle. This will allow us to pass the group service check
configuration down into a task without associating the service itself
with the task.
When tasks are checked for script checks, we walk back through their
task group to see if there are script checks associated with the
task. If so, we'll spin off script check tasklets for them. The
group-level service and any restart behaviors it needs are entirely
encapsulated within the group service hook.
2019-09-03 19:09:04 +00:00
|
|
|
checkID := MakeCheckID(serviceID, check)
|
2017-08-07 22:54:05 +00:00
|
|
|
checkIDs = append(checkIDs, checkID)
|
2017-04-13 23:43:38 +00:00
|
|
|
if check.Type == structs.ServiceCheckScript {
|
2017-12-08 05:58:15 +00:00
|
|
|
// Skip getAddress for script checks
|
|
|
|
checkReg, err := createCheckReg(serviceID, checkID, check, "", 0)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("failed to add script check %q: %v", check.Name, err)
|
|
|
|
}
|
|
|
|
ops.regChecks = append(ops.regChecks, checkReg)
|
|
|
|
continue
|
2017-04-13 23:43:38 +00:00
|
|
|
}
|
2017-04-19 19:18:06 +00:00
|
|
|
|
2017-12-05 19:39:42 +00:00
|
|
|
// Default to the service's port but allow check to override
|
2017-06-09 17:29:41 +00:00
|
|
|
portLabel := check.PortLabel
|
|
|
|
if portLabel == "" {
|
|
|
|
// Default to the service's port label
|
|
|
|
portLabel = service.PortLabel
|
2017-04-13 23:43:38 +00:00
|
|
|
}
|
2017-12-05 19:39:42 +00:00
|
|
|
|
|
|
|
// Checks address mode defaults to host for pre-#3380 backward compat
|
|
|
|
addrMode := check.AddressMode
|
|
|
|
if addrMode == "" {
|
|
|
|
addrMode = structs.AddressModeHost
|
|
|
|
}
|
|
|
|
|
2018-04-23 23:34:53 +00:00
|
|
|
ip, port, err := getAddress(addrMode, portLabel, task.Networks, task.DriverNetwork)
|
2017-12-05 19:39:42 +00:00
|
|
|
if err != nil {
|
2017-12-19 00:18:42 +00:00
|
|
|
return nil, fmt.Errorf("error getting address for check %q: %v", check.Name, err)
|
2017-12-05 19:39:42 +00:00
|
|
|
}
|
|
|
|
|
2017-06-16 23:35:16 +00:00
|
|
|
checkReg, err := createCheckReg(serviceID, checkID, check, ip, port)
|
2017-04-04 00:08:08 +00:00
|
|
|
if err != nil {
|
2017-08-07 22:54:05 +00:00
|
|
|
return nil, fmt.Errorf("failed to add check %q: %v", check.Name, err)
|
2017-04-04 00:08:08 +00:00
|
|
|
}
|
2017-04-13 23:43:38 +00:00
|
|
|
ops.regChecks = append(ops.regChecks, checkReg)
|
2017-04-04 00:08:08 +00:00
|
|
|
}
|
2017-08-07 22:54:05 +00:00
|
|
|
return checkIDs, nil
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
|
2019-08-14 22:02:00 +00:00
|
|
|
//TODO(schmichael) remove
|
|
|
|
type noopRestarter struct{}
|
|
|
|
|
|
|
|
func (noopRestarter) Restart(context.Context, *structs.TaskEvent, bool) error { return nil }
|
|
|
|
|
|
|
|
// makeAllocTaskServices creates a TaskServices struct for a group service.
|
|
|
|
//
|
|
|
|
//TODO(schmichael) rename TaskServices and refactor this into a New method
|
|
|
|
func makeAllocTaskServices(alloc *structs.Allocation, tg *structs.TaskGroup) (*TaskServices, error) {
|
2019-11-01 14:15:11 +00:00
|
|
|
//COMPAT(0.11) AllocatedResources is only nil when upgrading directly
|
|
|
|
// from 0.8.
|
|
|
|
if alloc.AllocatedResources == nil || len(alloc.AllocatedResources.Shared.Networks) == 0 {
|
2019-08-14 22:02:00 +00:00
|
|
|
return nil, fmt.Errorf("unable to register a group service without a group network")
|
|
|
|
}
|
|
|
|
|
|
|
|
//TODO(schmichael) only support one network for now
|
|
|
|
net := alloc.AllocatedResources.Shared.Networks[0]
|
|
|
|
|
|
|
|
ts := &TaskServices{
|
|
|
|
AllocID: alloc.ID,
|
|
|
|
Name: "group-" + alloc.TaskGroup,
|
|
|
|
Services: tg.Services,
|
|
|
|
Networks: alloc.AllocatedResources.Shared.Networks,
|
|
|
|
|
|
|
|
//TODO(schmichael) there's probably a better way than hacking driver network
|
|
|
|
DriverNetwork: &drivers.DriverNetwork{
|
|
|
|
AutoAdvertise: true,
|
|
|
|
IP: net.IP,
|
|
|
|
// Copy PortLabels from group network
|
|
|
|
PortMap: net.PortLabels(),
|
|
|
|
},
|
|
|
|
|
|
|
|
// unsupported for group services
|
|
|
|
Restarter: noopRestarter{},
|
|
|
|
DriverExec: nil,
|
|
|
|
}
|
|
|
|
|
|
|
|
if alloc.DeploymentStatus != nil {
|
|
|
|
ts.Canary = alloc.DeploymentStatus.Canary
|
|
|
|
}
|
|
|
|
|
|
|
|
return ts, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// RegisterGroup services with Consul. Adds all task group-level service
|
|
|
|
// entries and checks to Consul.
|
|
|
|
func (c *ServiceClient) RegisterGroup(alloc *structs.Allocation) error {
|
|
|
|
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
|
|
|
if tg == nil {
|
|
|
|
return fmt.Errorf("task group %q not in allocation", alloc.TaskGroup)
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(tg.Services) == 0 {
|
|
|
|
// noop
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
ts, err := makeAllocTaskServices(alloc, tg)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return c.RegisterTask(ts)
|
|
|
|
}
|
|
|
|
|
|
|
|
// UpdateGroup services with Consul. Updates all task group-level service
|
|
|
|
// entries and checks to Consul.
|
|
|
|
func (c *ServiceClient) UpdateGroup(oldAlloc, newAlloc *structs.Allocation) error {
|
|
|
|
oldTG := oldAlloc.Job.LookupTaskGroup(oldAlloc.TaskGroup)
|
|
|
|
if oldTG == nil {
|
|
|
|
return fmt.Errorf("task group %q not in old allocation", oldAlloc.TaskGroup)
|
|
|
|
}
|
|
|
|
|
2019-11-01 14:15:11 +00:00
|
|
|
if len(oldTG.Services) == 0 {
|
|
|
|
// No old group services, simply add new group services
|
|
|
|
return c.RegisterGroup(newAlloc)
|
|
|
|
}
|
|
|
|
|
2019-08-14 22:02:00 +00:00
|
|
|
oldServices, err := makeAllocTaskServices(oldAlloc, oldTG)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
newTG := newAlloc.Job.LookupTaskGroup(newAlloc.TaskGroup)
|
|
|
|
if newTG == nil {
|
|
|
|
return fmt.Errorf("task group %q not in new allocation", newAlloc.TaskGroup)
|
|
|
|
}
|
|
|
|
|
|
|
|
newServices, err := makeAllocTaskServices(newAlloc, newTG)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return c.UpdateTask(oldServices, newServices)
|
|
|
|
}
|
|
|
|
|
|
|
|
// RemoveGroup services with Consul. Removes all task group-level service
|
|
|
|
// entries and checks from Consul.
|
|
|
|
func (c *ServiceClient) RemoveGroup(alloc *structs.Allocation) error {
|
|
|
|
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
|
|
|
if tg == nil {
|
|
|
|
return fmt.Errorf("task group %q not in allocation", alloc.TaskGroup)
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(tg.Services) == 0 {
|
|
|
|
// noop
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
ts, err := makeAllocTaskServices(alloc, tg)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
c.RemoveTask(ts)
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-08-07 21:13:05 +00:00
|
|
|
// RegisterTask with Consul. Adds all service entries and checks to Consul. If
|
2017-02-01 00:43:57 +00:00
|
|
|
// exec is nil and a script check exists an error is returned.
|
|
|
|
//
|
2017-06-09 17:29:41 +00:00
|
|
|
// If the service IP is set it used as the address in the service registration.
|
|
|
|
// Checks will always use the IP from the Task struct (host's IP).
|
|
|
|
//
|
2018-03-11 17:41:50 +00:00
|
|
|
// Actual communication with Consul is done asynchronously (see Run).
|
2018-04-23 23:34:53 +00:00
|
|
|
func (c *ServiceClient) RegisterTask(task *TaskServices) error {
|
2017-08-07 22:54:05 +00:00
|
|
|
// Fast path
|
|
|
|
numServices := len(task.Services)
|
|
|
|
if numServices == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
t := new(TaskRegistration)
|
|
|
|
t.Services = make(map[string]*ServiceRegistration, numServices)
|
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
ops := &operations{}
|
2017-04-04 00:08:08 +00:00
|
|
|
for _, service := range task.Services {
|
2018-04-23 23:34:53 +00:00
|
|
|
sreg, err := c.serviceRegs(ops, service, task)
|
2017-08-07 22:54:05 +00:00
|
|
|
if err != nil {
|
2017-04-04 00:08:08 +00:00
|
|
|
return err
|
|
|
|
}
|
2017-08-07 22:54:05 +00:00
|
|
|
t.Services[sreg.serviceID] = sreg
|
2017-04-04 00:08:08 +00:00
|
|
|
}
|
2017-08-07 22:54:05 +00:00
|
|
|
|
|
|
|
// Add the task to the allocation's registration
|
2018-04-23 23:34:53 +00:00
|
|
|
c.addTaskRegistration(task.AllocID, task.Name, t)
|
2017-08-07 22:54:05 +00:00
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
c.commit(ops)
|
2017-08-26 05:40:18 +00:00
|
|
|
|
|
|
|
// Start watching checks. Done after service registrations are built
|
|
|
|
// since an error building them could leak watches.
|
|
|
|
for _, service := range task.Services {
|
2019-08-31 20:53:23 +00:00
|
|
|
serviceID := MakeTaskServiceID(task.AllocID, task.Name, service)
|
2017-08-26 05:40:18 +00:00
|
|
|
for _, check := range service.Checks {
|
2017-09-14 16:58:35 +00:00
|
|
|
if check.TriggersRestarts() {
|
support script checks for task group services (#6197)
In Nomad prior to Consul Connect, all Consul checks work the same
except for Script checks. Because the Task being checked is running in
its own container namespaces, the check is executed by Nomad in the
Task's context. If the Script check passes, Nomad uses the TTL check
feature of Consul to update the check status. This means in order to
run a Script check, we need to know what Task to execute it in.
To support Consul Connect, we need Group Services, and these need to
be registered in Consul along with their checks. We could push the
Service down into the Task, but this doesn't work if someone wants to
associate a service with a task's ports, but do script checks in
another task in the allocation.
Because Nomad is handling the Script check and not Consul anyways,
this moves the script check handling into the task runner so that the
task runner can own the script check's configuration and
lifecycle. This will allow us to pass the group service check
configuration down into a task without associating the service itself
with the task.
When tasks are checked for script checks, we walk back through their
task group to see if there are script checks associated with the
task. If so, we'll spin off script check tasklets for them. The
group-level service and any restart behaviors it needs are entirely
encapsulated within the group service hook.
2019-09-03 19:09:04 +00:00
|
|
|
checkID := MakeCheckID(serviceID, check)
|
2018-04-23 23:34:53 +00:00
|
|
|
c.checkWatcher.Watch(task.AllocID, task.Name, checkID, check, task.Restarter)
|
2017-08-26 05:40:18 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2017-04-04 00:08:08 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// UpdateTask in Consul. Does not alter the service if only checks have
|
|
|
|
// changed.
|
2017-06-16 23:35:16 +00:00
|
|
|
//
|
|
|
|
// DriverNetwork must not change between invocations for the same allocation.
|
2018-04-23 23:34:53 +00:00
|
|
|
func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error {
|
2017-04-08 00:10:26 +00:00
|
|
|
ops := &operations{}
|
2017-04-04 00:08:08 +00:00
|
|
|
|
2017-12-05 19:39:42 +00:00
|
|
|
taskReg := new(TaskRegistration)
|
|
|
|
taskReg.Services = make(map[string]*ServiceRegistration, len(newTask.Services))
|
2017-08-07 22:54:05 +00:00
|
|
|
|
2018-04-23 23:34:53 +00:00
|
|
|
existingIDs := make(map[string]*structs.Service, len(old.Services))
|
|
|
|
for _, s := range old.Services {
|
2019-08-31 20:53:23 +00:00
|
|
|
existingIDs[MakeTaskServiceID(old.AllocID, old.Name, s)] = s
|
2017-04-04 00:08:08 +00:00
|
|
|
}
|
|
|
|
newIDs := make(map[string]*structs.Service, len(newTask.Services))
|
|
|
|
for _, s := range newTask.Services {
|
2019-08-31 20:53:23 +00:00
|
|
|
newIDs[MakeTaskServiceID(newTask.AllocID, newTask.Name, s)] = s
|
2017-04-04 00:08:08 +00:00
|
|
|
}
|
|
|
|
|
consul: Use a stable identifier for services
The current implementation of Service Registration uses a hash of the
nomad-internal state of a service to register it with Consul, this means that
any update to the service invalidates this name and we then deregister, and
recreate the service in Consul.
While this behaviour slightly simplifies reasoning about service registration,
this becomes problematic when we add consul health checks to a service. When
the service is re-registered, so are the checks, which default to failing for
at least one check period.
This commit migrates us to using a stable identifier based on the
allocation, task, and service identifiers, and uses the difference
between the remote and local state to decide when to push updates.
It uses the existing hashing mechanic to decide when UpdateTask should
regenerate service registrations for providing to Sync, but this should
be removable as part of a future refactor.
It additionally introduces the _nomad-check- prefix for check
definitions, to allow for future allowing of consul features like
maintenance mode.
2019-04-10 08:39:24 +00:00
|
|
|
// Loop over existing Service IDs to see if they have been removed
|
2017-04-04 00:08:08 +00:00
|
|
|
for existingID, existingSvc := range existingIDs {
|
|
|
|
newSvc, ok := newIDs[existingID]
|
consul: Use a stable identifier for services
The current implementation of Service Registration uses a hash of the
nomad-internal state of a service to register it with Consul, this means that
any update to the service invalidates this name and we then deregister, and
recreate the service in Consul.
While this behaviour slightly simplifies reasoning about service registration,
this becomes problematic when we add consul health checks to a service. When
the service is re-registered, so are the checks, which default to failing for
at least one check period.
This commit migrates us to using a stable identifier based on the
allocation, task, and service identifiers, and uses the difference
between the remote and local state to decide when to push updates.
It uses the existing hashing mechanic to decide when UpdateTask should
regenerate service registrations for providing to Sync, but this should
be removable as part of a future refactor.
It additionally introduces the _nomad-check- prefix for check
definitions, to allow for future allowing of consul features like
maintenance mode.
2019-04-10 08:39:24 +00:00
|
|
|
|
2017-04-04 00:08:08 +00:00
|
|
|
if !ok {
|
2017-08-07 21:13:05 +00:00
|
|
|
// Existing service entry removed
|
2017-04-08 00:10:26 +00:00
|
|
|
ops.deregServices = append(ops.deregServices, existingID)
|
2017-04-04 00:08:08 +00:00
|
|
|
for _, check := range existingSvc.Checks {
|
support script checks for task group services (#6197)
In Nomad prior to Consul Connect, all Consul checks work the same
except for Script checks. Because the Task being checked is running in
its own container namespaces, the check is executed by Nomad in the
Task's context. If the Script check passes, Nomad uses the TTL check
feature of Consul to update the check status. This means in order to
run a Script check, we need to know what Task to execute it in.
To support Consul Connect, we need Group Services, and these need to
be registered in Consul along with their checks. We could push the
Service down into the Task, but this doesn't work if someone wants to
associate a service with a task's ports, but do script checks in
another task in the allocation.
Because Nomad is handling the Script check and not Consul anyways,
this moves the script check handling into the task runner so that the
task runner can own the script check's configuration and
lifecycle. This will allow us to pass the group service check
configuration down into a task without associating the service itself
with the task.
When tasks are checked for script checks, we walk back through their
task group to see if there are script checks associated with the
task. If so, we'll spin off script check tasklets for them. The
group-level service and any restart behaviors it needs are entirely
encapsulated within the group service hook.
2019-09-03 19:09:04 +00:00
|
|
|
cid := MakeCheckID(existingID, check)
|
2017-08-26 05:40:18 +00:00
|
|
|
ops.deregChecks = append(ops.deregChecks, cid)
|
|
|
|
|
|
|
|
// Unwatch watched checks
|
2017-09-14 16:58:35 +00:00
|
|
|
if check.TriggersRestarts() {
|
2017-08-26 05:40:18 +00:00
|
|
|
c.checkWatcher.Unwatch(cid)
|
|
|
|
}
|
2017-04-04 00:08:08 +00:00
|
|
|
}
|
|
|
|
continue
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
|
consul: Use a stable identifier for services
The current implementation of Service Registration uses a hash of the
nomad-internal state of a service to register it with Consul, this means that
any update to the service invalidates this name and we then deregister, and
recreate the service in Consul.
While this behaviour slightly simplifies reasoning about service registration,
this becomes problematic when we add consul health checks to a service. When
the service is re-registered, so are the checks, which default to failing for
at least one check period.
This commit migrates us to using a stable identifier based on the
allocation, task, and service identifiers, and uses the difference
between the remote and local state to decide when to push updates.
It uses the existing hashing mechanic to decide when UpdateTask should
regenerate service registrations for providing to Sync, but this should
be removable as part of a future refactor.
It additionally introduces the _nomad-check- prefix for check
definitions, to allow for future allowing of consul features like
maintenance mode.
2019-04-10 08:39:24 +00:00
|
|
|
oldHash := existingSvc.Hash(old.AllocID, old.Name, old.Canary)
|
|
|
|
newHash := newSvc.Hash(newTask.AllocID, newTask.Name, newTask.Canary)
|
|
|
|
if oldHash == newHash {
|
|
|
|
// Service exists and hasn't changed, don't re-add it later
|
|
|
|
delete(newIDs, existingID)
|
|
|
|
}
|
2017-12-08 01:08:25 +00:00
|
|
|
|
2017-08-07 22:54:05 +00:00
|
|
|
// Service still exists so add it to the task's registration
|
|
|
|
sreg := &ServiceRegistration{
|
|
|
|
serviceID: existingID,
|
|
|
|
checkIDs: make(map[string]struct{}, len(newSvc.Checks)),
|
|
|
|
}
|
2017-12-05 19:39:42 +00:00
|
|
|
taskReg.Services[existingID] = sreg
|
2017-08-07 22:54:05 +00:00
|
|
|
|
2017-12-08 01:08:25 +00:00
|
|
|
// See if any checks were updated
|
2017-08-26 05:40:18 +00:00
|
|
|
existingChecks := make(map[string]*structs.ServiceCheck, len(existingSvc.Checks))
|
2017-04-04 00:08:08 +00:00
|
|
|
for _, check := range existingSvc.Checks {
|
support script checks for task group services (#6197)
In Nomad prior to Consul Connect, all Consul checks work the same
except for Script checks. Because the Task being checked is running in
its own container namespaces, the check is executed by Nomad in the
Task's context. If the Script check passes, Nomad uses the TTL check
feature of Consul to update the check status. This means in order to
run a Script check, we need to know what Task to execute it in.
To support Consul Connect, we need Group Services, and these need to
be registered in Consul along with their checks. We could push the
Service down into the Task, but this doesn't work if someone wants to
associate a service with a task's ports, but do script checks in
another task in the allocation.
Because Nomad is handling the Script check and not Consul anyways,
this moves the script check handling into the task runner so that the
task runner can own the script check's configuration and
lifecycle. This will allow us to pass the group service check
configuration down into a task without associating the service itself
with the task.
When tasks are checked for script checks, we walk back through their
task group to see if there are script checks associated with the
task. If so, we'll spin off script check tasklets for them. The
group-level service and any restart behaviors it needs are entirely
encapsulated within the group service hook.
2019-09-03 19:09:04 +00:00
|
|
|
existingChecks[MakeCheckID(existingID, check)] = check
|
2017-04-04 00:08:08 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Register new checks
|
|
|
|
for _, check := range newSvc.Checks {
|
support script checks for task group services (#6197)
In Nomad prior to Consul Connect, all Consul checks work the same
except for Script checks. Because the Task being checked is running in
its own container namespaces, the check is executed by Nomad in the
Task's context. If the Script check passes, Nomad uses the TTL check
feature of Consul to update the check status. This means in order to
run a Script check, we need to know what Task to execute it in.
To support Consul Connect, we need Group Services, and these need to
be registered in Consul along with their checks. We could push the
Service down into the Task, but this doesn't work if someone wants to
associate a service with a task's ports, but do script checks in
another task in the allocation.
Because Nomad is handling the Script check and not Consul anyways,
this moves the script check handling into the task runner so that the
task runner can own the script check's configuration and
lifecycle. This will allow us to pass the group service check
configuration down into a task without associating the service itself
with the task.
When tasks are checked for script checks, we walk back through their
task group to see if there are script checks associated with the
task. If so, we'll spin off script check tasklets for them. The
group-level service and any restart behaviors it needs are entirely
encapsulated within the group service hook.
2019-09-03 19:09:04 +00:00
|
|
|
checkID := MakeCheckID(existingID, check)
|
2017-04-08 00:10:26 +00:00
|
|
|
if _, exists := existingChecks[checkID]; exists {
|
consul: Use a stable identifier for services
The current implementation of Service Registration uses a hash of the
nomad-internal state of a service to register it with Consul, this means that
any update to the service invalidates this name and we then deregister, and
recreate the service in Consul.
While this behaviour slightly simplifies reasoning about service registration,
this becomes problematic when we add consul health checks to a service. When
the service is re-registered, so are the checks, which default to failing for
at least one check period.
This commit migrates us to using a stable identifier based on the
allocation, task, and service identifiers, and uses the difference
between the remote and local state to decide when to push updates.
It uses the existing hashing mechanic to decide when UpdateTask should
regenerate service registrations for providing to Sync, but this should
be removable as part of a future refactor.
It additionally introduces the _nomad-check- prefix for check
definitions, to allow for future allowing of consul features like
maintenance mode.
2019-04-10 08:39:24 +00:00
|
|
|
// Check is still required. Remove it from the map so it doesn't get
|
|
|
|
// deleted later.
|
2017-04-08 00:10:26 +00:00
|
|
|
delete(existingChecks, checkID)
|
2017-08-07 22:54:05 +00:00
|
|
|
sreg.checkIDs[checkID] = struct{}{}
|
2017-12-08 01:08:25 +00:00
|
|
|
}
|
2017-08-26 05:40:18 +00:00
|
|
|
|
2017-12-08 01:08:25 +00:00
|
|
|
// New check on an unchanged service; add them now
|
2018-04-23 23:34:53 +00:00
|
|
|
newCheckIDs, err := c.checkRegs(ops, existingID, newSvc, newTask)
|
2017-12-08 01:08:25 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-08-26 05:40:18 +00:00
|
|
|
|
2017-12-08 01:08:25 +00:00
|
|
|
for _, checkID := range newCheckIDs {
|
|
|
|
sreg.checkIDs[checkID] = struct{}{}
|
2017-08-26 05:40:18 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Update all watched checks as CheckRestart fields aren't part of ID
|
2017-09-14 16:58:35 +00:00
|
|
|
if check.TriggersRestarts() {
|
2018-04-23 23:34:53 +00:00
|
|
|
c.checkWatcher.Watch(newTask.AllocID, newTask.Name, checkID, check, newTask.Restarter)
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
// Remove existing checks not in updated service
|
2017-08-26 05:40:18 +00:00
|
|
|
for cid, check := range existingChecks {
|
2017-04-08 00:10:26 +00:00
|
|
|
ops.deregChecks = append(ops.deregChecks, cid)
|
2017-08-26 05:40:18 +00:00
|
|
|
|
|
|
|
// Unwatch checks
|
2017-09-14 16:58:35 +00:00
|
|
|
if check.TriggersRestarts() {
|
2017-08-26 05:40:18 +00:00
|
|
|
c.checkWatcher.Unwatch(cid)
|
|
|
|
}
|
2017-04-08 00:10:26 +00:00
|
|
|
}
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
|
2017-04-04 00:08:08 +00:00
|
|
|
// Any remaining services should just be enqueued directly
|
|
|
|
for _, newSvc := range newIDs {
|
2018-04-23 23:34:53 +00:00
|
|
|
sreg, err := c.serviceRegs(ops, newSvc, newTask)
|
2017-04-04 00:08:08 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-08-07 22:54:05 +00:00
|
|
|
|
2017-12-05 19:39:42 +00:00
|
|
|
taskReg.Services[sreg.serviceID] = sreg
|
2017-04-04 00:08:08 +00:00
|
|
|
}
|
|
|
|
|
2017-08-07 22:54:05 +00:00
|
|
|
// Add the task to the allocation's registration
|
2018-04-23 23:34:53 +00:00
|
|
|
c.addTaskRegistration(newTask.AllocID, newTask.Name, taskReg)
|
2017-08-07 22:54:05 +00:00
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
c.commit(ops)
|
2017-08-26 05:40:18 +00:00
|
|
|
|
|
|
|
// Start watching checks. Done after service registrations are built
|
|
|
|
// since an error building them could leak watches.
|
|
|
|
for _, service := range newIDs {
|
2019-08-31 20:53:23 +00:00
|
|
|
serviceID := MakeTaskServiceID(newTask.AllocID, newTask.Name, service)
|
2017-08-26 05:40:18 +00:00
|
|
|
for _, check := range service.Checks {
|
2017-09-14 16:58:35 +00:00
|
|
|
if check.TriggersRestarts() {
|
support script checks for task group services (#6197)
In Nomad prior to Consul Connect, all Consul checks work the same
except for Script checks. Because the Task being checked is running in
its own container namespaces, the check is executed by Nomad in the
Task's context. If the Script check passes, Nomad uses the TTL check
feature of Consul to update the check status. This means in order to
run a Script check, we need to know what Task to execute it in.
To support Consul Connect, we need Group Services, and these need to
be registered in Consul along with their checks. We could push the
Service down into the Task, but this doesn't work if someone wants to
associate a service with a task's ports, but do script checks in
another task in the allocation.
Because Nomad is handling the Script check and not Consul anyways,
this moves the script check handling into the task runner so that the
task runner can own the script check's configuration and
lifecycle. This will allow us to pass the group service check
configuration down into a task without associating the service itself
with the task.
When tasks are checked for script checks, we walk back through their
task group to see if there are script checks associated with the
task. If so, we'll spin off script check tasklets for them. The
group-level service and any restart behaviors it needs are entirely
encapsulated within the group service hook.
2019-09-03 19:09:04 +00:00
|
|
|
checkID := MakeCheckID(serviceID, check)
|
2018-04-23 23:34:53 +00:00
|
|
|
c.checkWatcher.Watch(newTask.AllocID, newTask.Name, checkID, check, newTask.Restarter)
|
2017-08-26 05:40:18 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2017-02-01 00:43:57 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// RemoveTask from Consul. Removes all service entries and checks.
|
|
|
|
//
|
2018-03-11 17:41:50 +00:00
|
|
|
// Actual communication with Consul is done asynchronously (see Run).
|
2018-04-23 23:34:53 +00:00
|
|
|
func (c *ServiceClient) RemoveTask(task *TaskServices) {
|
2017-04-08 00:10:26 +00:00
|
|
|
ops := operations{}
|
2017-02-01 00:43:57 +00:00
|
|
|
|
2017-04-04 00:08:08 +00:00
|
|
|
for _, service := range task.Services {
|
2019-08-31 20:53:23 +00:00
|
|
|
id := MakeTaskServiceID(task.AllocID, task.Name, service)
|
2017-04-08 00:10:26 +00:00
|
|
|
ops.deregServices = append(ops.deregServices, id)
|
2017-02-01 00:43:57 +00:00
|
|
|
|
|
|
|
for _, check := range service.Checks {
|
support script checks for task group services (#6197)
In Nomad prior to Consul Connect, all Consul checks work the same
except for Script checks. Because the Task being checked is running in
its own container namespaces, the check is executed by Nomad in the
Task's context. If the Script check passes, Nomad uses the TTL check
feature of Consul to update the check status. This means in order to
run a Script check, we need to know what Task to execute it in.
To support Consul Connect, we need Group Services, and these need to
be registered in Consul along with their checks. We could push the
Service down into the Task, but this doesn't work if someone wants to
associate a service with a task's ports, but do script checks in
another task in the allocation.
Because Nomad is handling the Script check and not Consul anyways,
this moves the script check handling into the task runner so that the
task runner can own the script check's configuration and
lifecycle. This will allow us to pass the group service check
configuration down into a task without associating the service itself
with the task.
When tasks are checked for script checks, we walk back through their
task group to see if there are script checks associated with the
task. If so, we'll spin off script check tasklets for them. The
group-level service and any restart behaviors it needs are entirely
encapsulated within the group service hook.
2019-09-03 19:09:04 +00:00
|
|
|
cid := MakeCheckID(id, check)
|
2017-08-26 05:40:18 +00:00
|
|
|
ops.deregChecks = append(ops.deregChecks, cid)
|
|
|
|
|
2017-09-14 16:58:35 +00:00
|
|
|
if check.TriggersRestarts() {
|
2017-08-26 05:40:18 +00:00
|
|
|
c.checkWatcher.Unwatch(cid)
|
|
|
|
}
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-08-07 22:54:05 +00:00
|
|
|
// Remove the task from the alloc's registrations
|
2018-04-23 23:34:53 +00:00
|
|
|
c.removeTaskRegistration(task.AllocID, task.Name)
|
2017-08-07 22:54:05 +00:00
|
|
|
|
2017-02-01 00:43:57 +00:00
|
|
|
// Now add them to the deregistration fields; main Run loop will update
|
2017-04-08 00:10:26 +00:00
|
|
|
c.commit(&ops)
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
|
2017-08-07 22:54:05 +00:00
|
|
|
// AllocRegistrations returns the registrations for the given allocation. If the
|
|
|
|
// allocation has no reservations, the response is a nil object.
|
|
|
|
func (c *ServiceClient) AllocRegistrations(allocID string) (*AllocRegistration, error) {
|
|
|
|
// Get the internal struct using the lock
|
|
|
|
c.allocRegistrationsLock.RLock()
|
|
|
|
regInternal, ok := c.allocRegistrations[allocID]
|
|
|
|
if !ok {
|
|
|
|
c.allocRegistrationsLock.RUnlock()
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Copy so we don't expose internal structs
|
2017-08-10 20:07:03 +00:00
|
|
|
reg := regInternal.copy()
|
2017-08-07 22:54:05 +00:00
|
|
|
c.allocRegistrationsLock.RUnlock()
|
|
|
|
|
|
|
|
// Query the services and checks to populate the allocation registrations.
|
|
|
|
services, err := c.client.Services()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
2017-07-04 19:24:27 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
checks, err := c.client.Checks()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2017-08-07 22:54:05 +00:00
|
|
|
// Populate the object
|
|
|
|
for _, treg := range reg.Tasks {
|
|
|
|
for serviceID, sreg := range treg.Services {
|
|
|
|
sreg.Service = services[serviceID]
|
|
|
|
for checkID := range sreg.checkIDs {
|
|
|
|
if check, ok := checks[checkID]; ok {
|
|
|
|
sreg.Checks = append(sreg.Checks, check)
|
|
|
|
}
|
|
|
|
}
|
2017-07-04 19:24:27 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-08-07 22:54:05 +00:00
|
|
|
return reg, nil
|
2017-07-04 19:24:27 +00:00
|
|
|
}
|
|
|
|
|
2019-09-16 20:26:06 +00:00
|
|
|
// UpdateTTL is used to update the TTL of a check. Typically this will only be
|
|
|
|
// called to heartbeat script checks.
|
support script checks for task group services (#6197)
In Nomad prior to Consul Connect, all Consul checks work the same
except for Script checks. Because the Task being checked is running in
its own container namespaces, the check is executed by Nomad in the
Task's context. If the Script check passes, Nomad uses the TTL check
feature of Consul to update the check status. This means in order to
run a Script check, we need to know what Task to execute it in.
To support Consul Connect, we need Group Services, and these need to
be registered in Consul along with their checks. We could push the
Service down into the Task, but this doesn't work if someone wants to
associate a service with a task's ports, but do script checks in
another task in the allocation.
Because Nomad is handling the Script check and not Consul anyways,
this moves the script check handling into the task runner so that the
task runner can own the script check's configuration and
lifecycle. This will allow us to pass the group service check
configuration down into a task without associating the service itself
with the task.
When tasks are checked for script checks, we walk back through their
task group to see if there are script checks associated with the
task. If so, we'll spin off script check tasklets for them. The
group-level service and any restart behaviors it needs are entirely
encapsulated within the group service hook.
2019-09-03 19:09:04 +00:00
|
|
|
func (c *ServiceClient) UpdateTTL(id, output, status string) error {
|
|
|
|
return c.client.UpdateTTL(id, output, status)
|
|
|
|
}
|
|
|
|
|
2018-03-11 18:40:53 +00:00
|
|
|
// Shutdown the Consul client. Update running task registrations and deregister
|
2017-04-18 00:07:42 +00:00
|
|
|
// agent from Consul. On first call blocks up to shutdownWait before giving up
|
|
|
|
// on syncing operations.
|
2017-02-01 00:43:57 +00:00
|
|
|
func (c *ServiceClient) Shutdown() error {
|
2017-04-18 00:07:42 +00:00
|
|
|
// Serialize Shutdown calls with RegisterAgent to prevent leaking agent
|
|
|
|
// entries.
|
|
|
|
c.agentLock.Lock()
|
2017-07-24 18:40:37 +00:00
|
|
|
defer c.agentLock.Unlock()
|
2017-02-01 00:43:57 +00:00
|
|
|
select {
|
|
|
|
case <-c.shutdownCh:
|
|
|
|
return nil
|
|
|
|
default:
|
2017-07-24 18:40:37 +00:00
|
|
|
close(c.shutdownCh)
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
|
2017-04-12 19:07:10 +00:00
|
|
|
// Give run loop time to sync, but don't block indefinitely
|
|
|
|
deadline := time.After(c.shutdownWait)
|
2017-02-01 00:43:57 +00:00
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
// Wait for Run to finish any outstanding operations and exit
|
2017-02-01 00:43:57 +00:00
|
|
|
select {
|
2017-04-08 00:10:26 +00:00
|
|
|
case <-c.exitCh:
|
2017-02-01 00:43:57 +00:00
|
|
|
case <-deadline:
|
|
|
|
// Don't wait forever though
|
2017-07-24 18:40:37 +00:00
|
|
|
}
|
|
|
|
|
2017-07-24 23:48:40 +00:00
|
|
|
// If Consul was never seen nothing could be written so exit early
|
|
|
|
if !c.hasSeen() {
|
2017-07-24 19:12:02 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-07-24 18:40:37 +00:00
|
|
|
// Always attempt to deregister Nomad agent Consul entries, even if
|
|
|
|
// deadline was reached
|
|
|
|
for id := range c.agentServices {
|
|
|
|
if err := c.client.ServiceDeregister(id); err != nil {
|
2018-09-13 17:43:40 +00:00
|
|
|
c.logger.Error("failed deregistering agent service", "service_id", id, "error", err)
|
2017-07-24 18:40:37 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
for id := range c.agentChecks {
|
|
|
|
if err := c.client.CheckDeregister(id); err != nil {
|
2018-09-13 17:43:40 +00:00
|
|
|
c.logger.Error("failed deregistering agent check", "check_id", id, "error", err)
|
2017-07-24 18:40:37 +00:00
|
|
|
}
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
|
2017-04-12 19:07:10 +00:00
|
|
|
return nil
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
|
2017-08-07 22:54:05 +00:00
|
|
|
// addTaskRegistration adds the task registration for the given allocation.
|
|
|
|
func (c *ServiceClient) addTaskRegistration(allocID, taskName string, reg *TaskRegistration) {
|
|
|
|
c.allocRegistrationsLock.Lock()
|
|
|
|
defer c.allocRegistrationsLock.Unlock()
|
|
|
|
|
|
|
|
alloc, ok := c.allocRegistrations[allocID]
|
|
|
|
if !ok {
|
|
|
|
alloc = &AllocRegistration{
|
|
|
|
Tasks: make(map[string]*TaskRegistration),
|
|
|
|
}
|
|
|
|
c.allocRegistrations[allocID] = alloc
|
|
|
|
}
|
|
|
|
alloc.Tasks[taskName] = reg
|
|
|
|
}
|
|
|
|
|
|
|
|
// removeTaskRegistration removes the task registration for the given allocation.
|
|
|
|
func (c *ServiceClient) removeTaskRegistration(allocID, taskName string) {
|
|
|
|
c.allocRegistrationsLock.Lock()
|
|
|
|
defer c.allocRegistrationsLock.Unlock()
|
|
|
|
|
|
|
|
alloc, ok := c.allocRegistrations[allocID]
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Delete the task and if it is the last one also delete the alloc's
|
|
|
|
// registration
|
|
|
|
delete(alloc.Tasks, taskName)
|
|
|
|
if len(alloc.Tasks) == 0 {
|
|
|
|
delete(c.allocRegistrations, allocID)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-02-01 00:43:57 +00:00
|
|
|
// makeAgentServiceID creates a unique ID for identifying an agent service in
|
|
|
|
// Consul.
|
|
|
|
//
|
|
|
|
// Agent service IDs are of the form:
|
|
|
|
//
|
2017-12-08 01:08:25 +00:00
|
|
|
// {nomadServicePrefix}-{ROLE}-b32(sha1({Service.Name}-{Service.Tags...})
|
2017-12-12 00:50:15 +00:00
|
|
|
// Example Server ID: _nomad-server-fbbk265qn4tmt25nd4ep42tjvmyj3hr4
|
|
|
|
// Example Client ID: _nomad-client-ggnjpgl7yn7rgmvxzilmpvrzzvrszc7l
|
2017-02-01 00:43:57 +00:00
|
|
|
//
|
|
|
|
func makeAgentServiceID(role string, service *structs.Service) string {
|
2018-04-23 23:34:53 +00:00
|
|
|
return fmt.Sprintf("%s-%s-%s", nomadServicePrefix, role, service.Hash(role, "", false))
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
|
2019-08-12 22:41:39 +00:00
|
|
|
// MakeTaskServiceID creates a unique ID for identifying a task service in
|
2019-06-13 12:57:27 +00:00
|
|
|
// Consul.
|
2017-02-01 00:43:57 +00:00
|
|
|
//
|
2019-06-13 12:57:27 +00:00
|
|
|
// Example Service ID: _nomad-task-b4e61df9-b095-d64e-f241-23860da1375f-redis-http-http
|
2019-08-31 20:53:23 +00:00
|
|
|
func MakeTaskServiceID(allocID, taskName string, service *structs.Service) string {
|
2019-06-13 12:57:27 +00:00
|
|
|
return fmt.Sprintf("%s%s-%s-%s-%s", nomadTaskPrefix, allocID, taskName, service.Name, service.PortLabel)
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
|
support script checks for task group services (#6197)
In Nomad prior to Consul Connect, all Consul checks work the same
except for Script checks. Because the Task being checked is running in
its own container namespaces, the check is executed by Nomad in the
Task's context. If the Script check passes, Nomad uses the TTL check
feature of Consul to update the check status. This means in order to
run a Script check, we need to know what Task to execute it in.
To support Consul Connect, we need Group Services, and these need to
be registered in Consul along with their checks. We could push the
Service down into the Task, but this doesn't work if someone wants to
associate a service with a task's ports, but do script checks in
another task in the allocation.
Because Nomad is handling the Script check and not Consul anyways,
this moves the script check handling into the task runner so that the
task runner can own the script check's configuration and
lifecycle. This will allow us to pass the group service check
configuration down into a task without associating the service itself
with the task.
When tasks are checked for script checks, we walk back through their
task group to see if there are script checks associated with the
task. If so, we'll spin off script check tasklets for them. The
group-level service and any restart behaviors it needs are entirely
encapsulated within the group service hook.
2019-09-03 19:09:04 +00:00
|
|
|
// MakeCheckID creates a unique ID for a check.
|
2019-05-09 11:22:22 +00:00
|
|
|
//
|
|
|
|
// Example Check ID: _nomad-check-434ae42f9a57c5705344974ac38de2aee0ee089d
|
support script checks for task group services (#6197)
In Nomad prior to Consul Connect, all Consul checks work the same
except for Script checks. Because the Task being checked is running in
its own container namespaces, the check is executed by Nomad in the
Task's context. If the Script check passes, Nomad uses the TTL check
feature of Consul to update the check status. This means in order to
run a Script check, we need to know what Task to execute it in.
To support Consul Connect, we need Group Services, and these need to
be registered in Consul along with their checks. We could push the
Service down into the Task, but this doesn't work if someone wants to
associate a service with a task's ports, but do script checks in
another task in the allocation.
Because Nomad is handling the Script check and not Consul anyways,
this moves the script check handling into the task runner so that the
task runner can own the script check's configuration and
lifecycle. This will allow us to pass the group service check
configuration down into a task without associating the service itself
with the task.
When tasks are checked for script checks, we walk back through their
task group to see if there are script checks associated with the
task. If so, we'll spin off script check tasklets for them. The
group-level service and any restart behaviors it needs are entirely
encapsulated within the group service hook.
2019-09-03 19:09:04 +00:00
|
|
|
func MakeCheckID(serviceID string, check *structs.ServiceCheck) string {
|
consul: Use a stable identifier for services
The current implementation of Service Registration uses a hash of the
nomad-internal state of a service to register it with Consul, this means that
any update to the service invalidates this name and we then deregister, and
recreate the service in Consul.
While this behaviour slightly simplifies reasoning about service registration,
this becomes problematic when we add consul health checks to a service. When
the service is re-registered, so are the checks, which default to failing for
at least one check period.
This commit migrates us to using a stable identifier based on the
allocation, task, and service identifiers, and uses the difference
between the remote and local state to decide when to push updates.
It uses the existing hashing mechanic to decide when UpdateTask should
regenerate service registrations for providing to Sync, but this should
be removable as part of a future refactor.
It additionally introduces the _nomad-check- prefix for check
definitions, to allow for future allowing of consul features like
maintenance mode.
2019-04-10 08:39:24 +00:00
|
|
|
return fmt.Sprintf("%s%s", nomadCheckPrefix, check.Hash(serviceID))
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// createCheckReg creates a Check that can be registered with Consul.
|
|
|
|
//
|
2017-04-12 20:27:56 +00:00
|
|
|
// Script checks simply have a TTL set and the caller is responsible for
|
|
|
|
// running the script and heartbeating.
|
2017-02-01 00:43:57 +00:00
|
|
|
func createCheckReg(serviceID, checkID string, check *structs.ServiceCheck, host string, port int) (*api.AgentCheckRegistration, error) {
|
|
|
|
chkReg := api.AgentCheckRegistration{
|
|
|
|
ID: checkID,
|
|
|
|
Name: check.Name,
|
|
|
|
ServiceID: serviceID,
|
|
|
|
}
|
|
|
|
chkReg.Status = check.InitialStatus
|
|
|
|
chkReg.Timeout = check.Timeout.String()
|
|
|
|
chkReg.Interval = check.Interval.String()
|
|
|
|
|
2017-12-19 00:18:42 +00:00
|
|
|
// Require an address for http or tcp checks
|
|
|
|
if port == 0 && check.RequiresPort() {
|
|
|
|
return nil, fmt.Errorf("%s checks require an address", check.Type)
|
|
|
|
}
|
|
|
|
|
2017-02-01 00:43:57 +00:00
|
|
|
switch check.Type {
|
|
|
|
case structs.ServiceCheckHTTP:
|
2017-04-26 18:22:01 +00:00
|
|
|
proto := check.Protocol
|
|
|
|
if proto == "" {
|
|
|
|
proto = "http"
|
2017-02-01 00:43:57 +00:00
|
|
|
}
|
2017-04-19 04:28:25 +00:00
|
|
|
if check.TLSSkipVerify {
|
|
|
|
chkReg.TLSSkipVerify = true
|
|
|
|
}
|
2017-02-01 00:43:57 +00:00
|
|
|
base := url.URL{
|
2017-04-26 18:22:01 +00:00
|
|
|
Scheme: proto,
|
2017-02-01 00:43:57 +00:00
|
|
|
Host: net.JoinHostPort(host, strconv.Itoa(port)),
|
|
|
|
}
|
|
|
|
relative, err := url.Parse(check.Path)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
url := base.ResolveReference(relative)
|
|
|
|
chkReg.HTTP = url.String()
|
2017-08-15 23:13:05 +00:00
|
|
|
chkReg.Method = check.Method
|
|
|
|
chkReg.Header = check.Header
|
2018-05-02 23:49:47 +00:00
|
|
|
|
2017-02-01 00:43:57 +00:00
|
|
|
case structs.ServiceCheckTCP:
|
|
|
|
chkReg.TCP = net.JoinHostPort(host, strconv.Itoa(port))
|
2018-05-02 23:49:47 +00:00
|
|
|
|
2017-02-01 00:43:57 +00:00
|
|
|
case structs.ServiceCheckScript:
|
|
|
|
chkReg.TTL = (check.Interval + ttlCheckBuffer).String()
|
2017-10-17 00:35:47 +00:00
|
|
|
// As of Consul 1.0.0 setting TTL and Interval is a 400
|
|
|
|
chkReg.Interval = ""
|
2018-05-02 23:49:47 +00:00
|
|
|
|
|
|
|
case structs.ServiceCheckGRPC:
|
2018-05-03 22:18:12 +00:00
|
|
|
chkReg.GRPC = fmt.Sprintf("%s/%s", net.JoinHostPort(host, strconv.Itoa(port)), check.GRPCService)
|
2018-05-02 23:49:47 +00:00
|
|
|
chkReg.GRPCUseTLS = check.GRPCUseTLS
|
|
|
|
if check.TLSSkipVerify {
|
|
|
|
chkReg.TLSSkipVerify = true
|
|
|
|
}
|
|
|
|
|
2017-02-01 00:43:57 +00:00
|
|
|
default:
|
|
|
|
return nil, fmt.Errorf("check type %+q not valid", check.Type)
|
|
|
|
}
|
|
|
|
return &chkReg, nil
|
|
|
|
}
|
2017-04-08 00:10:26 +00:00
|
|
|
|
2019-04-25 11:48:19 +00:00
|
|
|
// isNomadCheck returns true if the ID matches the pattern of a Nomad managed
|
|
|
|
// check.
|
|
|
|
func isNomadCheck(id string) bool {
|
|
|
|
return strings.HasPrefix(id, nomadCheckPrefix)
|
|
|
|
}
|
|
|
|
|
2017-04-08 00:10:26 +00:00
|
|
|
// isNomadService returns true if the ID matches the pattern of a Nomad managed
|
2017-12-08 01:08:25 +00:00
|
|
|
// service (new or old formats). Agent services return false as independent
|
|
|
|
// client and server agents may be running on the same machine. #2827
|
2017-04-08 00:10:26 +00:00
|
|
|
func isNomadService(id string) bool {
|
2017-12-08 01:08:25 +00:00
|
|
|
return strings.HasPrefix(id, nomadTaskPrefix) || isOldNomadService(id)
|
|
|
|
}
|
|
|
|
|
|
|
|
// isOldNomadService returns true if the ID matches an old pattern managed by
|
|
|
|
// Nomad.
|
|
|
|
//
|
|
|
|
// Pre-0.7.1 task service IDs are of the form:
|
|
|
|
//
|
|
|
|
// {nomadServicePrefix}-executor-{ALLOC_ID}-{Service.Name}-{Service.Tags...}
|
|
|
|
// Example Service ID: _nomad-executor-1234-echo-http-tag1-tag2-tag3
|
|
|
|
//
|
|
|
|
func isOldNomadService(id string) bool {
|
2017-07-18 20:23:01 +00:00
|
|
|
const prefix = nomadServicePrefix + "-executor"
|
|
|
|
return strings.HasPrefix(id, prefix)
|
2017-04-08 00:10:26 +00:00
|
|
|
}
|
2017-12-05 19:39:42 +00:00
|
|
|
|
2019-08-30 18:05:30 +00:00
|
|
|
// isNomadSidecar returns true if the ID matches a sidecar proxy for a Nomad
|
|
|
|
// managed service.
|
|
|
|
//
|
|
|
|
// For example if you have a Connect enabled service with the ID:
|
|
|
|
//
|
|
|
|
// _nomad-task-5229c7f8-376b-3ccc-edd9-981e238f7033-cache-redis-cache-db
|
|
|
|
//
|
|
|
|
// Consul will create a service for the sidecar proxy with the ID:
|
|
|
|
//
|
|
|
|
// _nomad-task-5229c7f8-376b-3ccc-edd9-981e238f7033-cache-redis-cache-db-sidecar-proxy
|
|
|
|
//
|
|
|
|
func isNomadSidecar(id string, services map[string]*api.AgentServiceRegistration) bool {
|
|
|
|
const suffix = "-sidecar-proxy"
|
|
|
|
if !strings.HasSuffix(id, suffix) {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
// Make sure the Nomad managed service for this proxy still exists.
|
|
|
|
_, ok := services[id[:len(id)-len(suffix)]]
|
|
|
|
return ok
|
|
|
|
}
|
|
|
|
|
2017-12-19 00:18:42 +00:00
|
|
|
// getAddress returns the IP and port to use for a service or check. If no port
|
|
|
|
// label is specified (an empty value), zero values are returned because no
|
|
|
|
// address could be resolved.
|
2019-01-04 23:01:35 +00:00
|
|
|
func getAddress(addrMode, portLabel string, networks structs.Networks, driverNet *drivers.DriverNetwork) (string, int, error) {
|
2017-12-05 19:39:42 +00:00
|
|
|
switch addrMode {
|
|
|
|
case structs.AddressModeAuto:
|
|
|
|
if driverNet.Advertise() {
|
|
|
|
addrMode = structs.AddressModeDriver
|
|
|
|
} else {
|
|
|
|
addrMode = structs.AddressModeHost
|
|
|
|
}
|
|
|
|
return getAddress(addrMode, portLabel, networks, driverNet)
|
|
|
|
case structs.AddressModeHost:
|
2017-12-20 23:02:34 +00:00
|
|
|
if portLabel == "" {
|
|
|
|
if len(networks) != 1 {
|
|
|
|
// If no networks are specified return zero
|
|
|
|
// values. Consul will advertise the host IP
|
|
|
|
// with no port. This is the pre-0.7.1 behavior
|
|
|
|
// some people rely on.
|
|
|
|
return "", 0, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return networks[0].IP, 0, nil
|
|
|
|
}
|
|
|
|
|
2017-12-05 19:39:42 +00:00
|
|
|
// Default path: use host ip:port
|
|
|
|
ip, port := networks.Port(portLabel)
|
2017-12-08 20:27:57 +00:00
|
|
|
if ip == "" && port <= 0 {
|
2017-12-08 05:58:15 +00:00
|
|
|
return "", 0, fmt.Errorf("invalid port %q: port label not found", portLabel)
|
|
|
|
}
|
2017-12-05 19:39:42 +00:00
|
|
|
return ip, port, nil
|
|
|
|
|
|
|
|
case structs.AddressModeDriver:
|
|
|
|
// Require a driver network if driver address mode is used
|
|
|
|
if driverNet == nil {
|
|
|
|
return "", 0, fmt.Errorf(`cannot use address_mode="driver": no driver network exists`)
|
|
|
|
}
|
|
|
|
|
2017-12-20 23:02:34 +00:00
|
|
|
// If no port label is specified just return the IP
|
|
|
|
if portLabel == "" {
|
|
|
|
return driverNet.IP, 0, nil
|
|
|
|
}
|
|
|
|
|
2017-12-05 19:39:42 +00:00
|
|
|
// If the port is a label, use the driver's port (not the host's)
|
|
|
|
if port, ok := driverNet.PortMap[portLabel]; ok {
|
|
|
|
return driverNet.IP, port, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// If port isn't a label, try to parse it as a literal port number
|
|
|
|
port, err := strconv.Atoi(portLabel)
|
|
|
|
if err != nil {
|
2018-01-12 23:32:51 +00:00
|
|
|
// Don't include Atoi error message as user likely
|
|
|
|
// never intended it to be a numeric and it creates a
|
|
|
|
// confusing error message
|
|
|
|
return "", 0, fmt.Errorf("invalid port label %q: port labels in driver address_mode must be numeric or in the driver's port map", portLabel)
|
2017-12-05 19:39:42 +00:00
|
|
|
}
|
2017-12-08 20:27:57 +00:00
|
|
|
if port <= 0 {
|
2018-01-12 23:32:51 +00:00
|
|
|
return "", 0, fmt.Errorf("invalid port: %q: port must be >0", portLabel)
|
2017-12-08 06:04:22 +00:00
|
|
|
}
|
2017-12-05 19:39:42 +00:00
|
|
|
|
|
|
|
return driverNet.IP, port, nil
|
|
|
|
|
|
|
|
default:
|
|
|
|
// Shouldn't happen due to validation, but enforce invariants
|
|
|
|
return "", 0, fmt.Errorf("invalid address mode %q", addrMode)
|
|
|
|
}
|
|
|
|
}
|
2019-08-14 22:02:00 +00:00
|
|
|
|
|
|
|
// newConnect creates a new Consul AgentServiceConnect struct based on a Nomad
|
|
|
|
// Connect struct. If the nomad Connect struct is nil, nil will be returned to
|
|
|
|
// disable Connect for this service.
|
|
|
|
func newConnect(serviceName string, nc *structs.ConsulConnect, networks structs.Networks) (*api.AgentServiceConnect, error) {
|
|
|
|
if nc == nil {
|
|
|
|
// No Connect stanza, returning nil is fine
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
cc := &api.AgentServiceConnect{
|
|
|
|
Native: nc.Native,
|
|
|
|
}
|
|
|
|
|
|
|
|
if nc.SidecarService == nil {
|
|
|
|
return cc, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
net, port, err := getConnectPort(serviceName, networks)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Bind to netns IP(s):port
|
|
|
|
proxyConfig := map[string]interface{}{}
|
2019-09-23 18:30:48 +00:00
|
|
|
localServiceAddress := ""
|
|
|
|
localServicePort := 0
|
|
|
|
if nc.SidecarService.Proxy != nil {
|
|
|
|
localServiceAddress = nc.SidecarService.Proxy.LocalServiceAddress
|
|
|
|
localServicePort = nc.SidecarService.Proxy.LocalServicePort
|
|
|
|
if nc.SidecarService.Proxy.Config != nil {
|
|
|
|
proxyConfig = nc.SidecarService.Proxy.Config
|
|
|
|
}
|
2019-08-14 22:02:00 +00:00
|
|
|
}
|
|
|
|
proxyConfig["bind_address"] = "0.0.0.0"
|
|
|
|
proxyConfig["bind_port"] = port.To
|
|
|
|
|
|
|
|
// Advertise host IP:port
|
|
|
|
cc.SidecarService = &api.AgentServiceRegistration{
|
2019-10-08 19:19:09 +00:00
|
|
|
Tags: helper.CopySliceString(nc.SidecarService.Tags),
|
2019-08-14 22:02:00 +00:00
|
|
|
Address: net.IP,
|
|
|
|
Port: port.Value,
|
|
|
|
|
|
|
|
// Automatically configure the proxy to bind to all addresses
|
|
|
|
// within the netns.
|
|
|
|
Proxy: &api.AgentServiceConnectProxyConfig{
|
2019-09-23 18:30:48 +00:00
|
|
|
LocalServiceAddress: localServiceAddress,
|
|
|
|
LocalServicePort: localServicePort,
|
|
|
|
Config: proxyConfig,
|
2019-08-14 22:02:00 +00:00
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
// If no further proxy settings were explicitly configured, exit early
|
|
|
|
if nc.SidecarService.Proxy == nil {
|
|
|
|
return cc, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
numUpstreams := len(nc.SidecarService.Proxy.Upstreams)
|
|
|
|
if numUpstreams == 0 {
|
|
|
|
return cc, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
upstreams := make([]api.Upstream, numUpstreams)
|
|
|
|
for i, nu := range nc.SidecarService.Proxy.Upstreams {
|
|
|
|
upstreams[i].DestinationName = nu.DestinationName
|
|
|
|
upstreams[i].LocalBindPort = nu.LocalBindPort
|
|
|
|
}
|
|
|
|
cc.SidecarService.Proxy.Upstreams = upstreams
|
|
|
|
|
|
|
|
return cc, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// getConnectPort returns the network and port for the Connect proxy sidecar
|
|
|
|
// defined for this service. An error is returned if the network and port
|
|
|
|
// cannot be determined.
|
|
|
|
func getConnectPort(serviceName string, networks structs.Networks) (*structs.NetworkResource, structs.Port, error) {
|
|
|
|
if n := len(networks); n != 1 {
|
|
|
|
return nil, structs.Port{}, fmt.Errorf("Connect only supported with exactly 1 network (found %d)", n)
|
|
|
|
}
|
|
|
|
|
|
|
|
port, ok := networks[0].PortForService(serviceName)
|
|
|
|
if !ok {
|
|
|
|
return nil, structs.Port{}, fmt.Errorf("No Connect port defined for service %q", serviceName)
|
|
|
|
}
|
|
|
|
|
|
|
|
return networks[0], port, nil
|
|
|
|
}
|