Merge pull request #3619 from hashicorp/f-3380-custom-ports

Allow custom ports for services and checks when using driver address_mode
This commit is contained in:
Michael Schurter 2017-12-11 11:43:33 -08:00 committed by GitHub
commit 52bb3f592d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1070 additions and 199 deletions

View File

@ -1,6 +1,9 @@
## 0.7.1 (Unreleased)
__BACKWARDS INCOMPATIBILITIES:__
* client: The format of service IDs in Consul has changed. If you rely upon
Nomad's service IDs (*not* service names; those are stable), you will need
to update your code. [GH-3632]
* config: Nomad no longer parses Atlas configuration stanzas. Atlas has been
deprecated since earlier this year. If you have an Atlas stanza in your
config file it will have to be removed.
@ -22,10 +25,13 @@ IMPROVEMENTS:
* api: Environment variables are ignored during service name validation [GH-3532]
* cli: Allocation create and modify times are displayed in a human readable
relative format like `6 h ago` [GH-3449]
* client: Support `address_mode` on checks [GH-3619]
* client: Sticky volume migrations are now atomic. [GH-3563]
* client: Added metrics to track state transitions of allocations [GH-3061]
* client: When `network_interface` is unspecified use interface attached to
default route [GH-3546]
* client: Support numeric ports on services and checks when
`address_mode="driver"` [GH-3619]
* driver/docker: Detect OOM kill event [GH-3459]
* driver/docker: Adds support for adding host device to container via
`--device` [GH-2938]
@ -54,9 +60,13 @@ BUG FIXES:
explicitly [GH-3520]
* cli: Fix passing Consul address via flags [GH-3504]
* cli: Fix panic when running `keyring` commands [GH-3509]
* client: Fix advertising services with tags that require URL escaping
[GH-3632]
* client: Fix a panic when restoring an allocation with a dead leader task
[GH-3502]
* client: Fix crash when following logs from a Windows node [GH-3608]
* client: Fix service/check updating when just interpolated variables change
[GH-3619]
* client: Fix allocation accounting in GC and trigger GCs on allocation
updates [GH-3445]
* driver/rkt: Remove pods on shutdown [GH-3562]

View File

@ -154,6 +154,7 @@ type ServiceCheck struct {
Path string
Protocol string
PortLabel string `mapstructure:"port"`
AddressMode string `mapstructure:"address_mode"`
Interval time.Duration
Timeout time.Duration
InitialStatus string `mapstructure:"initial_status"`

View File

@ -209,12 +209,12 @@ func setupTaskEnv(t *testing.T, driver string) (*allocdir.TaskDir, map[string]st
"NOMAD_PORT_two": "443",
"NOMAD_HOST_PORT_two": "443",
"NOMAD_ADDR_admin": "1.2.3.4:8081",
"NOMAD_ADDR_web_main": "192.168.0.100:5000",
"NOMAD_ADDR_web_admin": "192.168.0.100:5000",
"NOMAD_ADDR_web_http": "192.168.0.100:2000",
"NOMAD_IP_web_main": "192.168.0.100",
"NOMAD_IP_web_admin": "192.168.0.100",
"NOMAD_IP_web_http": "192.168.0.100",
"NOMAD_PORT_web_http": "2000",
"NOMAD_PORT_web_main": "5000",
"NOMAD_PORT_web_admin": "5000",
"NOMAD_IP_admin": "1.2.3.4",
"NOMAD_PORT_admin": "8081",
"NOMAD_HOST_PORT_admin": "8081",

View File

@ -10,6 +10,7 @@ import (
"log"
"os"
"strconv"
"strings"
"time"
"github.com/mitchellh/mapstructure"
@ -61,6 +62,17 @@ type MockDriverConfig struct {
// SignalErr is the error message that the task returns if signalled
SignalErr string `mapstructure:"signal_error"`
// DriverIP will be returned as the DriverNetwork.IP from Start()
DriverIP string `mapstructure:"driver_ip"`
// DriverAdvertise will be returned as DriverNetwork.AutoAdvertise from
// Start().
DriverAdvertise bool `mapstructure:"driver_advertise"`
// DriverPortMap will parse a label:number pair and return it in
// DriverNetwork.PortMap from Start().
DriverPortMap string `mapstructure:"driver_port_map"`
}
// MockDriver is a driver which is used for testing purposes
@ -114,6 +126,23 @@ func (m *MockDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse
return nil, structs.NewRecoverableError(errors.New(driverConfig.StartErr), driverConfig.StartErrRecoverable)
}
// Create the driver network
net := &cstructs.DriverNetwork{
IP: driverConfig.DriverIP,
AutoAdvertise: driverConfig.DriverAdvertise,
}
if raw := driverConfig.DriverPortMap; len(raw) > 0 {
parts := strings.Split(raw, ":")
if len(parts) != 2 {
return nil, fmt.Errorf("malformed port map: %q", raw)
}
port, err := strconv.Atoi(parts[1])
if err != nil {
return nil, fmt.Errorf("malformed port map: %q -- error: %v", raw, err)
}
net.PortMap = map[string]int{parts[0]: port}
}
h := mockDriverHandle{
taskName: task.Name,
runFor: driverConfig.RunFor,
@ -133,7 +162,8 @@ func (m *MockDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse
}
m.logger.Printf("[DEBUG] driver.mock: starting task %q", task.Name)
go h.run()
return &StartResponse{Handle: &h}, nil
return &StartResponse{Handle: &h, Network: net}, nil
}
// Cleanup deletes all keys except for Config.Options["cleanup_fail_on"] for
@ -265,10 +295,20 @@ func (h *mockDriverHandle) Kill() error {
select {
case <-h.doneCh:
case <-time.After(h.killAfter):
close(h.doneCh)
select {
case <-h.doneCh:
// already closed
default:
close(h.doneCh)
}
case <-time.After(h.killTimeout):
h.logger.Printf("[DEBUG] driver.mock: terminating task %q", h.taskName)
close(h.doneCh)
select {
case <-h.doneCh:
// already closed
default:
close(h.doneCh)
}
}
return nil
}
@ -286,7 +326,12 @@ func (h *mockDriverHandle) run() {
for {
select {
case <-timer.C:
close(h.doneCh)
select {
case <-h.doneCh:
// already closed
default:
close(h.doneCh)
}
case <-h.doneCh:
h.logger.Printf("[DEBUG] driver.mock: finished running task %q", h.taskName)
h.waitCh <- dstructs.NewWaitResult(h.exitCode, h.exitSignal, h.exitErr)

View File

@ -1631,7 +1631,12 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
// Merge in the task resources
updatedTask.Resources = update.TaskResources[updatedTask.Name]
// Update the task's environment for interpolating in services/checks
// Interpolate the old task with the old env before updating the env as
// updating services in Consul need both the old and new interpolations
// to find differences.
oldInterpolatedTask := interpolateServices(r.envBuilder.Build(), r.task)
// Now it's safe to update the environment
r.envBuilder.UpdateTask(update, updatedTask)
var mErr multierror.Error
@ -1650,7 +1655,8 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
}
// Update services in Consul
if err := r.updateServices(drv, r.handle, r.task, updatedTask); err != nil {
newInterpolatedTask := interpolateServices(r.envBuilder.Build(), updatedTask)
if err := r.updateServices(drv, r.handle, oldInterpolatedTask, newInterpolatedTask); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("error updating services and checks in Consul: %v", err))
}
}
@ -1667,19 +1673,17 @@ func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
return mErr.ErrorOrNil()
}
// updateServices and checks with Consul.
func (r *TaskRunner) updateServices(d driver.Driver, h driver.ScriptExecutor, old, new *structs.Task) error {
// updateServices and checks with Consul. Tasks must be interpolated!
func (r *TaskRunner) updateServices(d driver.Driver, h driver.ScriptExecutor, oldTask, newTask *structs.Task) error {
var exec driver.ScriptExecutor
if d.Abilities().Exec {
// Allow set the script executor if the driver supports it
exec = h
}
newInterpolatedTask := interpolateServices(r.envBuilder.Build(), new)
oldInterpolatedTask := interpolateServices(r.envBuilder.Build(), old)
r.driverNetLock.Lock()
net := r.driverNet.Copy()
r.driverNetLock.Unlock()
return r.consul.UpdateTask(r.alloc.ID, oldInterpolatedTask, newInterpolatedTask, r, exec, net)
return r.consul.UpdateTask(r.alloc.ID, oldTask, newTask, r, exec, net)
}
// handleDestroy kills the task handle. In the case that killing fails,

View File

@ -73,15 +73,17 @@ func (m *MockTaskStateUpdater) String() string {
}
type taskRunnerTestCtx struct {
upd *MockTaskStateUpdater
tr *TaskRunner
allocDir *allocdir.AllocDir
vault *vaultclient.MockVaultClient
consul *mockConsulServiceClient
upd *MockTaskStateUpdater
tr *TaskRunner
allocDir *allocdir.AllocDir
vault *vaultclient.MockVaultClient
consul *consul.MockAgent
consulClient *consul.ServiceClient
}
// Cleanup calls Destroy on the task runner and alloc dir
func (ctx *taskRunnerTestCtx) Cleanup() {
ctx.consulClient.Shutdown()
ctx.tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
ctx.allocDir.Destroy()
}
@ -117,9 +119,6 @@ func testTaskRunnerFromAlloc(t *testing.T, restarts bool, alloc *structs.Allocat
upd := &MockTaskStateUpdater{}
task := alloc.Job.TaskGroups[0].Tasks[0]
// Initialize the port listing. This should be done by the offer process but
// we have a mock so that doesn't happen.
task.Resources.Networks[0].ReservedPorts = []structs.Port{{Label: "", Value: 80}}
allocDir := allocdir.NewAllocDir(testLogger(), filepath.Join(conf.AllocDir, alloc.ID))
if err := allocDir.Build(); err != nil {
@ -143,17 +142,20 @@ func testTaskRunnerFromAlloc(t *testing.T, restarts bool, alloc *structs.Allocat
}
vclient := vaultclient.NewMockVaultClient()
cclient := newMockConsulServiceClient()
tr := NewTaskRunner(logger, conf, db, upd.Update, taskDir, alloc, task, vclient, cclient)
cclient := consul.NewMockAgent()
serviceClient := consul.NewServiceClient(cclient, true, logger)
go serviceClient.Run()
tr := NewTaskRunner(logger, conf, db, upd.Update, taskDir, alloc, task, vclient, serviceClient)
if !restarts {
tr.restartTracker = noRestartsTracker()
}
return &taskRunnerTestCtx{
upd: upd,
tr: tr,
allocDir: allocDir,
vault: vclient,
consul: cclient,
upd: upd,
tr: tr,
allocDir: allocDir,
vault: vclient,
consul: cclient,
consulClient: serviceClient,
}
}
@ -339,7 +341,12 @@ func TestTaskRunner_Update(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Services[0].Checks[0].Args[0] = "${NOMAD_META_foo}"
task.Services[0].Checks[0] = &structs.ServiceCheck{
Name: "http-check",
Type: "http",
PortLabel: "http",
Path: "${NOMAD_META_foo}",
}
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"run_for": "100s",
@ -350,6 +357,8 @@ func TestTaskRunner_Update(t *testing.T) {
go ctx.tr.Run()
defer ctx.Cleanup()
testWaitForTaskToStart(t, ctx)
// Update the task definition
updateAlloc := ctx.tr.alloc.Copy()
@ -363,10 +372,9 @@ func TestTaskRunner_Update(t *testing.T) {
// Update meta to make sure service checks are interpolated correctly
// #2180
newTask.Meta["foo"] = "UPDATE"
newTask.Meta["foo"] = "/UPDATE"
// Update the kill timeout
testWaitForTaskToStart(t, ctx)
oldHandle := ctx.tr.handle.ID()
newTask.KillTimeout = time.Hour
ctx.tr.Update(updateAlloc)
@ -380,25 +388,22 @@ func TestTaskRunner_Update(t *testing.T) {
return false, fmt.Errorf("Task not copied")
}
if ctx.tr.restartTracker.policy.Mode != newMode {
return false, fmt.Errorf("restart policy not ctx.updated")
return false, fmt.Errorf("expected restart policy %q but found %q", newMode, ctx.tr.restartTracker.policy.Mode)
}
if ctx.tr.handle.ID() == oldHandle {
return false, fmt.Errorf("handle not ctx.updated")
}
// Make sure Consul services were interpolated correctly during
// the update #2180
consul := ctx.tr.consul.(*mockConsulServiceClient)
consul.mu.Lock()
defer consul.mu.Unlock()
if len(consul.ops) < 2 {
return false, fmt.Errorf("expected at least 2 consul ops found: %d", len(consul.ops))
checks := ctx.consul.CheckRegs()
if n := len(checks); n != 1 {
return false, fmt.Errorf("expected 1 check but found %d", n)
}
lastOp := consul.ops[len(consul.ops)-1]
if lastOp.op != "update" {
return false, fmt.Errorf("expected last consul op to be update not %q", lastOp.op)
}
if found := lastOp.task.Services[0].Checks[0].Args[0]; found != "UPDATE" {
return false, fmt.Errorf("expected consul check to be UPDATE but found: %q", found)
for _, check := range checks {
if found := check.HTTP; !strings.HasSuffix(found, "/UPDATE") {
return false, fmt.Errorf("expected consul check path to end with /UPDATE but found: %q", found)
}
}
return true, nil
}, func(err error) {
@ -635,12 +640,16 @@ func TestTaskRunner_UnregisterConsul_Retries(t *testing.T) {
}
ctx := testTaskRunnerFromAlloc(t, true, alloc)
// Use mockConsulServiceClient
consul := newMockConsulServiceClient()
ctx.tr.consul = consul
ctx.tr.MarkReceived()
ctx.tr.Run()
defer ctx.Cleanup()
// Assert it is properly registered and unregistered
consul := ctx.tr.consul.(*mockConsulServiceClient)
if expected := 4; len(consul.ops) != expected {
t.Errorf("expected %d consul ops but found: %d", expected, len(consul.ops))
}
@ -1742,6 +1751,8 @@ func TestTaskRunner_ShutdownDelay(t *testing.T) {
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Services[0].Tags = []string{"tag1"}
task.Services = task.Services[:1] // only need 1 for this test
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"run_for": "1000s",
@ -1758,34 +1769,39 @@ func TestTaskRunner_ShutdownDelay(t *testing.T) {
// Wait for the task to start
testWaitForTaskToStart(t, ctx)
testutil.WaitForResult(func() (bool, error) {
services, _ := ctx.consul.Services()
if n := len(services); n != 1 {
return false, fmt.Errorf("expected 1 service found %d", n)
}
for _, s := range services {
if !reflect.DeepEqual(s.Tags, task.Services[0].Tags) {
return false, fmt.Errorf("expected tags=%q but found %q",
strings.Join(task.Services[0].Tags, ","), strings.Join(s.Tags, ","))
}
}
return true, nil
}, func(err error) {
services, _ := ctx.consul.Services()
for _, s := range services {
t.Logf("Service: %#v", s)
}
t.Fatalf("err: %v", err)
})
// Begin the tear down
ctx.tr.Destroy(structs.NewTaskEvent(structs.TaskKilled))
destroyed := time.Now()
// Service should get removed quickly; loop until RemoveTask is called
found := false
deadline := destroyed.Add(task.ShutdownDelay)
for time.Now().Before(deadline) {
time.Sleep(5 * time.Millisecond)
ctx.consul.mu.Lock()
n := len(ctx.consul.ops)
if n < 2 {
ctx.consul.mu.Unlock()
continue
testutil.WaitForResult(func() (bool, error) {
services, _ := ctx.consul.Services()
if n := len(services); n == 1 {
return false, fmt.Errorf("expected 0 services found %d", n)
}
lastOp := ctx.consul.ops[n-1].op
ctx.consul.mu.Unlock()
if lastOp == "remove" {
found = true
break
}
}
if !found {
t.Errorf("task was not removed from Consul first")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Wait for actual exit
select {
@ -1893,3 +1909,125 @@ func TestTaskRunner_CheckWatcher_Restart(t *testing.T) {
}
}
}
// TestTaskRunner_DriverNetwork asserts that a driver's network is properly
// used in services and checks.
func TestTaskRunner_DriverNetwork(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"exit_code": 0,
"run_for": "100s",
"driver_ip": "10.1.2.3",
"driver_port_map": "http:80",
}
// Create services and checks with custom address modes to exercise
// address detection logic
task.Services = []*structs.Service{
{
Name: "host-service",
PortLabel: "http",
AddressMode: "host",
Checks: []*structs.ServiceCheck{
{
Name: "driver-check",
Type: "tcp",
PortLabel: "1234",
AddressMode: "driver",
},
},
},
{
Name: "driver-service",
PortLabel: "5678",
AddressMode: "driver",
Checks: []*structs.ServiceCheck{
{
Name: "host-check",
Type: "tcp",
PortLabel: "http",
},
{
Name: "driver-label-check",
Type: "tcp",
PortLabel: "http",
AddressMode: "driver",
},
},
},
}
ctx := testTaskRunnerFromAlloc(t, false, alloc)
ctx.tr.MarkReceived()
go ctx.tr.Run()
defer ctx.Cleanup()
// Wait for the task to start
testWaitForTaskToStart(t, ctx)
testutil.WaitForResult(func() (bool, error) {
services, _ := ctx.consul.Services()
if n := len(services); n != 2 {
return false, fmt.Errorf("expected 2 services, but found %d", n)
}
for _, s := range services {
switch s.Service {
case "host-service":
if expected := "192.168.0.100"; s.Address != expected {
return false, fmt.Errorf("expected host-service to have IP=%s but found %s",
expected, s.Address)
}
case "driver-service":
if expected := "10.1.2.3"; s.Address != expected {
return false, fmt.Errorf("expected driver-service to have IP=%s but found %s",
expected, s.Address)
}
if expected := 5678; s.Port != expected {
return false, fmt.Errorf("expected driver-service to have port=%d but found %d",
expected, s.Port)
}
default:
return false, fmt.Errorf("unexpected service: %q", s.Service)
}
}
checks := ctx.consul.CheckRegs()
if n := len(checks); n != 3 {
return false, fmt.Errorf("expected 3 checks, but found %d", n)
}
for _, check := range checks {
switch check.Name {
case "driver-check":
if expected := "10.1.2.3:1234"; check.TCP != expected {
return false, fmt.Errorf("expected driver-check to have address %q but found %q", expected, check.TCP)
}
case "driver-label-check":
if expected := "10.1.2.3:80"; check.TCP != expected {
return false, fmt.Errorf("expected driver-label-check to have address %q but found %q", expected, check.TCP)
}
case "host-check":
if expected := "192.168.0.100:"; !strings.HasPrefix(check.TCP, expected) {
return false, fmt.Errorf("expected host-check to have address start with %q but found %q", expected, check.TCP)
}
default:
return false, fmt.Errorf("unexpected check: %q", check.Name)
}
}
return true, nil
}, func(err error) {
services, _ := ctx.consul.Services()
for _, s := range services {
t.Logf(pretty.Sprint("Serivce: ", s))
}
for _, c := range ctx.consul.CheckRegs() {
t.Logf(pretty.Sprint("Check: ", c))
}
t.Fatalf("error: %v", err)
})
}

View File

@ -80,6 +80,7 @@ func (c *MockAgent) Services() (map[string]*api.AgentService, error) {
return r, nil
}
// Checks implements the Agent API Checks method.
func (c *MockAgent) Checks() (map[string]*api.AgentCheck, error) {
c.mu.Lock()
defer c.mu.Unlock()
@ -98,6 +99,19 @@ func (c *MockAgent) Checks() (map[string]*api.AgentCheck, error) {
return r, nil
}
// CheckRegs returns the raw AgentCheckRegistrations registered with this mock
// agent.
func (c *MockAgent) CheckRegs() []*api.AgentCheckRegistration {
c.mu.Lock()
defer c.mu.Unlock()
regs := make([]*api.AgentCheckRegistration, 0, len(c.checks))
for _, check := range c.checks {
regs = append(regs, check)
}
return regs
}
func (c *MockAgent) CheckRegister(check *api.AgentCheckRegistration) error {
c.mu.Lock()
defer c.mu.Unlock()

View File

@ -2,7 +2,10 @@ package consul
import (
"context"
"crypto/sha1"
"encoding/base32"
"fmt"
"io"
"log"
"net"
"net/url"
@ -21,10 +24,14 @@ import (
)
const (
// nomadServicePrefix is the first prefix that scopes all Nomad registered
// services
// nomadServicePrefix is the prefix that scopes all Nomad registered
// services (both agent and task entries).
nomadServicePrefix = "_nomad"
// nomadTaskPrefix is the prefix that scopes Nomad registered services
// for tasks.
nomadTaskPrefix = nomadServicePrefix + "-task-"
// defaultRetryInterval is how quickly to retry syncing services and
// checks to Consul when an error occurs. Will backoff up to a max.
defaultRetryInterval = time.Second
@ -288,8 +295,13 @@ func (c *ServiceClient) Run() {
if err := c.sync(); err != nil {
if failures == 0 {
// Log on the first failure
c.logger.Printf("[WARN] consul.sync: failed to update services in Consul: %v", err)
} else if failures%10 == 0 {
// Log every 10th consecutive failure
c.logger.Printf("[ERR] consul.sync: still unable to update services in Consul after %d failures; latest error: %v", failures, err)
}
failures++
if !retryTimer.Stop() {
// Timer already expired, since the timer may
@ -389,8 +401,14 @@ func (c *ServiceClient) sync() error {
// Not managed by Nomad, skip
continue
}
// Unknown Nomad managed service; kill
if err := c.client.ServiceDeregister(id); err != nil {
if isOldNomadService(id) {
// Don't hard-fail on old entries. See #3620
continue
}
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return err
}
@ -398,29 +416,16 @@ func (c *ServiceClient) sync() error {
metrics.IncrCounter([]string{"client", "consul", "service_deregistrations"}, 1)
}
// Track services whose ports have changed as their checks may also
// need updating
portsChanged := make(map[string]struct{}, len(c.services))
// Add Nomad services missing from Consul
for id, locals := range c.services {
if remotes, ok := consulServices[id]; ok {
// Make sure Port and Address are stable since
// PortLabel and AddressMode aren't included in the
// service ID.
if locals.Port == remotes.Port && locals.Address == remotes.Address {
// Already exists in Consul; skip
continue
if _, ok := consulServices[id]; !ok {
if err = c.client.ServiceRegister(locals); err != nil {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return err
}
// Port changed, reregister it and its checks
portsChanged[id] = struct{}{}
sreg++
metrics.IncrCounter([]string{"client", "consul", "service_registrations"}, 1)
}
if err = c.client.ServiceRegister(locals); err != nil {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return err
}
sreg++
metrics.IncrCounter([]string{"client", "consul", "service_registrations"}, 1)
}
// Remove Nomad checks in Consul but unknown locally
@ -433,8 +438,14 @@ func (c *ServiceClient) sync() error {
// Service not managed by Nomad, skip
continue
}
// Unknown Nomad managed check; kill
// Unknown Nomad managed check; remove
if err := c.client.CheckDeregister(id); err != nil {
if isOldNomadService(check.ServiceID) {
// Don't hard-fail on old entries.
continue
}
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return err
}
@ -444,12 +455,11 @@ func (c *ServiceClient) sync() error {
// Add Nomad checks missing from Consul
for id, check := range c.checks {
if check, ok := consulChecks[id]; ok {
if _, changed := portsChanged[check.ServiceID]; !changed {
// Already in Consul and ports didn't change; skipping
continue
}
if _, ok := consulChecks[id]; ok {
// Already in Consul; skipping
continue
}
if err := c.client.CheckRegister(check); err != nil {
metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1)
return err
@ -569,23 +579,16 @@ func (c *ServiceClient) serviceRegs(ops *operations, allocID string, service *st
checkIDs: make(map[string]struct{}, len(service.Checks)),
}
// Determine the address to advertise
// Service address modes default to auto
addrMode := service.AddressMode
if addrMode == structs.AddressModeAuto {
if net.Advertise() {
addrMode = structs.AddressModeDriver
} else {
// No driver network or shouldn't default to driver's network
addrMode = structs.AddressModeHost
}
if addrMode == "" {
addrMode = structs.AddressModeAuto
}
ip, port := task.Resources.Networks.Port(service.PortLabel)
if addrMode == structs.AddressModeDriver {
if net == nil {
return nil, fmt.Errorf("service %s cannot use driver's IP because driver didn't set one", service.Name)
}
ip = net.IP
port = net.PortMap[service.PortLabel]
// Determine the address to advertise based on the mode
ip, port, err := getAddress(addrMode, service.PortLabel, task.Resources.Networks, net)
if err != nil {
return nil, fmt.Errorf("unable to get address for service %q: %v", service.Name, err)
}
// Build the Consul Service registration request
@ -639,15 +642,33 @@ func (c *ServiceClient) checkRegs(ops *operations, allocID, serviceID string, se
ops.scripts = append(ops.scripts, newScriptCheck(
allocID, task.Name, checkID, check, exec, c.client, c.logger, c.shutdownCh))
// Skip getAddress for script checks
checkReg, err := createCheckReg(serviceID, checkID, check, "", 0)
if err != nil {
return nil, fmt.Errorf("failed to add script check %q: %v", check.Name, err)
}
ops.regChecks = append(ops.regChecks, checkReg)
continue
}
// Checks should always use the host ip:port
// Default to the service's port but allow check to override
portLabel := check.PortLabel
if portLabel == "" {
// Default to the service's port label
portLabel = service.PortLabel
}
ip, port := task.Resources.Networks.Port(portLabel)
// Checks address mode defaults to host for pre-#3380 backward compat
addrMode := check.AddressMode
if addrMode == "" {
addrMode = structs.AddressModeHost
}
ip, port, err := getAddress(addrMode, portLabel, task.Resources.Networks, net)
if err != nil {
return nil, fmt.Errorf("unable to get address for check %q: %v", check.Name, err)
}
checkReg, err := createCheckReg(serviceID, checkID, check, ip, port)
if err != nil {
return nil, fmt.Errorf("failed to add check %q: %v", check.Name, err)
@ -709,8 +730,8 @@ func (c *ServiceClient) RegisterTask(allocID string, task *structs.Task, restart
func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Task, restarter TaskRestarter, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error {
ops := &operations{}
t := new(TaskRegistration)
t.Services = make(map[string]*ServiceRegistration, len(newTask.Services))
taskReg := new(TaskRegistration)
taskReg.Services = make(map[string]*ServiceRegistration, len(newTask.Services))
existingIDs := make(map[string]*structs.Service, len(existing.Services))
for _, s := range existing.Services {
@ -740,22 +761,17 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta
continue
}
// Service exists and hasn't changed, don't re-add it later
delete(newIDs, existingID)
// Service still exists so add it to the task's registration
sreg := &ServiceRegistration{
serviceID: existingID,
checkIDs: make(map[string]struct{}, len(newSvc.Checks)),
}
t.Services[existingID] = sreg
taskReg.Services[existingID] = sreg
// PortLabel and AddressMode aren't included in the ID, so we
// have to compare manually.
serviceUnchanged := newSvc.PortLabel == existingSvc.PortLabel && newSvc.AddressMode == existingSvc.AddressMode
if serviceUnchanged {
// Service exists and hasn't changed, don't add it later
delete(newIDs, existingID)
}
// Check to see what checks were updated
// See if any checks were updated
existingChecks := make(map[string]*structs.ServiceCheck, len(existingSvc.Checks))
for _, check := range existingSvc.Checks {
existingChecks[makeCheckID(existingID, check)] = check
@ -768,17 +784,16 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta
// Check exists, so don't remove it
delete(existingChecks, checkID)
sreg.checkIDs[checkID] = struct{}{}
} else if serviceUnchanged {
// New check on an unchanged service; add them now
newCheckIDs, err := c.checkRegs(ops, allocID, existingID, newSvc, newTask, exec, net)
if err != nil {
return err
}
}
for _, checkID := range newCheckIDs {
sreg.checkIDs[checkID] = struct{}{}
// New check on an unchanged service; add them now
newCheckIDs, err := c.checkRegs(ops, allocID, existingID, newSvc, newTask, exec, net)
if err != nil {
return err
}
}
for _, checkID := range newCheckIDs {
sreg.checkIDs[checkID] = struct{}{}
}
@ -806,11 +821,11 @@ func (c *ServiceClient) UpdateTask(allocID string, existing, newTask *structs.Ta
return err
}
t.Services[sreg.serviceID] = sreg
taskReg.Services[sreg.serviceID] = sreg
}
// Add the task to the allocation's registration
c.addTaskRegistration(allocID, newTask.Name, t)
c.addTaskRegistration(allocID, newTask.Name, taskReg)
c.commit(ops)
@ -988,36 +1003,27 @@ func (c *ServiceClient) removeTaskRegistration(allocID, taskName string) {
//
// Agent service IDs are of the form:
//
// {nomadServicePrefix}-{ROLE}-{Service.Name}-{Service.Tags...}
// Example Server ID: _nomad-server-nomad-serf
// Example Client ID: _nomad-client-nomad-client-http
// {nomadServicePrefix}-{ROLE}-b32(sha1({Service.Name}-{Service.Tags...})
// Example Server ID: _nomad-server-FBBK265QN4TMT25ND4EP42TJVMYJ3HR4
// Example Client ID: _nomad-client-GGNJPGL7YN7RGMVXZILMPVRZZVRSZC7L
//
func makeAgentServiceID(role string, service *structs.Service) string {
parts := make([]string, len(service.Tags)+3)
parts[0] = nomadServicePrefix
parts[1] = role
parts[2] = service.Name
copy(parts[3:], service.Tags)
return strings.Join(parts, "-")
h := sha1.New()
io.WriteString(h, service.Name)
for _, tag := range service.Tags {
io.WriteString(h, tag)
}
b32 := base32.StdEncoding.EncodeToString(h.Sum(nil))
return fmt.Sprintf("%s-%s-%s", nomadServicePrefix, role, b32)
}
// makeTaskServiceID creates a unique ID for identifying a task service in
// Consul.
//
// Task service IDs are of the form:
//
// {nomadServicePrefix}-executor-{ALLOC_ID}-{Service.Name}-{Service.Tags...}
// Example Service ID: _nomad-executor-1234-echo-http-tag1-tag2-tag3
// Consul. All structs.Service fields are included in the ID's hash except
// Checks. This allows updates to merely compare IDs.
//
// Example Service ID: _nomad-task-TNM333JKJPM5AK4FAS3VXQLXFDWOF4VH
func makeTaskServiceID(allocID, taskName string, service *structs.Service) string {
parts := make([]string, len(service.Tags)+5)
parts[0] = nomadServicePrefix
parts[1] = "executor"
parts[2] = allocID
parts[3] = taskName
parts[4] = service.Name
copy(parts[5:], service.Tags)
return strings.Join(parts, "-")
return nomadTaskPrefix + service.Hash(allocID, taskName)
}
// makeCheckID creates a unique ID for a check.
@ -1073,9 +1079,68 @@ func createCheckReg(serviceID, checkID string, check *structs.ServiceCheck, host
}
// isNomadService returns true if the ID matches the pattern of a Nomad managed
// service. Agent services return false as independent client and server agents
// may be running on the same machine. #2827
// service (new or old formats). Agent services return false as independent
// client and server agents may be running on the same machine. #2827
func isNomadService(id string) bool {
return strings.HasPrefix(id, nomadTaskPrefix) || isOldNomadService(id)
}
// isOldNomadService returns true if the ID matches an old pattern managed by
// Nomad.
//
// Pre-0.7.1 task service IDs are of the form:
//
// {nomadServicePrefix}-executor-{ALLOC_ID}-{Service.Name}-{Service.Tags...}
// Example Service ID: _nomad-executor-1234-echo-http-tag1-tag2-tag3
//
func isOldNomadService(id string) bool {
const prefix = nomadServicePrefix + "-executor"
return strings.HasPrefix(id, prefix)
}
// getAddress returns the ip and port to use for a service or check. An error
// is returned if an ip and port cannot be determined.
func getAddress(addrMode, portLabel string, networks structs.Networks, driverNet *cstructs.DriverNetwork) (string, int, error) {
switch addrMode {
case structs.AddressModeAuto:
if driverNet.Advertise() {
addrMode = structs.AddressModeDriver
} else {
addrMode = structs.AddressModeHost
}
return getAddress(addrMode, portLabel, networks, driverNet)
case structs.AddressModeHost:
// Default path: use host ip:port
ip, port := networks.Port(portLabel)
if ip == "" && port <= 0 {
return "", 0, fmt.Errorf("invalid port %q: port label not found", portLabel)
}
return ip, port, nil
case structs.AddressModeDriver:
// Require a driver network if driver address mode is used
if driverNet == nil {
return "", 0, fmt.Errorf(`cannot use address_mode="driver": no driver network exists`)
}
// If the port is a label, use the driver's port (not the host's)
if port, ok := driverNet.PortMap[portLabel]; ok {
return driverNet.IP, port, nil
}
// If port isn't a label, try to parse it as a literal port number
port, err := strconv.Atoi(portLabel)
if err != nil {
return "", 0, fmt.Errorf("invalid port %q: %v", portLabel, err)
}
if port <= 0 {
return "", 0, fmt.Errorf("invalid port: %q: port 0 is invalid", portLabel)
}
return driverNet.IP, port, nil
default:
// Shouldn't happen due to validation, but enforce invariants
return "", 0, fmt.Errorf("invalid address mode %q", addrMode)
}
}

View File

@ -116,7 +116,12 @@ func TestConsul_Integration(t *testing.T) {
{
Name: "httpd2",
PortLabel: "http",
Tags: []string{"test", "http2"},
Tags: []string{
"test",
// Use URL-unfriendly tags to test #3620
"public-test.ettaviation.com:80/ redirect=302,https://test.ettaviation.com",
"public-test.ettaviation.com:443/",
},
},
}

View File

@ -16,6 +16,7 @@ import (
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
)
const (
@ -219,8 +220,8 @@ func TestConsul_ChangeTags(t *testing.T) {
}
// TestConsul_ChangePorts asserts that changing the ports on a service updates
// it in Consul. Since ports are not part of the service ID this is a slightly
// different code path than changing tags.
// it in Consul. Pre-0.7.1 ports were not part of the service ID and this was a
// slightly different code path than changing tags.
func TestConsul_ChangePorts(t *testing.T) {
ctx := setupFake()
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
@ -348,8 +349,8 @@ func TestConsul_ChangePorts(t *testing.T) {
}
for k, v := range ctx.FakeConsul.services {
if k != origServiceKey {
t.Errorf("unexpected key change; was: %q -- but found %q", origServiceKey, k)
if k == origServiceKey {
t.Errorf("expected key change; still: %q", k)
}
if v.Name != ctx.Task.Services[0].Name {
t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name)
@ -369,15 +370,15 @@ func TestConsul_ChangePorts(t *testing.T) {
for k, v := range ctx.FakeConsul.checks {
switch v.Name {
case "c1":
if k != origTCPKey {
t.Errorf("unexpected key change for %s from %q to %q", v.Name, origTCPKey, k)
if k == origTCPKey {
t.Errorf("expected key change for %s from %q", v.Name, origTCPKey)
}
if expected := fmt.Sprintf(":%d", xPort); v.TCP != expected {
t.Errorf("expected Port x=%v but found: %v", expected, v.TCP)
}
case "c2":
if k != origScriptKey {
t.Errorf("unexpected key change for %s from %q to %q", v.Name, origScriptKey, k)
if k == origScriptKey {
t.Errorf("expected key change for %s from %q", v.Name, origScriptKey)
}
select {
case <-ctx.execs:
@ -1382,9 +1383,16 @@ func TestIsNomadService(t *testing.T) {
}{
{"_nomad-client-nomad-client-http", false},
{"_nomad-server-nomad-serf", false},
// Pre-0.7.1 style IDs still match
{"_nomad-executor-abc", true},
{"_nomad-executor", true},
// Post-0.7.1 style IDs match
{"_nomad-task-FBBK265QN4TMT25ND4EP42TJVMYJ3HR4", true},
{"not-nomad", false},
{"_nomad", false},
}
for _, test := range tests {
@ -1440,3 +1448,160 @@ func TestCreateCheckReg(t *testing.T) {
t.Fatalf("diff:\n%s\n", strings.Join(diff, "\n"))
}
}
// TestGetAddress asserts Nomad uses the correct ip and port for services and
// checks depending on port labels, driver networks, and address mode.
func TestGetAddress(t *testing.T) {
const HostIP = "127.0.0.1"
cases := []struct {
Name string
// Parameters
Mode string
PortLabel string
Host map[string]int // will be converted to structs.Networks
Driver *cstructs.DriverNetwork
// Results
IP string
Port int
ErrContains string
}{
{
Name: "ExampleService",
Mode: structs.AddressModeAuto,
PortLabel: "db",
Host: map[string]int{"db": 12435},
Driver: &cstructs.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
},
IP: HostIP,
Port: 12435,
},
{
Name: "Host",
Mode: structs.AddressModeHost,
PortLabel: "db",
Host: map[string]int{"db": 12345},
Driver: &cstructs.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
},
IP: HostIP,
Port: 12345,
},
{
Name: "Driver",
Mode: structs.AddressModeDriver,
PortLabel: "db",
Host: map[string]int{"db": 12345},
Driver: &cstructs.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
},
IP: "10.1.2.3",
Port: 6379,
},
{
Name: "AutoDriver",
Mode: structs.AddressModeAuto,
PortLabel: "db",
Host: map[string]int{"db": 12345},
Driver: &cstructs.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
AutoAdvertise: true,
},
IP: "10.1.2.3",
Port: 6379,
},
{
Name: "DriverCustomPort",
Mode: structs.AddressModeDriver,
PortLabel: "7890",
Host: map[string]int{"db": 12345},
Driver: &cstructs.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
},
IP: "10.1.2.3",
Port: 7890,
},
{
Name: "DriverWithoutNetwork",
Mode: structs.AddressModeDriver,
PortLabel: "db",
Host: map[string]int{"db": 12345},
Driver: nil,
ErrContains: "no driver network exists",
},
{
Name: "DriverBadPort",
Mode: structs.AddressModeDriver,
PortLabel: "bad-port-label",
Host: map[string]int{"db": 12345},
Driver: &cstructs.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
},
ErrContains: "invalid port",
},
{
Name: "DriverZeroPort",
Mode: structs.AddressModeDriver,
PortLabel: "0",
Driver: &cstructs.DriverNetwork{
IP: "10.1.2.3",
},
ErrContains: "invalid port",
},
{
Name: "HostBadPort",
Mode: structs.AddressModeHost,
PortLabel: "bad-port-label",
ErrContains: "invalid port",
},
{
Name: "InvalidMode",
Mode: "invalid-mode",
ErrContains: "invalid address mode",
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
// convert host port map into a structs.Networks
networks := []*structs.NetworkResource{
{
IP: HostIP,
ReservedPorts: make([]structs.Port, len(tc.Host)),
},
}
i := 0
for label, port := range tc.Host {
networks[0].ReservedPorts[i].Label = label
networks[0].ReservedPorts[i].Value = port
i++
}
// Run getAddress
ip, port, err := getAddress(tc.Mode, tc.PortLabel, networks, tc.Driver)
// Assert the results
assert.Equal(t, tc.IP, ip, "IP mismatch")
assert.Equal(t, tc.Port, port, "Port mismatch")
if tc.ErrContains == "" {
assert.Nil(t, err)
} else {
if err == nil {
t.Fatalf("expected error containing %q but err=nil", tc.ErrContains)
} else {
assert.Contains(t, err.Error(), tc.ErrContains)
}
}
})
}
}

View File

@ -710,6 +710,7 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) {
Path: check.Path,
Protocol: check.Protocol,
PortLabel: check.PortLabel,
AddressMode: check.AddressMode,
Interval: check.Interval,
Timeout: check.Timeout,
InitialStatus: check.InitialStatus,

View File

@ -1222,6 +1222,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Path: "/check",
Protocol: "http",
PortLabel: "foo",
AddressMode: "driver",
Interval: 4 * time.Second,
Timeout: 2 * time.Second,
InitialStatus: "ok",
@ -1418,6 +1419,7 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Path: "/check",
Protocol: "http",
PortLabel: "foo",
AddressMode: "driver",
Interval: 4 * time.Second,
Timeout: 2 * time.Second,
InitialStatus: "ok",

View File

@ -981,6 +981,7 @@ func parseChecks(service *api.Service, checkObjs *ast.ObjectList) error {
"header",
"method",
"check_restart",
"address_mode",
}
if err := helper.CheckHCLKeys(co.Val, valid); err != nil {
return multierror.Prefix(err, "check ->")

View File

@ -583,6 +583,54 @@ func TestParse(t *testing.T) {
},
false,
},
{
"service-check-driver-address.hcl",
&api.Job{
ID: helper.StringToPtr("address_mode_driver"),
Name: helper.StringToPtr("address_mode_driver"),
Type: helper.StringToPtr("service"),
TaskGroups: []*api.TaskGroup{
{
Name: helper.StringToPtr("group"),
Tasks: []*api.Task{
{
Name: "task",
Services: []*api.Service{
{
Name: "http-service",
PortLabel: "http",
AddressMode: "auto",
Checks: []api.ServiceCheck{
{
Name: "http-check",
Type: "http",
Path: "/",
PortLabel: "http",
AddressMode: "driver",
},
},
},
{
Name: "random-service",
PortLabel: "9000",
AddressMode: "driver",
Checks: []api.ServiceCheck{
{
Name: "random-check",
Type: "tcp",
PortLabel: "9001",
AddressMode: "driver",
},
},
},
},
},
},
},
},
},
false,
},
}
for _, tc := range cases {

View File

@ -0,0 +1,38 @@
job "address_mode_driver" {
type = "service"
group "group" {
task "task" {
service {
name = "http-service"
port = "http"
address_mode = "auto"
check {
name = "http-check"
type = "http"
path = "/"
port = "http"
address_mode = "driver"
}
}
service {
name = "random-service"
port = "9000"
address_mode = "driver"
check {
name = "random-check"
type = "tcp"
port = "9001"
address_mode = "driver"
}
}
}
}
}

View File

@ -41,7 +41,7 @@ func Node() *structs.Node {
{
Device: "eth0",
IP: "192.168.0.100",
ReservedPorts: []structs.Port{{Label: "main", Value: 22}},
ReservedPorts: []structs.Port{{Label: "ssh", Value: 22}},
MBits: 1,
},
},
@ -128,8 +128,11 @@ func Job() *structs.Job {
MemoryMB: 256,
Networks: []*structs.NetworkResource{
{
MBits: 50,
DynamicPorts: []structs.Port{{Label: "http"}, {Label: "admin"}},
MBits: 50,
DynamicPorts: []structs.Port{
{Label: "http"},
{Label: "admin"},
},
},
},
},
@ -273,7 +276,7 @@ func Alloc() *structs.Allocation {
{
Device: "eth0",
IP: "192.168.0.100",
ReservedPorts: []structs.Port{{Label: "main", Value: 5000}},
ReservedPorts: []structs.Port{{Label: "admin", Value: 5000}},
MBits: 50,
DynamicPorts: []structs.Port{{Label: "http"}},
},
@ -287,7 +290,7 @@ func Alloc() *structs.Allocation {
{
Device: "eth0",
IP: "192.168.0.100",
ReservedPorts: []structs.Port{{Label: "main", Value: 5000}},
ReservedPorts: []structs.Port{{Label: "admin", Value: 5000}},
MBits: 50,
DynamicPorts: []structs.Port{{Label: "http"}},
},

View File

@ -3507,6 +3507,12 @@ func TestTaskDiff(t *testing.T) {
Type: DiffTypeEdited,
Name: "Check",
Fields: []*FieldDiff{
{
Type: DiffTypeNone,
Name: "AddressMode",
Old: "",
New: "",
},
{
Type: DiffTypeNone,
Name: "Command",

View File

@ -6,6 +6,7 @@ import (
"crypto/sha1"
"crypto/sha256"
"crypto/sha512"
"encoding/base32"
"encoding/hex"
"errors"
"fmt"
@ -44,6 +45,9 @@ var (
// validPolicyName is used to validate a policy name
validPolicyName = regexp.MustCompile("^[a-zA-Z0-9-]{1,128}$")
// b32 is a lowercase base32 encoding for use in URL friendly service hashes
b32 = base32.NewEncoding(strings.ToLower("abcdefghijklmnopqrstuvwxyz234567"))
)
type MessageType uint8
@ -2866,6 +2870,7 @@ type ServiceCheck struct {
Path string // path of the health check url for http type check
Protocol string // Protocol to use if check is http, defaults to http
PortLabel string // The port to use for tcp/http checks
AddressMode string // 'host' to use host ip:port or 'driver' to use driver's
Interval time.Duration // Interval of the check
Timeout time.Duration // Timeout of the response from the check before consul fails the check
InitialStatus string // Initial status of the check
@ -2911,6 +2916,7 @@ func (sc *ServiceCheck) Canonicalize(serviceName string) {
// validate a Service's ServiceCheck
func (sc *ServiceCheck) validate() error {
// Validate Type
switch strings.ToLower(sc.Type) {
case ServiceCheckTCP:
case ServiceCheckHTTP:
@ -2926,6 +2932,7 @@ func (sc *ServiceCheck) validate() error {
return fmt.Errorf(`invalid type (%+q), must be one of "http", "tcp", or "script" type`, sc.Type)
}
// Validate interval and timeout
if sc.Interval == 0 {
return fmt.Errorf("missing required value interval. Interval cannot be less than %v", minCheckInterval)
} else if sc.Interval < minCheckInterval {
@ -2938,9 +2945,9 @@ func (sc *ServiceCheck) validate() error {
return fmt.Errorf("timeout (%v) is lower than required minimum timeout %v", sc.Timeout, minCheckInterval)
}
// Validate InitialStatus
switch sc.InitialStatus {
case "":
// case api.HealthUnknown: TODO: Add when Consul releases 0.7.1
case api.HealthPassing:
case api.HealthWarning:
case api.HealthCritical:
@ -2949,6 +2956,16 @@ func (sc *ServiceCheck) validate() error {
}
// Validate AddressMode
switch sc.AddressMode {
case "", AddressModeHost, AddressModeDriver:
// Ok
case AddressModeAuto:
return fmt.Errorf("invalid address_mode %q - %s only valid for services", sc.AddressMode, AddressModeAuto)
default:
return fmt.Errorf("invalid address_mode %q", sc.AddressMode)
}
return sc.CheckRestart.Validate()
}
@ -3001,6 +3018,11 @@ func (sc *ServiceCheck) Hash(serviceID string) string {
io.WriteString(h, strings.Join(headers, ""))
}
// Only include AddressMode if set to maintain ID stability with Nomad <0.7.1
if len(sc.AddressMode) > 0 {
io.WriteString(h, sc.AddressMode)
}
return fmt.Sprintf("%x", h.Sum(nil))
}
@ -3122,15 +3144,24 @@ func (s *Service) ValidateName(name string) error {
return nil
}
// Hash calculates the hash of the check based on it's content and the service
// which owns it
func (s *Service) Hash() string {
// Hash returns a base32 encoded hash of a Service's contents excluding checks
// as they're hashed independently.
func (s *Service) Hash(allocID, taskName string) string {
h := sha1.New()
io.WriteString(h, allocID)
io.WriteString(h, taskName)
io.WriteString(h, s.Name)
io.WriteString(h, strings.Join(s.Tags, ""))
io.WriteString(h, s.PortLabel)
io.WriteString(h, s.AddressMode)
return fmt.Sprintf("%x", h.Sum(nil))
for _, tag := range s.Tags {
io.WriteString(h, tag)
}
// Base32 is used for encoding the hash as sha1 hashes can always be
// encoded without padding, only 4 bytes larger than base64, and saves
// 8 bytes vs hex. Since these hashes are used in Consul URLs it's nice
// to have a reasonably compact URL-safe representation.
return b32.EncodeToString(h.Sum(nil))
}
const (
@ -3439,7 +3470,13 @@ func validateServices(t *Task) error {
// Ensure that services don't ask for non-existent ports and their names are
// unique.
servicePorts := make(map[string][]string)
servicePorts := make(map[string]map[string]struct{})
addServicePort := func(label, service string) {
if _, ok := servicePorts[label]; !ok {
servicePorts[label] = map[string]struct{}{}
}
servicePorts[label][service] = struct{}{}
}
knownServices := make(map[string]struct{})
for i, service := range t.Services {
if err := service.Validate(); err != nil {
@ -3455,16 +3492,63 @@ func validateServices(t *Task) error {
knownServices[service.Name+service.PortLabel] = struct{}{}
if service.PortLabel != "" {
servicePorts[service.PortLabel] = append(servicePorts[service.PortLabel], service.Name)
if service.AddressMode == "driver" {
// Numeric port labels are valid for address_mode=driver
_, err := strconv.Atoi(service.PortLabel)
if err != nil {
// Not a numeric port label, add it to list to check
addServicePort(service.PortLabel, service.Name)
}
} else {
addServicePort(service.PortLabel, service.Name)
}
}
// Ensure that check names are unique.
// Ensure that check names are unique and have valid ports
knownChecks := make(map[string]struct{})
for _, check := range service.Checks {
if _, ok := knownChecks[check.Name]; ok {
mErr.Errors = append(mErr.Errors, fmt.Errorf("check %q is duplicate", check.Name))
}
knownChecks[check.Name] = struct{}{}
if !check.RequiresPort() {
// No need to continue validating check if it doesn't need a port
continue
}
effectivePort := check.PortLabel
if effectivePort == "" {
// Inherits from service
effectivePort = service.PortLabel
}
if effectivePort == "" {
mErr.Errors = append(mErr.Errors, fmt.Errorf("check %q is missing a port", check.Name))
continue
}
isNumeric := false
portNumber, err := strconv.Atoi(effectivePort)
if err == nil {
isNumeric = true
}
// Numeric ports are fine for address_mode = "driver"
if check.AddressMode == "driver" && isNumeric {
if portNumber <= 0 {
mErr.Errors = append(mErr.Errors, fmt.Errorf("check %q has invalid numeric port %d", check.Name, portNumber))
}
continue
}
if isNumeric {
mErr.Errors = append(mErr.Errors, fmt.Errorf(`check %q cannot use a numeric port %d without setting address_mode="driver"`, check.Name, portNumber))
continue
}
// PortLabel must exist, report errors by its parent service
addServicePort(effectivePort, service.Name)
}
}
@ -3483,7 +3567,14 @@ func validateServices(t *Task) error {
for servicePort, services := range servicePorts {
_, ok := portLabels[servicePort]
if !ok {
joined := strings.Join(services, ", ")
names := make([]string, 0, len(services))
for name := range services {
names = append(names, name)
}
// Keep order deterministic
sort.Strings(names)
joined := strings.Join(names, ", ")
err := fmt.Errorf("port label %q referenced by services %v does not exist", servicePort, joined)
mErr.Errors = append(mErr.Errors, err)
}

View File

@ -1180,6 +1180,102 @@ func TestTask_Validate_Service_Check(t *testing.T) {
}
}
// TestTask_Validate_Service_Check_AddressMode asserts that checks do not
// inherit address mode but do inherit ports.
func TestTask_Validate_Service_Check_AddressMode(t *testing.T) {
task := &Task{
Resources: &Resources{
Networks: []*NetworkResource{
{
DynamicPorts: []Port{
{
Label: "http",
Value: 9999,
},
},
},
},
},
Services: []*Service{
{
Name: "invalid-driver",
PortLabel: "80",
AddressMode: "host",
},
{
Name: "http-driver",
PortLabel: "80",
AddressMode: "driver",
Checks: []*ServiceCheck{
{
// Should fail
Name: "invalid-check-1",
Type: "tcp",
Interval: time.Second,
Timeout: time.Second,
},
{
// Should fail
Name: "invalid-check-2",
Type: "tcp",
PortLabel: "80",
Interval: time.Second,
Timeout: time.Second,
},
{
// Should fail
Name: "invalid-check-3",
Type: "tcp",
PortLabel: "missing-port-label",
Interval: time.Second,
Timeout: time.Second,
},
{
// Should pass
Name: "valid-script-check",
Type: "script",
Command: "ok",
Interval: time.Second,
Timeout: time.Second,
},
{
// Should pass
Name: "valid-host-check",
Type: "tcp",
PortLabel: "http",
Interval: time.Second,
Timeout: time.Second,
},
{
// Should pass
Name: "valid-driver-check",
Type: "tcp",
AddressMode: "driver",
Interval: time.Second,
Timeout: time.Second,
},
},
},
},
}
err := validateServices(task)
if err == nil {
t.Fatalf("expected errors but task validated successfully")
}
errs := err.(*multierror.Error).Errors
if expected := 4; len(errs) != expected {
for i, err := range errs {
t.Logf("errs[%d] -> %s", i, err)
}
t.Fatalf("expected %d errors but found %d", expected, len(errs))
}
assert.Contains(t, errs[0].Error(), `check "invalid-check-1" cannot use a numeric port`)
assert.Contains(t, errs[1].Error(), `check "invalid-check-2" cannot use a numeric port`)
assert.Contains(t, errs[2].Error(), `port label "80" referenced`)
assert.Contains(t, errs[3].Error(), `port label "missing-port-label" referenced`)
}
func TestTask_Validate_Service_Check_CheckRestart(t *testing.T) {
invalidCheckRestart := &CheckRestart{
Limit: -1,

View File

@ -1893,7 +1893,7 @@ func TestServiceSched_JobModify_InPlace(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
// Verify the network did not change
rp := structs.Port{Label: "main", Value: 5000}
rp := structs.Port{Label: "admin", Value: 5000}
for _, alloc := range out {
for _, resources := range alloc.TaskResources {
if resources.Networks[0].ReservedPorts[0] != rp {

View File

@ -777,7 +777,7 @@ func TestSystemSched_JobModify_InPlace(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
// Verify the network did not change
rp := structs.Port{Label: "main", Value: 5000}
rp := structs.Port{Label: "admin", Value: 5000}
for _, alloc := range out {
for _, resources := range alloc.TaskResources {
if resources.Networks[0].ReservedPorts[0] != rp {

View File

@ -98,20 +98,32 @@ does not automatically enable service discovery.
- `port` `(string: <required>)` - Specifies the label of the port on which this
service is running. Note this is the _label_ of the port and not the port
number. The port label must match one defined in the [`network`][network]
stanza.
number unless `address_mode = driver`. The port label must match one defined
in the [`network`][network] stanza unless you're also using
`address_mode="driver"`. Numeric ports may be used when in driver addressing
mode.
- `tags` `(array<string>: [])` - Specifies the list of tags to associate with
this service. If this is not supplied, no tags will be assigned to the service
when it is registered.
- `address_mode` `(string: "auto")` - Specifies what address (host or
driver-specific) this service should advertise. `host` indicates the host IP
and port. `driver` advertises the IP used in the driver (e.g. Docker's internal
IP) and uses the ports specified in the port map. The default is `auto` which
behaves the same as `host` unless the driver determines its IP should be used.
This setting supported Docker since Nomad 0.6 and rkt since Nomad 0.7. It
will advertise the container IP if a network plugin is used (e.g. weave).
driver-specific) this service should advertise. This setting is supported in
Docker since Nomad 0.6 and rkt since Nomad 0.7. See [below for
examples.](#using-driver-address-mode) Valid options are:
- `auto` - Allows the driver to determine whether the host or driver address
should be used. Defaults to `host` and only implemented by Docker. If you
use a Docker network plugin such as weave, Docker will automatically use
its address.
- `driver` - Use the IP specified by the driver, and the port specified in a
port map. A numeric port may be specified since port maps aren't required
by all network plugins. Useful for advertising SDN and overlay network
addresses. Task will fail if driver network cannot be determined. Only
implemented for Docker and rkt.
- `host` - Use the host IP and port.
### `check` Parameters
@ -120,6 +132,13 @@ the script will run inside the Docker container. If your task is running in a
chroot, it will run in the chroot. Please keep this in mind when authoring check
scripts.
- `address_mode` `(string: "host")` - Same as `address_mode` on `service`.
Unlike services, checks do not have an `auto` address mode as there's no way
for Nomad to know which is the best address to use for checks. Consul needs
access to the address for any HTTP or TCP checks. Added in Nomad 0.7.1. See
[below for details.](#using-driver-address-mode) Unlike `port`, this setting
is *not* inherited from the `service`.
- `args` `(array<string>: [])` - Specifies additional arguments to the
`command`. This only applies to script-based health checks.
@ -157,11 +176,13 @@ scripts.
- `port` `(string: <required>)` - Specifies the label of the port on which the
check will be performed. Note this is the _label_ of the port and not the port
number. The port label must match one defined in the [`network`][network]
stanza. If a port value was declared on the `service`, this will inherit from
that value if not supplied. If supplied, this value takes precedence over the
`service.port` value. This is useful for services which operate on multiple
ports. Checks will *always use the host IP and ports*.
number unless `address_mode = driver`. The port label must match one defined
in the [`network`][network] stanza. If a port value was declared on the
`service`, this will inherit from that value if not supplied. If supplied,
this value takes precedence over the `service.port` value. This is useful for
services which operate on multiple ports. Checks will use the host IP and
ports by default. In Nomad 0.7.1 or later numeric ports may be used if
`address_mode="driver"` is set on the check.
- `protocol` `(string: "http")` - Specifies the protocol for the http-based
health checks. Valid options are `http` and `https`.
@ -324,6 +345,123 @@ service {
}
```
### Using Driver Address Mode
The [Docker](/docs/drivers/docker.html#network_mode) and
[rkt](/docs/drivers/rkt.html#net) drivers support the `driver` setting for the
`address_mode` parameter in both `service` and `check` stanzas. The driver
address mode allows advertising and health checking the IP and port assigned to
a task by the driver. This way if you're using a network plugin like Weave with
Docker, you can advertise the Weave address in Consul instead of the host's
address.
For example if you were running the example Redis job in an environment with
Weave but Consul was running on the host you could use the following
configuration:
```hcl
job "example" {
datacenters = ["dc1"]
group "cache" {
task "redis" {
driver = "docker"
config {
image = "redis:3.2"
network_mode = "weave"
port_map {
db = 6379
}
}
resources {
cpu = 500 # 500 MHz
memory = 256 # 256MB
network {
mbits = 10
port "db" {}
}
}
service {
name = "weave-redis"
port = "db"
check {
name = "host-redis-check"
type = "tcp"
interval = "10s"
timeout = "2s"
}
}
}
}
}
```
No explicit `address_mode` required!
Services default to the `auto` address mode. When a Docker network mode other
than "host" or "bridge" is used, services will automatically advertise the
driver's address (in this case Weave's). The service will advertise the
container's port: 6379.
However since Consul is often run on the host without access to the Weave
network, `check` stanzas default to `host` address mode. The TCP check will run
against the host's IP and the dynamic host port assigned by Nomad.
Note that the `check` still inherits the `service` stanza's `db` port label,
but each will resolve the port label according to their address mode.
If Consul has access to the Weave network the job could be configured like
this:
```hcl
job "example" {
datacenters = ["dc1"]
group "cache" {
task "redis" {
driver = "docker"
config {
image = "redis:3.2"
network_mode = "weave"
# No port map required!
}
resources {
cpu = 500 # 500 MHz
memory = 256 # 256MB
network {
mbits = 10
}
}
service {
name = "weave-redis"
port = 6379
address_mode = "driver"
check {
name = "host-redis-check"
type = "tcp"
interval = "10s"
timeout = "2s"
port = 6379
address_mode = "driver"
}
}
}
}
}
```
In this case Nomad doesn't need to assign Redis any host ports. The `service`
and `check` stanzas can both specify the port number to advertise and check
directly since Nomad isn't managing any port assignments.
- - -
<sup><small>1</small></sup><small> Script checks are not supported for the