nsd: block on removal of services (#15862)

* nsd: block on removal of services

This PR uses a WaitGroup to ensure workload removals are complete
before returning from ServiceRegistrationHandler.RemoveWorkload of
the nomad service provider. The de-registration of individual services
still occurs asynchrously, but we must block on the parent removal
call so that we do not race with further operations on the same set
of services - e.g. in the case of a task restart where we de-register
and then re-register the services in quick succession.

Fixes #15032

* nsd: add e2e test for initial failing check and restart
This commit is contained in:
Seth Hoenig 2023-01-26 08:17:57 -06:00 committed by GitHub
parent 2a5c423ae0
commit 7375fd40fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 179 additions and 3 deletions

3
.changelog/15862.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
services: Fixed a bug where services would fail to register if task initially fails
```

View File

@ -229,7 +229,7 @@ func (h *groupServiceHook) Postrun() error {
return nil
}
// deregister services from Consul.
// deregister services from Consul/Nomad service provider.
func (h *groupServiceHook) deregister() {
if len(h.services) > 0 {
workloadServices := h.getWorkloadServices()

View File

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
@ -136,13 +137,24 @@ func (s *ServiceRegistrationHandler) RegisterWorkload(workload *serviceregistrat
// enabled. This covers situations where the feature is disabled, yet still has
// allocations which, when stopped need their registrations removed.
func (s *ServiceRegistrationHandler) RemoveWorkload(workload *serviceregistration.WorkloadServices) {
wg := new(sync.WaitGroup)
wg.Add(len(workload.Services))
for _, serviceSpec := range workload.Services {
go s.removeWorkload(workload, serviceSpec)
go s.removeWorkload(wg, workload, serviceSpec)
}
// wait for all workload removals to complete
wg.Wait()
}
func (s *ServiceRegistrationHandler) removeWorkload(
workload *serviceregistration.WorkloadServices, serviceSpec *structs.Service) {
wg *sync.WaitGroup,
workload *serviceregistration.WorkloadServices,
serviceSpec *structs.Service,
) {
// unblock wait group when we are done
defer wg.Done()
// Stop check watcher
for _, service := range workload.Services {

View File

@ -0,0 +1,46 @@
variable "nodeID" {
type = string
}
variable "cmd" {
type = string
}
variable "delay" {
type = string
}
job "checks_task_restart_helper" {
datacenters = ["dc1"]
type = "batch"
group "group" {
constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}
constraint {
attribute = "${node.unique_id}"
value = "${var.nodeID}"
}
reschedule {
attempts = 0
unlimited = false
}
task "touch" {
driver = "raw_exec"
config {
command = "bash"
args = ["-c", "sleep ${var.delay} && ${var.cmd} /tmp/nsd-checks-task-restart-test.txt"]
}
resources {
cpu = 50
memory = 32
}
}
}
}

View File

@ -0,0 +1,46 @@
job "checks_task_restart" {
datacenters = ["dc1"]
type = "service"
constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}
group "group" {
network {
mode = "host"
port "http" {}
}
service {
provider = "nomad"
name = "nsd-checks-task-restart-test"
port = "http"
check {
name = "alive"
type = "http"
path = "/nsd-checks-task-restart-test.txt"
interval = "2s"
timeout = "1s"
check_restart {
limit = 10
grace = "1s"
}
}
}
task "python" {
driver = "raw_exec"
user = "nobody"
config {
command = "python3"
args = ["-m", "http.server", "${NOMAD_PORT_http}", "--directory", "/tmp"]
}
resources {
cpu = 50
memory = 64
}
}
}
}

View File

@ -9,7 +9,10 @@ import (
"github.com/hashicorp/nomad/e2e/e2eutil"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
"github.com/stretchr/testify/require"
)
@ -100,3 +103,68 @@ func testChecksSad(t *testing.T) {
return strings.Contains(output, `<p>Error code explanation: HTTPStatus.NOT_IMPLEMENTED - Server does not support this operation.</p>`)
}, 5*time.Second, 200*time.Millisecond)
}
func testChecksServiceReRegisterAfterCheckRestart(t *testing.T) {
const (
jobChecksAfterRestartMain = "./input/checks_task_restart_main.nomad"
jobChecksAfterRestartHelper = "./input/checks_task_restart_helper.nomad"
)
nomadClient := e2eutil.NomadClient(t)
idJobMain := "nsd-check-restart-services-" + uuid.Short()
idJobHelper := "nsd-check-restart-services-helper-" + uuid.Short()
jobIDs := []string{idJobMain, idJobHelper}
// Defer a cleanup function to remove the job. This will trigger if the
// test fails, unless the cancel function is called.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
defer e2eutil.CleanupJobsAndGCWithContext(t, ctx, &jobIDs)
// register the main job
allocStubs := e2eutil.RegisterAndWaitForAllocs(t, nomadClient, jobChecksAfterRestartMain, idJobMain, "")
must.Len(t, 1, allocStubs)
// wait for task restart due to failing health check
must.Wait(t, wait.InitialSuccess(
wait.BoolFunc(func() bool {
allocEvents, err := e2eutil.AllocTaskEventsForJob(idJobMain, "")
if err != nil {
t.Log("failed to get task events for job", idJobMain, err)
return false
}
for _, events := range allocEvents {
for _, event := range events {
if event["Type"] == "Restarting" {
return true
}
}
}
return false
}),
wait.Timeout(30*time.Second),
wait.Gap(3*time.Second),
))
runHelper := func(command string) {
vars := []string{"-var", "nodeID=" + allocStubs[0].NodeID, "-var", "cmd=touch", "-var", "delay=3s"}
err := e2eutil.RegisterWithArgs(idJobHelper, jobChecksAfterRestartHelper, vars...)
test.NoError(t, err)
}
// register helper job, triggering check to start passing
runHelper("touch")
defer func() {
runHelper("rm")
}()
// wait for main task to become healthy
e2eutil.WaitForAllocStatus(t, nomadClient, allocStubs[0].ID, structs.AllocClientStatusRunning)
// finally assert we have services
services := nomadClient.Services()
serviceStubs, _, err := services.Get("nsd-checks-task-restart-test", nil)
must.NoError(t, err)
must.Len(t, 1, serviceStubs)
}

View File

@ -45,6 +45,7 @@ func TestServiceDiscovery(t *testing.T) {
t.Run("TestServiceDiscovery_SimpleLoadBalancing", testSimpleLoadBalancing)
t.Run("TestServiceDiscovery_ChecksHappy", testChecksHappy)
t.Run("TestServiceDiscovery_ChecksSad", testChecksSad)
t.Run("TestServiceDiscovery_ServiceRegisterAfterCheckRestart", testChecksServiceReRegisterAfterCheckRestart)
}
// testMultiProvider tests service discovery where multi providers are used