csi: fix prefix queries for plugin list RPC (#12194)

The `CSIPlugin.List` RPC was intended to accept a prefix to filter the
list of plugins being listed. This was being accidentally being done
in the state store instead, which contributed to incorrect filtering
behavior for plugins in the `volume plugin status` command.

Move the prefix matching into the RPC so that it calls the
prefix-matching method in the state store if we're looking for a
prefix.

Update the `plugin status command` to accept a prefix for the plugin
ID argument so that it matches the expected behavior of other commands.
This commit is contained in:
Tim Gross 2022-03-04 16:44:09 -05:00 committed by GitHub
parent 5c3d2315cc
commit b776c1c196
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 43 additions and 6 deletions

3
.changelog/12194.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
csi: Fixed a bug where `plugin status` commands could choose the incorrect plugin if a plugin with a name that matched the same prefix existed.
```

View File

@ -37,6 +37,29 @@ func (c *PluginStatusCommand) csiStatus(client *api.Client, id string) int {
return 0
}
// filter by plugin if a plugin ID was passed
plugs, _, err := client.CSIPlugins().List(&api.QueryOptions{Prefix: id})
if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying CSI plugins: %s", err))
return 1
}
if len(plugs) == 0 {
c.Ui.Error(fmt.Sprintf("No plugins(s) with prefix or ID %q found", id))
return 1
}
if len(plugs) > 1 {
if id != plugs[0].ID {
out, err := c.csiFormatPlugins(plugs)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error formatting: %s", err))
return 1
}
c.Ui.Error(fmt.Sprintf("Prefix matched multiple plugins\n\n%s", out))
return 1
}
}
id = plugs[0].ID
// Lookup matched a single plugin
plug, _, err := client.CSIPlugins().Info(id, nil)
if err != nil {

View File

@ -1294,10 +1294,20 @@ func (v *CSIPlugin) List(args *structs.CSIPluginListRequest, reply *structs.CSIP
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Query all plugins
iter, err := state.CSIPlugins(ws)
if err != nil {
return err
var iter memdb.ResultIterator
var err error
if args.Prefix != "" {
iter, err = state.CSIPluginsByIDPrefix(ws, args.Prefix)
if err != nil {
return err
}
} else {
// Query all plugins
iter, err = state.CSIPlugins(ws)
if err != nil {
return err
}
}
// Collect results

View File

@ -1100,6 +1100,7 @@ func TestCSIVolumeEndpoint_ListExternal(t *testing.T) {
// List external volumes; note that none of these exist in the state store
req := &structs.CSIVolumeExternalListRequest{
PluginID: "minnie",
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: structs.DefaultNamespace,
@ -1371,8 +1372,8 @@ func TestCSIVolumeEndpoint_ListSnapshots(t *testing.T) {
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, index, node))
// List snapshots
req := &structs.CSISnapshotListRequest{
PluginID: "minnie",
Secrets: structs.CSISecrets{
"secret-key-1": "secret-val-1",
},

View File

@ -2696,7 +2696,7 @@ func (s *StateStore) CSIPluginByID(ws memdb.WatchSet, id string) (*structs.CSIPl
// CSIPluginByIDTxn returns a named CSIPlugin
func (s *StateStore) CSIPluginByIDTxn(txn Txn, ws memdb.WatchSet, id string) (*structs.CSIPlugin, error) {
watchCh, obj, err := txn.FirstWatch("csi_plugins", "id_prefix", id)
watchCh, obj, err := txn.FirstWatch("csi_plugins", "id", id)
if err != nil {
return nil, fmt.Errorf("csi_plugin lookup failed: %s %v", id, err)
}