block deleting namespaces if the namespace contains a volume (#13880)

When we delete a namespace, we check to ensure that there are no non-terminal
jobs, which effectively covers evals, allocs, etc. CSI volumes are also
namespaced, so extend this check to cover CSI volumes.
This commit is contained in:
Tim Gross 2022-07-21 16:13:52 -04:00 committed by GitHub
parent 0d1c9a53a4
commit c7a11a86c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 54 additions and 0 deletions

3
.changelog/13880.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
namespaces: Fixed a bug that allowed deleting a namespace that contained a CSI volume
```

View File

@ -2440,6 +2440,11 @@ func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, prefix, nodeID string
func (s *StateStore) CSIVolumesByNamespace(ws memdb.WatchSet, namespace, prefix string) (memdb.ResultIterator, error) {
txn := s.db.ReadTxn()
return s.csiVolumesByNamespaceImpl(txn, ws, namespace, prefix)
}
func (s *StateStore) csiVolumesByNamespaceImpl(txn *txn, ws memdb.WatchSet, namespace, prefix string) (memdb.ResultIterator, error) {
iter, err := txn.Get("csi_volumes", "id_prefix", namespace, prefix)
if err != nil {
return nil, fmt.Errorf("volume lookup failed: %v", err)
@ -6336,6 +6341,17 @@ func (s *StateStore) DeleteNamespaces(index uint64, names []string) error {
}
}
vIter, err := s.csiVolumesByNamespaceImpl(txn, nil, name, "")
if err != nil {
return err
}
rawVol := vIter.Next()
if rawVol != nil {
vol := rawVol.(*structs.CSIVolume)
return fmt.Errorf("namespace %q contains at least one CSI volume %q. "+
"All CSI volumes in namespace must be deleted before it can be deleted", name, vol.ID)
}
// Delete the namespace
if err := txn.Delete(TableNamespaces, existing); err != nil {
return fmt.Errorf("namespace deletion failed: %v", err)

View File

@ -1009,6 +1009,41 @@ func TestStateStore_DeleteNamespaces_NonTerminalJobs(t *testing.T) {
require.False(t, watchFired(ws))
}
func TestStateStore_DeleteNamespaces_CSIVolumes(t *testing.T) {
ci.Parallel(t)
state := testStateStore(t)
ns := mock.Namespace()
require.NoError(t, state.UpsertNamespaces(1000, []*structs.Namespace{ns}))
plugin := mock.CSIPlugin()
vol := mock.CSIVolume(plugin)
vol.Namespace = ns.Name
require.NoError(t, state.UpsertCSIVolume(1001, []*structs.CSIVolume{vol}))
// Create a watchset so we can test that delete fires the watch
ws := memdb.NewWatchSet()
_, err := state.NamespaceByName(ws, ns.Name)
require.NoError(t, err)
err = state.DeleteNamespaces(1002, []string{ns.Name})
require.Error(t, err)
require.Contains(t, err.Error(), "one CSI volume")
require.False(t, watchFired(ws))
ws = memdb.NewWatchSet()
out, err := state.NamespaceByName(ws, ns.Name)
require.NoError(t, err)
require.NotNil(t, out)
index, err := state.Index(TableNamespaces)
require.NoError(t, err)
require.EqualValues(t, 1000, index)
require.False(t, watchFired(ws))
}
func TestStateStore_Namespaces(t *testing.T) {
ci.Parallel(t)