diff --git a/.changelog/12184.txt b/.changelog/12184.txt new file mode 100644 index 000000000..e885a9ad5 --- /dev/null +++ b/.changelog/12184.txt @@ -0,0 +1,3 @@ +```release-note:bug +api: Apply prefix filter when querying CSI volumes in all namespaces +``` diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index 092fc16ff..66083452f 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -129,10 +129,12 @@ func (v *CSIVolume) List(args *structs.CSIVolumeListRequest, reply *structs.CSIV iter, err = snap.CSIVolumesByNodeID(ws, prefix, args.NodeID) } else if args.PluginID != "" { iter, err = snap.CSIVolumesByPluginID(ws, ns, prefix, args.PluginID) - } else if ns == structs.AllNamespacesSentinel { - iter, err = snap.CSIVolumes(ws) - } else { + } else if prefix != "" { + iter, err = snap.CSIVolumesByIDPrefix(ws, ns, prefix) + } else if ns != structs.AllNamespacesSentinel { iter, err = snap.CSIVolumesByNamespace(ws, ns, prefix) + } else { + iter, err = snap.CSIVolumes(ws) } if err != nil { diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index e4235ee21..4b8275064 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -736,6 +736,22 @@ func TestCSIVolumeEndpoint_ListAllNamespaces(t *testing.T) { require.NoError(t, err) require.Equal(t, uint64(1001), resp.Index) require.Len(t, resp.Volumes, len(vols)) + + // Lookup volumes in all namespaces with prefix + get = &structs.CSIVolumeListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + Prefix: id0[:4], + Namespace: "*", + }, + } + var resp2 structs.CSIVolumeListResponse + err = msgpackrpc.CallWithCodec(codec, "CSIVolume.List", get, &resp2) + require.NoError(t, err) + require.Equal(t, uint64(1001), resp.Index) + require.Len(t, resp2.Volumes, 1) + require.Equal(t, vols[0].ID, resp2.Volumes[0].ID) + require.Equal(t, structs.DefaultNamespace, resp2.Volumes[0].Namespace) } func TestCSIVolumeEndpoint_Create(t *testing.T) { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index b8215b4ba..4cc1902aa 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2265,8 +2265,13 @@ func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, namespace, prefix, } // CSIVolumesByIDPrefix supports search. Caller should snapshot if it wants to -// also denormalize the plugins. +// also denormalize the plugins. If using a prefix with the wildcard namespace, +// the results will not use the index prefix. func (s *StateStore) CSIVolumesByIDPrefix(ws memdb.WatchSet, namespace, volumeID string) (memdb.ResultIterator, error) { + if namespace == structs.AllNamespacesSentinel { + return s.csiVolumeByIDPrefixAllNamespaces(ws, volumeID) + } + txn := s.db.ReadTxn() iter, err := txn.Get("csi_volumes", "id_prefix", namespace, volumeID) @@ -2279,6 +2284,30 @@ func (s *StateStore) CSIVolumesByIDPrefix(ws memdb.WatchSet, namespace, volumeID return iter, nil } +func (s *StateStore) csiVolumeByIDPrefixAllNamespaces(ws memdb.WatchSet, prefix string) (memdb.ResultIterator, error) { + txn := s.db.ReadTxn() + + // Walk the entire csi_volumes table + iter, err := txn.Get("csi_volumes", "id") + + if err != nil { + return nil, err + } + + ws.Add(iter.WatchCh()) + + // Filter the iterator by ID prefix + f := func(raw interface{}) bool { + v, ok := raw.(*structs.CSIVolume) + if !ok { + return false + } + return !strings.HasPrefix(v.ID, prefix) + } + wrap := memdb.NewFilterIterator(iter, f) + return wrap, nil +} + // CSIVolumesByNodeID looks up CSIVolumes in use on a node. Caller should // snapshot if it wants to also denormalize the plugins. func (s *StateStore) CSIVolumesByNodeID(ws memdb.WatchSet, prefix, nodeID string) (memdb.ResultIterator, error) {