open-nomad/nomad/volumewatcher/volumes_watcher_test.go
Tim Gross 23be116da0
csi: add -force flag to volume deregister (#8295)
The `nomad volume deregister` command currently returns an error if the volume
has any claims, but in cases where the claims can't be dropped because of
plugin errors, providing a `-force` flag gives the operator an escape hatch.

If the volume has no allocations or if they are all terminal, this flag
deletes the volume from the state store, immediately and implicitly dropping
all claims without further CSI RPCs. Note that this will not also
unmount/detach the volume, which we'll make the responsibility of a separate
`nomad volume detach` command.
2020-07-01 12:17:51 -04:00

292 lines
8.9 KiB
Go

package volumewatcher
import (
"context"
"testing"
"time"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
// TestVolumeWatch_EnableDisable tests the watcher registration logic that needs
// to happen during leader step-up/step-down
func TestVolumeWatch_EnableDisable(t *testing.T) {
t.Parallel()
require := require.New(t)
srv := &MockRPCServer{}
srv.state = state.TestStateStore(t)
index := uint64(100)
watcher := NewVolumesWatcher(testlog.HCLogger(t),
srv, srv,
LimitStateQueriesPerSecond,
CrossVolumeUpdateBatchDuration)
watcher.SetEnabled(true, srv.State())
plugin := mock.CSIPlugin()
node := testNode(nil, plugin, srv.State())
alloc := mock.Alloc()
alloc.ClientStatus = structs.AllocClientStatusComplete
vol := testVolume(nil, plugin, alloc, node.ID)
index++
err := srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol})
require.NoError(err)
claim := &structs.CSIVolumeClaim{Mode: structs.CSIVolumeClaimRelease}
index++
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
require.NoError(err)
require.Eventually(func() bool {
return 1 == len(watcher.watchers)
}, time.Second, 10*time.Millisecond)
watcher.SetEnabled(false, srv.State())
require.Equal(0, len(watcher.watchers))
}
// TestVolumeWatch_Checkpoint tests the checkpointing of progress across
// leader leader step-up/step-down
func TestVolumeWatch_Checkpoint(t *testing.T) {
t.Parallel()
require := require.New(t)
srv := &MockRPCServer{}
srv.state = state.TestStateStore(t)
index := uint64(100)
watcher := NewVolumesWatcher(testlog.HCLogger(t),
srv, srv,
LimitStateQueriesPerSecond,
CrossVolumeUpdateBatchDuration)
plugin := mock.CSIPlugin()
node := testNode(nil, plugin, srv.State())
alloc := mock.Alloc()
alloc.ClientStatus = structs.AllocClientStatusComplete
vol := testVolume(nil, plugin, alloc, node.ID)
watcher.SetEnabled(true, srv.State())
index++
err := srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol})
require.NoError(err)
// we should get or start up a watcher when we get an update for
// the volume from the state store
require.Eventually(func() bool {
return 1 == len(watcher.watchers)
}, time.Second, 10*time.Millisecond)
// step-down (this is sync, but step-up is async)
watcher.SetEnabled(false, srv.State())
require.Equal(0, len(watcher.watchers))
// step-up again
watcher.SetEnabled(true, srv.State())
require.Eventually(func() bool {
return 1 == len(watcher.watchers) &&
!watcher.watchers[vol.ID+vol.Namespace].isRunning()
}, time.Second, 10*time.Millisecond)
}
// TestVolumeWatch_StartStop tests the start and stop of the watcher when
// it receives notifcations and has completed its work
func TestVolumeWatch_StartStop(t *testing.T) {
t.Parallel()
require := require.New(t)
ctx, exitFn := context.WithCancel(context.Background())
defer exitFn()
srv := &MockStatefulRPCServer{}
srv.state = state.TestStateStore(t)
index := uint64(100)
srv.volumeUpdateBatcher = NewVolumeUpdateBatcher(
ctx, CrossVolumeUpdateBatchDuration, srv)
watcher := NewVolumesWatcher(testlog.HCLogger(t),
srv, srv,
LimitStateQueriesPerSecond,
CrossVolumeUpdateBatchDuration)
watcher.SetEnabled(true, srv.State())
require.Equal(0, len(watcher.watchers))
plugin := mock.CSIPlugin()
node := testNode(nil, plugin, srv.State())
alloc1 := mock.Alloc()
alloc1.ClientStatus = structs.AllocClientStatusRunning
alloc2 := mock.Alloc()
alloc2.Job = alloc1.Job
alloc2.ClientStatus = structs.AllocClientStatusRunning
index++
err := srv.State().UpsertJob(index, alloc1.Job)
require.NoError(err)
index++
err = srv.State().UpsertAllocs(index, []*structs.Allocation{alloc1, alloc2})
require.NoError(err)
// register a volume
vol := testVolume(nil, plugin, alloc1, node.ID)
index++
err = srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol})
require.NoError(err)
// assert we get a watcher; there are no claims so it should immediately stop
require.Eventually(func() bool {
return 1 == len(watcher.watchers) &&
!watcher.watchers[vol.ID+vol.Namespace].isRunning()
}, time.Second*2, 10*time.Millisecond)
// claim the volume for both allocs
claim := &structs.CSIVolumeClaim{
AllocationID: alloc1.ID,
NodeID: node.ID,
Mode: structs.CSIVolumeClaimRead,
}
index++
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
require.NoError(err)
claim.AllocationID = alloc2.ID
index++
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
require.NoError(err)
// reap the volume and assert nothing has happened
claim = &structs.CSIVolumeClaim{
AllocationID: alloc1.ID,
NodeID: node.ID,
Mode: structs.CSIVolumeClaimRelease,
}
index++
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
require.NoError(err)
ws := memdb.NewWatchSet()
vol, _ = srv.State().CSIVolumeByID(ws, vol.Namespace, vol.ID)
require.Equal(2, len(vol.ReadAllocs))
// alloc becomes terminal
alloc1.ClientStatus = structs.AllocClientStatusComplete
index++
err = srv.State().UpsertAllocs(index, []*structs.Allocation{alloc1})
require.NoError(err)
index++
claim.State = structs.CSIVolumeClaimStateReadyToFree
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
require.NoError(err)
// 1 claim has been released and watcher stops
require.Eventually(func() bool {
ws := memdb.NewWatchSet()
vol, _ := srv.State().CSIVolumeByID(ws, vol.Namespace, vol.ID)
return len(vol.ReadAllocs) == 1 && len(vol.PastClaims) == 0
}, time.Second*2, 10*time.Millisecond)
require.Eventually(func() bool {
return !watcher.watchers[vol.ID+vol.Namespace].isRunning()
}, time.Second*5, 10*time.Millisecond)
// the watcher will have incremented the index so we need to make sure
// our inserts will trigger new events
index, _ = srv.State().LatestIndex()
// remaining alloc's job is stopped (alloc is not marked terminal)
alloc2.Job.Stop = true
index++
err = srv.State().UpsertJob(index, alloc2.Job)
require.NoError(err)
// job deregistration write a claim with no allocations or nodes
claim = &structs.CSIVolumeClaim{
Mode: structs.CSIVolumeClaimRelease,
}
index++
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
require.NoError(err)
// all claims have been released and watcher has stopped again
require.Eventually(func() bool {
ws := memdb.NewWatchSet()
vol, _ := srv.State().CSIVolumeByID(ws, vol.Namespace, vol.ID)
return len(vol.ReadAllocs) == 0 && len(vol.PastClaims) == 0
}, time.Second*2, 10*time.Millisecond)
require.Eventually(func() bool {
return !watcher.watchers[vol.ID+vol.Namespace].isRunning()
}, time.Second*5, 10*time.Millisecond)
}
// TestVolumeWatch_RegisterDeregister tests the start and stop of
// watchers around registration
func TestVolumeWatch_RegisterDeregister(t *testing.T) {
t.Parallel()
require := require.New(t)
ctx, exitFn := context.WithCancel(context.Background())
defer exitFn()
srv := &MockStatefulRPCServer{}
srv.state = state.TestStateStore(t)
srv.volumeUpdateBatcher = NewVolumeUpdateBatcher(
ctx, CrossVolumeUpdateBatchDuration, srv)
index := uint64(100)
watcher := NewVolumesWatcher(testlog.HCLogger(t),
srv, srv,
LimitStateQueriesPerSecond,
CrossVolumeUpdateBatchDuration)
watcher.SetEnabled(true, srv.State())
require.Equal(0, len(watcher.watchers))
plugin := mock.CSIPlugin()
node := testNode(nil, plugin, srv.State())
alloc := mock.Alloc()
alloc.ClientStatus = structs.AllocClientStatusComplete
// register a volume
vol := testVolume(nil, plugin, alloc, node.ID)
index++
err := srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol})
require.NoError(err)
require.Eventually(func() bool {
return 1 == len(watcher.watchers)
}, time.Second, 10*time.Millisecond)
// reap the volume and assert we've cleaned up
w := watcher.watchers[vol.ID+vol.Namespace]
w.Notify(vol)
require.Eventually(func() bool {
ws := memdb.NewWatchSet()
vol, _ := srv.State().CSIVolumeByID(ws, vol.Namespace, vol.ID)
return len(vol.ReadAllocs) == 0 && len(vol.PastClaims) == 0
}, time.Second*2, 10*time.Millisecond)
require.Eventually(func() bool {
return !watcher.watchers[vol.ID+vol.Namespace].isRunning()
}, time.Second*1, 10*time.Millisecond)
require.Equal(1, srv.countCSINodeDetachVolume, "node detach RPC count")
require.Equal(1, srv.countCSIControllerDetachVolume, "controller detach RPC count")
require.Equal(2, srv.countUpsertVolumeClaims, "upsert claims count")
// deregistering the volume doesn't cause an update that triggers
// a watcher; we'll clean up this watcher in a GC later
err = srv.State().CSIVolumeDeregister(index, vol.Namespace, []string{vol.ID}, false)
require.NoError(err)
require.Equal(1, len(watcher.watchers))
require.False(watcher.watchers[vol.ID+vol.Namespace].isRunning())
}