Merge pull request #9720 from hashicorp/b-consul-sreg-update

consul: always include task services hook
This commit is contained in:
Seth Hoenig 2021-01-05 08:48:44 -06:00 committed by GitHub
commit 0ed44a82b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 211 additions and 117 deletions

View File

@ -5,6 +5,7 @@ IMPROVEMENTS:
* consul/connect: interpolate the connect, service meta, and service canary meta blocks with the task environment [[GH-9586](https://github.com/hashicorp/nomad/pull/9586)]
BUG FIXES:
* consul: Fixed a bug where updating a task to include services would not work [[GH-9707](https://github.com/hashicorp/nomad/issues/9707)]
* template: Fixed multiple issues in template src/dest and artifact dest interpolation [[GH-9671](https://github.com/hashicorp/nomad/issues/9671)]
* template: Fixed a bug where dynamic secrets did not trigger the template `change_mode` after a client restart. [[GH-9636](https://github.com/hashicorp/nomad/issues/9636)]
* server: Fixed a bug where new servers may bootstrap prematurely when configured with `bootstrap_expect = 0`.

View File

@ -101,8 +101,8 @@ func (tr *TaskRunner) initHooks() {
}))
}
// If there are any services, add the service hook
if len(task.Services) != 0 {
// Always add the service hook. A task with no services on initial registration
// may be updated to include services, which must be handled with this hook.
tr.runnerHooks = append(tr.runnerHooks, newServiceHook(serviceHookConfig{
alloc: tr.Alloc(),
task: tr.Task(),
@ -110,7 +110,6 @@ func (tr *TaskRunner) initHooks() {
restarter: tr,
logger: hookLogger,
}))
}
// If this is a Connect sidecar proxy (or a Connect Native) service,
// add the sidsHook for requesting a Service Identity token (if ACLs).

View File

@ -17,6 +17,9 @@ import (
const (
consulJobBasic = "consul/input/consul_example.nomad"
consulJobCanaryTags = "consul/input/canary_tags.nomad"
consulJobRegisterOnUpdatePart1 = "consul/input/services_empty.nomad"
consulJobRegisterOnUpdatePart2 = "consul/input/services_present.nomad"
)
type ConsulE2ETest struct {
@ -47,24 +50,25 @@ func (tc *ConsulE2ETest) AfterEach(f *framework.F) {
}
for _, id := range tc.jobIds {
tc.Nomad().Jobs().Deregister(id, true, nil)
_, _, err := tc.Nomad().Jobs().Deregister(id, true, nil)
require.NoError(f.T(), err)
}
tc.jobIds = []string{}
tc.Nomad().System().GarbageCollect()
require.NoError(f.T(), tc.Nomad().System().GarbageCollect())
}
// TestConsulRegistration asserts that a job registers services with tags in Consul.
func (tc *ConsulE2ETest) TestConsulRegistration(f *framework.F) {
t := f.T()
r := require.New(t)
nomadClient := tc.Nomad()
catalog := tc.Consul().Catalog()
jobId := "consul" + uuid.Generate()[0:8]
jobId := "consul" + uuid.Short()
tc.jobIds = append(tc.jobIds, jobId)
allocs := e2eutil.RegisterAndWaitForAllocs(f.T(), nomadClient, consulJobBasic, jobId, "")
require.Equal(t, 3, len(allocs))
allocIDs := e2eutil.AllocIDsFromAllocationListStubs(allocs)
allocations := e2eutil.RegisterAndWaitForAllocs(f.T(), nomadClient, consulJobBasic, jobId, "")
require.Equal(t, 3, len(allocations))
allocIDs := e2eutil.AllocIDsFromAllocationListStubs(allocations)
e2eutil.WaitForAllocsRunning(t, tc.Nomad(), allocIDs)
expectedTags := []string{
@ -73,42 +77,58 @@ func (tc *ConsulE2ETest) TestConsulRegistration(f *framework.F) {
}
// Assert services get registered
testutil.WaitForResult(func() (bool, error) {
services, _, err := catalog.Service("consul-example", "", nil)
if err != nil {
return false, fmt.Errorf("error contacting Consul: %v", err)
}
if expected := 3; len(services) != expected {
return false, fmt.Errorf("expected %d services but found %d", expected, len(services))
}
e2eutil.RequireConsulRegistered(r, tc.Consul(), "consul-example", 3)
services, _, err := tc.Consul().Catalog().Service("consul-example", "", nil)
require.NoError(t, err)
for _, s := range services {
// If we've made it this far the tags should *always* match
require.True(t, helper.CompareSliceSetString(expectedTags, s.ServiceTags))
require.ElementsMatch(t, expectedTags, s.ServiceTags)
}
return true, nil
}, func(err error) {
t.Fatalf("error waiting for services to be registered: %v", err)
})
// Stop the job
e2eutil.WaitForJobStopped(t, nomadClient, jobId)
// Verify that services were deregistered in Consul
testutil.WaitForResult(func() (bool, error) {
s, _, err := catalog.Service("consul-example", "", nil)
if err != nil {
return false, err
// Verify that services were de-registered in Consul
e2eutil.RequireConsulDeregistered(r, tc.Consul(), "consul-example")
}
return len(s) == 0, fmt.Errorf("expected 0 services but found: %v", s)
}, func(err error) {
t.Fatalf("error waiting for services to be deregistered: %v", err)
})
func (tc *ConsulE2ETest) TestConsulRegisterOnUpdate(f *framework.F) {
t := f.T()
r := require.New(t)
nomadClient := tc.Nomad()
catalog := tc.Consul().Catalog()
jobID := "consul" + uuid.Short()
tc.jobIds = append(tc.jobIds, jobID)
// Initial job has no services for task.
allocations := e2eutil.RegisterAndWaitForAllocs(t, nomadClient, consulJobRegisterOnUpdatePart1, jobID, "")
require.Equal(t, 1, len(allocations))
allocIDs := e2eutil.AllocIDsFromAllocationListStubs(allocations)
e2eutil.WaitForAllocsRunning(t, tc.Nomad(), allocIDs)
// Assert service not yet registered.
results, _, err := catalog.Service("nc-service", "", nil)
require.NoError(t, err)
require.Empty(t, results)
// On update, add services for task.
allocations = e2eutil.RegisterAndWaitForAllocs(t, nomadClient, consulJobRegisterOnUpdatePart2, jobID, "")
require.Equal(t, 1, len(allocations))
allocIDs = e2eutil.AllocIDsFromAllocationListStubs(allocations)
e2eutil.WaitForAllocsRunning(t, tc.Nomad(), allocIDs)
// Assert service is now registered.
e2eutil.RequireConsulRegistered(r, tc.Consul(), "nc-service", 1)
}
// TestCanaryInplaceUpgrades verifies setting and unsetting canary tags
func (tc *ConsulE2ETest) TestCanaryInplaceUpgrades(f *framework.F) {
t := f.T()
// TODO(shoenig) https://github.com/hashicorp/nomad/issues/9627
t.Skip("THIS TEST IS BROKEN (#9627)")
nomadClient := tc.Nomad()
consulClient := tc.Consul()
jobId := "consul" + uuid.Generate()[0:8]

View File

@ -23,7 +23,7 @@ job "consul-example" {
healthy_deadline = "5m"
}
group "cache" {
group "group" {
count = 3
network {
@ -41,7 +41,7 @@ job "consul-example" {
size = 300
}
task "redis" {
task "example" {
driver = "docker"
config {

View File

@ -0,0 +1,29 @@
job "consul-register-on-update" {
datacenters = ["dc1"]
type = "service"
constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}
group "echo" {
task "busybox-nc" {
driver = "docker"
config {
image = "busybox:1"
command = "nc"
args = [
"-ll",
"-p",
"1234",
"-e",
"/bin/cat"]
}
# no initial service definition
}
}
}

View File

@ -0,0 +1,31 @@
job "consul-register-on-update" {
datacenters = ["dc1"]
type = "service"
constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}
group "echo" {
task "busybox-nc" {
driver = "docker"
config {
image = "busybox:1"
command = "nc"
args = [
"-ll",
"-p",
"1234",
"-e",
"/bin/cat"]
}
service {
name = "nc-service"
}
}
}
}

View File

@ -32,64 +32,65 @@ func (tc *ScriptChecksE2ETest) BeforeAll(f *framework.F) {
// and associated script checks. It updates, stops, etc. the job to verify
// that script checks are re-registered as expected.
func (tc *ScriptChecksE2ETest) TestGroupScriptCheck(f *framework.F) {
r := require.New(f.T())
nomadClient := tc.Nomad()
uuid := uuid.Generate()
require := require.New(f.T())
consulClient := tc.Consul()
jobId := "checks_group" + uuid[0:8]
jobId := "checks_group" + uuid.Short()
tc.jobIds = append(tc.jobIds, jobId)
// Job run: verify that checks were registered in Consul
allocs := e2eutil.RegisterAndWaitForAllocs(f.T(),
nomadClient, "consul/input/checks_group.nomad", jobId, "")
require.Equal(1, len(allocs))
e2eutil.RequireConsulStatus(require, consulClient, "group-service-1", capi.HealthPassing)
e2eutil.RequireConsulStatus(require, consulClient, "group-service-2", capi.HealthWarning)
e2eutil.RequireConsulStatus(require, consulClient, "group-service-3", capi.HealthCritical)
r.Equal(1, len(allocs))
e2eutil.RequireConsulStatus(r, consulClient, "group-service-1", capi.HealthPassing)
e2eutil.RequireConsulStatus(r, consulClient, "group-service-2", capi.HealthWarning)
e2eutil.RequireConsulStatus(r, consulClient, "group-service-3", capi.HealthCritical)
// Check in warning state becomes healthy after check passes
_, _, err := exec(nomadClient, allocs,
[]string{"/bin/sh", "-c", "touch /tmp/${NOMAD_ALLOC_ID}-alive-2b"})
require.NoError(err)
e2eutil.RequireConsulStatus(require, consulClient, "group-service-2", capi.HealthPassing)
r.NoError(err)
e2eutil.RequireConsulStatus(r, consulClient, "group-service-2", capi.HealthPassing)
// Job update: verify checks are re-registered in Consul
allocs = e2eutil.RegisterAndWaitForAllocs(f.T(),
nomadClient, "consul/input/checks_group_update.nomad", jobId, "")
require.Equal(1, len(allocs))
e2eutil.RequireConsulStatus(require, consulClient, "group-service-1", capi.HealthPassing)
e2eutil.RequireConsulStatus(require, consulClient, "group-service-2", capi.HealthPassing)
e2eutil.RequireConsulStatus(require, consulClient, "group-service-3", capi.HealthCritical)
r.Equal(1, len(allocs))
e2eutil.RequireConsulStatus(r, consulClient, "group-service-1", capi.HealthPassing)
e2eutil.RequireConsulStatus(r, consulClient, "group-service-2", capi.HealthPassing)
e2eutil.RequireConsulStatus(r, consulClient, "group-service-3", capi.HealthCritical)
// Verify we don't have any linger script checks running on the client
out, _, err := exec(nomadClient, allocs, []string{"pgrep", "sleep"})
require.NoError(err)
r.NoError(err)
running := strings.Split(strings.TrimSpace(out.String()), "\n")
require.LessOrEqual(len(running), 2) // task itself + 1 check == 2
r.LessOrEqual(len(running), 2) // task itself + 1 check == 2
// Clean job stop: verify that checks were deregistered in Consul
nomadClient.Jobs().Deregister(jobId, false, nil) // nomad job stop
e2eutil.RequireConsulDeregistered(require, consulClient, "group-service-1")
e2eutil.RequireConsulDeregistered(require, consulClient, "group-service-2")
e2eutil.RequireConsulDeregistered(require, consulClient, "group-service-3")
_, _, err = nomadClient.Jobs().Deregister(jobId, false, nil) // nomad job stop
r.NoError(err)
e2eutil.RequireConsulDeregistered(r, consulClient, "group-service-1")
e2eutil.RequireConsulDeregistered(r, consulClient, "group-service-2")
e2eutil.RequireConsulDeregistered(r, consulClient, "group-service-3")
// Restore for next test
allocs = e2eutil.RegisterAndWaitForAllocs(f.T(),
nomadClient, "consul/input/checks_group.nomad", jobId, "")
require.Equal(2, len(allocs))
e2eutil.RequireConsulStatus(require, consulClient, "group-service-1", capi.HealthPassing)
e2eutil.RequireConsulStatus(require, consulClient, "group-service-2", capi.HealthWarning)
e2eutil.RequireConsulStatus(require, consulClient, "group-service-3", capi.HealthCritical)
r.Equal(2, len(allocs))
e2eutil.RequireConsulStatus(r, consulClient, "group-service-1", capi.HealthPassing)
e2eutil.RequireConsulStatus(r, consulClient, "group-service-2", capi.HealthWarning)
e2eutil.RequireConsulStatus(r, consulClient, "group-service-3", capi.HealthCritical)
// Crash a task: verify that checks become healthy again
_, _, err = exec(nomadClient, allocs, []string{"pkill", "sleep"})
if err != nil && err.Error() != "plugin is shut down" {
require.FailNow("unexpected error: %v", err)
r.FailNow("unexpected error: %v", err)
}
e2eutil.RequireConsulStatus(require, consulClient, "group-service-1", capi.HealthPassing)
e2eutil.RequireConsulStatus(require, consulClient, "group-service-2", capi.HealthWarning)
e2eutil.RequireConsulStatus(require, consulClient, "group-service-3", capi.HealthCritical)
e2eutil.RequireConsulStatus(r, consulClient, "group-service-1", capi.HealthPassing)
e2eutil.RequireConsulStatus(r, consulClient, "group-service-2", capi.HealthWarning)
e2eutil.RequireConsulStatus(r, consulClient, "group-service-3", capi.HealthCritical)
// TODO(tgross) ...
// Restart client: verify that checks are re-registered
@ -99,78 +100,82 @@ func (tc *ScriptChecksE2ETest) TestGroupScriptCheck(f *framework.F) {
// and associated script checks. It updates, stops, etc. the job to verify
// that script checks are re-registered as expected.
func (tc *ScriptChecksE2ETest) TestTaskScriptCheck(f *framework.F) {
r := require.New(f.T())
nomadClient := tc.Nomad()
uuid := uuid.Generate()
require := require.New(f.T())
consulClient := tc.Consul()
jobId := "checks_task" + uuid[0:8]
jobId := "checks_task" + uuid.Short()
tc.jobIds = append(tc.jobIds, jobId)
// Job run: verify that checks were registered in Consul
allocs := e2eutil.RegisterAndWaitForAllocs(f.T(),
nomadClient, "consul/input/checks_task.nomad", jobId, "")
require.Equal(1, len(allocs))
e2eutil.RequireConsulStatus(require, consulClient, "task-service-1", capi.HealthPassing)
e2eutil.RequireConsulStatus(require, consulClient, "task-service-2", capi.HealthWarning)
e2eutil.RequireConsulStatus(require, consulClient, "task-service-3", capi.HealthCritical)
r.Equal(1, len(allocs))
e2eutil.RequireConsulStatus(r, consulClient, "task-service-1", capi.HealthPassing)
e2eutil.RequireConsulStatus(r, consulClient, "task-service-2", capi.HealthWarning)
e2eutil.RequireConsulStatus(r, consulClient, "task-service-3", capi.HealthCritical)
// Check in warning state becomes healthy after check passes
_, _, err := exec(nomadClient, allocs,
[]string{"/bin/sh", "-c", "touch ${NOMAD_TASK_DIR}/alive-2b"})
require.NoError(err)
e2eutil.RequireConsulStatus(require, consulClient, "task-service-2", capi.HealthPassing)
r.NoError(err)
e2eutil.RequireConsulStatus(r, consulClient, "task-service-2", capi.HealthPassing)
// Job update: verify checks are re-registered in Consul
allocs = e2eutil.RegisterAndWaitForAllocs(f.T(),
nomadClient, "consul/input/checks_task_update.nomad", jobId, "")
require.Equal(1, len(allocs))
e2eutil.RequireConsulStatus(require, consulClient, "task-service-1", capi.HealthPassing)
e2eutil.RequireConsulStatus(require, consulClient, "task-service-2", capi.HealthPassing)
e2eutil.RequireConsulStatus(require, consulClient, "task-service-3", capi.HealthCritical)
r.Equal(1, len(allocs))
e2eutil.RequireConsulStatus(r, consulClient, "task-service-1", capi.HealthPassing)
e2eutil.RequireConsulStatus(r, consulClient, "task-service-2", capi.HealthPassing)
e2eutil.RequireConsulStatus(r, consulClient, "task-service-3", capi.HealthCritical)
// Verify we don't have any linger script checks running on the client
out, _, err := exec(nomadClient, allocs, []string{"pgrep", "sleep"})
require.NoError(err)
r.NoError(err)
running := strings.Split(strings.TrimSpace(out.String()), "\n")
require.LessOrEqual(len(running), 2) // task itself + 1 check == 2
r.LessOrEqual(len(running), 2) // task itself + 1 check == 2
// Clean job stop: verify that checks were deregistered in Consul
nomadClient.Jobs().Deregister(jobId, false, nil) // nomad job stop
e2eutil.RequireConsulDeregistered(require, consulClient, "task-service-1")
e2eutil.RequireConsulDeregistered(require, consulClient, "task-service-2")
e2eutil.RequireConsulDeregistered(require, consulClient, "task-service-3")
_, _, err = nomadClient.Jobs().Deregister(jobId, false, nil) // nomad job stop
r.NoError(err)
e2eutil.RequireConsulDeregistered(r, consulClient, "task-service-1")
e2eutil.RequireConsulDeregistered(r, consulClient, "task-service-2")
e2eutil.RequireConsulDeregistered(r, consulClient, "task-service-3")
// Restore for next test
allocs = e2eutil.RegisterAndWaitForAllocs(f.T(),
nomadClient, "consul/input/checks_task.nomad", jobId, "")
require.Equal(2, len(allocs))
e2eutil.RequireConsulStatus(require, consulClient, "task-service-1", capi.HealthPassing)
e2eutil.RequireConsulStatus(require, consulClient, "task-service-2", capi.HealthWarning)
e2eutil.RequireConsulStatus(require, consulClient, "task-service-3", capi.HealthCritical)
r.Equal(2, len(allocs))
e2eutil.RequireConsulStatus(r, consulClient, "task-service-1", capi.HealthPassing)
e2eutil.RequireConsulStatus(r, consulClient, "task-service-2", capi.HealthWarning)
e2eutil.RequireConsulStatus(r, consulClient, "task-service-3", capi.HealthCritical)
// Crash a task: verify that checks become healthy again
_, _, err = exec(nomadClient, allocs, []string{"pkill", "sleep"})
if err != nil && err.Error() != "plugin is shut down" {
require.FailNow("unexpected error: %v", err)
r.FailNow("unexpected error: %v", err)
}
e2eutil.RequireConsulStatus(require, consulClient, "task-service-1", capi.HealthPassing)
e2eutil.RequireConsulStatus(require, consulClient, "task-service-2", capi.HealthWarning)
e2eutil.RequireConsulStatus(require, consulClient, "task-service-3", capi.HealthCritical)
e2eutil.RequireConsulStatus(r, consulClient, "task-service-1", capi.HealthPassing)
e2eutil.RequireConsulStatus(r, consulClient, "task-service-2", capi.HealthWarning)
e2eutil.RequireConsulStatus(r, consulClient, "task-service-3", capi.HealthCritical)
// TODO(tgross) ...
// Restart client: verify that checks are re-registered
}
func (tc *ScriptChecksE2ETest) AfterEach(f *framework.F) {
r := require.New(f.T())
nomadClient := tc.Nomad()
jobs := nomadClient.Jobs()
// Stop all jobs in test
for _, id := range tc.jobIds {
jobs.Deregister(id, true, nil)
_, _, err := jobs.Deregister(id, true, nil)
r.NoError(err)
}
// Garbage collect
nomadClient.System().GarbageCollect()
r.NoError(nomadClient.System().GarbageCollect())
}
func exec(client *api.Client, allocs []*api.AllocationListStub, command []string) (bytes.Buffer, bytes.Buffer, error) {

View File

@ -7,10 +7,8 @@ import (
"github.com/stretchr/testify/require"
)
// RequireConsulStatus asserts the aggregate health of the service converges to
// the expected status
func RequireConsulStatus(require *require.Assertions,
client *capi.Client, serviceName, expectedStatus string) {
// RequireConsulStatus asserts the aggregate health of the service converges to the expected status.
func RequireConsulStatus(require *require.Assertions, client *capi.Client, serviceName, expectedStatus string) {
require.Eventually(func() bool {
_, status := serviceStatus(require, client, serviceName)
return status == expectedStatus
@ -20,10 +18,8 @@ func RequireConsulStatus(require *require.Assertions,
)
}
// serviceStatus gets the aggregate health of the service and returns
// the []ServiceEntry for further checking
func serviceStatus(require *require.Assertions,
client *capi.Client, serviceName string) ([]*capi.ServiceEntry, string) {
// serviceStatus gets the aggregate health of the service and returns the []ServiceEntry for further checking.
func serviceStatus(require *require.Assertions, client *capi.Client, serviceName string) ([]*capi.ServiceEntry, string) {
services, _, err := client.Health().Service(serviceName, "", false, nil)
require.NoError(err, "expected no error for %q, got %v", serviceName, err)
if len(services) > 0 {
@ -32,12 +28,20 @@ func serviceStatus(require *require.Assertions,
return nil, "(unknown status)"
}
// RequireConsulDeregistered asserts that the service eventually is deregistered from Consul
func RequireConsulDeregistered(require *require.Assertions,
client *capi.Client, serviceName string) {
// RequireConsulDeregistered asserts that the service eventually is de-registered from Consul.
func RequireConsulDeregistered(require *require.Assertions, client *capi.Client, service string) {
require.Eventually(func() bool {
services, _, err := client.Health().Service(serviceName, "", false, nil)
require.NoError(err, "expected no error for %q, got %v", serviceName, err)
services, _, err := client.Health().Service(service, "", false, nil)
require.NoError(err)
return len(services) == 0
}, 5*time.Second, time.Second)
}
// RequireConsulRegistered assert that the service is registered in Consul.
func RequireConsulRegistered(require *require.Assertions, client *capi.Client, service string, count int) {
require.Eventually(func() bool {
services, _, err := client.Catalog().Service(service, "", nil)
require.NoError(err)
return len(services) == count
}, 5*time.Second, time.Second)
}

View File

@ -1,14 +1,14 @@
package uuid
import (
crand "crypto/rand"
"crypto/rand"
"fmt"
)
// Generate is used to generate a random UUID
// Generate is used to generate a random UUID.
func Generate() string {
buf := make([]byte, 16)
if _, err := crand.Read(buf); err != nil {
if _, err := rand.Read(buf); err != nil {
panic(fmt.Errorf("failed to read random bytes: %v", err))
}
@ -19,3 +19,8 @@ func Generate() string {
buf[8:10],
buf[10:16])
}
// Short is used to generate the first 8 characters of a UUID.
func Short() string {
return Generate()[0:8]
}