diff --git a/.changelog/15862.txt b/.changelog/15862.txt new file mode 100644 index 000000000..d4a428614 --- /dev/null +++ b/.changelog/15862.txt @@ -0,0 +1,3 @@ +```release-note:bug +services: Fixed a bug where services would fail to register if task initially fails +``` diff --git a/client/allocrunner/group_service_hook.go b/client/allocrunner/group_service_hook.go index 7ce96bf6b..e350f7773 100644 --- a/client/allocrunner/group_service_hook.go +++ b/client/allocrunner/group_service_hook.go @@ -229,7 +229,7 @@ func (h *groupServiceHook) Postrun() error { return nil } -// deregister services from Consul. +// deregister services from Consul/Nomad service provider. func (h *groupServiceHook) deregister() { if len(h.services) > 0 { workloadServices := h.getWorkloadServices() diff --git a/client/serviceregistration/nsd/nsd.go b/client/serviceregistration/nsd/nsd.go index 1138c4458..7ebc5c149 100644 --- a/client/serviceregistration/nsd/nsd.go +++ b/client/serviceregistration/nsd/nsd.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "strings" + "sync" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-multierror" @@ -136,13 +137,24 @@ func (s *ServiceRegistrationHandler) RegisterWorkload(workload *serviceregistrat // enabled. This covers situations where the feature is disabled, yet still has // allocations which, when stopped need their registrations removed. func (s *ServiceRegistrationHandler) RemoveWorkload(workload *serviceregistration.WorkloadServices) { + wg := new(sync.WaitGroup) + wg.Add(len(workload.Services)) + for _, serviceSpec := range workload.Services { - go s.removeWorkload(workload, serviceSpec) + go s.removeWorkload(wg, workload, serviceSpec) } + + // wait for all workload removals to complete + wg.Wait() } func (s *ServiceRegistrationHandler) removeWorkload( - workload *serviceregistration.WorkloadServices, serviceSpec *structs.Service) { + wg *sync.WaitGroup, + workload *serviceregistration.WorkloadServices, + serviceSpec *structs.Service, +) { + // unblock wait group when we are done + defer wg.Done() // Stop check watcher for _, service := range workload.Services { diff --git a/e2e/servicediscovery/input/checks_task_restart_helper.nomad b/e2e/servicediscovery/input/checks_task_restart_helper.nomad new file mode 100644 index 000000000..83539d2e1 --- /dev/null +++ b/e2e/servicediscovery/input/checks_task_restart_helper.nomad @@ -0,0 +1,46 @@ +variable "nodeID" { + type = string +} + +variable "cmd" { + type = string +} + +variable "delay" { + type = string +} + +job "checks_task_restart_helper" { + datacenters = ["dc1"] + type = "batch" + + group "group" { + + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + + constraint { + attribute = "${node.unique_id}" + value = "${var.nodeID}" + } + + reschedule { + attempts = 0 + unlimited = false + } + + task "touch" { + driver = "raw_exec" + config { + command = "bash" + args = ["-c", "sleep ${var.delay} && ${var.cmd} /tmp/nsd-checks-task-restart-test.txt"] + } + resources { + cpu = 50 + memory = 32 + } + } + } +} diff --git a/e2e/servicediscovery/input/checks_task_restart_main.nomad b/e2e/servicediscovery/input/checks_task_restart_main.nomad new file mode 100644 index 000000000..580c41401 --- /dev/null +++ b/e2e/servicediscovery/input/checks_task_restart_main.nomad @@ -0,0 +1,46 @@ +job "checks_task_restart" { + datacenters = ["dc1"] + type = "service" + + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + + group "group" { + network { + mode = "host" + port "http" {} + } + + service { + provider = "nomad" + name = "nsd-checks-task-restart-test" + port = "http" + check { + name = "alive" + type = "http" + path = "/nsd-checks-task-restart-test.txt" + interval = "2s" + timeout = "1s" + check_restart { + limit = 10 + grace = "1s" + } + } + } + + task "python" { + driver = "raw_exec" + user = "nobody" + config { + command = "python3" + args = ["-m", "http.server", "${NOMAD_PORT_http}", "--directory", "/tmp"] + } + resources { + cpu = 50 + memory = 64 + } + } + } +} diff --git a/e2e/servicediscovery/nomad_checks_test.go b/e2e/servicediscovery/nomad_checks_test.go index e72b6fb0f..3be9a6f20 100644 --- a/e2e/servicediscovery/nomad_checks_test.go +++ b/e2e/servicediscovery/nomad_checks_test.go @@ -9,7 +9,10 @@ import ( "github.com/hashicorp/nomad/e2e/e2eutil" "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/shoenig/test" "github.com/shoenig/test/must" + "github.com/shoenig/test/wait" "github.com/stretchr/testify/require" ) @@ -100,3 +103,68 @@ func testChecksSad(t *testing.T) { return strings.Contains(output, `

Error code explanation: HTTPStatus.NOT_IMPLEMENTED - Server does not support this operation.

`) }, 5*time.Second, 200*time.Millisecond) } + +func testChecksServiceReRegisterAfterCheckRestart(t *testing.T) { + const ( + jobChecksAfterRestartMain = "./input/checks_task_restart_main.nomad" + jobChecksAfterRestartHelper = "./input/checks_task_restart_helper.nomad" + ) + + nomadClient := e2eutil.NomadClient(t) + + idJobMain := "nsd-check-restart-services-" + uuid.Short() + idJobHelper := "nsd-check-restart-services-helper-" + uuid.Short() + jobIDs := []string{idJobMain, idJobHelper} + + // Defer a cleanup function to remove the job. This will trigger if the + // test fails, unless the cancel function is called. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + defer e2eutil.CleanupJobsAndGCWithContext(t, ctx, &jobIDs) + + // register the main job + allocStubs := e2eutil.RegisterAndWaitForAllocs(t, nomadClient, jobChecksAfterRestartMain, idJobMain, "") + must.Len(t, 1, allocStubs) + + // wait for task restart due to failing health check + must.Wait(t, wait.InitialSuccess( + wait.BoolFunc(func() bool { + allocEvents, err := e2eutil.AllocTaskEventsForJob(idJobMain, "") + if err != nil { + t.Log("failed to get task events for job", idJobMain, err) + return false + } + for _, events := range allocEvents { + for _, event := range events { + if event["Type"] == "Restarting" { + return true + } + } + } + return false + }), + wait.Timeout(30*time.Second), + wait.Gap(3*time.Second), + )) + + runHelper := func(command string) { + vars := []string{"-var", "nodeID=" + allocStubs[0].NodeID, "-var", "cmd=touch", "-var", "delay=3s"} + err := e2eutil.RegisterWithArgs(idJobHelper, jobChecksAfterRestartHelper, vars...) + test.NoError(t, err) + } + + // register helper job, triggering check to start passing + runHelper("touch") + defer func() { + runHelper("rm") + }() + + // wait for main task to become healthy + e2eutil.WaitForAllocStatus(t, nomadClient, allocStubs[0].ID, structs.AllocClientStatusRunning) + + // finally assert we have services + services := nomadClient.Services() + serviceStubs, _, err := services.Get("nsd-checks-task-restart-test", nil) + must.NoError(t, err) + must.Len(t, 1, serviceStubs) +} diff --git a/e2e/servicediscovery/service_discovery_test.go b/e2e/servicediscovery/service_discovery_test.go index 00596895a..0fd863562 100644 --- a/e2e/servicediscovery/service_discovery_test.go +++ b/e2e/servicediscovery/service_discovery_test.go @@ -45,6 +45,7 @@ func TestServiceDiscovery(t *testing.T) { t.Run("TestServiceDiscovery_SimpleLoadBalancing", testSimpleLoadBalancing) t.Run("TestServiceDiscovery_ChecksHappy", testChecksHappy) t.Run("TestServiceDiscovery_ChecksSad", testChecksSad) + t.Run("TestServiceDiscovery_ServiceRegisterAfterCheckRestart", testChecksServiceReRegisterAfterCheckRestart) } // testMultiProvider tests service discovery where multi providers are used