From 07543f8bdf8f8b5fd7d1947684a64c2b8183b1ee Mon Sep 17 00:00:00 2001 From: Seth Hoenig Date: Fri, 17 Mar 2023 09:44:21 -0500 Subject: [PATCH] nsd: always set deregister flag after deregistration of group (#16289) * services: always set deregister flag after deregistration of group This PR fixes a bug where the group service hook's deregister flag was not set in some cases, causing the hook to attempt deregistrations twice during job updates (alloc replacement). In the tests ... we used to assert on the wrong behvior (remove twice) which has now been corrected to assert we remove only once. This bug was "silent" in the Consul provider world because the error logs for double deregistration only show up in Consul logs; with the Nomad provider the error logs are in the Nomad agent logs. * services: cleanup group service hook tests --- .changelog/16289.txt | 3 + client/allocrunner/group_service_hook.go | 59 +++++++----- client/allocrunner/group_service_hook_test.go | 94 +++++++++---------- client/serviceregistration/nsd/nsd.go | 2 + 4 files changed, 84 insertions(+), 74 deletions(-) create mode 100644 .changelog/16289.txt diff --git a/.changelog/16289.txt b/.changelog/16289.txt new file mode 100644 index 000000000..f11e0dd46 --- /dev/null +++ b/.changelog/16289.txt @@ -0,0 +1,3 @@ +```release-note:bug +services: Fixed a bug where a service would be deregistered twice +``` diff --git a/client/allocrunner/group_service_hook.go b/client/allocrunner/group_service_hook.go index e350f7773..0d37b6301 100644 --- a/client/allocrunner/group_service_hook.go +++ b/client/allocrunner/group_service_hook.go @@ -5,7 +5,7 @@ import ( "sync" "time" - log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/client/serviceregistration" "github.com/hashicorp/nomad/client/serviceregistration/wrapper" @@ -39,7 +39,7 @@ type groupServiceHook struct { // and check registration and deregistration. serviceRegWrapper *wrapper.HandlerWrapper - logger log.Logger + logger hclog.Logger // The following fields may be updated canary bool @@ -60,7 +60,7 @@ type groupServiceHookConfig struct { taskEnvBuilder *taskenv.Builder networkStatus structs.NetworkStatus shutdownDelayCtx context.Context - logger log.Logger + logger hclog.Logger // providerNamespace is the Nomad or Consul namespace in which service // registrations will be made. @@ -118,23 +118,26 @@ func (h *groupServiceHook) Prerun() error { h.prerun = true h.mu.Unlock() }() - return h.prerunLocked() + return h.preRunLocked() } -func (h *groupServiceHook) prerunLocked() error { +// caller must hold h.lock +func (h *groupServiceHook) preRunLocked() error { if len(h.services) == 0 { return nil } - services := h.getWorkloadServices() + services := h.getWorkloadServicesLocked() return h.serviceRegWrapper.RegisterWorkload(services) } +// Update is run when a job submitter modifies service(s) (but not much else - +// otherwise a full alloc replacement would occur). func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error { h.mu.Lock() defer h.mu.Unlock() - oldWorkloadServices := h.getWorkloadServices() + oldWorkloadServices := h.getWorkloadServicesLocked() // Store new updated values out of request canary := false @@ -166,7 +169,7 @@ func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error { h.providerNamespace = req.Alloc.ServiceProviderNamespace() // Create new task services struct with those new values - newWorkloadServices := h.getWorkloadServices() + newWorkloadServices := h.getWorkloadServicesLocked() if !h.prerun { // Update called before Prerun. Update alloc and exit to allow @@ -186,21 +189,20 @@ func (h *groupServiceHook) PreTaskRestart() error { }() h.preKillLocked() - return h.prerunLocked() + return h.preRunLocked() } func (h *groupServiceHook) PreKill() { - h.mu.Lock() - defer h.mu.Unlock() - h.preKillLocked() + helper.WithLock(&h.mu, h.preKillLocked) } -// implements the PreKill hook but requires the caller hold the lock +// implements the PreKill hook +// +// caller must hold h.lock func (h *groupServiceHook) preKillLocked() { // If we have a shutdown delay deregister group services and then wait // before continuing to kill tasks. - h.deregister() - h.deregistered = true + h.deregisterLocked() if h.delay == 0 { return @@ -220,24 +222,31 @@ func (h *groupServiceHook) preKillLocked() { } func (h *groupServiceHook) Postrun() error { - h.mu.Lock() - defer h.mu.Unlock() - - if !h.deregistered { - h.deregister() - } + helper.WithLock(&h.mu, h.deregisterLocked) return nil } -// deregister services from Consul/Nomad service provider. -func (h *groupServiceHook) deregister() { +// deregisterLocked will deregister services from Consul/Nomad service provider. +// +// caller must hold h.lock +func (h *groupServiceHook) deregisterLocked() { + if h.deregistered { + return + } + if len(h.services) > 0 { - workloadServices := h.getWorkloadServices() + workloadServices := h.getWorkloadServicesLocked() h.serviceRegWrapper.RemoveWorkload(workloadServices) } + + h.deregistered = true } -func (h *groupServiceHook) getWorkloadServices() *serviceregistration.WorkloadServices { +// getWorkloadServicesLocked returns the set of workload services currently +// on the hook. +// +// caller must hold h.lock +func (h *groupServiceHook) getWorkloadServicesLocked() *serviceregistration.WorkloadServices { // Interpolate with the task's environment interpolatedServices := taskenv.InterpolateServices(h.taskEnvBuilder.Build(), h.services) diff --git a/client/allocrunner/group_service_hook_test.go b/client/allocrunner/group_service_hook_test.go index e05df8cbc..606dab8f3 100644 --- a/client/allocrunner/group_service_hook_test.go +++ b/client/allocrunner/group_service_hook_test.go @@ -14,7 +14,7 @@ import ( "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" - "github.com/stretchr/testify/require" + "github.com/shoenig/test/must" ) var _ interfaces.RunnerPrerunHook = (*groupServiceHook)(nil) @@ -50,22 +50,21 @@ func TestGroupServiceHook_NoGroupServices(t *testing.T) { taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, }) - require.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun()) req := &interfaces.RunnerUpdateRequest{Alloc: alloc} - require.NoError(t, h.Update(req)) + must.NoError(t, h.Update(req)) - require.NoError(t, h.Postrun()) + must.NoError(t, h.Postrun()) - require.NoError(t, h.PreTaskRestart()) + must.NoError(t, h.PreTaskRestart()) ops := consulMockClient.GetOps() - require.Len(t, ops, 5) - require.Equal(t, "add", ops[0].Op) // Prerun - require.Equal(t, "update", ops[1].Op) // Update - require.Equal(t, "remove", ops[2].Op) // Postrun - require.Equal(t, "remove", ops[3].Op) // Restart -> preKill - require.Equal(t, "add", ops[4].Op) // Restart -> preRun + must.Len(t, 4, ops) + must.Eq(t, "add", ops[0].Op) // Prerun + must.Eq(t, "update", ops[1].Op) // Update + must.Eq(t, "remove", ops[2].Op) // Postrun + must.Eq(t, "add", ops[3].Op) // Restart -> preRun } // TestGroupServiceHook_ShutdownDelayUpdate asserts calling group service hooks @@ -92,23 +91,23 @@ func TestGroupServiceHook_ShutdownDelayUpdate(t *testing.T) { taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, }) - require.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun()) // Incease shutdown Delay alloc.Job.TaskGroups[0].ShutdownDelay = pointer.Of(15 * time.Second) req := &interfaces.RunnerUpdateRequest{Alloc: alloc} - require.NoError(t, h.Update(req)) + must.NoError(t, h.Update(req)) // Assert that update updated the delay value - require.Equal(t, h.delay, 15*time.Second) + must.Eq(t, h.delay, 15*time.Second) // Remove shutdown delay alloc.Job.TaskGroups[0].ShutdownDelay = nil req = &interfaces.RunnerUpdateRequest{Alloc: alloc} - require.NoError(t, h.Update(req)) + must.NoError(t, h.Update(req)) // Assert that update updated the delay value - require.Equal(t, h.delay, 0*time.Second) + must.Eq(t, h.delay, 0*time.Second) } // TestGroupServiceHook_GroupServices asserts group service hooks with group @@ -133,22 +132,21 @@ func TestGroupServiceHook_GroupServices(t *testing.T) { taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, }) - require.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun()) req := &interfaces.RunnerUpdateRequest{Alloc: alloc} - require.NoError(t, h.Update(req)) + must.NoError(t, h.Update(req)) - require.NoError(t, h.Postrun()) + must.NoError(t, h.Postrun()) - require.NoError(t, h.PreTaskRestart()) + must.NoError(t, h.PreTaskRestart()) ops := consulMockClient.GetOps() - require.Len(t, ops, 5) - require.Equal(t, "add", ops[0].Op) // Prerun - require.Equal(t, "update", ops[1].Op) // Update - require.Equal(t, "remove", ops[2].Op) // Postrun - require.Equal(t, "remove", ops[3].Op) // Restart -> preKill - require.Equal(t, "add", ops[4].Op) // Restart -> preRun + must.Len(t, 4, ops) + must.Eq(t, "add", ops[0].Op) // Prerun + must.Eq(t, "update", ops[1].Op) // Update + must.Eq(t, "remove", ops[2].Op) // Postrun + must.Eq(t, "add", ops[3].Op) // Restart -> preRun } // TestGroupServiceHook_GroupServices_Nomad asserts group service hooks with @@ -179,25 +177,24 @@ func TestGroupServiceHook_GroupServices_Nomad(t *testing.T) { taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, }) - require.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun()) // Trigger our hook requests. req := &interfaces.RunnerUpdateRequest{Alloc: alloc} - require.NoError(t, h.Update(req)) - require.NoError(t, h.Postrun()) - require.NoError(t, h.PreTaskRestart()) + must.NoError(t, h.Update(req)) + must.NoError(t, h.Postrun()) + must.NoError(t, h.PreTaskRestart()) // Ensure the Nomad mock provider has the expected operations. ops := nomadMockClient.GetOps() - require.Len(t, ops, 5) - require.Equal(t, "add", ops[0].Op) // Prerun - require.Equal(t, "update", ops[1].Op) // Update - require.Equal(t, "remove", ops[2].Op) // Postrun - require.Equal(t, "remove", ops[3].Op) // Restart -> preKill - require.Equal(t, "add", ops[4].Op) // Restart -> preRun + must.Len(t, 4, ops) + must.Eq(t, "add", ops[0].Op) // Prerun + must.Eq(t, "update", ops[1].Op) // Update + must.Eq(t, "remove", ops[2].Op) // Postrun + must.Eq(t, "add", ops[3].Op) // Restart -> preRun // Ensure the Consul mock provider has zero operations. - require.Len(t, consulMockClient.GetOps(), 0) + must.SliceEmpty(t, consulMockClient.GetOps()) } // TestGroupServiceHook_Error asserts group service hooks with group @@ -234,22 +231,21 @@ func TestGroupServiceHook_NoNetwork(t *testing.T) { taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), logger: logger, }) - require.NoError(t, h.Prerun()) + must.NoError(t, h.Prerun()) req := &interfaces.RunnerUpdateRequest{Alloc: alloc} - require.NoError(t, h.Update(req)) + must.NoError(t, h.Update(req)) - require.NoError(t, h.Postrun()) + must.NoError(t, h.Postrun()) - require.NoError(t, h.PreTaskRestart()) + must.NoError(t, h.PreTaskRestart()) ops := consulMockClient.GetOps() - require.Len(t, ops, 5) - require.Equal(t, "add", ops[0].Op) // Prerun - require.Equal(t, "update", ops[1].Op) // Update - require.Equal(t, "remove", ops[2].Op) // Postrun - require.Equal(t, "remove", ops[3].Op) // Restart -> preKill - require.Equal(t, "add", ops[4].Op) // Restart -> preRun + must.Len(t, 4, ops) + must.Eq(t, "add", ops[0].Op) // Prerun + must.Eq(t, "update", ops[1].Op) // Update + must.Eq(t, "remove", ops[2].Op) // Postrun + must.Eq(t, "add", ops[3].Op) // Restart -> preRun } func TestGroupServiceHook_getWorkloadServices(t *testing.T) { @@ -284,6 +280,6 @@ func TestGroupServiceHook_getWorkloadServices(t *testing.T) { logger: logger, }) - services := h.getWorkloadServices() - require.Len(t, services.Services, 1) + services := h.getWorkloadServicesLocked() + must.Len(t, 1, services.Services) } diff --git a/client/serviceregistration/nsd/nsd.go b/client/serviceregistration/nsd/nsd.go index 807f4d492..adbea8a0b 100644 --- a/client/serviceregistration/nsd/nsd.go +++ b/client/serviceregistration/nsd/nsd.go @@ -157,6 +157,8 @@ func (s *ServiceRegistrationHandler) removeWorkload( defer wg.Done() // Stop check watcher + // + // todo(shoenig) - shouldn't we only unwatch checks for the given serviceSpec ? for _, service := range workload.Services { for _, check := range service.Checks { checkID := string(structs.NomadCheckID(workload.AllocInfo.AllocID, workload.AllocInfo.Group, check))