open-nomad/nomad/volumewatcher/volumes_watcher_test.go
Tim Gross 759310d13a
CSI: volume watcher shutdown fixes (#12439)
The volume watcher design was based on deploymentwatcher and drainer,
but has an important difference: we don't want to maintain a goroutine
for the lifetime of the volume. So we stop the volumewatcher goroutine
for a volume when that volume has no more claims to free. But the
shutdown races with updates on the parent goroutine, and it's possible
to drop updates. Fortunately these updates are picked up on the next
core GC job, but we're most likely to hit this race when we're
replacing an allocation and that's the time we least want to wait.

Wait until the volume has "settled" before stopping this goroutine so
that the race between shutdown and the parent goroutine sending on
`<-updateCh` is pushed to after the window we most care about quick
freeing of claims.

* Fixes a resource leak when volumewatchers are no longer needed. The
  volume is nil and can't ever be started again, so the volume's
  `watcher` should be removed from the top-level `Watcher`.

* De-flakes the GC job test: the test throws an error because the
  claimed node doesn't exist and is unreachable. This flaked instead of
  failed because we didn't correctly wait for the first pass through the
  volumewatcher.

  Make the GC job wait for the volumewatcher to reach the quiescent
  timeout window state before running the GC eval under test, so that
  we're sure the GC job's work isn't being picked up by processing one
  of the earlier claims. Update the claims used so that we're sure the
  GC pass won't hit a node unpublish error.

* Adds trace logging to unpublish operations
2022-04-04 10:46:45 -04:00

277 lines
8.7 KiB
Go

package volumewatcher
import (
"testing"
"time"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
// TestVolumeWatch_EnableDisable tests the watcher registration logic that needs
// to happen during leader step-up/step-down
func TestVolumeWatch_EnableDisable(t *testing.T) {
ci.Parallel(t)
srv := &MockRPCServer{}
srv.state = state.TestStateStore(t)
index := uint64(100)
watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.quiescentTimeout = 100 * time.Millisecond
watcher.SetEnabled(true, srv.State(), "")
plugin := mock.CSIPlugin()
node := testNode(plugin, srv.State())
alloc := mock.Alloc()
alloc.ClientStatus = structs.AllocClientStatusComplete
vol := testVolume(plugin, alloc, node.ID)
index++
err := srv.State().UpsertCSIVolume(index, []*structs.CSIVolume{vol})
require.NoError(t, err)
// need to have just enough of a volume and claim in place so that
// the watcher doesn't immediately stop and unload itself
claim := &structs.CSIVolumeClaim{
Mode: structs.CSIVolumeClaimGC,
State: structs.CSIVolumeClaimStateNodeDetached,
}
index++
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
require.NoError(t, err)
require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return 1 == len(watcher.watchers)
}, time.Second, 10*time.Millisecond)
watcher.SetEnabled(false, nil, "")
require.Equal(t, 0, len(watcher.watchers))
}
// TestVolumeWatch_LeadershipTransition tests the correct behavior of
// claim reaping across leader step-up/step-down
func TestVolumeWatch_LeadershipTransition(t *testing.T) {
ci.Parallel(t)
srv := &MockRPCServer{}
srv.state = state.TestStateStore(t)
index := uint64(100)
watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.quiescentTimeout = 100 * time.Millisecond
plugin := mock.CSIPlugin()
node := testNode(plugin, srv.State())
alloc := mock.Alloc()
alloc.ClientStatus = structs.AllocClientStatusRunning
vol := testVolume(plugin, alloc, node.ID)
index++
err := srv.State().UpsertAllocs(structs.MsgTypeTestSetup, index,
[]*structs.Allocation{alloc})
require.NoError(t, err)
watcher.SetEnabled(true, srv.State(), "")
index++
err = srv.State().UpsertCSIVolume(index, []*structs.CSIVolume{vol})
require.NoError(t, err)
// we should get or start up a watcher when we get an update for
// the volume from the state store
require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return 1 == len(watcher.watchers)
}, time.Second, 10*time.Millisecond)
vol, _ = srv.State().CSIVolumeByID(nil, vol.Namespace, vol.ID)
require.Len(t, vol.PastClaims, 0, "expected to have 0 PastClaims")
require.Equal(t, srv.countCSIUnpublish, 0, "expected no CSI.Unpublish RPC calls")
// trying to test a dropped watch is racy, so to reliably simulate
// this condition, step-down the watcher first and then perform
// the writes to the volume before starting the new watcher. no
// watches for that change will fire on the new watcher
// step-down (this is sync)
watcher.SetEnabled(false, nil, "")
require.Equal(t, 0, len(watcher.watchers))
// allocation is now invalid
index++
err = srv.State().DeleteEval(index, []string{}, []string{alloc.ID})
require.NoError(t, err)
// emit a GC so that we have a volume change that's dropped
claim := &structs.CSIVolumeClaim{
AllocationID: alloc.ID,
NodeID: node.ID,
Mode: structs.CSIVolumeClaimGC,
State: structs.CSIVolumeClaimStateUnpublishing,
}
index++
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
require.NoError(t, err)
// create a new watcher and enable it to simulate the leadership
// transition
watcher = NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.quiescentTimeout = 100 * time.Millisecond
watcher.SetEnabled(true, srv.State(), "")
require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return 1 == len(watcher.watchers) &&
!watcher.watchers[vol.ID+vol.Namespace].isRunning()
}, time.Second, 10*time.Millisecond)
vol, _ = srv.State().CSIVolumeByID(nil, vol.Namespace, vol.ID)
require.Len(t, vol.PastClaims, 1, "expected to have 1 PastClaim")
require.Equal(t, srv.countCSIUnpublish, 1, "expected CSI.Unpublish RPC to be called")
}
// TestVolumeWatch_StartStop tests the start and stop of the watcher when
// it receives notifcations and has completed its work
func TestVolumeWatch_StartStop(t *testing.T) {
ci.Parallel(t)
srv := &MockStatefulRPCServer{}
srv.state = state.TestStateStore(t)
index := uint64(100)
watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.quiescentTimeout = 100 * time.Millisecond
watcher.SetEnabled(true, srv.State(), "")
require.Equal(t, 0, len(watcher.watchers))
plugin := mock.CSIPlugin()
node := testNode(plugin, srv.State())
alloc1 := mock.Alloc()
alloc1.ClientStatus = structs.AllocClientStatusRunning
alloc2 := mock.Alloc()
alloc2.Job = alloc1.Job
alloc2.ClientStatus = structs.AllocClientStatusRunning
index++
err := srv.State().UpsertJob(structs.MsgTypeTestSetup, index, alloc1.Job)
require.NoError(t, err)
index++
err = srv.State().UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc1, alloc2})
require.NoError(t, err)
// register a volume
vol := testVolume(plugin, alloc1, node.ID)
index++
err = srv.State().UpsertCSIVolume(index, []*structs.CSIVolume{vol})
require.NoError(t, err)
// assert we get a watcher; there are no claims so it should immediately stop
require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return 1 == len(watcher.watchers) &&
!watcher.watchers[vol.ID+vol.Namespace].isRunning()
}, time.Second*2, 10*time.Millisecond)
// claim the volume for both allocs
claim := &structs.CSIVolumeClaim{
AllocationID: alloc1.ID,
NodeID: node.ID,
Mode: structs.CSIVolumeClaimRead,
AccessMode: structs.CSIVolumeAccessModeMultiNodeReader,
}
index++
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
require.NoError(t, err)
claim.AllocationID = alloc2.ID
index++
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
require.NoError(t, err)
// reap the volume and assert nothing has happened
claim = &structs.CSIVolumeClaim{
AllocationID: alloc1.ID,
NodeID: node.ID,
}
index++
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
require.NoError(t, err)
ws := memdb.NewWatchSet()
vol, _ = srv.State().CSIVolumeByID(ws, vol.Namespace, vol.ID)
require.Equal(t, 2, len(vol.ReadAllocs))
// alloc becomes terminal
alloc1.ClientStatus = structs.AllocClientStatusComplete
index++
err = srv.State().UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc1})
require.NoError(t, err)
index++
claim.State = structs.CSIVolumeClaimStateReadyToFree
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
require.NoError(t, err)
// 1 claim has been released and watcher stops
require.Eventually(t, func() bool {
ws := memdb.NewWatchSet()
vol, _ := srv.State().CSIVolumeByID(ws, vol.Namespace, vol.ID)
return len(vol.ReadAllocs) == 1 && len(vol.PastClaims) == 0
}, time.Second*2, 10*time.Millisecond)
require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return !watcher.watchers[vol.ID+vol.Namespace].isRunning()
}, time.Second*5, 10*time.Millisecond)
}
// TestVolumeWatch_RegisterDeregister tests the start and stop of
// watchers around registration
func TestVolumeWatch_RegisterDeregister(t *testing.T) {
ci.Parallel(t)
srv := &MockStatefulRPCServer{}
srv.state = state.TestStateStore(t)
index := uint64(100)
watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.quiescentTimeout = 10 * time.Millisecond
watcher.SetEnabled(true, srv.State(), "")
require.Equal(t, 0, len(watcher.watchers))
plugin := mock.CSIPlugin()
alloc := mock.Alloc()
alloc.ClientStatus = structs.AllocClientStatusComplete
// register a volume without claims
vol := mock.CSIVolume(plugin)
index++
err := srv.State().UpsertCSIVolume(index, []*structs.CSIVolume{vol})
require.NoError(t, err)
// watcher should be started but immediately stopped
require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return 1 == len(watcher.watchers)
}, time.Second, 10*time.Millisecond)
require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return !watcher.watchers[vol.ID+vol.Namespace].isRunning()
}, 1*time.Second, 10*time.Millisecond)
}