CSI: fix potential state store corruptions (#16256)
The `CSIVolume` struct has references to allocations that are "denormalized"; we don't store them on the `CSIVolume` struct but hydrate them on read. Tests detecting potential state store corruptions found two locations where we're not copying the volume before denormalizing: * When garbage collecting CSI volume claims. * When checking if it's safe to force-deregister the volume. There are no known user-visible problems associated with these bugs but both have the potential of mutating volume claims outside of a FSM transaction. This changeset also cleans up state mutations in some CSI tests so as to avoid having working tests cover up potential future bugs.
This commit is contained in:
parent
62ac8c561d
commit
4c9688271a
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
csi: Fixed potential state store corruption when garbage collecting CSI volume claims or checking whether it's safe to force-deregister a volume
|
||||||
|
```
|
|
@ -728,7 +728,7 @@ func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error {
|
||||||
// we only call the claim release RPC if the volume has claims
|
// we only call the claim release RPC if the volume has claims
|
||||||
// that no longer have valid allocations. otherwise we'd send
|
// that no longer have valid allocations. otherwise we'd send
|
||||||
// out a lot of do-nothing RPCs.
|
// out a lot of do-nothing RPCs.
|
||||||
vol, err := c.snap.CSIVolumeDenormalize(ws, vol)
|
vol, err := c.snap.CSIVolumeDenormalize(ws, vol.Copy())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -2218,6 +2218,7 @@ func TestCoreScheduler_CSIPluginGC(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Empty the plugin
|
// Empty the plugin
|
||||||
|
plug = plug.Copy()
|
||||||
plug.Controllers = map[string]*structs.CSIInfo{}
|
plug.Controllers = map[string]*structs.CSIInfo{}
|
||||||
plug.Nodes = map[string]*structs.CSIInfo{}
|
plug.Nodes = map[string]*structs.CSIInfo{}
|
||||||
|
|
||||||
|
|
|
@ -1458,14 +1458,13 @@ func deleteNodeCSIPlugins(txn *txn, node *structs.Node, index uint64) error {
|
||||||
|
|
||||||
// updateOrGCPlugin updates a plugin but will delete it if the plugin is empty
|
// updateOrGCPlugin updates a plugin but will delete it if the plugin is empty
|
||||||
func updateOrGCPlugin(index uint64, txn Txn, plug *structs.CSIPlugin) error {
|
func updateOrGCPlugin(index uint64, txn Txn, plug *structs.CSIPlugin) error {
|
||||||
plug.ModifyIndex = index
|
|
||||||
|
|
||||||
if plug.IsEmpty() {
|
if plug.IsEmpty() {
|
||||||
err := txn.Delete("csi_plugins", plug)
|
err := txn.Delete("csi_plugins", plug)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("csi_plugins delete error: %v", err)
|
return fmt.Errorf("csi_plugins delete error: %v", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
plug.ModifyIndex = index
|
||||||
err := txn.Insert("csi_plugins", plug)
|
err := txn.Insert("csi_plugins", plug)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("csi_plugins update error %s: %v", plug.ID, err)
|
return fmt.Errorf("csi_plugins update error %s: %v", plug.ID, err)
|
||||||
|
@ -2601,6 +2600,7 @@ func (s *StateStore) CSIVolumeDeregister(index uint64, namespace string, ids []s
|
||||||
// volSafeToForce checks if the any of the remaining allocations
|
// volSafeToForce checks if the any of the remaining allocations
|
||||||
// are in a non-terminal state.
|
// are in a non-terminal state.
|
||||||
func (s *StateStore) volSafeToForce(txn Txn, v *structs.CSIVolume) bool {
|
func (s *StateStore) volSafeToForce(txn Txn, v *structs.CSIVolume) bool {
|
||||||
|
v = v.Copy()
|
||||||
vol, err := s.csiVolumeDenormalizeTxn(txn, nil, v)
|
vol, err := s.csiVolumeDenormalizeTxn(txn, nil, v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false
|
return false
|
||||||
|
|
|
@ -3490,8 +3490,10 @@ func TestStateStore_CSIVolume(t *testing.T) {
|
||||||
vs = slurp(iter)
|
vs = slurp(iter)
|
||||||
require.False(t, vs[0].HasFreeWriteClaims())
|
require.False(t, vs[0].HasFreeWriteClaims())
|
||||||
|
|
||||||
claim0.Mode = u
|
claim2 := new(structs.CSIVolumeClaim)
|
||||||
err = state.CSIVolumeClaim(2, ns, vol0, claim0)
|
*claim2 = *claim0
|
||||||
|
claim2.Mode = u
|
||||||
|
err = state.CSIVolumeClaim(2, ns, vol0, claim2)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
ws = memdb.NewWatchSet()
|
ws = memdb.NewWatchSet()
|
||||||
iter, err = state.CSIVolumesByPluginID(ws, ns, "", "minnie")
|
iter, err = state.CSIVolumesByPluginID(ws, ns, "", "minnie")
|
||||||
|
@ -3520,8 +3522,10 @@ func TestStateStore_CSIVolume(t *testing.T) {
|
||||||
|
|
||||||
// release claims to unblock deregister
|
// release claims to unblock deregister
|
||||||
index++
|
index++
|
||||||
claim0.State = structs.CSIVolumeClaimStateReadyToFree
|
claim3 := new(structs.CSIVolumeClaim)
|
||||||
err = state.CSIVolumeClaim(index, ns, vol0, claim0)
|
*claim3 = *claim2
|
||||||
|
claim3.State = structs.CSIVolumeClaimStateReadyToFree
|
||||||
|
err = state.CSIVolumeClaim(index, ns, vol0, claim3)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
index++
|
index++
|
||||||
claim1.Mode = u
|
claim1.Mode = u
|
||||||
|
@ -3577,7 +3581,7 @@ func TestStateStore_CSIPlugin_Lifecycle(t *testing.T) {
|
||||||
require.Equal(t, counts.nodesHealthy, plug.NodesHealthy, "nodes healthy")
|
require.Equal(t, counts.nodesHealthy, plug.NodesHealthy, "nodes healthy")
|
||||||
require.Equal(t, counts.controllersExpected, plug.ControllersExpected, "controllers expected")
|
require.Equal(t, counts.controllersExpected, plug.ControllersExpected, "controllers expected")
|
||||||
require.Equal(t, counts.nodesExpected, plug.NodesExpected, "nodes expected")
|
require.Equal(t, counts.nodesExpected, plug.NodesExpected, "nodes expected")
|
||||||
return plug
|
return plug.Copy()
|
||||||
}
|
}
|
||||||
|
|
||||||
type allocUpdateKind int
|
type allocUpdateKind int
|
||||||
|
|
Loading…
Reference in New Issue