Fix CSI volume list with prefix and `*` namespace (#12184)
When using a prefix value and the * wildcard for namespace, the endpoint would not take the prefix value into consideration due to the order in which the checks were executed but also the logic for retrieving volumes from the state store. This commit changes the order to check for a prefix first and wraps the result iterator of the state store query in a filter to apply the prefix.
This commit is contained in:
parent
b8b08fb32d
commit
b1809eb48c
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:bug
|
||||||
|
api: Apply prefix filter when querying CSI volumes in all namespaces
|
||||||
|
```
|
|
@ -129,10 +129,12 @@ func (v *CSIVolume) List(args *structs.CSIVolumeListRequest, reply *structs.CSIV
|
||||||
iter, err = snap.CSIVolumesByNodeID(ws, prefix, args.NodeID)
|
iter, err = snap.CSIVolumesByNodeID(ws, prefix, args.NodeID)
|
||||||
} else if args.PluginID != "" {
|
} else if args.PluginID != "" {
|
||||||
iter, err = snap.CSIVolumesByPluginID(ws, ns, prefix, args.PluginID)
|
iter, err = snap.CSIVolumesByPluginID(ws, ns, prefix, args.PluginID)
|
||||||
} else if ns == structs.AllNamespacesSentinel {
|
} else if prefix != "" {
|
||||||
iter, err = snap.CSIVolumes(ws)
|
iter, err = snap.CSIVolumesByIDPrefix(ws, ns, prefix)
|
||||||
} else {
|
} else if ns != structs.AllNamespacesSentinel {
|
||||||
iter, err = snap.CSIVolumesByNamespace(ws, ns, prefix)
|
iter, err = snap.CSIVolumesByNamespace(ws, ns, prefix)
|
||||||
|
} else {
|
||||||
|
iter, err = snap.CSIVolumes(ws)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -736,6 +736,22 @@ func TestCSIVolumeEndpoint_ListAllNamespaces(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(1001), resp.Index)
|
require.Equal(t, uint64(1001), resp.Index)
|
||||||
require.Len(t, resp.Volumes, len(vols))
|
require.Len(t, resp.Volumes, len(vols))
|
||||||
|
|
||||||
|
// Lookup volumes in all namespaces with prefix
|
||||||
|
get = &structs.CSIVolumeListRequest{
|
||||||
|
QueryOptions: structs.QueryOptions{
|
||||||
|
Region: "global",
|
||||||
|
Prefix: id0[:4],
|
||||||
|
Namespace: "*",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
var resp2 structs.CSIVolumeListResponse
|
||||||
|
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.List", get, &resp2)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, uint64(1001), resp.Index)
|
||||||
|
require.Len(t, resp2.Volumes, 1)
|
||||||
|
require.Equal(t, vols[0].ID, resp2.Volumes[0].ID)
|
||||||
|
require.Equal(t, structs.DefaultNamespace, resp2.Volumes[0].Namespace)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCSIVolumeEndpoint_Create(t *testing.T) {
|
func TestCSIVolumeEndpoint_Create(t *testing.T) {
|
||||||
|
|
|
@ -2265,8 +2265,13 @@ func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, namespace, prefix,
|
||||||
}
|
}
|
||||||
|
|
||||||
// CSIVolumesByIDPrefix supports search. Caller should snapshot if it wants to
|
// CSIVolumesByIDPrefix supports search. Caller should snapshot if it wants to
|
||||||
// also denormalize the plugins.
|
// also denormalize the plugins. If using a prefix with the wildcard namespace,
|
||||||
|
// the results will not use the index prefix.
|
||||||
func (s *StateStore) CSIVolumesByIDPrefix(ws memdb.WatchSet, namespace, volumeID string) (memdb.ResultIterator, error) {
|
func (s *StateStore) CSIVolumesByIDPrefix(ws memdb.WatchSet, namespace, volumeID string) (memdb.ResultIterator, error) {
|
||||||
|
if namespace == structs.AllNamespacesSentinel {
|
||||||
|
return s.csiVolumeByIDPrefixAllNamespaces(ws, volumeID)
|
||||||
|
}
|
||||||
|
|
||||||
txn := s.db.ReadTxn()
|
txn := s.db.ReadTxn()
|
||||||
|
|
||||||
iter, err := txn.Get("csi_volumes", "id_prefix", namespace, volumeID)
|
iter, err := txn.Get("csi_volumes", "id_prefix", namespace, volumeID)
|
||||||
|
@ -2279,6 +2284,30 @@ func (s *StateStore) CSIVolumesByIDPrefix(ws memdb.WatchSet, namespace, volumeID
|
||||||
return iter, nil
|
return iter, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *StateStore) csiVolumeByIDPrefixAllNamespaces(ws memdb.WatchSet, prefix string) (memdb.ResultIterator, error) {
|
||||||
|
txn := s.db.ReadTxn()
|
||||||
|
|
||||||
|
// Walk the entire csi_volumes table
|
||||||
|
iter, err := txn.Get("csi_volumes", "id")
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ws.Add(iter.WatchCh())
|
||||||
|
|
||||||
|
// Filter the iterator by ID prefix
|
||||||
|
f := func(raw interface{}) bool {
|
||||||
|
v, ok := raw.(*structs.CSIVolume)
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return !strings.HasPrefix(v.ID, prefix)
|
||||||
|
}
|
||||||
|
wrap := memdb.NewFilterIterator(iter, f)
|
||||||
|
return wrap, nil
|
||||||
|
}
|
||||||
|
|
||||||
// CSIVolumesByNodeID looks up CSIVolumes in use on a node. Caller should
|
// CSIVolumesByNodeID looks up CSIVolumes in use on a node. Caller should
|
||||||
// snapshot if it wants to also denormalize the plugins.
|
// snapshot if it wants to also denormalize the plugins.
|
||||||
func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, prefix, nodeID string) (memdb.ResultIterator, error) {
|
func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, prefix, nodeID string) (memdb.ResultIterator, error) {
|
||||||
|
|
Loading…
Reference in New Issue