59e0b67c7f
Fixes #6041 Unlike all other Consul operations, boostrapping requires Consul be available. This PR tries Consul 3 times with a backoff to account for the group services being asynchronously registered with Consul.
1920 lines
54 KiB
Go
1920 lines
54 KiB
Go
package consul
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"reflect"
|
|
"strings"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/hashicorp/consul/api"
|
|
"github.com/hashicorp/nomad/helper/testlog"
|
|
"github.com/hashicorp/nomad/helper/uuid"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/hashicorp/nomad/plugins/drivers"
|
|
"github.com/hashicorp/nomad/testutil"
|
|
"github.com/kr/pretty"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
const (
|
|
// Ports used in testTask
|
|
xPort = 1234
|
|
yPort = 1235
|
|
)
|
|
|
|
func testTask() *TaskServices {
|
|
return &TaskServices{
|
|
AllocID: uuid.Generate(),
|
|
Name: "taskname",
|
|
Restarter: &restartRecorder{},
|
|
Services: []*structs.Service{
|
|
{
|
|
Name: "taskname-service",
|
|
PortLabel: "x",
|
|
Tags: []string{"tag1", "tag2"},
|
|
},
|
|
},
|
|
Networks: []*structs.NetworkResource{
|
|
{
|
|
DynamicPorts: []structs.Port{
|
|
{Label: "x", Value: xPort},
|
|
{Label: "y", Value: yPort},
|
|
},
|
|
},
|
|
},
|
|
DriverExec: newMockExec(),
|
|
}
|
|
}
|
|
|
|
// mockExec implements the ScriptExecutor interface and will use an alternate
|
|
// implementation t.ExecFunc if non-nil.
|
|
type mockExec struct {
|
|
// Ticked whenever a script is called
|
|
execs chan int
|
|
|
|
// If non-nil will be called by script checks
|
|
ExecFunc func(ctx context.Context, cmd string, args []string) ([]byte, int, error)
|
|
}
|
|
|
|
func newMockExec() *mockExec {
|
|
return &mockExec{
|
|
execs: make(chan int, 100),
|
|
}
|
|
}
|
|
|
|
func (m *mockExec) Exec(dur time.Duration, cmd string, args []string) ([]byte, int, error) {
|
|
select {
|
|
case m.execs <- 1:
|
|
default:
|
|
}
|
|
if m.ExecFunc == nil {
|
|
// Default impl is just "ok"
|
|
return []byte("ok"), 0, nil
|
|
}
|
|
ctx, cancel := context.WithTimeout(context.Background(), dur)
|
|
defer cancel()
|
|
return m.ExecFunc(ctx, cmd, args)
|
|
}
|
|
|
|
// restartRecorder is a minimal TaskRestarter implementation that simply
|
|
// counts how many restarts were triggered.
|
|
type restartRecorder struct {
|
|
restarts int64
|
|
}
|
|
|
|
func (r *restartRecorder) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error {
|
|
atomic.AddInt64(&r.restarts, 1)
|
|
return nil
|
|
}
|
|
|
|
// testFakeCtx contains a fake Consul AgentAPI
|
|
type testFakeCtx struct {
|
|
ServiceClient *ServiceClient
|
|
FakeConsul *MockAgent
|
|
Task *TaskServices
|
|
MockExec *mockExec
|
|
}
|
|
|
|
var errNoOps = fmt.Errorf("testing error: no pending operations")
|
|
|
|
// syncOps simulates one iteration of the ServiceClient.Run loop and returns
|
|
// any errors returned by sync() or errNoOps if no pending operations.
|
|
func (t *testFakeCtx) syncOnce() error {
|
|
select {
|
|
case ops := <-t.ServiceClient.opCh:
|
|
t.ServiceClient.merge(ops)
|
|
err := t.ServiceClient.sync()
|
|
if err == nil {
|
|
t.ServiceClient.clearExplicitlyDeregistered()
|
|
}
|
|
return err
|
|
default:
|
|
return errNoOps
|
|
}
|
|
}
|
|
|
|
// setupFake creates a testFakeCtx with a ServiceClient backed by a fakeConsul.
|
|
// A test Task is also provided.
|
|
func setupFake(t *testing.T) *testFakeCtx {
|
|
fc := NewMockAgent()
|
|
tt := testTask()
|
|
|
|
// by default start fake client being out of probation
|
|
sc := NewServiceClient(fc, testlog.HCLogger(t), true)
|
|
sc.deregisterProbationExpiry = time.Now().Add(-1 * time.Minute)
|
|
|
|
return &testFakeCtx{
|
|
ServiceClient: sc,
|
|
FakeConsul: fc,
|
|
Task: tt,
|
|
MockExec: tt.DriverExec.(*mockExec),
|
|
}
|
|
}
|
|
|
|
func TestConsul_ChangeTags(t *testing.T) {
|
|
ctx := setupFake(t)
|
|
require := require.New(t)
|
|
|
|
require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task))
|
|
require.NoError(ctx.syncOnce())
|
|
require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
|
|
|
|
// Validate the alloc registration
|
|
reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Task.AllocID)
|
|
require.NoError(err)
|
|
require.NotNil(reg1, "Unexpected nil alloc registration")
|
|
require.Equal(1, reg1.NumServices())
|
|
require.Equal(0, reg1.NumChecks())
|
|
|
|
for _, v := range ctx.FakeConsul.services {
|
|
require.Equal(v.Name, ctx.Task.Services[0].Name)
|
|
require.Equal(v.Tags, ctx.Task.Services[0].Tags)
|
|
}
|
|
|
|
// Update the task definition
|
|
origTask := ctx.Task.Copy()
|
|
ctx.Task.Services[0].Tags[0] = "newtag"
|
|
|
|
// Register and sync the update
|
|
require.NoError(ctx.ServiceClient.UpdateTask(origTask, ctx.Task))
|
|
require.NoError(ctx.syncOnce())
|
|
require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
|
|
|
|
// Validate the metadata changed
|
|
for _, v := range ctx.FakeConsul.services {
|
|
require.Equal(v.Name, ctx.Task.Services[0].Name)
|
|
require.Equal(v.Tags, ctx.Task.Services[0].Tags)
|
|
require.Equal("newtag", v.Tags[0])
|
|
}
|
|
}
|
|
|
|
// TestConsul_ChangePorts asserts that changing the ports on a service updates
|
|
// it in Consul. Pre-0.7.1 ports were not part of the service ID and this was a
|
|
// slightly different code path than changing tags.
|
|
func TestConsul_ChangePorts(t *testing.T) {
|
|
ctx := setupFake(t)
|
|
require := require.New(t)
|
|
|
|
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
|
|
{
|
|
Name: "c1",
|
|
Type: "tcp",
|
|
Interval: time.Second,
|
|
Timeout: time.Second,
|
|
PortLabel: "x",
|
|
},
|
|
{
|
|
Name: "c2",
|
|
Type: "script",
|
|
Interval: 9000 * time.Hour,
|
|
Timeout: time.Second,
|
|
},
|
|
{
|
|
Name: "c3",
|
|
Type: "http",
|
|
Protocol: "http",
|
|
Path: "/",
|
|
Interval: time.Second,
|
|
Timeout: time.Second,
|
|
PortLabel: "y",
|
|
},
|
|
}
|
|
|
|
require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task))
|
|
require.NoError(ctx.syncOnce())
|
|
require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
|
|
|
|
for _, v := range ctx.FakeConsul.services {
|
|
require.Equal(ctx.Task.Services[0].Name, v.Name)
|
|
require.Equal(ctx.Task.Services[0].Tags, v.Tags)
|
|
require.Equal(xPort, v.Port)
|
|
}
|
|
|
|
require.Len(ctx.FakeConsul.checks, 3)
|
|
|
|
origTCPKey := ""
|
|
origScriptKey := ""
|
|
origHTTPKey := ""
|
|
for k, v := range ctx.FakeConsul.checks {
|
|
switch v.Name {
|
|
case "c1":
|
|
origTCPKey = k
|
|
require.Equal(fmt.Sprintf(":%d", xPort), v.TCP)
|
|
case "c2":
|
|
origScriptKey = k
|
|
select {
|
|
case <-ctx.MockExec.execs:
|
|
// Here we validate there is nothing left on the channel
|
|
require.Equal(0, len(ctx.MockExec.execs))
|
|
case <-time.After(3 * time.Second):
|
|
t.Fatalf("script not called in time")
|
|
}
|
|
case "c3":
|
|
origHTTPKey = k
|
|
require.Equal(fmt.Sprintf("http://:%d/", yPort), v.HTTP)
|
|
default:
|
|
t.Fatalf("unexpected check: %q", v.Name)
|
|
}
|
|
}
|
|
|
|
require.NotEmpty(origTCPKey)
|
|
require.NotEmpty(origScriptKey)
|
|
require.NotEmpty(origHTTPKey)
|
|
|
|
// Now update the PortLabel on the Service and Check c3
|
|
origTask := ctx.Task.Copy()
|
|
ctx.Task.Services[0].PortLabel = "y"
|
|
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
|
|
{
|
|
Name: "c1",
|
|
Type: "tcp",
|
|
Interval: time.Second,
|
|
Timeout: time.Second,
|
|
PortLabel: "x",
|
|
},
|
|
{
|
|
Name: "c2",
|
|
Type: "script",
|
|
Interval: 9000 * time.Hour,
|
|
Timeout: time.Second,
|
|
},
|
|
{
|
|
Name: "c3",
|
|
Type: "http",
|
|
Protocol: "http",
|
|
Path: "/",
|
|
Interval: time.Second,
|
|
Timeout: time.Second,
|
|
// Removed PortLabel; should default to service's (y)
|
|
},
|
|
}
|
|
|
|
require.NoError(ctx.ServiceClient.UpdateTask(origTask, ctx.Task))
|
|
require.NoError(ctx.syncOnce())
|
|
require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul")
|
|
|
|
for _, v := range ctx.FakeConsul.services {
|
|
require.Equal(ctx.Task.Services[0].Name, v.Name)
|
|
require.Equal(ctx.Task.Services[0].Tags, v.Tags)
|
|
require.Equal(yPort, v.Port)
|
|
}
|
|
|
|
require.Equal(3, len(ctx.FakeConsul.checks))
|
|
|
|
for k, v := range ctx.FakeConsul.checks {
|
|
switch v.Name {
|
|
case "c1":
|
|
// C1 is changed because the service was re-registered
|
|
require.NotEqual(origTCPKey, k)
|
|
require.Equal(fmt.Sprintf(":%d", xPort), v.TCP)
|
|
case "c2":
|
|
// C2 is changed because the service was re-registered
|
|
require.NotEqual(origScriptKey, k)
|
|
case "c3":
|
|
require.NotEqual(origHTTPKey, k)
|
|
require.Equal(fmt.Sprintf("http://:%d/", yPort), v.HTTP)
|
|
default:
|
|
t.Errorf("Unknown check: %q", k)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestConsul_ChangeChecks asserts that updating only the checks on a service
|
|
// properly syncs with Consul.
|
|
func TestConsul_ChangeChecks(t *testing.T) {
|
|
ctx := setupFake(t)
|
|
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
|
|
{
|
|
Name: "c1",
|
|
Type: "tcp",
|
|
Interval: time.Second,
|
|
Timeout: time.Second,
|
|
PortLabel: "x",
|
|
CheckRestart: &structs.CheckRestart{
|
|
Limit: 3,
|
|
},
|
|
},
|
|
}
|
|
|
|
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
|
|
t.Fatalf("unexpected error registering task: %v", err)
|
|
}
|
|
|
|
if err := ctx.syncOnce(); err != nil {
|
|
t.Fatalf("unexpected error syncing task: %v", err)
|
|
}
|
|
|
|
if n := len(ctx.FakeConsul.services); n != 1 {
|
|
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services)
|
|
}
|
|
|
|
// Assert a check restart watch update was enqueued and clear it
|
|
if n := len(ctx.ServiceClient.checkWatcher.checkUpdateCh); n != 1 {
|
|
t.Fatalf("expected 1 check restart update but found %d", n)
|
|
}
|
|
upd := <-ctx.ServiceClient.checkWatcher.checkUpdateCh
|
|
c1ID := upd.checkID
|
|
|
|
// Query the allocs registrations and then again when we update. The IDs
|
|
// should change
|
|
reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Task.AllocID)
|
|
if err != nil {
|
|
t.Fatalf("Looking up alloc registration failed: %v", err)
|
|
}
|
|
if reg1 == nil {
|
|
t.Fatalf("Nil alloc registrations: %v", err)
|
|
}
|
|
if num := reg1.NumServices(); num != 1 {
|
|
t.Fatalf("Wrong number of services: got %d; want 1", num)
|
|
}
|
|
if num := reg1.NumChecks(); num != 1 {
|
|
t.Fatalf("Wrong number of checks: got %d; want 1", num)
|
|
}
|
|
|
|
origServiceKey := ""
|
|
for k, v := range ctx.FakeConsul.services {
|
|
origServiceKey = k
|
|
if v.Name != ctx.Task.Services[0].Name {
|
|
t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name)
|
|
}
|
|
if v.Port != xPort {
|
|
t.Errorf("expected Port x=%v but found: %v", xPort, v.Port)
|
|
}
|
|
}
|
|
|
|
if n := len(ctx.FakeConsul.checks); n != 1 {
|
|
t.Fatalf("expected 1 check but found %d:\n%#v", n, ctx.FakeConsul.checks)
|
|
}
|
|
for _, v := range ctx.FakeConsul.checks {
|
|
if v.Name != "c1" {
|
|
t.Fatalf("expected check c1 but found %q", v.Name)
|
|
}
|
|
}
|
|
|
|
// Now add a check and modify the original
|
|
origTask := ctx.Task.Copy()
|
|
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
|
|
{
|
|
Name: "c1",
|
|
Type: "tcp",
|
|
Interval: 2 * time.Second,
|
|
Timeout: time.Second,
|
|
PortLabel: "x",
|
|
CheckRestart: &structs.CheckRestart{
|
|
Limit: 3,
|
|
},
|
|
},
|
|
{
|
|
Name: "c2",
|
|
Type: "http",
|
|
Path: "/",
|
|
Interval: time.Second,
|
|
Timeout: time.Second,
|
|
PortLabel: "x",
|
|
},
|
|
}
|
|
if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil {
|
|
t.Fatalf("unexpected error registering task: %v", err)
|
|
}
|
|
|
|
// Assert 2 check restart watch updates was enqueued
|
|
if n := len(ctx.ServiceClient.checkWatcher.checkUpdateCh); n != 2 {
|
|
t.Fatalf("expected 2 check restart updates but found %d", n)
|
|
}
|
|
|
|
// First the new watch
|
|
upd = <-ctx.ServiceClient.checkWatcher.checkUpdateCh
|
|
if upd.checkID == c1ID || upd.remove {
|
|
t.Fatalf("expected check watch update to be an add of checkID=%q but found remove=%t checkID=%q",
|
|
c1ID, upd.remove, upd.checkID)
|
|
}
|
|
|
|
// Then remove the old watch
|
|
upd = <-ctx.ServiceClient.checkWatcher.checkUpdateCh
|
|
if upd.checkID != c1ID || !upd.remove {
|
|
t.Fatalf("expected check watch update to be a removal of checkID=%q but found remove=%t checkID=%q",
|
|
c1ID, upd.remove, upd.checkID)
|
|
}
|
|
|
|
if err := ctx.syncOnce(); err != nil {
|
|
t.Fatalf("unexpected error syncing task: %v", err)
|
|
}
|
|
|
|
if n := len(ctx.FakeConsul.services); n != 1 {
|
|
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services)
|
|
}
|
|
|
|
if _, ok := ctx.FakeConsul.services[origServiceKey]; !ok {
|
|
t.Errorf("unexpected key change; was: %q -- but found %#v", origServiceKey, ctx.FakeConsul.services)
|
|
}
|
|
|
|
if n := len(ctx.FakeConsul.checks); n != 2 {
|
|
t.Fatalf("expected 2 check but found %d:\n%#v", n, ctx.FakeConsul.checks)
|
|
}
|
|
|
|
for k, v := range ctx.FakeConsul.checks {
|
|
switch v.Name {
|
|
case "c1":
|
|
if expected := fmt.Sprintf(":%d", xPort); v.TCP != expected {
|
|
t.Errorf("expected Port x=%v but found: %v", expected, v.TCP)
|
|
}
|
|
|
|
// update id
|
|
c1ID = k
|
|
case "c2":
|
|
if expected := fmt.Sprintf("http://:%d/", xPort); v.HTTP != expected {
|
|
t.Errorf("expected Port x=%v but found: %v", expected, v.HTTP)
|
|
}
|
|
default:
|
|
t.Errorf("Unknown check: %q", k)
|
|
}
|
|
}
|
|
|
|
// Check again and ensure the IDs changed
|
|
reg2, err := ctx.ServiceClient.AllocRegistrations(ctx.Task.AllocID)
|
|
if err != nil {
|
|
t.Fatalf("Looking up alloc registration failed: %v", err)
|
|
}
|
|
if reg2 == nil {
|
|
t.Fatalf("Nil alloc registrations: %v", err)
|
|
}
|
|
if num := reg2.NumServices(); num != 1 {
|
|
t.Fatalf("Wrong number of services: got %d; want 1", num)
|
|
}
|
|
if num := reg2.NumChecks(); num != 2 {
|
|
t.Fatalf("Wrong number of checks: got %d; want 2", num)
|
|
}
|
|
|
|
for task, treg := range reg1.Tasks {
|
|
otherTaskReg, ok := reg2.Tasks[task]
|
|
if !ok {
|
|
t.Fatalf("Task %q not in second reg", task)
|
|
}
|
|
|
|
for sID, sreg := range treg.Services {
|
|
otherServiceReg, ok := otherTaskReg.Services[sID]
|
|
if !ok {
|
|
t.Fatalf("service ID changed")
|
|
}
|
|
|
|
for newID := range sreg.checkIDs {
|
|
if _, ok := otherServiceReg.checkIDs[newID]; ok {
|
|
t.Fatalf("check IDs should change")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Alter a CheckRestart and make sure the watcher is updated but nothing else
|
|
origTask = ctx.Task.Copy()
|
|
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
|
|
{
|
|
Name: "c1",
|
|
Type: "tcp",
|
|
Interval: 2 * time.Second,
|
|
Timeout: time.Second,
|
|
PortLabel: "x",
|
|
CheckRestart: &structs.CheckRestart{
|
|
Limit: 11,
|
|
},
|
|
},
|
|
{
|
|
Name: "c2",
|
|
Type: "http",
|
|
Path: "/",
|
|
Interval: time.Second,
|
|
Timeout: time.Second,
|
|
PortLabel: "x",
|
|
},
|
|
}
|
|
if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil {
|
|
t.Fatalf("unexpected error registering task: %v", err)
|
|
}
|
|
if err := ctx.syncOnce(); err != nil {
|
|
t.Fatalf("unexpected error syncing task: %v", err)
|
|
}
|
|
|
|
if n := len(ctx.FakeConsul.checks); n != 2 {
|
|
t.Fatalf("expected 2 check but found %d:\n%#v", n, ctx.FakeConsul.checks)
|
|
}
|
|
|
|
for k, v := range ctx.FakeConsul.checks {
|
|
if v.Name == "c1" {
|
|
if k != c1ID {
|
|
t.Errorf("expected c1 to still have id %q but found %q", c1ID, k)
|
|
}
|
|
break
|
|
}
|
|
}
|
|
|
|
// Assert a check restart watch update was enqueued for a removal and an add
|
|
if n := len(ctx.ServiceClient.checkWatcher.checkUpdateCh); n != 1 {
|
|
t.Fatalf("expected 1 check restart update but found %d", n)
|
|
}
|
|
<-ctx.ServiceClient.checkWatcher.checkUpdateCh
|
|
}
|
|
|
|
// TestConsul_RegServices tests basic service registration.
|
|
func TestConsul_RegServices(t *testing.T) {
|
|
ctx := setupFake(t)
|
|
|
|
// Add a check w/restarting
|
|
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
|
|
{
|
|
Name: "testcheck",
|
|
Type: "tcp",
|
|
Interval: 100 * time.Millisecond,
|
|
CheckRestart: &structs.CheckRestart{
|
|
Limit: 3,
|
|
},
|
|
},
|
|
}
|
|
|
|
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
|
|
t.Fatalf("unexpected error registering task: %v", err)
|
|
}
|
|
|
|
if err := ctx.syncOnce(); err != nil {
|
|
t.Fatalf("unexpected error syncing task: %v", err)
|
|
}
|
|
|
|
if n := len(ctx.FakeConsul.services); n != 1 {
|
|
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services)
|
|
}
|
|
|
|
for _, v := range ctx.FakeConsul.services {
|
|
if v.Name != ctx.Task.Services[0].Name {
|
|
t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name)
|
|
}
|
|
if !reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) {
|
|
t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags)
|
|
}
|
|
if v.Port != xPort {
|
|
t.Errorf("expected Port=%d != %d", xPort, v.Port)
|
|
}
|
|
}
|
|
|
|
// Assert the check update is pending
|
|
if n := len(ctx.ServiceClient.checkWatcher.checkUpdateCh); n != 1 {
|
|
t.Fatalf("expected 1 check restart update but found %d", n)
|
|
}
|
|
|
|
// Assert the check update is properly formed
|
|
checkUpd := <-ctx.ServiceClient.checkWatcher.checkUpdateCh
|
|
if checkUpd.checkRestart.allocID != ctx.Task.AllocID {
|
|
t.Fatalf("expected check's allocid to be %q but found %q", "allocid", checkUpd.checkRestart.allocID)
|
|
}
|
|
if expected := 200 * time.Millisecond; checkUpd.checkRestart.timeLimit != expected {
|
|
t.Fatalf("expected check's time limit to be %v but found %v", expected, checkUpd.checkRestart.timeLimit)
|
|
}
|
|
|
|
// Make a change which will register a new service
|
|
ctx.Task.Services[0].Name = "taskname-service2"
|
|
ctx.Task.Services[0].Tags[0] = "tag3"
|
|
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
|
|
t.Fatalf("unexpected error registering task: %v", err)
|
|
}
|
|
|
|
// Assert check update is pending
|
|
if n := len(ctx.ServiceClient.checkWatcher.checkUpdateCh); n != 1 {
|
|
t.Fatalf("expected 1 check restart update but found %d", n)
|
|
}
|
|
|
|
// Assert the check update's id has changed
|
|
checkUpd2 := <-ctx.ServiceClient.checkWatcher.checkUpdateCh
|
|
if checkUpd.checkID == checkUpd2.checkID {
|
|
t.Fatalf("expected new check update to have a new ID both both have: %q", checkUpd.checkID)
|
|
}
|
|
|
|
// Make sure changes don't take affect until sync() is called (since
|
|
// Run() isn't running)
|
|
if n := len(ctx.FakeConsul.services); n != 1 {
|
|
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services)
|
|
}
|
|
for _, v := range ctx.FakeConsul.services {
|
|
if reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) {
|
|
t.Errorf("expected Tags to differ, changes applied before sync()")
|
|
}
|
|
}
|
|
|
|
// Now sync() and re-check for the applied updates
|
|
if err := ctx.syncOnce(); err != nil {
|
|
t.Fatalf("unexpected error syncing task: %v", err)
|
|
}
|
|
if n := len(ctx.FakeConsul.services); n != 2 {
|
|
t.Fatalf("expected 2 services but found %d:\n%#v", n, ctx.FakeConsul.services)
|
|
}
|
|
found := false
|
|
for _, v := range ctx.FakeConsul.services {
|
|
if v.Name == ctx.Task.Services[0].Name {
|
|
if found {
|
|
t.Fatalf("found new service name %q twice", v.Name)
|
|
}
|
|
found = true
|
|
if !reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) {
|
|
t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags)
|
|
}
|
|
}
|
|
}
|
|
if !found {
|
|
t.Fatalf("did not find new service %q", ctx.Task.Services[0].Name)
|
|
}
|
|
|
|
// Remove the new task
|
|
ctx.ServiceClient.RemoveTask(ctx.Task)
|
|
if err := ctx.syncOnce(); err != nil {
|
|
t.Fatalf("unexpected error syncing task: %v", err)
|
|
}
|
|
if n := len(ctx.FakeConsul.services); n != 1 {
|
|
t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services)
|
|
}
|
|
for _, v := range ctx.FakeConsul.services {
|
|
if v.Name != "taskname-service" {
|
|
t.Errorf("expected original task to survive not %q", v.Name)
|
|
}
|
|
}
|
|
|
|
// Assert check update is pending
|
|
if n := len(ctx.ServiceClient.checkWatcher.checkUpdateCh); n != 1 {
|
|
t.Fatalf("expected 1 check restart update but found %d", n)
|
|
}
|
|
|
|
// Assert the check update's id is correct and that it's a removal
|
|
checkUpd3 := <-ctx.ServiceClient.checkWatcher.checkUpdateCh
|
|
if checkUpd2.checkID != checkUpd3.checkID {
|
|
t.Fatalf("expected checkid %q but found %q", checkUpd2.checkID, checkUpd3.checkID)
|
|
}
|
|
if !checkUpd3.remove {
|
|
t.Fatalf("expected check watch removal update but found: %#v", checkUpd3)
|
|
}
|
|
}
|
|
|
|
// TestConsul_ShutdownOK tests the ok path for the shutdown logic in
|
|
// ServiceClient.
|
|
func TestConsul_ShutdownOK(t *testing.T) {
|
|
require := require.New(t)
|
|
ctx := setupFake(t)
|
|
|
|
// Add a script check to make sure its TTL gets updated
|
|
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
|
|
{
|
|
Name: "scriptcheck",
|
|
Type: "script",
|
|
Command: "true",
|
|
// Make check block until shutdown
|
|
Interval: 9000 * time.Hour,
|
|
Timeout: 10 * time.Second,
|
|
InitialStatus: "warning",
|
|
},
|
|
}
|
|
|
|
go ctx.ServiceClient.Run()
|
|
|
|
// Register a task and agent
|
|
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
|
|
t.Fatalf("unexpected error registering task: %v", err)
|
|
}
|
|
|
|
agentServices := []*structs.Service{
|
|
{
|
|
Name: "http",
|
|
Tags: []string{"nomad"},
|
|
PortLabel: "localhost:2345",
|
|
},
|
|
}
|
|
if err := ctx.ServiceClient.RegisterAgent("client", agentServices); err != nil {
|
|
t.Fatalf("unexpected error registering agent: %v", err)
|
|
}
|
|
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
return ctx.ServiceClient.hasSeen(), fmt.Errorf("error contacting Consul")
|
|
}, func(err error) {
|
|
t.Fatalf("err: %v", err)
|
|
})
|
|
|
|
// Shutdown should block until scripts finish
|
|
if err := ctx.ServiceClient.Shutdown(); err != nil {
|
|
t.Errorf("unexpected error shutting down client: %v", err)
|
|
}
|
|
|
|
// UpdateTTL should have been called once for the script check and once
|
|
// for shutdown
|
|
if n := len(ctx.FakeConsul.checkTTLs); n != 1 {
|
|
t.Fatalf("expected 1 checkTTL entry but found: %d", n)
|
|
}
|
|
for _, v := range ctx.FakeConsul.checkTTLs {
|
|
require.Equalf(2, v, "expected 2 updates but found %d", v)
|
|
}
|
|
for _, v := range ctx.FakeConsul.checks {
|
|
if v.Status != "passing" {
|
|
t.Fatalf("expected check to be passing but found %q", v.Status)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestConsul_ShutdownSlow tests the slow but ok path for the shutdown logic in
|
|
// ServiceClient.
|
|
func TestConsul_ShutdownSlow(t *testing.T) {
|
|
t.Parallel()
|
|
ctx := setupFake(t)
|
|
|
|
// Add a script check to make sure its TTL gets updated
|
|
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
|
|
{
|
|
Name: "scriptcheck",
|
|
Type: "script",
|
|
Command: "true",
|
|
// Make check block until shutdown
|
|
Interval: 9000 * time.Hour,
|
|
Timeout: 5 * time.Second,
|
|
InitialStatus: "warning",
|
|
},
|
|
}
|
|
|
|
// Make Exec slow, but not too slow
|
|
waiter := make(chan struct{})
|
|
ctx.MockExec.ExecFunc = func(ctx context.Context, cmd string, args []string) ([]byte, int, error) {
|
|
select {
|
|
case <-waiter:
|
|
default:
|
|
close(waiter)
|
|
}
|
|
time.Sleep(time.Second)
|
|
return []byte{}, 0, nil
|
|
}
|
|
|
|
// Make shutdown wait time just a bit longer than ctx.Exec takes
|
|
ctx.ServiceClient.shutdownWait = 3 * time.Second
|
|
|
|
go ctx.ServiceClient.Run()
|
|
|
|
// Register a task and agent
|
|
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
|
|
t.Fatalf("unexpected error registering task: %v", err)
|
|
}
|
|
|
|
// wait for Exec to get called before shutting down
|
|
<-waiter
|
|
|
|
// Shutdown should block until all enqueued operations finish.
|
|
preShutdown := time.Now()
|
|
if err := ctx.ServiceClient.Shutdown(); err != nil {
|
|
t.Errorf("unexpected error shutting down client: %v", err)
|
|
}
|
|
|
|
// Shutdown time should have taken: ~1s <= shutdown <= 3s
|
|
// actual timing might be less than 1s, to account for shutdown invocation overhead
|
|
shutdownTime := time.Now().Sub(preShutdown)
|
|
if shutdownTime < 900*time.Millisecond || shutdownTime > ctx.ServiceClient.shutdownWait {
|
|
t.Errorf("expected shutdown to take >1s and <%s but took: %s", ctx.ServiceClient.shutdownWait, shutdownTime)
|
|
}
|
|
|
|
// UpdateTTL should have been called once for the script check
|
|
if n := len(ctx.FakeConsul.checkTTLs); n != 1 {
|
|
t.Fatalf("expected 1 checkTTL entry but found: %d", n)
|
|
}
|
|
for _, v := range ctx.FakeConsul.checkTTLs {
|
|
if v != 1 {
|
|
t.Fatalf("expected script check to be updated once but found %d", v)
|
|
}
|
|
}
|
|
for _, v := range ctx.FakeConsul.checks {
|
|
if v.Status != "passing" {
|
|
t.Fatalf("expected check to be passing but found %q", v.Status)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestConsul_ShutdownBlocked tests the blocked past deadline path for the
|
|
// shutdown logic in ServiceClient.
|
|
func TestConsul_ShutdownBlocked(t *testing.T) {
|
|
t.Parallel()
|
|
ctx := setupFake(t)
|
|
|
|
// Add a script check to make sure its TTL gets updated
|
|
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
|
|
{
|
|
Name: "scriptcheck",
|
|
Type: "script",
|
|
Command: "true",
|
|
// Make check block until shutdown
|
|
Interval: 9000 * time.Hour,
|
|
Timeout: 9000 * time.Hour,
|
|
InitialStatus: "warning",
|
|
},
|
|
}
|
|
|
|
block := make(chan struct{})
|
|
defer close(block) // cleanup after test
|
|
|
|
// Make Exec block forever
|
|
waiter := make(chan struct{})
|
|
ctx.MockExec.ExecFunc = func(ctx context.Context, cmd string, args []string) ([]byte, int, error) {
|
|
close(waiter)
|
|
<-block
|
|
return []byte{}, 0, nil
|
|
}
|
|
|
|
// Use a short shutdown deadline since we're intentionally blocking forever
|
|
ctx.ServiceClient.shutdownWait = time.Second
|
|
|
|
go ctx.ServiceClient.Run()
|
|
|
|
// Register a task and agent
|
|
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
|
|
t.Fatalf("unexpected error registering task: %v", err)
|
|
}
|
|
|
|
// Wait for exec to be called
|
|
<-waiter
|
|
|
|
// Shutdown should block until all enqueued operations finish.
|
|
preShutdown := time.Now()
|
|
err := ctx.ServiceClient.Shutdown()
|
|
if err == nil {
|
|
t.Errorf("expected a timed out error from shutdown")
|
|
}
|
|
|
|
// Shutdown time should have taken shutdownWait; to avoid timing
|
|
// related errors simply test for wait <= shutdown <= wait+3s
|
|
shutdownTime := time.Now().Sub(preShutdown)
|
|
maxWait := ctx.ServiceClient.shutdownWait + (3 * time.Second)
|
|
if shutdownTime < ctx.ServiceClient.shutdownWait || shutdownTime > maxWait {
|
|
t.Errorf("expected shutdown to take >%s and <%s but took: %s", ctx.ServiceClient.shutdownWait, maxWait, shutdownTime)
|
|
}
|
|
|
|
// UpdateTTL should not have been called for the script check
|
|
if n := len(ctx.FakeConsul.checkTTLs); n != 0 {
|
|
t.Fatalf("expected 0 checkTTL entry but found: %d", n)
|
|
}
|
|
for _, v := range ctx.FakeConsul.checks {
|
|
if expected := "warning"; v.Status != expected {
|
|
t.Fatalf("expected check to be %q but found %q", expected, v.Status)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestConsul_RemoveScript assert removing a script check removes all objects
|
|
// related to that check.
|
|
func TestConsul_CancelScript(t *testing.T) {
|
|
ctx := setupFake(t)
|
|
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
|
|
{
|
|
Name: "scriptcheckDel",
|
|
Type: "script",
|
|
Interval: 9000 * time.Hour,
|
|
Timeout: 9000 * time.Hour,
|
|
},
|
|
{
|
|
Name: "scriptcheckKeep",
|
|
Type: "script",
|
|
Interval: 9000 * time.Hour,
|
|
Timeout: 9000 * time.Hour,
|
|
},
|
|
}
|
|
|
|
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
|
|
t.Fatalf("unexpected error registering task: %v", err)
|
|
}
|
|
|
|
if err := ctx.syncOnce(); err != nil {
|
|
t.Fatalf("unexpected error syncing task: %v", err)
|
|
}
|
|
|
|
if len(ctx.FakeConsul.checks) != 2 {
|
|
t.Errorf("expected 2 checks but found %d", len(ctx.FakeConsul.checks))
|
|
}
|
|
|
|
if len(ctx.ServiceClient.scripts) != 2 && len(ctx.ServiceClient.runningScripts) != 2 {
|
|
t.Errorf("expected 2 running script but found scripts=%d runningScripts=%d",
|
|
len(ctx.ServiceClient.scripts), len(ctx.ServiceClient.runningScripts))
|
|
}
|
|
|
|
for i := 0; i < 2; i++ {
|
|
select {
|
|
case <-ctx.MockExec.execs:
|
|
// Script ran as expected!
|
|
case <-time.After(3 * time.Second):
|
|
t.Fatalf("timed out waiting for script check to run")
|
|
}
|
|
}
|
|
|
|
// Remove a check and update the task
|
|
origTask := ctx.Task.Copy()
|
|
ctx.Task.Services[0].Checks = []*structs.ServiceCheck{
|
|
{
|
|
Name: "scriptcheckKeep",
|
|
Type: "script",
|
|
Interval: 9000 * time.Hour,
|
|
Timeout: 9000 * time.Hour,
|
|
},
|
|
}
|
|
|
|
if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil {
|
|
t.Fatalf("unexpected error registering task: %v", err)
|
|
}
|
|
|
|
if err := ctx.syncOnce(); err != nil {
|
|
t.Fatalf("unexpected error syncing task: %v", err)
|
|
}
|
|
|
|
if len(ctx.FakeConsul.checks) != 1 {
|
|
t.Errorf("expected 1 check but found %d", len(ctx.FakeConsul.checks))
|
|
}
|
|
|
|
if len(ctx.ServiceClient.scripts) != 1 && len(ctx.ServiceClient.runningScripts) != 1 {
|
|
t.Errorf("expected 1 running script but found scripts=%d runningScripts=%d",
|
|
len(ctx.ServiceClient.scripts), len(ctx.ServiceClient.runningScripts))
|
|
}
|
|
|
|
// Make sure exec wasn't called again
|
|
select {
|
|
case <-ctx.MockExec.execs:
|
|
t.Errorf("unexpected execution of script; was goroutine not cancelled?")
|
|
case <-time.After(100 * time.Millisecond):
|
|
// No unexpected script execs
|
|
}
|
|
|
|
// Don't leak goroutines
|
|
for _, scriptHandle := range ctx.ServiceClient.runningScripts {
|
|
scriptHandle.cancel()
|
|
}
|
|
}
|
|
|
|
// TestConsul_DriverNetwork_AutoUse asserts that if a driver network has
|
|
// auto-use set then services should advertise it unless explicitly set to
|
|
// host. Checks should always use host.
|
|
func TestConsul_DriverNetwork_AutoUse(t *testing.T) {
|
|
t.Parallel()
|
|
ctx := setupFake(t)
|
|
|
|
ctx.Task.Services = []*structs.Service{
|
|
{
|
|
Name: "auto-advertise-x",
|
|
PortLabel: "x",
|
|
AddressMode: structs.AddressModeAuto,
|
|
Checks: []*structs.ServiceCheck{
|
|
{
|
|
Name: "default-check-x",
|
|
Type: "tcp",
|
|
Interval: time.Second,
|
|
Timeout: time.Second,
|
|
},
|
|
{
|
|
Name: "weird-y-check",
|
|
Type: "http",
|
|
Interval: time.Second,
|
|
Timeout: time.Second,
|
|
PortLabel: "y",
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Name: "driver-advertise-y",
|
|
PortLabel: "y",
|
|
AddressMode: structs.AddressModeDriver,
|
|
Checks: []*structs.ServiceCheck{
|
|
{
|
|
Name: "default-check-y",
|
|
Type: "tcp",
|
|
Interval: time.Second,
|
|
Timeout: time.Second,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Name: "host-advertise-y",
|
|
PortLabel: "y",
|
|
AddressMode: structs.AddressModeHost,
|
|
},
|
|
}
|
|
|
|
ctx.Task.DriverNetwork = &drivers.DriverNetwork{
|
|
PortMap: map[string]int{
|
|
"x": 8888,
|
|
"y": 9999,
|
|
},
|
|
IP: "172.18.0.2",
|
|
AutoAdvertise: true,
|
|
}
|
|
|
|
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
|
|
t.Fatalf("unexpected error registering task: %v", err)
|
|
}
|
|
|
|
if err := ctx.syncOnce(); err != nil {
|
|
t.Fatalf("unexpected error syncing task: %v", err)
|
|
}
|
|
|
|
if n := len(ctx.FakeConsul.services); n != 3 {
|
|
t.Fatalf("expected 2 services but found: %d", n)
|
|
}
|
|
|
|
for _, v := range ctx.FakeConsul.services {
|
|
switch v.Name {
|
|
case ctx.Task.Services[0].Name: // x
|
|
// Since DriverNetwork.AutoAdvertise=true, driver ports should be used
|
|
if v.Port != ctx.Task.DriverNetwork.PortMap["x"] {
|
|
t.Errorf("expected service %s's port to be %d but found %d",
|
|
v.Name, ctx.Task.DriverNetwork.PortMap["x"], v.Port)
|
|
}
|
|
// The order of checks in Consul is not guaranteed to
|
|
// be the same as their order in the Task definition,
|
|
// so check in a loop
|
|
if expected := 2; len(v.Checks) != expected {
|
|
t.Errorf("expected %d checks but found %d", expected, len(v.Checks))
|
|
}
|
|
for _, c := range v.Checks {
|
|
// No name on AgentServiceChecks, use type
|
|
switch {
|
|
case c.TCP != "":
|
|
// Checks should always use host port though
|
|
if c.TCP != ":1234" { // xPort
|
|
t.Errorf("expected service %s check 1's port to be %d but found %q",
|
|
v.Name, xPort, c.TCP)
|
|
}
|
|
case c.HTTP != "":
|
|
if c.HTTP != "http://:1235" { // yPort
|
|
t.Errorf("expected service %s check 2's port to be %d but found %q",
|
|
v.Name, yPort, c.HTTP)
|
|
}
|
|
default:
|
|
t.Errorf("unexpected check %#v on service %q", c, v.Name)
|
|
}
|
|
}
|
|
case ctx.Task.Services[1].Name: // y
|
|
// Service should be container ip:port
|
|
if v.Address != ctx.Task.DriverNetwork.IP {
|
|
t.Errorf("expected service %s's address to be %s but found %s",
|
|
v.Name, ctx.Task.DriverNetwork.IP, v.Address)
|
|
}
|
|
if v.Port != ctx.Task.DriverNetwork.PortMap["y"] {
|
|
t.Errorf("expected service %s's port to be %d but found %d",
|
|
v.Name, ctx.Task.DriverNetwork.PortMap["x"], v.Port)
|
|
}
|
|
// Check should be host ip:port
|
|
if v.Checks[0].TCP != ":1235" { // yPort
|
|
t.Errorf("expected service %s check's port to be %d but found %s",
|
|
v.Name, yPort, v.Checks[0].TCP)
|
|
}
|
|
case ctx.Task.Services[2].Name: // y + host mode
|
|
if v.Port != yPort {
|
|
t.Errorf("expected service %s's port to be %d but found %d",
|
|
v.Name, yPort, v.Port)
|
|
}
|
|
default:
|
|
t.Errorf("unexpected service name: %q", v.Name)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestConsul_DriverNetwork_NoAutoUse asserts that if a driver network doesn't
|
|
// set auto-use only services which request the driver's network should
|
|
// advertise it.
|
|
func TestConsul_DriverNetwork_NoAutoUse(t *testing.T) {
|
|
t.Parallel()
|
|
ctx := setupFake(t)
|
|
|
|
ctx.Task.Services = []*structs.Service{
|
|
{
|
|
Name: "auto-advertise-x",
|
|
PortLabel: "x",
|
|
AddressMode: structs.AddressModeAuto,
|
|
},
|
|
{
|
|
Name: "driver-advertise-y",
|
|
PortLabel: "y",
|
|
AddressMode: structs.AddressModeDriver,
|
|
},
|
|
{
|
|
Name: "host-advertise-y",
|
|
PortLabel: "y",
|
|
AddressMode: structs.AddressModeHost,
|
|
},
|
|
}
|
|
|
|
ctx.Task.DriverNetwork = &drivers.DriverNetwork{
|
|
PortMap: map[string]int{
|
|
"x": 8888,
|
|
"y": 9999,
|
|
},
|
|
IP: "172.18.0.2",
|
|
AutoAdvertise: false,
|
|
}
|
|
|
|
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
|
|
t.Fatalf("unexpected error registering task: %v", err)
|
|
}
|
|
|
|
if err := ctx.syncOnce(); err != nil {
|
|
t.Fatalf("unexpected error syncing task: %v", err)
|
|
}
|
|
|
|
if n := len(ctx.FakeConsul.services); n != 3 {
|
|
t.Fatalf("expected 3 services but found: %d", n)
|
|
}
|
|
|
|
for _, v := range ctx.FakeConsul.services {
|
|
switch v.Name {
|
|
case ctx.Task.Services[0].Name: // x + auto
|
|
// Since DriverNetwork.AutoAdvertise=false, host ports should be used
|
|
if v.Port != xPort {
|
|
t.Errorf("expected service %s's port to be %d but found %d",
|
|
v.Name, xPort, v.Port)
|
|
}
|
|
case ctx.Task.Services[1].Name: // y + driver mode
|
|
// Service should be container ip:port
|
|
if v.Address != ctx.Task.DriverNetwork.IP {
|
|
t.Errorf("expected service %s's address to be %s but found %s",
|
|
v.Name, ctx.Task.DriverNetwork.IP, v.Address)
|
|
}
|
|
if v.Port != ctx.Task.DriverNetwork.PortMap["y"] {
|
|
t.Errorf("expected service %s's port to be %d but found %d",
|
|
v.Name, ctx.Task.DriverNetwork.PortMap["x"], v.Port)
|
|
}
|
|
case ctx.Task.Services[2].Name: // y + host mode
|
|
if v.Port != yPort {
|
|
t.Errorf("expected service %s's port to be %d but found %d",
|
|
v.Name, yPort, v.Port)
|
|
}
|
|
default:
|
|
t.Errorf("unexpected service name: %q", v.Name)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestConsul_DriverNetwork_Change asserts that if a driver network is
|
|
// specified and a service updates its use its properly updated in Consul.
|
|
func TestConsul_DriverNetwork_Change(t *testing.T) {
|
|
t.Parallel()
|
|
ctx := setupFake(t)
|
|
|
|
ctx.Task.Services = []*structs.Service{
|
|
{
|
|
Name: "service-foo",
|
|
PortLabel: "x",
|
|
AddressMode: structs.AddressModeAuto,
|
|
},
|
|
}
|
|
|
|
ctx.Task.DriverNetwork = &drivers.DriverNetwork{
|
|
PortMap: map[string]int{
|
|
"x": 8888,
|
|
"y": 9999,
|
|
},
|
|
IP: "172.18.0.2",
|
|
AutoAdvertise: false,
|
|
}
|
|
|
|
syncAndAssertPort := func(port int) {
|
|
if err := ctx.syncOnce(); err != nil {
|
|
t.Fatalf("unexpected error syncing task: %v", err)
|
|
}
|
|
|
|
if n := len(ctx.FakeConsul.services); n != 1 {
|
|
t.Fatalf("expected 1 service but found: %d", n)
|
|
}
|
|
|
|
for _, v := range ctx.FakeConsul.services {
|
|
switch v.Name {
|
|
case ctx.Task.Services[0].Name:
|
|
if v.Port != port {
|
|
t.Errorf("expected service %s's port to be %d but found %d",
|
|
v.Name, port, v.Port)
|
|
}
|
|
default:
|
|
t.Errorf("unexpected service name: %q", v.Name)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Initial service should advertise host port x
|
|
if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil {
|
|
t.Fatalf("unexpected error registering task: %v", err)
|
|
}
|
|
|
|
syncAndAssertPort(xPort)
|
|
|
|
// UpdateTask to use Host (shouldn't change anything)
|
|
origTask := ctx.Task.Copy()
|
|
ctx.Task.Services[0].AddressMode = structs.AddressModeHost
|
|
|
|
if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil {
|
|
t.Fatalf("unexpected error updating task: %v", err)
|
|
}
|
|
|
|
syncAndAssertPort(xPort)
|
|
|
|
// UpdateTask to use Driver (*should* change IP and port)
|
|
origTask = ctx.Task.Copy()
|
|
ctx.Task.Services[0].AddressMode = structs.AddressModeDriver
|
|
|
|
if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil {
|
|
t.Fatalf("unexpected error updating task: %v", err)
|
|
}
|
|
|
|
syncAndAssertPort(ctx.Task.DriverNetwork.PortMap["x"])
|
|
}
|
|
|
|
// TestConsul_CanaryTags asserts CanaryTags are used when Canary=true
|
|
func TestConsul_CanaryTags(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
ctx := setupFake(t)
|
|
|
|
canaryTags := []string{"tag1", "canary"}
|
|
ctx.Task.Canary = true
|
|
ctx.Task.Services[0].CanaryTags = canaryTags
|
|
|
|
require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task))
|
|
require.NoError(ctx.syncOnce())
|
|
require.Len(ctx.FakeConsul.services, 1)
|
|
for _, service := range ctx.FakeConsul.services {
|
|
require.Equal(canaryTags, service.Tags)
|
|
}
|
|
|
|
// Disable canary and assert tags are not the canary tags
|
|
origTask := ctx.Task.Copy()
|
|
ctx.Task.Canary = false
|
|
require.NoError(ctx.ServiceClient.UpdateTask(origTask, ctx.Task))
|
|
require.NoError(ctx.syncOnce())
|
|
require.Len(ctx.FakeConsul.services, 1)
|
|
for _, service := range ctx.FakeConsul.services {
|
|
require.NotEqual(canaryTags, service.Tags)
|
|
}
|
|
|
|
ctx.ServiceClient.RemoveTask(ctx.Task)
|
|
require.NoError(ctx.syncOnce())
|
|
require.Len(ctx.FakeConsul.services, 0)
|
|
}
|
|
|
|
// TestConsul_CanaryTags_NoTags asserts Tags are used when Canary=true and there
|
|
// are no specified canary tags
|
|
func TestConsul_CanaryTags_NoTags(t *testing.T) {
|
|
t.Parallel()
|
|
require := require.New(t)
|
|
ctx := setupFake(t)
|
|
|
|
tags := []string{"tag1", "foo"}
|
|
ctx.Task.Canary = true
|
|
ctx.Task.Services[0].Tags = tags
|
|
|
|
require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task))
|
|
require.NoError(ctx.syncOnce())
|
|
require.Len(ctx.FakeConsul.services, 1)
|
|
for _, service := range ctx.FakeConsul.services {
|
|
require.Equal(tags, service.Tags)
|
|
}
|
|
|
|
// Disable canary and assert tags dont change
|
|
origTask := ctx.Task.Copy()
|
|
ctx.Task.Canary = false
|
|
require.NoError(ctx.ServiceClient.UpdateTask(origTask, ctx.Task))
|
|
require.NoError(ctx.syncOnce())
|
|
require.Len(ctx.FakeConsul.services, 1)
|
|
for _, service := range ctx.FakeConsul.services {
|
|
require.Equal(tags, service.Tags)
|
|
}
|
|
|
|
ctx.ServiceClient.RemoveTask(ctx.Task)
|
|
require.NoError(ctx.syncOnce())
|
|
require.Len(ctx.FakeConsul.services, 0)
|
|
}
|
|
|
|
// TestConsul_PeriodicSync asserts that Nomad periodically reconciles with
|
|
// Consul.
|
|
func TestConsul_PeriodicSync(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
ctx := setupFake(t)
|
|
defer ctx.ServiceClient.Shutdown()
|
|
|
|
// Lower periodic sync interval to speed up test
|
|
ctx.ServiceClient.periodicInterval = 2 * time.Millisecond
|
|
|
|
// Run for 10ms and assert hits >= 5 because each sync() calls multiple
|
|
// Consul APIs
|
|
go ctx.ServiceClient.Run()
|
|
|
|
select {
|
|
case <-ctx.ServiceClient.exitCh:
|
|
t.Fatalf("exited unexpectedly")
|
|
case <-time.After(10 * time.Millisecond):
|
|
}
|
|
|
|
minHits := 5
|
|
if hits := ctx.FakeConsul.getHits(); hits < minHits {
|
|
t.Fatalf("expected at least %d hits but found %d", minHits, hits)
|
|
}
|
|
}
|
|
|
|
// TestIsNomadService asserts the isNomadService helper returns true for Nomad
|
|
// task IDs and false for unknown IDs and Nomad agent IDs (see #2827).
|
|
func TestIsNomadService(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
tests := []struct {
|
|
id string
|
|
result bool
|
|
}{
|
|
{"_nomad-client-nomad-client-http", false},
|
|
{"_nomad-server-nomad-serf", false},
|
|
|
|
// Pre-0.7.1 style IDs still match
|
|
{"_nomad-executor-abc", true},
|
|
{"_nomad-executor", true},
|
|
|
|
// Post-0.7.1 style IDs match
|
|
{"_nomad-task-FBBK265QN4TMT25ND4EP42TJVMYJ3HR4", true},
|
|
|
|
{"not-nomad", false},
|
|
{"_nomad", false},
|
|
}
|
|
|
|
for _, test := range tests {
|
|
t.Run(test.id, func(t *testing.T) {
|
|
actual := isNomadService(test.id)
|
|
if actual != test.result {
|
|
t.Errorf("%q should be %t but found %t", test.id, test.result, actual)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// TestCreateCheckReg_HTTP asserts Nomad ServiceCheck structs are properly
|
|
// converted to Consul API AgentCheckRegistrations for HTTP checks.
|
|
func TestCreateCheckReg_HTTP(t *testing.T) {
|
|
t.Parallel()
|
|
check := &structs.ServiceCheck{
|
|
Name: "name",
|
|
Type: "http",
|
|
Path: "/path",
|
|
PortLabel: "label",
|
|
Method: "POST",
|
|
Header: map[string][]string{
|
|
"Foo": {"bar"},
|
|
},
|
|
}
|
|
|
|
serviceID := "testService"
|
|
checkID := check.Hash(serviceID)
|
|
host := "localhost"
|
|
port := 41111
|
|
|
|
expected := &api.AgentCheckRegistration{
|
|
ID: checkID,
|
|
Name: "name",
|
|
ServiceID: serviceID,
|
|
AgentServiceCheck: api.AgentServiceCheck{
|
|
Timeout: "0s",
|
|
Interval: "0s",
|
|
HTTP: fmt.Sprintf("http://%s:%d/path", host, port),
|
|
Method: "POST",
|
|
Header: map[string][]string{
|
|
"Foo": {"bar"},
|
|
},
|
|
},
|
|
}
|
|
|
|
actual, err := createCheckReg(serviceID, checkID, check, host, port)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
if diff := pretty.Diff(actual, expected); len(diff) > 0 {
|
|
t.Fatalf("diff:\n%s\n", strings.Join(diff, "\n"))
|
|
}
|
|
}
|
|
|
|
// TestCreateCheckReg_GRPC asserts Nomad ServiceCheck structs are properly
|
|
// converted to Consul API AgentCheckRegistrations for GRPC checks.
|
|
func TestCreateCheckReg_GRPC(t *testing.T) {
|
|
t.Parallel()
|
|
check := &structs.ServiceCheck{
|
|
Name: "name",
|
|
Type: "grpc",
|
|
PortLabel: "label",
|
|
GRPCService: "foo.Bar",
|
|
GRPCUseTLS: true,
|
|
TLSSkipVerify: true,
|
|
Timeout: time.Second,
|
|
Interval: time.Minute,
|
|
}
|
|
|
|
serviceID := "testService"
|
|
checkID := check.Hash(serviceID)
|
|
|
|
expected := &api.AgentCheckRegistration{
|
|
ID: checkID,
|
|
Name: "name",
|
|
ServiceID: serviceID,
|
|
AgentServiceCheck: api.AgentServiceCheck{
|
|
Timeout: "1s",
|
|
Interval: "1m0s",
|
|
GRPC: "localhost:8080/foo.Bar",
|
|
GRPCUseTLS: true,
|
|
TLSSkipVerify: true,
|
|
},
|
|
}
|
|
|
|
actual, err := createCheckReg(serviceID, checkID, check, "localhost", 8080)
|
|
require.NoError(t, err)
|
|
require.Equal(t, expected, actual)
|
|
}
|
|
|
|
// TestGetAddress asserts Nomad uses the correct ip and port for services and
|
|
// checks depending on port labels, driver networks, and address mode.
|
|
func TestGetAddress(t *testing.T) {
|
|
const HostIP = "127.0.0.1"
|
|
|
|
cases := []struct {
|
|
Name string
|
|
|
|
// Parameters
|
|
Mode string
|
|
PortLabel string
|
|
Host map[string]int // will be converted to structs.Networks
|
|
Driver *drivers.DriverNetwork
|
|
|
|
// Results
|
|
ExpectedIP string
|
|
ExpectedPort int
|
|
ExpectedErr string
|
|
}{
|
|
// Valid Configurations
|
|
{
|
|
Name: "ExampleService",
|
|
Mode: structs.AddressModeAuto,
|
|
PortLabel: "db",
|
|
Host: map[string]int{"db": 12435},
|
|
Driver: &drivers.DriverNetwork{
|
|
PortMap: map[string]int{"db": 6379},
|
|
IP: "10.1.2.3",
|
|
},
|
|
ExpectedIP: HostIP,
|
|
ExpectedPort: 12435,
|
|
},
|
|
{
|
|
Name: "Host",
|
|
Mode: structs.AddressModeHost,
|
|
PortLabel: "db",
|
|
Host: map[string]int{"db": 12345},
|
|
Driver: &drivers.DriverNetwork{
|
|
PortMap: map[string]int{"db": 6379},
|
|
IP: "10.1.2.3",
|
|
},
|
|
ExpectedIP: HostIP,
|
|
ExpectedPort: 12345,
|
|
},
|
|
{
|
|
Name: "Driver",
|
|
Mode: structs.AddressModeDriver,
|
|
PortLabel: "db",
|
|
Host: map[string]int{"db": 12345},
|
|
Driver: &drivers.DriverNetwork{
|
|
PortMap: map[string]int{"db": 6379},
|
|
IP: "10.1.2.3",
|
|
},
|
|
ExpectedIP: "10.1.2.3",
|
|
ExpectedPort: 6379,
|
|
},
|
|
{
|
|
Name: "AutoDriver",
|
|
Mode: structs.AddressModeAuto,
|
|
PortLabel: "db",
|
|
Host: map[string]int{"db": 12345},
|
|
Driver: &drivers.DriverNetwork{
|
|
PortMap: map[string]int{"db": 6379},
|
|
IP: "10.1.2.3",
|
|
AutoAdvertise: true,
|
|
},
|
|
ExpectedIP: "10.1.2.3",
|
|
ExpectedPort: 6379,
|
|
},
|
|
{
|
|
Name: "DriverCustomPort",
|
|
Mode: structs.AddressModeDriver,
|
|
PortLabel: "7890",
|
|
Host: map[string]int{"db": 12345},
|
|
Driver: &drivers.DriverNetwork{
|
|
PortMap: map[string]int{"db": 6379},
|
|
IP: "10.1.2.3",
|
|
},
|
|
ExpectedIP: "10.1.2.3",
|
|
ExpectedPort: 7890,
|
|
},
|
|
|
|
// Invalid Configurations
|
|
{
|
|
Name: "DriverWithoutNetwork",
|
|
Mode: structs.AddressModeDriver,
|
|
PortLabel: "db",
|
|
Host: map[string]int{"db": 12345},
|
|
Driver: nil,
|
|
ExpectedErr: "no driver network exists",
|
|
},
|
|
{
|
|
Name: "DriverBadPort",
|
|
Mode: structs.AddressModeDriver,
|
|
PortLabel: "bad-port-label",
|
|
Host: map[string]int{"db": 12345},
|
|
Driver: &drivers.DriverNetwork{
|
|
PortMap: map[string]int{"db": 6379},
|
|
IP: "10.1.2.3",
|
|
},
|
|
ExpectedErr: "invalid port",
|
|
},
|
|
{
|
|
Name: "DriverZeroPort",
|
|
Mode: structs.AddressModeDriver,
|
|
PortLabel: "0",
|
|
Driver: &drivers.DriverNetwork{
|
|
IP: "10.1.2.3",
|
|
},
|
|
ExpectedErr: "invalid port",
|
|
},
|
|
{
|
|
Name: "HostBadPort",
|
|
Mode: structs.AddressModeHost,
|
|
PortLabel: "bad-port-label",
|
|
ExpectedErr: "invalid port",
|
|
},
|
|
{
|
|
Name: "InvalidMode",
|
|
Mode: "invalid-mode",
|
|
PortLabel: "80",
|
|
ExpectedErr: "invalid address mode",
|
|
},
|
|
{
|
|
Name: "NoPort_AutoMode",
|
|
Mode: structs.AddressModeAuto,
|
|
ExpectedIP: HostIP,
|
|
},
|
|
{
|
|
Name: "NoPort_HostMode",
|
|
Mode: structs.AddressModeHost,
|
|
ExpectedIP: HostIP,
|
|
},
|
|
{
|
|
Name: "NoPort_DriverMode",
|
|
Mode: structs.AddressModeDriver,
|
|
Driver: &drivers.DriverNetwork{
|
|
IP: "10.1.2.3",
|
|
},
|
|
ExpectedIP: "10.1.2.3",
|
|
},
|
|
}
|
|
|
|
for _, tc := range cases {
|
|
t.Run(tc.Name, func(t *testing.T) {
|
|
// convert host port map into a structs.Networks
|
|
networks := []*structs.NetworkResource{
|
|
{
|
|
IP: HostIP,
|
|
ReservedPorts: make([]structs.Port, len(tc.Host)),
|
|
},
|
|
}
|
|
|
|
i := 0
|
|
for label, port := range tc.Host {
|
|
networks[0].ReservedPorts[i].Label = label
|
|
networks[0].ReservedPorts[i].Value = port
|
|
i++
|
|
}
|
|
|
|
// Run getAddress
|
|
ip, port, err := getAddress(tc.Mode, tc.PortLabel, networks, tc.Driver)
|
|
|
|
// Assert the results
|
|
assert.Equal(t, tc.ExpectedIP, ip, "IP mismatch")
|
|
assert.Equal(t, tc.ExpectedPort, port, "Port mismatch")
|
|
if tc.ExpectedErr == "" {
|
|
assert.Nil(t, err)
|
|
} else {
|
|
if err == nil {
|
|
t.Fatalf("expected error containing %q but err=nil", tc.ExpectedErr)
|
|
} else {
|
|
assert.Contains(t, err.Error(), tc.ExpectedErr)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestConsul_ServiceName_Duplicates(t *testing.T) {
|
|
t.Parallel()
|
|
ctx := setupFake(t)
|
|
require := require.New(t)
|
|
|
|
ctx.Task.Services = []*structs.Service{
|
|
{
|
|
Name: "best-service",
|
|
PortLabel: "x",
|
|
Tags: []string{
|
|
"foo",
|
|
},
|
|
Checks: []*structs.ServiceCheck{
|
|
{
|
|
Name: "check-a",
|
|
Type: "tcp",
|
|
Interval: time.Second,
|
|
Timeout: time.Second,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Name: "best-service",
|
|
PortLabel: "y",
|
|
Tags: []string{
|
|
"bar",
|
|
},
|
|
Checks: []*structs.ServiceCheck{
|
|
{
|
|
Name: "checky-mccheckface",
|
|
Type: "tcp",
|
|
Interval: time.Second,
|
|
Timeout: time.Second,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Name: "worst-service",
|
|
PortLabel: "y",
|
|
},
|
|
}
|
|
|
|
require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task))
|
|
|
|
require.NoError(ctx.syncOnce())
|
|
|
|
require.Len(ctx.FakeConsul.services, 3)
|
|
|
|
for _, v := range ctx.FakeConsul.services {
|
|
if v.Name == ctx.Task.Services[0].Name && v.Port == xPort {
|
|
require.ElementsMatch(v.Tags, ctx.Task.Services[0].Tags)
|
|
require.Len(v.Checks, 1)
|
|
} else if v.Name == ctx.Task.Services[1].Name && v.Port == yPort {
|
|
require.ElementsMatch(v.Tags, ctx.Task.Services[1].Tags)
|
|
require.Len(v.Checks, 1)
|
|
} else if v.Name == ctx.Task.Services[2].Name {
|
|
require.Len(v.Checks, 0)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestConsul_ServiceDeregistration_OutOfProbation asserts that during in steady
|
|
// state we remove any services we don't reconize locally
|
|
func TestConsul_ServiceDeregistration_OutProbation(t *testing.T) {
|
|
t.Parallel()
|
|
ctx := setupFake(t)
|
|
require := require.New(t)
|
|
|
|
ctx.ServiceClient.deregisterProbationExpiry = time.Now().Add(-1 * time.Hour)
|
|
|
|
remainingTask := testTask()
|
|
remainingTask.Services = []*structs.Service{
|
|
{
|
|
Name: "remaining-service",
|
|
PortLabel: "x",
|
|
Checks: []*structs.ServiceCheck{
|
|
{
|
|
Name: "check",
|
|
Type: "tcp",
|
|
Interval: time.Second,
|
|
Timeout: time.Second,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
remainingTaskServiceID := MakeTaskServiceID(remainingTask.AllocID,
|
|
remainingTask.Name, remainingTask.Services[0], false)
|
|
|
|
require.NoError(ctx.ServiceClient.RegisterTask(remainingTask))
|
|
require.NoError(ctx.syncOnce())
|
|
require.Len(ctx.FakeConsul.services, 1)
|
|
require.Len(ctx.FakeConsul.checks, 1)
|
|
|
|
explicitlyRemovedTask := testTask()
|
|
explicitlyRemovedTask.Services = []*structs.Service{
|
|
{
|
|
Name: "explicitly-removed-service",
|
|
PortLabel: "y",
|
|
Checks: []*structs.ServiceCheck{
|
|
{
|
|
Name: "check",
|
|
Type: "tcp",
|
|
Interval: time.Second,
|
|
Timeout: time.Second,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
explicitlyRemovedTaskServiceID := MakeTaskServiceID(explicitlyRemovedTask.AllocID,
|
|
explicitlyRemovedTask.Name, explicitlyRemovedTask.Services[0], false)
|
|
|
|
require.NoError(ctx.ServiceClient.RegisterTask(explicitlyRemovedTask))
|
|
|
|
require.NoError(ctx.syncOnce())
|
|
require.Len(ctx.FakeConsul.services, 2)
|
|
require.Len(ctx.FakeConsul.checks, 2)
|
|
|
|
// we register a task through nomad API then remove it out of band
|
|
outofbandTask := testTask()
|
|
outofbandTask.Services = []*structs.Service{
|
|
{
|
|
Name: "unknown-service",
|
|
PortLabel: "x",
|
|
Checks: []*structs.ServiceCheck{
|
|
{
|
|
Name: "check",
|
|
Type: "tcp",
|
|
Interval: time.Second,
|
|
Timeout: time.Second,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
outofbandTaskServiceID := MakeTaskServiceID(outofbandTask.AllocID,
|
|
outofbandTask.Name, outofbandTask.Services[0], false)
|
|
|
|
require.NoError(ctx.ServiceClient.RegisterTask(outofbandTask))
|
|
require.NoError(ctx.syncOnce())
|
|
|
|
require.Len(ctx.FakeConsul.services, 3)
|
|
|
|
// remove outofbandTask from local services so it appears unknown to client
|
|
require.Len(ctx.ServiceClient.services, 3)
|
|
require.Len(ctx.ServiceClient.checks, 3)
|
|
|
|
delete(ctx.ServiceClient.services, outofbandTaskServiceID)
|
|
delete(ctx.ServiceClient.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
|
|
|
|
require.Len(ctx.ServiceClient.services, 2)
|
|
require.Len(ctx.ServiceClient.checks, 2)
|
|
|
|
// Sync and ensure that explicitly removed service as well as outofbandTask were removed
|
|
|
|
ctx.ServiceClient.RemoveTask(explicitlyRemovedTask)
|
|
require.NoError(ctx.syncOnce())
|
|
require.NoError(ctx.ServiceClient.sync())
|
|
require.Len(ctx.FakeConsul.services, 1)
|
|
require.Len(ctx.FakeConsul.checks, 1)
|
|
|
|
require.Contains(ctx.FakeConsul.services, remainingTaskServiceID)
|
|
require.NotContains(ctx.FakeConsul.services, outofbandTaskServiceID)
|
|
require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID)
|
|
|
|
require.Contains(ctx.FakeConsul.checks, makeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0]))
|
|
require.NotContains(ctx.FakeConsul.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
|
|
require.NotContains(ctx.FakeConsul.checks, makeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0]))
|
|
}
|
|
|
|
// TestConsul_ServiceDeregistration_InProbation asserts that during initialization
|
|
// we only deregister services that were explicitly removed and leave unknown
|
|
// services untouched. This adds a grace period for restoring recovered tasks
|
|
// before deregistering them
|
|
func TestConsul_ServiceDeregistration_InProbation(t *testing.T) {
|
|
t.Parallel()
|
|
ctx := setupFake(t)
|
|
require := require.New(t)
|
|
|
|
ctx.ServiceClient.deregisterProbationExpiry = time.Now().Add(1 * time.Hour)
|
|
|
|
remainingTask := testTask()
|
|
remainingTask.Services = []*structs.Service{
|
|
{
|
|
Name: "remaining-service",
|
|
PortLabel: "x",
|
|
Checks: []*structs.ServiceCheck{
|
|
{
|
|
Name: "check",
|
|
Type: "tcp",
|
|
Interval: time.Second,
|
|
Timeout: time.Second,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
remainingTaskServiceID := MakeTaskServiceID(remainingTask.AllocID,
|
|
remainingTask.Name, remainingTask.Services[0], false)
|
|
|
|
require.NoError(ctx.ServiceClient.RegisterTask(remainingTask))
|
|
require.NoError(ctx.syncOnce())
|
|
require.Len(ctx.FakeConsul.services, 1)
|
|
require.Len(ctx.FakeConsul.checks, 1)
|
|
|
|
explicitlyRemovedTask := testTask()
|
|
explicitlyRemovedTask.Services = []*structs.Service{
|
|
{
|
|
Name: "explicitly-removed-service",
|
|
PortLabel: "y",
|
|
Checks: []*structs.ServiceCheck{
|
|
{
|
|
Name: "check",
|
|
Type: "tcp",
|
|
Interval: time.Second,
|
|
Timeout: time.Second,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
explicitlyRemovedTaskServiceID := MakeTaskServiceID(explicitlyRemovedTask.AllocID,
|
|
explicitlyRemovedTask.Name, explicitlyRemovedTask.Services[0], false)
|
|
|
|
require.NoError(ctx.ServiceClient.RegisterTask(explicitlyRemovedTask))
|
|
|
|
require.NoError(ctx.syncOnce())
|
|
require.Len(ctx.FakeConsul.services, 2)
|
|
require.Len(ctx.FakeConsul.checks, 2)
|
|
|
|
// we register a task through nomad API then remove it out of band
|
|
outofbandTask := testTask()
|
|
outofbandTask.Services = []*structs.Service{
|
|
{
|
|
Name: "unknown-service",
|
|
PortLabel: "x",
|
|
Checks: []*structs.ServiceCheck{
|
|
{
|
|
Name: "check",
|
|
Type: "tcp",
|
|
Interval: time.Second,
|
|
Timeout: time.Second,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
outofbandTaskServiceID := MakeTaskServiceID(outofbandTask.AllocID,
|
|
outofbandTask.Name, outofbandTask.Services[0], false)
|
|
|
|
require.NoError(ctx.ServiceClient.RegisterTask(outofbandTask))
|
|
require.NoError(ctx.syncOnce())
|
|
|
|
require.Len(ctx.FakeConsul.services, 3)
|
|
|
|
// remove outofbandTask from local services so it appears unknown to client
|
|
require.Len(ctx.ServiceClient.services, 3)
|
|
require.Len(ctx.ServiceClient.checks, 3)
|
|
|
|
delete(ctx.ServiceClient.services, outofbandTaskServiceID)
|
|
delete(ctx.ServiceClient.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
|
|
|
|
require.Len(ctx.ServiceClient.services, 2)
|
|
require.Len(ctx.ServiceClient.checks, 2)
|
|
|
|
// Sync and ensure that explicitly removed service was removed, but outofbandTask remains
|
|
|
|
ctx.ServiceClient.RemoveTask(explicitlyRemovedTask)
|
|
require.NoError(ctx.syncOnce())
|
|
require.NoError(ctx.ServiceClient.sync())
|
|
require.Len(ctx.FakeConsul.services, 2)
|
|
require.Len(ctx.FakeConsul.checks, 2)
|
|
|
|
require.Contains(ctx.FakeConsul.services, remainingTaskServiceID)
|
|
require.Contains(ctx.FakeConsul.services, outofbandTaskServiceID)
|
|
require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID)
|
|
|
|
require.Contains(ctx.FakeConsul.checks, makeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0]))
|
|
require.Contains(ctx.FakeConsul.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
|
|
require.NotContains(ctx.FakeConsul.checks, makeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0]))
|
|
|
|
// after probation, outofband services and checks are removed
|
|
ctx.ServiceClient.deregisterProbationExpiry = time.Now().Add(-1 * time.Hour)
|
|
|
|
require.NoError(ctx.ServiceClient.sync())
|
|
require.Len(ctx.FakeConsul.services, 1)
|
|
require.Len(ctx.FakeConsul.checks, 1)
|
|
|
|
require.Contains(ctx.FakeConsul.services, remainingTaskServiceID)
|
|
require.NotContains(ctx.FakeConsul.services, outofbandTaskServiceID)
|
|
require.NotContains(ctx.FakeConsul.services, explicitlyRemovedTaskServiceID)
|
|
|
|
require.Contains(ctx.FakeConsul.checks, makeCheckID(remainingTaskServiceID, remainingTask.Services[0].Checks[0]))
|
|
require.NotContains(ctx.FakeConsul.checks, makeCheckID(outofbandTaskServiceID, outofbandTask.Services[0].Checks[0]))
|
|
require.NotContains(ctx.FakeConsul.checks, makeCheckID(explicitlyRemovedTaskServiceID, explicitlyRemovedTask.Services[0].Checks[0]))
|
|
|
|
}
|