csi: reap unused volume claims at leadership transitions (#11776)

When `volumewatcher.Watcher` starts on the leader, it starts a watch
on every volume and triggers a reap of unused claims on any change to
that volume. But if a reaping is in-flight during leadership
transitions, it will fail and the event that triggered the reap will
be dropped. Perform one reap of unused claims at the start of the
watcher so that leadership transitions don't drop this event.
This commit is contained in:
Tim Gross 2022-01-05 11:40:20 -05:00 committed by GitHub
parent ffb174b596
commit 51f512a3e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 48 additions and 5 deletions

3
.changelog/11776.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
csi: Fixed a bug where volume claim releases that were not fully processed before a leadership transition would be ignored
```

View File

@ -104,6 +104,13 @@ func (vw *volumeWatcher) isRunning() bool {
// Each pass steps the volume's claims through the various states of reaping // Each pass steps the volume's claims through the various states of reaping
// until the volume has no more claims eligible to be reaped. // until the volume has no more claims eligible to be reaped.
func (vw *volumeWatcher) watch() { func (vw *volumeWatcher) watch() {
// always denormalize the volume and call reap when we first start
// the watcher so that we ensure we don't drop events that
// happened during leadership transitions and didn't get completed
// by the prior leader
vol := vw.getVolume(vw.v)
vw.volumeReap(vol)
for { for {
select { select {
// TODO(tgross): currently server->client RPC have no cancellation // TODO(tgross): currently server->client RPC have no cancellation

View File

@ -52,9 +52,9 @@ func TestVolumeWatch_EnableDisable(t *testing.T) {
require.Equal(0, len(watcher.watchers)) require.Equal(0, len(watcher.watchers))
} }
// TestVolumeWatch_Checkpoint tests the checkpointing of progress across // TestVolumeWatch_LeadershipTransition tests the correct behavior of
// leader leader step-up/step-down // claim reaping across leader step-up/step-down
func TestVolumeWatch_Checkpoint(t *testing.T) { func TestVolumeWatch_LeadershipTransition(t *testing.T) {
t.Parallel() t.Parallel()
require := require.New(t) require := require.New(t)
@ -84,18 +84,51 @@ func TestVolumeWatch_Checkpoint(t *testing.T) {
return 1 == len(watcher.watchers) return 1 == len(watcher.watchers)
}, time.Second, 10*time.Millisecond) }, time.Second, 10*time.Millisecond)
// step-down (this is sync, but step-up is async) vol, _ = srv.State().CSIVolumeByID(nil, vol.Namespace, vol.ID)
require.Len(vol.PastClaims, 0, "expected to have 0 PastClaims")
require.Equal(srv.countCSIUnpublish, 0, "expected no CSI.Unpublish RPC calls")
// trying to test a dropped watch is racy, so to reliably simulate
// this condition, step-down the watcher first and then perform
// the writes to the volume before starting the new watcher. no
// watches for that change will fire on the new watcher
// step-down (this is sync)
watcher.SetEnabled(false, nil) watcher.SetEnabled(false, nil)
require.Equal(0, len(watcher.watchers)) require.Equal(0, len(watcher.watchers))
// step-up again // allocation is now invalid
index++
err = srv.State().DeleteEval(index, []string{}, []string{alloc.ID})
require.NoError(err)
// emit a GC so that we have a volume change that's dropped
claim := &structs.CSIVolumeClaim{
AllocationID: alloc.ID,
NodeID: node.ID,
Mode: structs.CSIVolumeClaimGC,
State: structs.CSIVolumeClaimStateUnpublishing,
}
index++
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
require.NoError(err)
// create a new watcher and enable it to simulate the leadership
// transition
watcher = NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.SetEnabled(true, srv.State()) watcher.SetEnabled(true, srv.State())
require.Eventually(func() bool { require.Eventually(func() bool {
watcher.wlock.RLock() watcher.wlock.RLock()
defer watcher.wlock.RUnlock() defer watcher.wlock.RUnlock()
return 1 == len(watcher.watchers) && return 1 == len(watcher.watchers) &&
!watcher.watchers[vol.ID+vol.Namespace].isRunning() !watcher.watchers[vol.ID+vol.Namespace].isRunning()
}, time.Second, 10*time.Millisecond) }, time.Second, 10*time.Millisecond)
vol, _ = srv.State().CSIVolumeByID(nil, vol.Namespace, vol.ID)
require.Len(vol.PastClaims, 1, "expected to have 1 PastClaim")
require.Equal(srv.countCSIUnpublish, 1, "expected CSI.Unpublish RPC to be called")
} }
// TestVolumeWatch_StartStop tests the start and stop of the watcher when // TestVolumeWatch_StartStop tests the start and stop of the watcher when