open-nomad/command/agent/consul/unit_test.go
Mahmood Ali e07413c420 Avoid de-registering slowly restored services
When a nomad client restarts/upgraded, nomad restores state from running
task and starts the sync loop.  If sync loop runs early, it may
deregister services from Consul prematurely even when Consul has the
running service as healthy.

This is not ideal, as re-registering the service means potentially
waiting a whole service health check interval before declaring the
service healthy.

We attempt to mitigate this by introducing an initialization probation
period.  During this time, we only deregister services and checks that
were explicitly deregistered, and leave unrecognized ones alone.  This
serves as a grace period for restoring to complete, or for operators to
restore should they recognize they restored with the wrong nomad data
directory.
2019-06-14 11:15:21 -04:00

1920 lines
54 KiB
Go

package consul
import (
"context"
"fmt"
"reflect"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const (
// Ports used in testTask
xPort = 1234
yPort = 1235
)
func testTask() *TaskServices {
return &TaskServices{
AllocID: uuid.Generate(),
Name: "taskname",
Restarter: &restartRecorder{},
Services: []*structs.Service{
{
Name: "taskname-service",
PortLabel: "x",
Tags: []string{"tag1", "tag2"},
},
},
Networks: []*structs.NetworkResource{
{
DynamicPorts: []structs.Port{
{Label: "x", Value: xPort},
{Label: "y", Value: yPort},
},
},
},
DriverExec: newMockExec(),
}
}
// mockExec implements the ScriptExecutor interface and will use an alternate
// implementation t.ExecFunc if non-nil.
type mockExec struct {
// Ticked whenever a script is called
execs chan int
// If non-nil will be called by script checks
ExecFunc func(ctx context.Context, cmd string, args []string) ([]byte, int, error)
}
func newMockExec() *mockExec {
return &mockExec{
execs: make(chan int, 100),
}
}
func (m *mockExec) Exec(dur time.Duration, cmd string, args []string) ([]byte, int, error) {
select {
case m.execs <- 1:
default:
}
if m.ExecFunc == nil {
// Default impl is just "ok"
return []byte("ok"), 0, nil
}
ctx, cancel := context.WithTimeout(context.Background(), dur)
defer cancel()
return m.ExecFunc(ctx, cmd, args)
}
// restartRecorder is a minimal TaskRestarter implementation that simply
// counts how many restarts were triggered.
type restartRecorder struct {
restarts int64
}
func (r *restartRecorder) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
atomic.AddInt64(&r.restarts, 1)
return nil
}
// testFakeCtx contains a fake Consul AgentAPI
type testFakeCtx struct {
ServiceClient *ServiceClient
FakeConsul *MockAgent
Task *TaskServices
MockExec *mockExec
}
var errNoOps = fmt.Errorf("testing error: no pending operations")
// syncOps simulates one iteration of the ServiceClient.Run loop and returns
// any errors returned by sync() or errNoOps if no pending operations.
func (t *testFakeCtx) syncOnce() error {
select {
case ops := <-t.ServiceClient.opCh:
t.ServiceClient.merge(ops)
err := t.ServiceClient.sync()
if err == nil {
t.ServiceClient.clearExplicitlyDeregistered()
}
return err
default:
return errNoOps
}
}
// setupFake creates a testFakeCtx with a ServiceClient backed by a fakeConsul.
// A test Task is also provided.
func setupFake(t *testing.T) *testFakeCtx {
fc := NewMockAgent()
tt := testTask()
// by default start fake client being out of probation
sc := NewServiceClient(fc, testlog.HCLogger(t), true)
sc.deregisterProbationExpiry = time.Now().Add(-1 * time.Minute)
return &testFakeCtx{
ServiceClient: sc,
FakeConsul: fc,
Task: tt,
MockExec: tt.DriverExec.(*mockExec),
}
}
func TestConsul_ChangeTags(t *testing.T) {
ctx := setupFake(t)
require := require.New(t)
require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task))
require.NoError(ctx.syncOnce())
require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
// Validate the alloc registration
reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Task.AllocID)
require.NoError(err)
require.NotNil(reg1, "Unexpected nil alloc registration")
require.Equal(1, reg1.NumServices())
require.Equal(0, reg1.NumChecks())
for _, v := range ctx.FakeConsul.services {
require.Equal(v.Name, ctx.Task.Services[0].Name)
require.Equal(v.Tags, ctx.Task.Services[0].Tags)
}
// Update the task definition
origTask := ctx.Task.Copy()
ctx.Task.Services[0].Tags[0] = "newtag"
// Register and sync the update
require.NoError(ctx.ServiceClient.UpdateTask(origTask, ctx.Task))
require.NoError(ctx.syncOnce())
require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
// Validate the metadata changed
for _, v := range ctx.FakeConsul.services {
require.Equal(v.Name, ctx.Task.Services[0].Name)
require.Equal(v.Tags, ctx.Task.Services[0].Tags)
require.Equal("newtag", v.Tags[0])
}
}
// TestConsul_ChangePorts asserts that changing the ports on a service updates
// it in Consul. Pre-0.7.1 ports were not part of the service ID and this was a
// slightly different code path than changing tags.
func TestConsul_ChangePorts(t *testing.T) {
ctx := setupFake(t)
require := require.New(t)
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "c1",
Type: "tcp",
Interval: time.Second,
Timeout: time.Second,
PortLabel: "x",
},
{
Name: "c2",
Type: "script",
Interval: 9000 * time.Hour,
Timeout: time.Second,
},
{
Name: "c3",
Type: "http",
Protocol: "http",
Path: "/",
Interval: time.Second,
Timeout: time.Second,
PortLabel: "y",
},
}
require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task))
require.NoError(ctx.syncOnce())
require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
for _, v := range ctx.FakeConsul.services {
require.Equal(ctx.Task.Services[0].Name, v.Name)
require.Equal(ctx.Task.Services[0].Tags, v.Tags)
require.Equal(xPort, v.Port)
}
require.Len(ctx.FakeConsul.checks, 3)
origTCPKey := ""
origScriptKey := ""
origHTTPKey := ""
for k, v := range ctx.FakeConsul.checks {
switch v.Name {
case "c1":
origTCPKey = k
require.Equal(fmt.Sprintf(":%d", xPort), v.TCP)
case "c2":
origScriptKey = k
select {
case <-ctx.MockExec.execs:
// Here we validate there is nothing left on the channel
require.Equal(0, len(ctx.MockExec.execs))
case <-time.After(3 * time.Second):
t.Fatalf("script not called in time")
}
case "c3":
origHTTPKey = k
require.Equal(fmt.Sprintf("http://:%d/", yPort), v.HTTP)
default:
t.Fatalf("unexpected check: %q", v.Name)
}
}
require.NotEmpty(origTCPKey)
require.NotEmpty(origScriptKey)
require.NotEmpty(origHTTPKey)
// Now update the PortLabel on the Service and Check c3
origTask := ctx.Task.Copy()
ctx.Task.Services[0].PortLabel = "y"
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "c1",
Type: "tcp",
Interval: time.Second,
Timeout: time.Second,
PortLabel: "x",
},
{
Name: "c2",
Type: "script",
Interval: 9000 * time.Hour,
Timeout: time.Second,
},
{
Name: "c3",
Type: "http",
Protocol: "http",
Path: "/",
Interval: time.Second,
Timeout: time.Second,
// Removed PortLabel; should default to service's (y)
},
}
require.NoError(ctx.ServiceClient.UpdateTask(origTask, ctx.Task))
require.NoError(ctx.syncOnce())
require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
for _, v := range ctx.FakeConsul.services {
require.Equal(ctx.Task.Services[0].Name, v.Name)
require.Equal(ctx.Task.Services[0].Tags, v.Tags)
require.Equal(yPort, v.Port)
}
require.Equal(3, len(ctx.FakeConsul.checks))
for k, v := range ctx.FakeConsul.checks {
switch v.Name {
case "c1":
// C1 is changed because the service was re-registered
require.NotEqual(origTCPKey, k)
require.Equal(fmt.Sprintf(":%d", xPort), v.TCP)
case "c2":
// C2 is changed because the service was re-registered
require.NotEqual(origScriptKey, k)
case "c3":
require.NotEqual(origHTTPKey, k)
require.Equal(fmt.Sprintf("http://:%d/", yPort), v.HTTP)
default:
t.Errorf("Unknown check: %q", k)
}
}
}
// TestConsul_ChangeChecks asserts that updating only the checks on a service
// properly syncs with Consul.
func TestConsul_ChangeChecks(t *testing.T) {
ctx := setupFake(t)
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "c1",
Type: "tcp",
Interval: time.Second,
Timeout: time.Second,
PortLabel: "x",
CheckRestart: &structs.CheckRestart{
Limit: 3,
},
},
}
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
if err := ctx.syncOnce(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services)
}
// Assert a check restart watch update was enqueued and clear it
if n := len(ctx.ServiceClient.checkWatcher.checkUpdateCh); n != 1 {
t.Fatalf("expected 1 check restart update but found %d", n)
}
upd := <-ctx.ServiceClient.checkWatcher.checkUpdateCh
c1ID := upd.checkID
// Query the allocs registrations and then again when we update. The IDs
// should change
reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Task.AllocID)
if err != nil {
t.Fatalf("Looking up alloc registration failed: %v", err)
}
if reg1 == nil {
t.Fatalf("Nil alloc registrations: %v", err)
}
if num := reg1.NumServices(); num != 1 {
t.Fatalf("Wrong number of services: got %d; want 1", num)
}
if num := reg1.NumChecks(); num != 1 {
t.Fatalf("Wrong number of checks: got %d; want 1", num)
}
origServiceKey := ""
for k, v := range ctx.FakeConsul.services {
origServiceKey = k
if v.Name != ctx.Task.Services[0].Name {
t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name)
}
if v.Port != xPort {
t.Errorf("expected Port x=%v but found: %v", xPort, v.Port)
}
}
if n := len(ctx.FakeConsul.checks); n != 1 {
t.Fatalf("expected 1 check but found %d:\n%#v", n, ctx.FakeConsul.checks)
}
for _, v := range ctx.FakeConsul.checks {
if v.Name != "c1" {
t.Fatalf("expected check c1 but found %q", v.Name)
}
}
// Now add a check and modify the original
origTask := ctx.Task.Copy()
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "c1",
Type: "tcp",
Interval: 2 * time.Second,
Timeout: time.Second,
PortLabel: "x",
CheckRestart: &structs.CheckRestart{
Limit: 3,
},
},
{
Name: "c2",
Type: "http",
Path: "/",
Interval: time.Second,
Timeout: time.Second,
PortLabel: "x",
},
}
if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
// Assert 2 check restart watch updates was enqueued
if n := len(ctx.ServiceClient.checkWatcher.checkUpdateCh); n != 2 {
t.Fatalf("expected 2 check restart updates but found %d", n)
}
// First the new watch
upd = <-ctx.ServiceClient.checkWatcher.checkUpdateCh
if upd.checkID == c1ID || upd.remove {
t.Fatalf("expected check watch update to be an add of checkID=%q but found remove=%t checkID=%q",
c1ID, upd.remove, upd.checkID)
}
// Then remove the old watch
upd = <-ctx.ServiceClient.checkWatcher.checkUpdateCh
if upd.checkID != c1ID || !upd.remove {
t.Fatalf("expected check watch update to be a removal of checkID=%q but found remove=%t checkID=%q",
c1ID, upd.remove, upd.checkID)
}
if err := ctx.syncOnce(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services)
}
if _, ok := ctx.FakeConsul.services[origServiceKey]; !ok {
t.Errorf("unexpected key change; was: %q -- but found %#v", origServiceKey, ctx.FakeConsul.services)
}
if n := len(ctx.FakeConsul.checks); n != 2 {
t.Fatalf("expected 2 check but found %d:\n%#v", n, ctx.FakeConsul.checks)
}
for k, v := range ctx.FakeConsul.checks {
switch v.Name {
case "c1":
if expected := fmt.Sprintf(":%d", xPort); v.TCP != expected {
t.Errorf("expected Port x=%v but found: %v", expected, v.TCP)
}
// update id
c1ID = k
case "c2":
if expected := fmt.Sprintf("http://:%d/", xPort); v.HTTP != expected {
t.Errorf("expected Port x=%v but found: %v", expected, v.HTTP)
}
default:
t.Errorf("Unknown check: %q", k)
}
}
// Check again and ensure the IDs changed
reg2, err := ctx.ServiceClient.AllocRegistrations(ctx.Task.AllocID)
if err != nil {
t.Fatalf("Looking up alloc registration failed: %v", err)
}
if reg2 == nil {
t.Fatalf("Nil alloc registrations: %v", err)
}
if num := reg2.NumServices(); num != 1 {
t.Fatalf("Wrong number of services: got %d; want 1", num)
}
if num := reg2.NumChecks(); num != 2 {
t.Fatalf("Wrong number of checks: got %d; want 2", num)
}
for task, treg := range reg1.Tasks {
otherTaskReg, ok := reg2.Tasks[task]
if !ok {
t.Fatalf("Task %q not in second reg", task)
}
for sID, sreg := range treg.Services {
otherServiceReg, ok := otherTaskReg.Services[sID]
if !ok {
t.Fatalf("service ID changed")
}
for newID := range sreg.checkIDs {
if _, ok := otherServiceReg.checkIDs[newID]; ok {
t.Fatalf("check IDs should change")
}
}
}
}
// Alter a CheckRestart and make sure the watcher is updated but nothing else
origTask = ctx.Task.Copy()
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "c1",
Type: "tcp",
Interval: 2 * time.Second,
Timeout: time.Second,
PortLabel: "x",
CheckRestart: &structs.CheckRestart{
Limit: 11,
},
},
{
Name: "c2",
Type: "http",
Path: "/",
Interval: time.Second,
Timeout: time.Second,
PortLabel: "x",
},
}
if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
if err := ctx.syncOnce(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.checks); n != 2 {
t.Fatalf("expected 2 check but found %d:\n%#v", n, ctx.FakeConsul.checks)
}
for k, v := range ctx.FakeConsul.checks {
if v.Name == "c1" {
if k != c1ID {
t.Errorf("expected c1 to still have id %q but found %q", c1ID, k)
}
break
}
}
// Assert a check restart watch update was enqueued for a removal and an add
if n := len(ctx.ServiceClient.checkWatcher.checkUpdateCh); n != 1 {
t.Fatalf("expected 1 check restart update but found %d", n)
}
<-ctx.ServiceClient.checkWatcher.checkUpdateCh
}
// TestConsul_RegServices tests basic service registration.
func TestConsul_RegServices(t *testing.T) {
ctx := setupFake(t)
// Add a check w/restarting
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "testcheck",
Type: "tcp",
Interval: 100 * time.Millisecond,
CheckRestart: &structs.CheckRestart{
Limit: 3,
},
},
}
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
if err := ctx.syncOnce(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services)
}
for _, v := range ctx.FakeConsul.services {
if v.Name != ctx.Task.Services[0].Name {
t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name)
}
if !reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) {
t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags)
}
if v.Port != xPort {
t.Errorf("expected Port=%d != %d", xPort, v.Port)
}
}
// Assert the check update is pending
if n := len(ctx.ServiceClient.checkWatcher.checkUpdateCh); n != 1 {
t.Fatalf("expected 1 check restart update but found %d", n)
}
// Assert the check update is properly formed
checkUpd := <-ctx.ServiceClient.checkWatcher.checkUpdateCh
if checkUpd.checkRestart.allocID != ctx.Task.AllocID {
t.Fatalf("expected check's allocid to be %q but found %q", "allocid", checkUpd.checkRestart.allocID)
}
if expected := 200 * time.Millisecond; checkUpd.checkRestart.timeLimit != expected {
t.Fatalf("expected check's time limit to be %v but found %v", expected, checkUpd.checkRestart.timeLimit)
}
// Make a change which will register a new service
ctx.Task.Services[0].Name = "taskname-service2"
ctx.Task.Services[0].Tags[0] = "tag3"
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
// Assert check update is pending
if n := len(ctx.ServiceClient.checkWatcher.checkUpdateCh); n != 1 {
t.Fatalf("expected 1 check restart update but found %d", n)
}
// Assert the check update's id has changed
checkUpd2 := <-ctx.ServiceClient.checkWatcher.checkUpdateCh
if checkUpd.checkID == checkUpd2.checkID {
t.Fatalf("expected new check update to have a new ID both both have: %q", checkUpd.checkID)
}
// Make sure changes don't take affect until sync() is called (since
// Run() isn't running)
if n := len(ctx.FakeConsul.services); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services)
}
for _, v := range ctx.FakeConsul.services {
if reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) {
t.Errorf("expected Tags to differ, changes applied before sync()")
}
}
// Now sync() and re-check for the applied updates
if err := ctx.syncOnce(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services); n != 2 {
t.Fatalf("expected 2 services but found %d:\n%#v", n, ctx.FakeConsul.services)
}
found := false
for _, v := range ctx.FakeConsul.services {
if v.Name == ctx.Task.Services[0].Name {
if found {
t.Fatalf("found new service name %q twice", v.Name)
}
found = true
if !reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) {
t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags)
}
}
}
if !found {
t.Fatalf("did not find new service %q", ctx.Task.Services[0].Name)
}
// Remove the new task
ctx.ServiceClient.RemoveTask(ctx.Task)
if err := ctx.syncOnce(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services); n != 1 {
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services)
}
for _, v := range ctx.FakeConsul.services {
if v.Name != "taskname-service" {
t.Errorf("expected original task to survive not %q", v.Name)
}
}
// Assert check update is pending
if n := len(ctx.ServiceClient.checkWatcher.checkUpdateCh); n != 1 {
t.Fatalf("expected 1 check restart update but found %d", n)
}
// Assert the check update's id is correct and that it's a removal
checkUpd3 := <-ctx.ServiceClient.checkWatcher.checkUpdateCh
if checkUpd2.checkID != checkUpd3.checkID {
t.Fatalf("expected checkid %q but found %q", checkUpd2.checkID, checkUpd3.checkID)
}
if !checkUpd3.remove {
t.Fatalf("expected check watch removal update but found: %#v", checkUpd3)
}
}
// TestConsul_ShutdownOK tests the ok path for the shutdown logic in
// ServiceClient.
func TestConsul_ShutdownOK(t *testing.T) {
require := require.New(t)
ctx := setupFake(t)
// Add a script check to make sure its TTL gets updated
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "scriptcheck",
Type: "script",
Command: "true",
// Make check block until shutdown
Interval: 9000 * time.Hour,
Timeout: 10 * time.Second,
InitialStatus: "warning",
},
}
go ctx.ServiceClient.Run()
// Register a task and agent
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
agentServices := []*structs.Service{
{
Name: "http",
Tags: []string{"nomad"},
PortLabel: "localhost:2345",
},
}
if err := ctx.ServiceClient.RegisterAgent("client", agentServices); err != nil {
t.Fatalf("unexpected error registering agent: %v", err)
}
testutil.WaitForResult(func() (bool, error) {
return ctx.ServiceClient.hasSeen(), fmt.Errorf("error contacting Consul")
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Shutdown should block until scripts finish
if err := ctx.ServiceClient.Shutdown(); err != nil {
t.Errorf("unexpected error shutting down client: %v", err)
}
// UpdateTTL should have been called once for the script check and once
// for shutdown
if n := len(ctx.FakeConsul.checkTTLs); n != 1 {
t.Fatalf("expected 1 checkTTL entry but found: %d", n)
}
for _, v := range ctx.FakeConsul.checkTTLs {
require.Equalf(2, v, "expected 2 updates but found %d", v)
}
for _, v := range ctx.FakeConsul.checks {
if v.Status != "passing" {
t.Fatalf("expected check to be passing but found %q", v.Status)
}
}
}
// TestConsul_ShutdownSlow tests the slow but ok path for the shutdown logic in
// ServiceClient.
func TestConsul_ShutdownSlow(t *testing.T) {
t.Parallel()
ctx := setupFake(t)
// Add a script check to make sure its TTL gets updated
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "scriptcheck",
Type: "script",
Command: "true",
// Make check block until shutdown
Interval: 9000 * time.Hour,
Timeout: 5 * time.Second,
InitialStatus: "warning",
},
}
// Make Exec slow, but not too slow
waiter := make(chan struct{})
ctx.MockExec.ExecFunc = func(ctx context.Context, cmd string, args []string) ([]byte, int, error) {
select {
case <-waiter:
default:
close(waiter)
}
time.Sleep(time.Second)
return []byte{}, 0, nil
}
// Make shutdown wait time just a bit longer than ctx.Exec takes
ctx.ServiceClient.shutdownWait = 3 * time.Second
go ctx.ServiceClient.Run()
// Register a task and agent
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
// wait for Exec to get called before shutting down
<-waiter
// Shutdown should block until all enqueued operations finish.
preShutdown := time.Now()
if err := ctx.ServiceClient.Shutdown(); err != nil {
t.Errorf("unexpected error shutting down client: %v", err)
}
// Shutdown time should have taken: ~1s <= shutdown <= 3s
// actual timing might be less than 1s, to account for shutdown invocation overhead
shutdownTime := time.Now().Sub(preShutdown)
if shutdownTime < 900*time.Millisecond || shutdownTime > ctx.ServiceClient.shutdownWait {
t.Errorf("expected shutdown to take >1s and <%s but took: %s", ctx.ServiceClient.shutdownWait, shutdownTime)
}
// UpdateTTL should have been called once for the script check
if n := len(ctx.FakeConsul.checkTTLs); n != 1 {
t.Fatalf("expected 1 checkTTL entry but found: %d", n)
}
for _, v := range ctx.FakeConsul.checkTTLs {
if v != 1 {
t.Fatalf("expected script check to be updated once but found %d", v)
}
}
for _, v := range ctx.FakeConsul.checks {
if v.Status != "passing" {
t.Fatalf("expected check to be passing but found %q", v.Status)
}
}
}
// TestConsul_ShutdownBlocked tests the blocked past deadline path for the
// shutdown logic in ServiceClient.
func TestConsul_ShutdownBlocked(t *testing.T) {
t.Parallel()
ctx := setupFake(t)
// Add a script check to make sure its TTL gets updated
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "scriptcheck",
Type: "script",
Command: "true",
// Make check block until shutdown
Interval: 9000 * time.Hour,
Timeout: 9000 * time.Hour,
InitialStatus: "warning",
},
}
block := make(chan struct{})
defer close(block) // cleanup after test
// Make Exec block forever
waiter := make(chan struct{})
ctx.MockExec.ExecFunc = func(ctx context.Context, cmd string, args []string) ([]byte, int, error) {
close(waiter)
<-block
return []byte{}, 0, nil
}
// Use a short shutdown deadline since we're intentionally blocking forever
ctx.ServiceClient.shutdownWait = time.Second
go ctx.ServiceClient.Run()
// Register a task and agent
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
// Wait for exec to be called
<-waiter
// Shutdown should block until all enqueued operations finish.
preShutdown := time.Now()
err := ctx.ServiceClient.Shutdown()
if err == nil {
t.Errorf("expected a timed out error from shutdown")
}
// Shutdown time should have taken shutdownWait; to avoid timing
// related errors simply test for wait <= shutdown <= wait+3s
shutdownTime := time.Now().Sub(preShutdown)
maxWait := ctx.ServiceClient.shutdownWait + (3 * time.Second)
if shutdownTime < ctx.ServiceClient.shutdownWait || shutdownTime > maxWait {
t.Errorf("expected shutdown to take >%s and <%s but took: %s", ctx.ServiceClient.shutdownWait, maxWait, shutdownTime)
}
// UpdateTTL should not have been called for the script check
if n := len(ctx.FakeConsul.checkTTLs); n != 0 {
t.Fatalf("expected 0 checkTTL entry but found: %d", n)
}
for _, v := range ctx.FakeConsul.checks {
if expected := "warning"; v.Status != expected {
t.Fatalf("expected check to be %q but found %q", expected, v.Status)
}
}
}
// TestConsul_RemoveScript assert removing a script check removes all objects
// related to that check.
func TestConsul_CancelScript(t *testing.T) {
ctx := setupFake(t)
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "scriptcheckDel",
Type: "script",
Interval: 9000 * time.Hour,
Timeout: 9000 * time.Hour,
},
{
Name: "scriptcheckKeep",
Type: "script",
Interval: 9000 * time.Hour,
Timeout: 9000 * time.Hour,
},
}
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
if err := ctx.syncOnce(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if len(ctx.FakeConsul.checks) != 2 {
t.Errorf("expected 2 checks but found %d", len(ctx.FakeConsul.checks))
}
if len(ctx.ServiceClient.scripts) != 2 && len(ctx.ServiceClient.runningScripts) != 2 {
t.Errorf("expected 2 running script but found scripts=%d runningScripts=%d",
len(ctx.ServiceClient.scripts), len(ctx.ServiceClient.runningScripts))
}
for i := 0; i < 2; i++ {
select {
case <-ctx.MockExec.execs:
// Script ran as expected!
case <-time.After(3 * time.Second):
t.Fatalf("timed out waiting for script check to run")
}
}
// Remove a check and update the task
origTask := ctx.Task.Copy()
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
{
Name: "scriptcheckKeep",
Type: "script",
Interval: 9000 * time.Hour,
Timeout: 9000 * time.Hour,
},
}
if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
if err := ctx.syncOnce(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if len(ctx.FakeConsul.checks) != 1 {
t.Errorf("expected 1 check but found %d", len(ctx.FakeConsul.checks))
}
if len(ctx.ServiceClient.scripts) != 1 && len(ctx.ServiceClient.runningScripts) != 1 {
t.Errorf("expected 1 running script but found scripts=%d runningScripts=%d",
len(ctx.ServiceClient.scripts), len(ctx.ServiceClient.runningScripts))
}
// Make sure exec wasn't called again
select {
case <-ctx.MockExec.execs:
t.Errorf("unexpected execution of script; was goroutine not cancelled?")
case <-time.After(100 * time.Millisecond):
// No unexpected script execs
}
// Don't leak goroutines
for _, scriptHandle := range ctx.ServiceClient.runningScripts {
scriptHandle.cancel()
}
}
// TestConsul_DriverNetwork_AutoUse asserts that if a driver network has
// auto-use set then services should advertise it unless explicitly set to
// host. Checks should always use host.
func TestConsul_DriverNetwork_AutoUse(t *testing.T) {
t.Parallel()
ctx := setupFake(t)
ctx.Task.Services = []*structs.Service{
{
Name: "auto-advertise-x",
PortLabel: "x",
AddressMode: structs.AddressModeAuto,
Checks: []*structs.ServiceCheck{
{
Name: "default-check-x",
Type: "tcp",
Interval: time.Second,
Timeout: time.Second,
},
{
Name: "weird-y-check",
Type: "http",
Interval: time.Second,
Timeout: time.Second,
PortLabel: "y",
},
},
},
{
Name: "driver-advertise-y",
PortLabel: "y",
AddressMode: structs.AddressModeDriver,
Checks: []*structs.ServiceCheck{
{
Name: "default-check-y",
Type: "tcp",
Interval: time.Second,
Timeout: time.Second,
},
},
},
{
Name: "host-advertise-y",
PortLabel: "y",
AddressMode: structs.AddressModeHost,
},
}
ctx.Task.DriverNetwork = &drivers.DriverNetwork{
PortMap: map[string]int{
"x": 8888,
"y": 9999,
},
IP: "172.18.0.2",
AutoAdvertise: true,
}
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
if err := ctx.syncOnce(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services); n != 3 {
t.Fatalf("expected 2 services but found: %d", n)
}
for _, v := range ctx.FakeConsul.services {
switch v.Name {
case ctx.Task.Services[0].Name: // x
// Since DriverNetwork.AutoAdvertise=true, driver ports should be used
if v.Port != ctx.Task.DriverNetwork.PortMap["x"] {
t.Errorf("expected service %s's port to be %d but found %d",
v.Name, ctx.Task.DriverNetwork.PortMap["x"], v.Port)
}
// The order of checks in Consul is not guaranteed to
// be the same as their order in the Task definition,
// so check in a loop
if expected := 2; len(v.Checks) != expected {
t.Errorf("expected %d checks but found %d", expected, len(v.Checks))
}
for _, c := range v.Checks {
// No name on AgentServiceChecks, use type
switch {
case c.TCP != "":
// Checks should always use host port though
if c.TCP != ":1234" { // xPort
t.Errorf("expected service %s check 1's port to be %d but found %q",
v.Name, xPort, c.TCP)
}
case c.HTTP != "":
if c.HTTP != "http://:1235" { // yPort
t.Errorf("expected service %s check 2's port to be %d but found %q",
v.Name, yPort, c.HTTP)
}
default:
t.Errorf("unexpected check %#v on service %q", c, v.Name)
}
}
case ctx.Task.Services[1].Name: // y
// Service should be container ip:port
if v.Address != ctx.Task.DriverNetwork.IP {
t.Errorf("expected service %s's address to be %s but found %s",
v.Name, ctx.Task.DriverNetwork.IP, v.Address)
}
if v.Port != ctx.Task.DriverNetwork.PortMap["y"] {
t.Errorf("expected service %s's port to be %d but found %d",
v.Name, ctx.Task.DriverNetwork.PortMap["x"], v.Port)
}
// Check should be host ip:port
if v.Checks[0].TCP != ":1235" { // yPort
t.Errorf("expected service %s check's port to be %d but found %s",
v.Name, yPort, v.Checks[0].TCP)
}
case ctx.Task.Services[2].Name: // y + host mode
if v.Port != yPort {
t.Errorf("expected service %s's port to be %d but found %d",
v.Name, yPort, v.Port)
}
default:
t.Errorf("unexpected service name: %q", v.Name)
}
}
}
// TestConsul_DriverNetwork_NoAutoUse asserts that if a driver network doesn't
// set auto-use only services which request the driver's network should
// advertise it.
func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) {
t.Parallel()
ctx := setupFake(t)
ctx.Task.Services = []*structs.Service{
{
Name: "auto-advertise-x",
PortLabel: "x",
AddressMode: structs.AddressModeAuto,
},
{
Name: "driver-advertise-y",
PortLabel: "y",
AddressMode: structs.AddressModeDriver,
},
{
Name: "host-advertise-y",
PortLabel: "y",
AddressMode: structs.AddressModeHost,
},
}
ctx.Task.DriverNetwork = &drivers.DriverNetwork{
PortMap: map[string]int{
"x": 8888,
"y": 9999,
},
IP: "172.18.0.2",
AutoAdvertise: false,
}
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
if err := ctx.syncOnce(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services); n != 3 {
t.Fatalf("expected 3 services but found: %d", n)
}
for _, v := range ctx.FakeConsul.services {
switch v.Name {
case ctx.Task.Services[0].Name: // x + auto
// Since DriverNetwork.AutoAdvertise=false, host ports should be used
if v.Port != xPort {
t.Errorf("expected service %s's port to be %d but found %d",
v.Name, xPort, v.Port)
}
case ctx.Task.Services[1].Name: // y + driver mode
// Service should be container ip:port
if v.Address != ctx.Task.DriverNetwork.IP {
t.Errorf("expected service %s's address to be %s but found %s",
v.Name, ctx.Task.DriverNetwork.IP, v.Address)
}
if v.Port != ctx.Task.DriverNetwork.PortMap["y"] {
t.Errorf("expected service %s's port to be %d but found %d",
v.Name, ctx.Task.DriverNetwork.PortMap["x"], v.Port)
}
case ctx.Task.Services[2].Name: // y + host mode
if v.Port != yPort {
t.Errorf("expected service %s's port to be %d but found %d",
v.Name, yPort, v.Port)
}
default:
t.Errorf("unexpected service name: %q", v.Name)
}
}
}
// TestConsul_DriverNetwork_Change asserts that if a driver network is
// specified and a service updates its use its properly updated in Consul.
func TestConsul_DriverNetwork_Change(t *testing.T) {
t.Parallel()
ctx := setupFake(t)
ctx.Task.Services = []*structs.Service{
{
Name: "service-foo",
PortLabel: "x",
AddressMode: structs.AddressModeAuto,
},
}
ctx.Task.DriverNetwork = &drivers.DriverNetwork{
PortMap: map[string]int{
"x": 8888,
"y": 9999,
},
IP: "172.18.0.2",
AutoAdvertise: false,
}
syncAndAssertPort := func(port int) {
if err := ctx.syncOnce(); err != nil {
t.Fatalf("unexpected error syncing task: %v", err)
}
if n := len(ctx.FakeConsul.services); n != 1 {
t.Fatalf("expected 1 service but found: %d", n)
}
for _, v := range ctx.FakeConsul.services {
switch v.Name {
case ctx.Task.Services[0].Name:
if v.Port != port {
t.Errorf("expected service %s's port to be %d but found %d",
v.Name, port, v.Port)
}
default:
t.Errorf("unexpected service name: %q", v.Name)
}
}
}
// Initial service should advertise host port x
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
t.Fatalf("unexpected error registering task: %v", err)
}
syncAndAssertPort(xPort)
// UpdateTask to use Host (shouldn't change anything)
origTask := ctx.Task.Copy()
ctx.Task.Services[0].AddressMode = structs.AddressModeHost
if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil {
t.Fatalf("unexpected error updating task: %v", err)
}
syncAndAssertPort(xPort)
// UpdateTask to use Driver (*should* change IP and port)
origTask = ctx.Task.Copy()
ctx.Task.Services[0].AddressMode = structs.AddressModeDriver
if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil {
t.Fatalf("unexpected error updating task: %v", err)
}
syncAndAssertPort(ctx.Task.DriverNetwork.PortMap["x"])
}
// TestConsul_CanaryTags asserts CanaryTags are used when Canary=true
func TestConsul_CanaryTags(t *testing.T) {
t.Parallel()
require := require.New(t)
ctx := setupFake(t)
canaryTags := []string{"tag1", "canary"}
ctx.Task.Canary = true
ctx.Task.Services[0].CanaryTags = canaryTags
require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task))
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 1)
for _, service := range ctx.FakeConsul.services {
require.Equal(canaryTags, service.Tags)
}
// Disable canary and assert tags are not the canary tags
origTask := ctx.Task.Copy()
ctx.Task.Canary = false
require.NoError(ctx.ServiceClient.UpdateTask(origTask, ctx.Task))
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 1)
for _, service := range ctx.FakeConsul.services {
require.NotEqual(canaryTags, service.Tags)
}
ctx.ServiceClient.RemoveTask(ctx.Task)
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 0)
}
// TestConsul_CanaryTags_NoTags asserts Tags are used when Canary=true and there
// are no specified canary tags
func TestConsul_CanaryTags_NoTags(t *testing.T) {
t.Parallel()
require := require.New(t)
ctx := setupFake(t)
tags := []string{"tag1", "foo"}
ctx.Task.Canary = true
ctx.Task.Services[0].Tags = tags
require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task))
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 1)
for _, service := range ctx.FakeConsul.services {
require.Equal(tags, service.Tags)
}
// Disable canary and assert tags dont change
origTask := ctx.Task.Copy()
ctx.Task.Canary = false
require.NoError(ctx.ServiceClient.UpdateTask(origTask, ctx.Task))
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 1)
for _, service := range ctx.FakeConsul.services {
require.Equal(tags, service.Tags)
}
ctx.ServiceClient.RemoveTask(ctx.Task)
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 0)
}
// TestConsul_PeriodicSync asserts that Nomad periodically reconciles with
// Consul.
func TestConsul_PeriodicSync(t *testing.T) {
t.Parallel()
ctx := setupFake(t)
defer ctx.ServiceClient.Shutdown()
// Lower periodic sync interval to speed up test
ctx.ServiceClient.periodicInterval = 2 * time.Millisecond
// Run for 10ms and assert hits >= 5 because each sync() calls multiple
// Consul APIs
go ctx.ServiceClient.Run()
select {
case <-ctx.ServiceClient.exitCh:
t.Fatalf("exited unexpectedly")
case <-time.After(10 * time.Millisecond):
}
minHits := 5
if hits := ctx.FakeConsul.getHits(); hits < minHits {
t.Fatalf("expected at least %d hits but found %d", minHits, hits)
}
}
// TestIsNomadService asserts the isNomadService helper returns true for Nomad
// task IDs and false for unknown IDs and Nomad agent IDs (see #2827).
func TestIsNomadService(t *testing.T) {
t.Parallel()
tests := []struct {
id string
result bool
}{
{"_nomad-client-nomad-client-http", false},
{"_nomad-server-nomad-serf", false},
// Pre-0.7.1 style IDs still match
{"_nomad-executor-abc", true},
{"_nomad-executor", true},
// Post-0.7.1 style IDs match
{"_nomad-task-FBBK265QN4TMT25ND4EP42TJVMYJ3HR4", true},
{"not-nomad", false},
{"_nomad", false},
}
for _, test := range tests {
t.Run(test.id, func(t *testing.T) {
actual := isNomadService(test.id)
if actual != test.result {
t.Errorf("%q should be %t but found %t", test.id, test.result, actual)
}
})
}
}
// TestCreateCheckReg_HTTP asserts Nomad ServiceCheck structs are properly
// converted to Consul API AgentCheckRegistrations for HTTP checks.
func TestCreateCheckReg_HTTP(t *testing.T) {
t.Parallel()
check := &structs.ServiceCheck{
Name: "name",
Type: "http",
Path: "/path",
PortLabel: "label",
Method: "POST",
Header: map[string][]string{
"Foo": {"bar"},
},
}
serviceID := "testService"
checkID := check.Hash(serviceID)
host := "localhost"
port := 41111
expected := &api.AgentCheckRegistration{
ID: checkID,
Name: "name",
ServiceID: serviceID,
AgentServiceCheck: api.AgentServiceCheck{
Timeout: "0s",
Interval: "0s",
HTTP: fmt.Sprintf("http://%s:%d/path", host, port),
Method: "POST",
Header: map[string][]string{
"Foo": {"bar"},
},
},
}
actual, err := createCheckReg(serviceID, checkID, check, host, port)
if err != nil {
t.Fatalf("err: %v", err)
}
if diff := pretty.Diff(actual, expected); len(diff) > 0 {
t.Fatalf("diff:\n%s\n", strings.Join(diff, "\n"))
}
}
// TestCreateCheckReg_GRPC asserts Nomad ServiceCheck structs are properly
// converted to Consul API AgentCheckRegistrations for GRPC checks.
func TestCreateCheckReg_GRPC(t *testing.T) {
t.Parallel()
check := &structs.ServiceCheck{
Name: "name",
Type: "grpc",
PortLabel: "label",
GRPCService: "foo.Bar",
GRPCUseTLS: true,
TLSSkipVerify: true,
Timeout: time.Second,
Interval: time.Minute,
}
serviceID := "testService"
checkID := check.Hash(serviceID)
expected := &api.AgentCheckRegistration{
ID: checkID,
Name: "name",
ServiceID: serviceID,
AgentServiceCheck: api.AgentServiceCheck{
Timeout: "1s",
Interval: "1m0s",
GRPC: "localhost:8080/foo.Bar",
GRPCUseTLS: true,
TLSSkipVerify: true,
},
}
actual, err := createCheckReg(serviceID, checkID, check, "localhost", 8080)
require.NoError(t, err)
require.Equal(t, expected, actual)
}
// TestGetAddress asserts Nomad uses the correct ip and port for services and
// checks depending on port labels, driver networks, and address mode.
func TestGetAddress(t *testing.T) {
const HostIP = "127.0.0.1"
cases := []struct {
Name string
// Parameters
Mode string
PortLabel string
Host map[string]int // will be converted to structs.Networks
Driver *drivers.DriverNetwork
// Results
ExpectedIP string
ExpectedPort int
ExpectedErr string
}{
// Valid Configurations
{
Name: "ExampleService",
Mode: structs.AddressModeAuto,
PortLabel: "db",
Host: map[string]int{"db": 12435},
Driver: &drivers.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
},
ExpectedIP: HostIP,
ExpectedPort: 12435,
},
{
Name: "Host",
Mode: structs.AddressModeHost,
PortLabel: "db",
Host: map[string]int{"db": 12345},
Driver: &drivers.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
},
ExpectedIP: HostIP,
ExpectedPort: 12345,
},
{
Name: "Driver",
Mode: structs.AddressModeDriver,
PortLabel: "db",
Host: map[string]int{"db": 12345},
Driver: &drivers.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
},
ExpectedIP: "10.1.2.3",
ExpectedPort: 6379,
},
{
Name: "AutoDriver",
Mode: structs.AddressModeAuto,
PortLabel: "db",
Host: map[string]int{"db": 12345},
Driver: &drivers.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
AutoAdvertise: true,
},
ExpectedIP: "10.1.2.3",
ExpectedPort: 6379,
},
{
Name: "DriverCustomPort",
Mode: structs.AddressModeDriver,
PortLabel: "7890",
Host: map[string]int{"db": 12345},
Driver: &drivers.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
},
ExpectedIP: "10.1.2.3",
ExpectedPort: 7890,
},
// Invalid Configurations
{
Name: "DriverWithoutNetwork",
Mode: structs.AddressModeDriver,
PortLabel: "db",
Host: map[string]int{"db": 12345},
Driver: nil,
ExpectedErr: "no driver network exists",
},
{
Name: "DriverBadPort",
Mode: structs.AddressModeDriver,
PortLabel: "bad-port-label",
Host: map[string]int{"db": 12345},
Driver: &drivers.DriverNetwork{
PortMap: map[string]int{"db": 6379},
IP: "10.1.2.3",
},
ExpectedErr: "invalid port",
},
{
Name: "DriverZeroPort",
Mode: structs.AddressModeDriver,
PortLabel: "0",
Driver: &drivers.DriverNetwork{
IP: "10.1.2.3",
},
ExpectedErr: "invalid port",
},
{
Name: "HostBadPort",
Mode: structs.AddressModeHost,
PortLabel: "bad-port-label",
ExpectedErr: "invalid port",
},
{
Name: "InvalidMode",
Mode: "invalid-mode",
PortLabel: "80",
ExpectedErr: "invalid address mode",
},
{
Name: "NoPort_AutoMode",
Mode: structs.AddressModeAuto,
ExpectedIP: HostIP,
},
{
Name: "NoPort_HostMode",
Mode: structs.AddressModeHost,
ExpectedIP: HostIP,
},
{
Name: "NoPort_DriverMode",
Mode: structs.AddressModeDriver,
Driver: &drivers.DriverNetwork{
IP: "10.1.2.3",
},
ExpectedIP: "10.1.2.3",
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
// convert host port map into a structs.Networks
networks := []*structs.NetworkResource{
{
IP: HostIP,
ReservedPorts: make([]structs.Port, len(tc.Host)),
},
}
i := 0
for label, port := range tc.Host {
networks[0].ReservedPorts[i].Label = label
networks[0].ReservedPorts[i].Value = port
i++
}
// Run getAddress
ip, port, err := getAddress(tc.Mode, tc.PortLabel, networks, tc.Driver)
// Assert the results
assert.Equal(t, tc.ExpectedIP, ip, "IP mismatch")
assert.Equal(t, tc.ExpectedPort, port, "Port mismatch")
if tc.ExpectedErr == "" {
assert.Nil(t, err)
} else {
if err == nil {
t.Fatalf("expected error containing %q but err=nil", tc.ExpectedErr)
} else {
assert.Contains(t, err.Error(), tc.ExpectedErr)
}
}
})
}
}
func TestConsul_ServiceName_Duplicates(t *testing.T) {
t.Parallel()
ctx := setupFake(t)
require := require.New(t)
ctx.Task.Services = []*structs.Service{
{
Name: "best-service",
PortLabel: "x",
Tags: []string{
"foo",
},
Checks: []*structs.ServiceCheck{
{
Name: "check-a",
Type: "tcp",
Interval: time.Second,
Timeout: time.Second,
},
},
},
{
Name: "best-service",
PortLabel: "y",
Tags: []string{
"bar",
},
Checks: []*structs.ServiceCheck{
{
Name: "checky-mccheckface",
Type: "tcp",
Interval: time.Second,
Timeout: time.Second,
},
},
},
{
Name: "worst-service",
PortLabel: "y",
},
}
require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task))
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 3)
for _, v := range ctx.FakeConsul.services {
if v.Name == ctx.Task.Services[0].Name && v.Port == xPort {
require.ElementsMatch(v.Tags, ctx.Task.Services[0].Tags)
require.Len(v.Checks, 1)
} else if v.Name == ctx.Task.Services[1].Name && v.Port == yPort {
require.ElementsMatch(v.Tags, ctx.Task.Services[1].Tags)
require.Len(v.Checks, 1)
} else if v.Name == ctx.Task.Services[2].Name {
require.Len(v.Checks, 0)
}
}
}
// TestConsul_ServiceDeregistration_OutOfProbation asserts that during in steady
// state we remove any services we don't reconize locally
func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
t.Parallel()
ctx := setupFake(t)
require := require.New(t)
ctx.ServiceClient.deregisterProbationExpiry = time.Now().Add(-1 * time.Hour)
remainingTask := testTask()
remainingTask.Services = []*structs.Service{
{
Name: "remaining-service",
PortLabel: "x",
Checks: []*structs.ServiceCheck{
{
Name: "check",
Type: "tcp",
Interval: time.Second,
Timeout: time.Second,
},
},
},
}
remainingTaskServiceID := makeTaskServiceID(remainingTask.AllocID,
remainingTask.Name, remainingTask.Services[0], false)
require.NoError(ctx.ServiceClient.RegisterTask(remainingTask))
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 1)
require.Len(ctx.FakeConsul.checks, 1)
explicitlyRemovedTask := testTask()
explicitlyRemovedTask.Services = []*structs.Service{
{
Name: "explicitly-removed-service",
PortLabel: "y",
Checks: []*structs.ServiceCheck{
{
Name: "check",
Type: "tcp",
Interval: time.Second,
Timeout: time.Second,
},
},
},
}
explicitlyRemovedTaskServiceID := makeTaskServiceID(explicitlyRemovedTask.AllocID,
explicitlyRemovedTask.Name, explicitlyRemovedTask.Services[0], false)
require.NoError(ctx.ServiceClient.RegisterTask(explicitlyRemovedTask))
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 2)
require.Len(ctx.FakeConsul.checks, 2)
// we register a task through nomad API then remove it out of band
outofbandTask := testTask()
outofbandTask.Services = []*structs.Service{
{
Name: "unknown-service",
PortLabel: "x",
Checks: []*structs.ServiceCheck{
{
Name: "check",
Type: "tcp",
Interval: time.Second,
Timeout: time.Second,
},
},
},
}
outofbandTaskServiceID := makeTaskServiceID(outofbandTask.AllocID,
outofbandTask.Name, outofbandTask.Services[0], false)
require.NoError(ctx.ServiceClient.RegisterTask(outofbandTask))
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 3)
// remove outofbandTask from local services so it appears unknown to client
require.Len(ctx.ServiceClient.services, 3)
require.Len(ctx.ServiceClient.checks, 3)
delete(ctx.ServiceClient.services, outofbandTaskServiceID)
delete(ctx.ServiceClient.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
require.Len(ctx.ServiceClient.services, 2)
require.Len(ctx.ServiceClient.checks, 2)
// Sync and ensure that explicitly removed service as well as outofbandTask were removed
ctx.ServiceClient.RemoveTask(explicitlyRemovedTask)
require.NoError(ctx.syncOnce())
require.NoError(ctx.ServiceClient.sync())
require.Len(ctx.FakeConsul.services, 1)
require.Len(ctx.FakeConsul.checks, 1)
require.Contains(ctx.FakeConsul.services, remainingTaskServiceID)
require.NotContains(ctx.FakeConsul.services, outofbandTaskServiceID)
require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID)
require.Contains(ctx.FakeConsul.checks, makeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, makeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0]))
}
// TestConsul_ServiceDeregistration_InProbation asserts that during initialization
// we only deregister services that were explicitly removed and leave unknown
// services untouched. This adds a grace period for restoring recovered tasks
// before deregistering them
func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
t.Parallel()
ctx := setupFake(t)
require := require.New(t)
ctx.ServiceClient.deregisterProbationExpiry = time.Now().Add(1 * time.Hour)
remainingTask := testTask()
remainingTask.Services = []*structs.Service{
{
Name: "remaining-service",
PortLabel: "x",
Checks: []*structs.ServiceCheck{
{
Name: "check",
Type: "tcp",
Interval: time.Second,
Timeout: time.Second,
},
},
},
}
remainingTaskServiceID := makeTaskServiceID(remainingTask.AllocID,
remainingTask.Name, remainingTask.Services[0], false)
require.NoError(ctx.ServiceClient.RegisterTask(remainingTask))
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 1)
require.Len(ctx.FakeConsul.checks, 1)
explicitlyRemovedTask := testTask()
explicitlyRemovedTask.Services = []*structs.Service{
{
Name: "explicitly-removed-service",
PortLabel: "y",
Checks: []*structs.ServiceCheck{
{
Name: "check",
Type: "tcp",
Interval: time.Second,
Timeout: time.Second,
},
},
},
}
explicitlyRemovedTaskServiceID := makeTaskServiceID(explicitlyRemovedTask.AllocID,
explicitlyRemovedTask.Name, explicitlyRemovedTask.Services[0], false)
require.NoError(ctx.ServiceClient.RegisterTask(explicitlyRemovedTask))
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 2)
require.Len(ctx.FakeConsul.checks, 2)
// we register a task through nomad API then remove it out of band
outofbandTask := testTask()
outofbandTask.Services = []*structs.Service{
{
Name: "unknown-service",
PortLabel: "x",
Checks: []*structs.ServiceCheck{
{
Name: "check",
Type: "tcp",
Interval: time.Second,
Timeout: time.Second,
},
},
},
}
outofbandTaskServiceID := makeTaskServiceID(outofbandTask.AllocID,
outofbandTask.Name, outofbandTask.Services[0], false)
require.NoError(ctx.ServiceClient.RegisterTask(outofbandTask))
require.NoError(ctx.syncOnce())
require.Len(ctx.FakeConsul.services, 3)
// remove outofbandTask from local services so it appears unknown to client
require.Len(ctx.ServiceClient.services, 3)
require.Len(ctx.ServiceClient.checks, 3)
delete(ctx.ServiceClient.services, outofbandTaskServiceID)
delete(ctx.ServiceClient.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
require.Len(ctx.ServiceClient.services, 2)
require.Len(ctx.ServiceClient.checks, 2)
// Sync and ensure that explicitly removed service was removed, but outofbandTask remains
ctx.ServiceClient.RemoveTask(explicitlyRemovedTask)
require.NoError(ctx.syncOnce())
require.NoError(ctx.ServiceClient.sync())
require.Len(ctx.FakeConsul.services, 2)
require.Len(ctx.FakeConsul.checks, 2)
require.Contains(ctx.FakeConsul.services, remainingTaskServiceID)
require.Contains(ctx.FakeConsul.services, outofbandTaskServiceID)
require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID)
require.Contains(ctx.FakeConsul.checks, makeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0]))
require.Contains(ctx.FakeConsul.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, makeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0]))
// after probation, outofband services and checks are removed
ctx.ServiceClient.deregisterProbationExpiry = time.Now().Add(-1 * time.Hour)
require.NoError(ctx.ServiceClient.sync())
require.Len(ctx.FakeConsul.services, 1)
require.Len(ctx.FakeConsul.checks, 1)
require.Contains(ctx.FakeConsul.services, remainingTaskServiceID)
require.NotContains(ctx.FakeConsul.services, outofbandTaskServiceID)
require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID)
require.Contains(ctx.FakeConsul.checks, makeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
require.NotContains(ctx.FakeConsul.checks, makeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0]))
}