nsd: always set deregister flag after deregistration of group (#16289)
* services: always set deregister flag after deregistration of group This PR fixes a bug where the group service hook's deregister flag was not set in some cases, causing the hook to attempt deregistrations twice during job updates (alloc replacement). In the tests ... we used to assert on the wrong behvior (remove twice) which has now been corrected to assert we remove only once. This bug was "silent" in the Consul provider world because the error logs for double deregistration only show up in Consul logs; with the Nomad provider the error logs are in the Nomad agent logs. * services: cleanup group service hook tests
This commit is contained in:
parent
14927e93bc
commit
07543f8bdf
|
@ -0,0 +1,3 @@
|
|||
```release-note:bug
|
||||
services: Fixed a bug where a service would be deregistered twice
|
||||
```
|
|
@ -5,7 +5,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
||||
"github.com/hashicorp/nomad/client/serviceregistration"
|
||||
"github.com/hashicorp/nomad/client/serviceregistration/wrapper"
|
||||
|
@ -39,7 +39,7 @@ type groupServiceHook struct {
|
|||
// and check registration and deregistration.
|
||||
serviceRegWrapper *wrapper.HandlerWrapper
|
||||
|
||||
logger log.Logger
|
||||
logger hclog.Logger
|
||||
|
||||
// The following fields may be updated
|
||||
canary bool
|
||||
|
@ -60,7 +60,7 @@ type groupServiceHookConfig struct {
|
|||
taskEnvBuilder *taskenv.Builder
|
||||
networkStatus structs.NetworkStatus
|
||||
shutdownDelayCtx context.Context
|
||||
logger log.Logger
|
||||
logger hclog.Logger
|
||||
|
||||
// providerNamespace is the Nomad or Consul namespace in which service
|
||||
// registrations will be made.
|
||||
|
@ -118,23 +118,26 @@ func (h *groupServiceHook) Prerun() error {
|
|||
h.prerun = true
|
||||
h.mu.Unlock()
|
||||
}()
|
||||
return h.prerunLocked()
|
||||
return h.preRunLocked()
|
||||
}
|
||||
|
||||
func (h *groupServiceHook) prerunLocked() error {
|
||||
// caller must hold h.lock
|
||||
func (h *groupServiceHook) preRunLocked() error {
|
||||
if len(h.services) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
services := h.getWorkloadServices()
|
||||
services := h.getWorkloadServicesLocked()
|
||||
return h.serviceRegWrapper.RegisterWorkload(services)
|
||||
}
|
||||
|
||||
// Update is run when a job submitter modifies service(s) (but not much else -
|
||||
// otherwise a full alloc replacement would occur).
|
||||
func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
oldWorkloadServices := h.getWorkloadServices()
|
||||
oldWorkloadServices := h.getWorkloadServicesLocked()
|
||||
|
||||
// Store new updated values out of request
|
||||
canary := false
|
||||
|
@ -166,7 +169,7 @@ func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
|
|||
h.providerNamespace = req.Alloc.ServiceProviderNamespace()
|
||||
|
||||
// Create new task services struct with those new values
|
||||
newWorkloadServices := h.getWorkloadServices()
|
||||
newWorkloadServices := h.getWorkloadServicesLocked()
|
||||
|
||||
if !h.prerun {
|
||||
// Update called before Prerun. Update alloc and exit to allow
|
||||
|
@ -186,21 +189,20 @@ func (h *groupServiceHook) PreTaskRestart() error {
|
|||
}()
|
||||
|
||||
h.preKillLocked()
|
||||
return h.prerunLocked()
|
||||
return h.preRunLocked()
|
||||
}
|
||||
|
||||
func (h *groupServiceHook) PreKill() {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.preKillLocked()
|
||||
helper.WithLock(&h.mu, h.preKillLocked)
|
||||
}
|
||||
|
||||
// implements the PreKill hook but requires the caller hold the lock
|
||||
// implements the PreKill hook
|
||||
//
|
||||
// caller must hold h.lock
|
||||
func (h *groupServiceHook) preKillLocked() {
|
||||
// If we have a shutdown delay deregister group services and then wait
|
||||
// before continuing to kill tasks.
|
||||
h.deregister()
|
||||
h.deregistered = true
|
||||
h.deregisterLocked()
|
||||
|
||||
if h.delay == 0 {
|
||||
return
|
||||
|
@ -220,24 +222,31 @@ func (h *groupServiceHook) preKillLocked() {
|
|||
}
|
||||
|
||||
func (h *groupServiceHook) Postrun() error {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
if !h.deregistered {
|
||||
h.deregister()
|
||||
}
|
||||
helper.WithLock(&h.mu, h.deregisterLocked)
|
||||
return nil
|
||||
}
|
||||
|
||||
// deregister services from Consul/Nomad service provider.
|
||||
func (h *groupServiceHook) deregister() {
|
||||
// deregisterLocked will deregister services from Consul/Nomad service provider.
|
||||
//
|
||||
// caller must hold h.lock
|
||||
func (h *groupServiceHook) deregisterLocked() {
|
||||
if h.deregistered {
|
||||
return
|
||||
}
|
||||
|
||||
if len(h.services) > 0 {
|
||||
workloadServices := h.getWorkloadServices()
|
||||
workloadServices := h.getWorkloadServicesLocked()
|
||||
h.serviceRegWrapper.RemoveWorkload(workloadServices)
|
||||
}
|
||||
|
||||
h.deregistered = true
|
||||
}
|
||||
|
||||
func (h *groupServiceHook) getWorkloadServices() *serviceregistration.WorkloadServices {
|
||||
// getWorkloadServicesLocked returns the set of workload services currently
|
||||
// on the hook.
|
||||
//
|
||||
// caller must hold h.lock
|
||||
func (h *groupServiceHook) getWorkloadServicesLocked() *serviceregistration.WorkloadServices {
|
||||
// Interpolate with the task's environment
|
||||
interpolatedServices := taskenv.InterpolateServices(h.taskEnvBuilder.Build(), h.services)
|
||||
|
||||
|
|
|
@ -14,7 +14,7 @@ import (
|
|||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/shoenig/test/must"
|
||||
)
|
||||
|
||||
var _ interfaces.RunnerPrerunHook = (*groupServiceHook)(nil)
|
||||
|
@ -50,22 +50,21 @@ func TestGroupServiceHook_NoGroupServices(t *testing.T) {
|
|||
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
|
||||
logger: logger,
|
||||
})
|
||||
require.NoError(t, h.Prerun())
|
||||
must.NoError(t, h.Prerun())
|
||||
|
||||
req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
|
||||
require.NoError(t, h.Update(req))
|
||||
must.NoError(t, h.Update(req))
|
||||
|
||||
require.NoError(t, h.Postrun())
|
||||
must.NoError(t, h.Postrun())
|
||||
|
||||
require.NoError(t, h.PreTaskRestart())
|
||||
must.NoError(t, h.PreTaskRestart())
|
||||
|
||||
ops := consulMockClient.GetOps()
|
||||
require.Len(t, ops, 5)
|
||||
require.Equal(t, "add", ops[0].Op) // Prerun
|
||||
require.Equal(t, "update", ops[1].Op) // Update
|
||||
require.Equal(t, "remove", ops[2].Op) // Postrun
|
||||
require.Equal(t, "remove", ops[3].Op) // Restart -> preKill
|
||||
require.Equal(t, "add", ops[4].Op) // Restart -> preRun
|
||||
must.Len(t, 4, ops)
|
||||
must.Eq(t, "add", ops[0].Op) // Prerun
|
||||
must.Eq(t, "update", ops[1].Op) // Update
|
||||
must.Eq(t, "remove", ops[2].Op) // Postrun
|
||||
must.Eq(t, "add", ops[3].Op) // Restart -> preRun
|
||||
}
|
||||
|
||||
// TestGroupServiceHook_ShutdownDelayUpdate asserts calling group service hooks
|
||||
|
@ -92,23 +91,23 @@ func TestGroupServiceHook_ShutdownDelayUpdate(t *testing.T) {
|
|||
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
|
||||
logger: logger,
|
||||
})
|
||||
require.NoError(t, h.Prerun())
|
||||
must.NoError(t, h.Prerun())
|
||||
|
||||
// Incease shutdown Delay
|
||||
alloc.Job.TaskGroups[0].ShutdownDelay = pointer.Of(15 * time.Second)
|
||||
req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
|
||||
require.NoError(t, h.Update(req))
|
||||
must.NoError(t, h.Update(req))
|
||||
|
||||
// Assert that update updated the delay value
|
||||
require.Equal(t, h.delay, 15*time.Second)
|
||||
must.Eq(t, h.delay, 15*time.Second)
|
||||
|
||||
// Remove shutdown delay
|
||||
alloc.Job.TaskGroups[0].ShutdownDelay = nil
|
||||
req = &interfaces.RunnerUpdateRequest{Alloc: alloc}
|
||||
require.NoError(t, h.Update(req))
|
||||
must.NoError(t, h.Update(req))
|
||||
|
||||
// Assert that update updated the delay value
|
||||
require.Equal(t, h.delay, 0*time.Second)
|
||||
must.Eq(t, h.delay, 0*time.Second)
|
||||
}
|
||||
|
||||
// TestGroupServiceHook_GroupServices asserts group service hooks with group
|
||||
|
@ -133,22 +132,21 @@ func TestGroupServiceHook_GroupServices(t *testing.T) {
|
|||
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
|
||||
logger: logger,
|
||||
})
|
||||
require.NoError(t, h.Prerun())
|
||||
must.NoError(t, h.Prerun())
|
||||
|
||||
req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
|
||||
require.NoError(t, h.Update(req))
|
||||
must.NoError(t, h.Update(req))
|
||||
|
||||
require.NoError(t, h.Postrun())
|
||||
must.NoError(t, h.Postrun())
|
||||
|
||||
require.NoError(t, h.PreTaskRestart())
|
||||
must.NoError(t, h.PreTaskRestart())
|
||||
|
||||
ops := consulMockClient.GetOps()
|
||||
require.Len(t, ops, 5)
|
||||
require.Equal(t, "add", ops[0].Op) // Prerun
|
||||
require.Equal(t, "update", ops[1].Op) // Update
|
||||
require.Equal(t, "remove", ops[2].Op) // Postrun
|
||||
require.Equal(t, "remove", ops[3].Op) // Restart -> preKill
|
||||
require.Equal(t, "add", ops[4].Op) // Restart -> preRun
|
||||
must.Len(t, 4, ops)
|
||||
must.Eq(t, "add", ops[0].Op) // Prerun
|
||||
must.Eq(t, "update", ops[1].Op) // Update
|
||||
must.Eq(t, "remove", ops[2].Op) // Postrun
|
||||
must.Eq(t, "add", ops[3].Op) // Restart -> preRun
|
||||
}
|
||||
|
||||
// TestGroupServiceHook_GroupServices_Nomad asserts group service hooks with
|
||||
|
@ -179,25 +177,24 @@ func TestGroupServiceHook_GroupServices_Nomad(t *testing.T) {
|
|||
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
|
||||
logger: logger,
|
||||
})
|
||||
require.NoError(t, h.Prerun())
|
||||
must.NoError(t, h.Prerun())
|
||||
|
||||
// Trigger our hook requests.
|
||||
req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
|
||||
require.NoError(t, h.Update(req))
|
||||
require.NoError(t, h.Postrun())
|
||||
require.NoError(t, h.PreTaskRestart())
|
||||
must.NoError(t, h.Update(req))
|
||||
must.NoError(t, h.Postrun())
|
||||
must.NoError(t, h.PreTaskRestart())
|
||||
|
||||
// Ensure the Nomad mock provider has the expected operations.
|
||||
ops := nomadMockClient.GetOps()
|
||||
require.Len(t, ops, 5)
|
||||
require.Equal(t, "add", ops[0].Op) // Prerun
|
||||
require.Equal(t, "update", ops[1].Op) // Update
|
||||
require.Equal(t, "remove", ops[2].Op) // Postrun
|
||||
require.Equal(t, "remove", ops[3].Op) // Restart -> preKill
|
||||
require.Equal(t, "add", ops[4].Op) // Restart -> preRun
|
||||
must.Len(t, 4, ops)
|
||||
must.Eq(t, "add", ops[0].Op) // Prerun
|
||||
must.Eq(t, "update", ops[1].Op) // Update
|
||||
must.Eq(t, "remove", ops[2].Op) // Postrun
|
||||
must.Eq(t, "add", ops[3].Op) // Restart -> preRun
|
||||
|
||||
// Ensure the Consul mock provider has zero operations.
|
||||
require.Len(t, consulMockClient.GetOps(), 0)
|
||||
must.SliceEmpty(t, consulMockClient.GetOps())
|
||||
}
|
||||
|
||||
// TestGroupServiceHook_Error asserts group service hooks with group
|
||||
|
@ -234,22 +231,21 @@ func TestGroupServiceHook_NoNetwork(t *testing.T) {
|
|||
taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region),
|
||||
logger: logger,
|
||||
})
|
||||
require.NoError(t, h.Prerun())
|
||||
must.NoError(t, h.Prerun())
|
||||
|
||||
req := &interfaces.RunnerUpdateRequest{Alloc: alloc}
|
||||
require.NoError(t, h.Update(req))
|
||||
must.NoError(t, h.Update(req))
|
||||
|
||||
require.NoError(t, h.Postrun())
|
||||
must.NoError(t, h.Postrun())
|
||||
|
||||
require.NoError(t, h.PreTaskRestart())
|
||||
must.NoError(t, h.PreTaskRestart())
|
||||
|
||||
ops := consulMockClient.GetOps()
|
||||
require.Len(t, ops, 5)
|
||||
require.Equal(t, "add", ops[0].Op) // Prerun
|
||||
require.Equal(t, "update", ops[1].Op) // Update
|
||||
require.Equal(t, "remove", ops[2].Op) // Postrun
|
||||
require.Equal(t, "remove", ops[3].Op) // Restart -> preKill
|
||||
require.Equal(t, "add", ops[4].Op) // Restart -> preRun
|
||||
must.Len(t, 4, ops)
|
||||
must.Eq(t, "add", ops[0].Op) // Prerun
|
||||
must.Eq(t, "update", ops[1].Op) // Update
|
||||
must.Eq(t, "remove", ops[2].Op) // Postrun
|
||||
must.Eq(t, "add", ops[3].Op) // Restart -> preRun
|
||||
}
|
||||
|
||||
func TestGroupServiceHook_getWorkloadServices(t *testing.T) {
|
||||
|
@ -284,6 +280,6 @@ func TestGroupServiceHook_getWorkloadServices(t *testing.T) {
|
|||
logger: logger,
|
||||
})
|
||||
|
||||
services := h.getWorkloadServices()
|
||||
require.Len(t, services.Services, 1)
|
||||
services := h.getWorkloadServicesLocked()
|
||||
must.Len(t, 1, services.Services)
|
||||
}
|
||||
|
|
|
@ -157,6 +157,8 @@ func (s *ServiceRegistrationHandler) removeWorkload(
|
|||
defer wg.Done()
|
||||
|
||||
// Stop check watcher
|
||||
//
|
||||
// todo(shoenig) - shouldn't we only unwatch checks for the given serviceSpec ?
|
||||
for _, service := range workload.Services {
|
||||
for _, check := range service.Checks {
|
||||
checkID := string(structs.NomadCheckID(workload.AllocInfo.AllocID, workload.AllocInfo.Group, check))
|
||||
|
|
Loading…
Reference in New Issue