csi: CLI for volume status, registration/deregistration and plugin status (#7193)

* command/csi: csi, csi_plugin, csi_volume

* helper/funcs: move ExtraKeys from parse_config to UnusedKeys

* command/agent/config_parse: use helper.UnusedKeys

* api/csi: annotate CSIVolumes with hcl fields

* command/csi_plugin: add Synopsis

* command/csi_volume_register: use hcl.Decode style parsing

* command/csi_volume_list

* command/csi_volume_status: list format, cleanup

* command/csi_plugin_list

* command/csi_plugin_status

* command/csi_volume_deregister

* command/csi_volume: add Synopsis

* api/contexts/contexts: add csi search contexts to the constants

* command/commands: register csi commands

* api/csi: fix struct tag for linter

* command/csi_plugin_list: unused struct vars

* command/csi_plugin_status: unused struct vars

* command/csi_volume_list: unused struct vars

* api/csi: add allocs to CSIPlugin

* command/csi_plugin_status: format the allocs

* api/allocations: copy Allocation.Stub in from structs

* nomad/client_rpc: add some error context with Errorf

* api/csi: collapse read & write alloc maps to a stub list

* command/csi_volume_status: cleanup allocation display

* command/csi_volume_list: use Schedulable instead of Healthy

* command/csi_volume_status: use Schedulable instead of Healthy

* command/csi_volume_list: sprintf string

* command/csi: delete csi.go, csi_plugin.go

* command/plugin: refactor csi components to sub-command plugin status

* command/plugin: remove csi

* command/plugin_status: remove csi

* command/volume: remove csi

* command/volume_status: split out csi specific

* helper/funcs: add RemoveEqualFold

* command/agent/config_parse: use helper.RemoveEqualFold

* api/csi: do ,unusedKeys right

* command/volume: refactor csi components to `nomad volume`

* command/volume_register: split out csi specific

* command/commands: use the new top level commands

* command/volume_deregister: hardwired type csi for now

* command/volume_status: csiFormatVolumes rescued from volume_list

* command/plugin_status: avoid a panic on no args

* command/volume_status: avoid a panic on no args

* command/plugin_status: predictVolumeType

* command/volume_status: predictVolumeType

* nomad/csi_endpoint_test: move CreateTestPlugin to testing

* command/plugin_status_test: use CreateTestCSIPlugin

* nomad/structs/structs: add CSIPlugins and CSIVolumes search consts

* nomad/state/state_store: add CSIPlugins and CSIVolumesByIDPrefix

* nomad/search_endpoint: add CSIPlugins and CSIVolumes

* command/plugin_status: move the header to the csi specific

* command/volume_status: move the header to the csi specific

* nomad/state/state_store: CSIPluginByID prefix

* command/status: rename the search context to just Plugins/Volumes

* command/plugin,volume_status: test return ids now

* command/status: rename the search context to just Plugins/Volumes

* command/plugin_status: support -json and -t

* command/volume_status: support -json and -t

* command/plugin_status_csi: comments

* command/*_status: clean up text

* api/csi: fix stale comments

* command/volume: make deregister sound less fearsome

* command/plugin_status: set the id length

* command/plugin_status_csi: more compact plugin health

* command/volume: better error message, comment
This commit is contained in:
Lang Martin 2020-03-06 10:09:10 -05:00 committed by Tim Gross
parent 016281135c
commit 887e1f28c9
26 changed files with 1486 additions and 168 deletions

View File

@ -399,6 +399,36 @@ type NodeScoreMeta struct {
NormScore float64
}
// Stub returns a list stub for the allocation
func (a *Allocation) Stub() *AllocationListStub {
return &AllocationListStub{
ID: a.ID,
EvalID: a.EvalID,
Name: a.Name,
Namespace: a.Namespace,
NodeID: a.NodeID,
NodeName: a.NodeName,
JobID: a.JobID,
JobType: *a.Job.Type,
JobVersion: *a.Job.Version,
TaskGroup: a.TaskGroup,
DesiredStatus: a.DesiredStatus,
DesiredDescription: a.DesiredDescription,
ClientStatus: a.ClientStatus,
ClientDescription: a.ClientDescription,
TaskStates: a.TaskStates,
DeploymentStatus: a.DeploymentStatus,
FollowupEvalID: a.FollowupEvalID,
RescheduleTracker: a.RescheduleTracker,
PreemptedAllocations: a.PreemptedAllocations,
PreemptedByAllocation: a.PreemptedByAllocation,
CreateIndex: a.CreateIndex,
ModifyIndex: a.ModifyIndex,
CreateTime: a.CreateTime,
ModifyTime: a.ModifyTime,
}
}
// AllocationListStub is used to return a subset of an allocation
// during list operations.
type AllocationListStub struct {

View File

@ -11,5 +11,7 @@ const (
Nodes Context = "nodes"
Namespaces Context = "namespaces"
Quotas Context = "quotas"
Plugins Context = "plugins"
Volumes Context = "volumes"
All Context = "all"
)

View File

@ -38,6 +38,10 @@ func (v *CSIVolumes) Info(id string, q *QueryOptions) (*CSIVolume, *QueryMeta, e
if err != nil {
return nil, nil, err
}
// Cleanup allocation representation for the ui
resp.allocs()
return &resp, qm, nil
}
@ -79,19 +83,24 @@ const (
// CSIVolume is used for serialization, see also nomad/structs/csi.go
type CSIVolume struct {
ID string
Namespace string
Name string
ExternalID string
Topologies []*CSITopology
AccessMode CSIVolumeAccessMode
AttachmentMode CSIVolumeAttachmentMode
ID string `hcl:"id"`
Name string `hcl:"name"`
ExternalID string `hcl:"external_id"`
Namespace string `hcl:"namespace"`
Topologies []*CSITopology `hcl:"topologies"`
AccessMode CSIVolumeAccessMode `hcl:"access_mode"`
AttachmentMode CSIVolumeAttachmentMode `hcl:"attachment_mode"`
// Combine structs.{Read,Write,Past}Allocs
// Allocations, tracking claim status
ReadAllocs map[string]*Allocation
WriteAllocs map[string]*Allocation
// Combine structs.{Read,Write}Allocs
Allocations []*AllocationListStub
// Schedulable is true if all the denormalized plugin health fields are true
Schedulable bool
PluginID string
PluginID string `hcl:"plugin_id"`
ControllerRequired bool
ControllersHealthy int
ControllersExpected int
@ -101,6 +110,20 @@ type CSIVolume struct {
CreateIndex uint64
ModifyIndex uint64
// ExtraKeysHCL is used by the hcl parser to report unexpected keys
ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"`
}
// allocs is called after we query the volume (creating this CSIVolume struct) to collapse
// allocations for the UI
func (v *CSIVolume) allocs() {
for _, a := range v.WriteAllocs {
v.Allocations = append(v.Allocations, a.Stub())
}
for _, a := range v.ReadAllocs {
v.Allocations = append(v.Allocations, a.Stub())
}
}
type CSIVolumeIndexSort []*CSIVolumeListStub
@ -160,6 +183,7 @@ type CSIPlugin struct {
// Map Node.ID to CSIInfo fingerprint results
Controllers map[string]*CSIInfo
Nodes map[string]*CSIInfo
Allocations []*AllocationListStub
ControllersHealthy int
NodesHealthy int
CreateIndex uint64

View File

@ -6,11 +6,10 @@ import (
"io"
"os"
"path/filepath"
"reflect"
"strings"
"time"
"github.com/hashicorp/hcl"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs/config"
)
@ -99,106 +98,44 @@ func durations(xs []td) error {
return nil
}
// removeEqualFold removes the first string that EqualFold matches
func removeEqualFold(xs *[]string, search string) {
sl := *xs
for i, x := range sl {
if strings.EqualFold(x, search) {
sl = append(sl[:i], sl[i+1:]...)
if len(sl) == 0 {
*xs = nil
} else {
*xs = sl
}
return
}
}
}
func extraKeys(c *Config) error {
// hcl leaves behind extra keys when parsing JSON. These keys
// are kept on the top level, taken from slices or the keys of
// structs contained in slices. Clean up before looking for
// extra keys.
for range c.HTTPAPIResponseHeaders {
removeEqualFold(&c.ExtraKeysHCL, "http_api_response_headers")
helper.RemoveEqualFold(&c.ExtraKeysHCL, "http_api_response_headers")
}
for _, p := range c.Plugins {
removeEqualFold(&c.ExtraKeysHCL, p.Name)
removeEqualFold(&c.ExtraKeysHCL, "config")
removeEqualFold(&c.ExtraKeysHCL, "plugin")
helper.RemoveEqualFold(&c.ExtraKeysHCL, p.Name)
helper.RemoveEqualFold(&c.ExtraKeysHCL, "config")
helper.RemoveEqualFold(&c.ExtraKeysHCL, "plugin")
}
for _, k := range []string{"options", "meta", "chroot_env", "servers", "server_join"} {
removeEqualFold(&c.ExtraKeysHCL, k)
removeEqualFold(&c.ExtraKeysHCL, "client")
helper.RemoveEqualFold(&c.ExtraKeysHCL, k)
helper.RemoveEqualFold(&c.ExtraKeysHCL, "client")
}
// stats is an unused key, continue to silently ignore it
removeEqualFold(&c.Client.ExtraKeysHCL, "stats")
helper.RemoveEqualFold(&c.Client.ExtraKeysHCL, "stats")
// Remove HostVolume extra keys
for _, hv := range c.Client.HostVolumes {
removeEqualFold(&c.Client.ExtraKeysHCL, hv.Name)
removeEqualFold(&c.Client.ExtraKeysHCL, "host_volume")
helper.RemoveEqualFold(&c.Client.ExtraKeysHCL, hv.Name)
helper.RemoveEqualFold(&c.Client.ExtraKeysHCL, "host_volume")
}
for _, k := range []string{"enabled_schedulers", "start_join", "retry_join", "server_join"} {
removeEqualFold(&c.ExtraKeysHCL, k)
removeEqualFold(&c.ExtraKeysHCL, "server")
helper.RemoveEqualFold(&c.ExtraKeysHCL, k)
helper.RemoveEqualFold(&c.ExtraKeysHCL, "server")
}
for _, k := range []string{"datadog_tags"} {
removeEqualFold(&c.ExtraKeysHCL, k)
removeEqualFold(&c.ExtraKeysHCL, "telemetry")
helper.RemoveEqualFold(&c.ExtraKeysHCL, k)
helper.RemoveEqualFold(&c.ExtraKeysHCL, "telemetry")
}
return extraKeysImpl([]string{}, reflect.ValueOf(*c))
}
// extraKeysImpl returns an error if any extraKeys array is not empty
func extraKeysImpl(path []string, val reflect.Value) error {
stype := val.Type()
for i := 0; i < stype.NumField(); i++ {
ftype := stype.Field(i)
fval := val.Field(i)
name := ftype.Name
prop := ""
tagSplit(ftype, "hcl", &name, &prop)
if fval.Kind() == reflect.Ptr {
fval = reflect.Indirect(fval)
}
// struct? recurse. add the struct's key to the path
if fval.Kind() == reflect.Struct {
err := extraKeysImpl(append([]string{name}, path...), fval)
if err != nil {
return err
}
}
if "unusedKeys" == prop {
if ks, ok := fval.Interface().([]string); ok && len(ks) != 0 {
return fmt.Errorf("%s unexpected keys %s",
strings.Join(path, "."),
strings.Join(ks, ", "))
}
}
}
return nil
}
// tagSplit reads the named tag from the structfield and splits its values into strings
func tagSplit(field reflect.StructField, tagName string, vars ...*string) {
tag := strings.Split(field.Tag.Get(tagName), ",")
end := len(tag) - 1
for i, s := range vars {
if i > end {
return
}
*s = tag[i]
}
return helper.UnusedKeys(c)
}

View File

@ -493,6 +493,17 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
}, nil
},
"plugin": func() (cli.Command, error) {
return &PluginCommand{
Meta: meta,
}, nil
},
"plugin status": func() (cli.Command, error) {
return &PluginStatusCommand{
Meta: meta,
}, nil
},
"quota": func() (cli.Command, error) {
return &QuotaCommand{
Meta: meta,
@ -646,6 +657,26 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
Ui: meta.Ui,
}, nil
},
"volume": func() (cli.Command, error) {
return &VolumeCommand{
Meta: meta,
}, nil
},
"volume status": func() (cli.Command, error) {
return &VolumeStatusCommand{
Meta: meta,
}, nil
},
"volume register": func() (cli.Command, error) {
return &VolumeRegisterCommand{
Meta: meta,
}, nil
},
"volume deregister": func() (cli.Command, error) {
return &VolumeDeregisterCommand{
Meta: meta,
}, nil
},
}
deprecated := map[string]cli.CommandFactory{

26
command/plugin.go Normal file
View File

@ -0,0 +1,26 @@
package command
import "github.com/mitchellh/cli"
type PluginCommand struct {
Meta
}
func (c *PluginCommand) Help() string {
helpText := `
Usage nomad plugin status [options] [plugin]
This command groups subcommands for interacting with plugins.
`
return helpText
}
func (c *PluginCommand) Synopsis() string {
return "Inspect plugins"
}
func (c *PluginCommand) Name() string { return "plugin" }
func (c *PluginCommand) Run(args []string) int {
return cli.RunResultHelp
}

146
command/plugin_status.go Normal file
View File

@ -0,0 +1,146 @@
package command
import (
"fmt"
"strings"
"github.com/hashicorp/nomad/api/contexts"
"github.com/posener/complete"
)
type PluginStatusCommand struct {
Meta
length int
short bool
verbose bool
json bool
template string
}
func (c *PluginStatusCommand) Help() string {
helpText := `
Usage nomad plugin status [options] <plugin>
Display status information about a plugin. If no plugin id is given,
a list of all plugins will be displayed.
General Options:
` + generalOptionsUsage() + `
Status Options:
-type <type>
List only plugins of type <type>.
-short
Display short output.
-verbose
Display full information.
-json
Output the allocation in its JSON format.
-t
Format and display allocation using a Go template.
`
return helpText
}
func (c *PluginStatusCommand) Synopsis() string {
return "Display status information about a plugin"
}
// predictVolumeType is also used in volume_status
var predictVolumeType = complete.PredictFunc(func(a complete.Args) []string {
types := []string{"csi"}
for _, t := range types {
if strings.Contains(t, a.Last) {
return []string{t}
}
}
return nil
})
func (c *PluginStatusCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"-type": predictVolumeType,
"-short": complete.PredictNothing,
"-verbose": complete.PredictNothing,
"-json": complete.PredictNothing,
"-t": complete.PredictAnything,
})
}
func (c *PluginStatusCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictFunc(func(a complete.Args) []string {
client, err := c.Meta.Client()
if err != nil {
return nil
}
resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Plugins, nil)
if err != nil {
return []string{}
}
return resp.Matches[contexts.Plugins]
})
}
func (c *PluginStatusCommand) Name() string { return "plugin status" }
func (c *PluginStatusCommand) Run(args []string) int {
var typeArg string
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
flags.StringVar(&typeArg, "type", "", "")
flags.BoolVar(&c.short, "short", false, "")
flags.BoolVar(&c.verbose, "verbose", false, "")
flags.BoolVar(&c.json, "json", false, "")
flags.StringVar(&c.template, "t", "", "")
if err := flags.Parse(args); err != nil {
c.Ui.Error(fmt.Sprintf("Error parsing arguments %s", err))
return 1
}
typeArg = strings.ToLower(typeArg)
// Check that we either got no arguments or exactly one.
args = flags.Args()
if len(args) > 1 {
c.Ui.Error("This command takes either no arguments or one: <plugin>")
c.Ui.Error(commandErrorText(c))
return 1
}
// Truncate the id unless full length is requested
c.length = shortId
if c.verbose {
c.length = fullId
}
// Get the HTTP client
client, err := c.Meta.Client()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}
id := ""
if len(args) == 1 {
id = args[0]
}
code := c.csiStatus(client, id)
if code != 0 {
return code
}
// Extend this section with other plugin implementations
return 0
}

View File

@ -0,0 +1,111 @@
package command
import (
"fmt"
"sort"
"strings"
"github.com/hashicorp/nomad/api"
)
func (c *PluginStatusCommand) csiBanner() {
if !(c.json || len(c.template) > 0) {
c.Ui.Output(c.Colorize().Color("[bold]Container Storage Interface[reset]"))
}
}
func (c *PluginStatusCommand) csiStatus(client *api.Client, id string) int {
if id == "" {
c.csiBanner()
plugs, _, err := client.CSIPlugins().List(nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying CSI plugins: %s", err))
return 1
}
if len(plugs) == 0 {
// No output if we have no plugins
c.Ui.Error("No CSI plugins")
} else {
str, err := c.csiFormatPlugins(plugs)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error formatting: %s", err))
return 1
}
c.Ui.Output(str)
}
return 0
}
// Lookup matched a single plugin
plug, _, err := client.CSIPlugins().Info(id, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying plugin: %s", err))
return 1
}
str, err := c.csiFormatPlugin(plug)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error formatting plugin: %s", err))
return 1
}
c.Ui.Output(str)
return 0
}
func (c *PluginStatusCommand) csiFormatPlugins(plugs []*api.CSIPluginListStub) (string, error) {
// Sort the output by quota name
sort.Slice(plugs, func(i, j int) bool { return plugs[i].ID < plugs[j].ID })
if c.json || len(c.template) > 0 {
out, err := Format(c.json, c.template, plugs)
if err != nil {
return "", fmt.Errorf("format error: %v", err)
}
return out, nil
}
// TODO(langmartin) add Provider https://github.com/hashicorp/nomad/issues/7248
rows := make([]string, len(plugs)+1)
rows[0] = "ID|Controllers Healthy/Expected|Nodes Healthy/Expected"
for i, p := range plugs {
rows[i+1] = fmt.Sprintf("%s|%d/%d|%d/%d",
limit(p.ID, c.length),
p.ControllersHealthy,
p.ControllersExpected,
p.NodesHealthy,
p.NodesExpected,
)
}
return formatList(rows), nil
}
func (c *PluginStatusCommand) csiFormatPlugin(plug *api.CSIPlugin) (string, error) {
if c.json || len(c.template) > 0 {
out, err := Format(c.json, c.template, plug)
if err != nil {
return "", fmt.Errorf("format error: %v", err)
}
return out, nil
}
output := []string{
fmt.Sprintf("ID|%s", plug.ID),
fmt.Sprintf("Controllers Healthy|%d", plug.ControllersHealthy),
fmt.Sprintf("Controllers Expected|%d", len(plug.Controllers)),
fmt.Sprintf("Nodes Healthy|%d", plug.NodesHealthy),
fmt.Sprintf("Nodes Expected|%d", len(plug.Nodes)),
}
// Exit early
if c.short {
return formatKV(output), nil
}
// Format the allocs
banner := c.Colorize().Color("\n[bold]Allocations[reset]")
allocs := formatAllocListStubs(plug.Allocations, c.verbose, c.length)
full := []string{formatKV(output), banner, allocs}
return strings.Join(full, "\n"), nil
}

View File

@ -0,0 +1,57 @@
package command
import (
"testing"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad"
"github.com/mitchellh/cli"
"github.com/posener/complete"
"github.com/stretchr/testify/require"
)
func TestPluginStatusCommand_Implements(t *testing.T) {
t.Parallel()
var _ cli.Command = &PluginStatusCommand{}
}
func TestPluginStatusCommand_Fails(t *testing.T) {
t.Parallel()
ui := new(cli.MockUi)
cmd := &PluginStatusCommand{Meta: Meta{Ui: ui}}
// Fails on misuse
code := cmd.Run([]string{"some", "bad", "args"})
require.Equal(t, 1, code)
out := ui.ErrorWriter.String()
require.Contains(t, out, commandErrorText(cmd))
ui.ErrorWriter.Reset()
}
func TestPluginStatusCommand_AutocompleteArgs(t *testing.T) {
t.Parallel()
srv, _, url := testServer(t, true, nil)
defer srv.Shutdown()
ui := new(cli.MockUi)
cmd := &PluginStatusCommand{Meta: Meta{Ui: ui, flagAddress: url}}
// Create a plugin
id := "long-plugin-id"
state := srv.Agent.Server().State()
cleanup := nomad.CreateTestCSIPlugin(state, id)
defer cleanup()
ws := memdb.NewWatchSet()
plug, err := state.CSIPluginByID(ws, id)
require.NoError(t, err)
prefix := plug.ID[:len(plug.ID)-5]
args := complete.Args{Last: prefix}
predictor := cmd.AutocompleteArgs()
res := predictor.Predict(args)
require.Equal(t, 1, len(res))
require.Equal(t, plug.ID, res[0])
}

View File

@ -162,6 +162,10 @@ func (c *StatusCommand) Run(args []string) int {
cmd = &NamespaceStatusCommand{Meta: c.Meta}
case contexts.Quotas:
cmd = &QuotaStatusCommand{Meta: c.Meta}
case contexts.Plugins:
cmd = &PluginStatusCommand{Meta: c.Meta}
case contexts.Volumes:
cmd = &VolumeStatusCommand{Meta: c.Meta}
default:
c.Ui.Error(fmt.Sprintf("Unable to resolve ID: %q", id))
return 1

46
command/volume.go Normal file
View File

@ -0,0 +1,46 @@
package command
import (
"strings"
"github.com/mitchellh/cli"
)
type VolumeCommand struct {
Meta
}
func (c *VolumeCommand) Help() string {
helpText := `
Usage: nomad volume <subcommand> [options]
volume groups commands that interact with volumes.
Register a new volume or update an existing volume:
$ nomad volume register <input>
Examine the status of a volume:
$ nomad volume status <id>
Deregister an unused volume:
$ nomad volume deregister <id>
Please see the individual subcommand help for detailed usage information.
`
return strings.TrimSpace(helpText)
}
func (c *VolumeCommand) Name() string {
return "volume"
}
func (c *VolumeCommand) Synopsis() string {
return "Interact with volumes"
}
func (c *VolumeCommand) Run(args []string) int {
return cli.RunResultHelp
}

View File

@ -0,0 +1,88 @@
package command
import (
"fmt"
"strings"
"github.com/hashicorp/nomad/api/contexts"
"github.com/posener/complete"
)
type VolumeDeregisterCommand struct {
Meta
}
func (c *VolumeDeregisterCommand) Help() string {
helpText := `
Usage: nomad volume deregister [options] <id>
Remove an unused volume from Nomad.
General Options:
` + generalOptionsUsage()
return strings.TrimSpace(helpText)
}
func (c *VolumeDeregisterCommand) AutocompleteFlags() complete.Flags {
return c.Meta.AutocompleteFlags(FlagSetClient)
}
func (c *VolumeDeregisterCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictFunc(func(a complete.Args) []string {
client, err := c.Meta.Client()
if err != nil {
return nil
}
// When multiple volume types are implemented, this search should merge contexts
resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Volumes, nil)
if err != nil {
return []string{}
}
return resp.Matches[contexts.Volumes]
})
}
func (c *VolumeDeregisterCommand) Synopsis() string {
return "Remove a volume"
}
func (c *VolumeDeregisterCommand) Name() string { return "volume deregister" }
func (c *VolumeDeregisterCommand) Run(args []string) int {
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
if err := flags.Parse(args); err != nil {
c.Ui.Error(fmt.Sprintf("Error parsing arguments %s", err))
return 1
}
// Check that we get exactly one argument
args = flags.Args()
if l := len(args); l != 1 {
c.Ui.Error("This command takes one argument: <id>")
c.Ui.Error(commandErrorText(c))
return 1
}
volID := args[0]
// Get the HTTP client
client, err := c.Meta.Client()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}
// Deregister only works on CSI volumes, but could be extended to support other
// network interfaces or host volumes
err = client.CSIVolumes().Deregister(volID, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error deregistering volume: %s", err))
return 1
}
return 0
}

130
command/volume_register.go Normal file
View File

@ -0,0 +1,130 @@
package command
import (
"fmt"
"io/ioutil"
"os"
"strings"
"github.com/hashicorp/hcl"
"github.com/hashicorp/hcl/hcl/ast"
"github.com/posener/complete"
)
type VolumeRegisterCommand struct {
Meta
}
func (c *VolumeRegisterCommand) Help() string {
helpText := `
Usage: nomad volume register [options] <input>
Creates or updates a volume in Nomad. The volume must exist on the remote
storage provider before it can be used by a task.
If the supplied path is "-" the volume file is read from stdin. Otherwise, it
is read from the file at the supplied path.
General Options:
` + generalOptionsUsage()
return strings.TrimSpace(helpText)
}
func (c *VolumeRegisterCommand) AutocompleteFlags() complete.Flags {
return c.Meta.AutocompleteFlags(FlagSetClient)
}
func (c *VolumeRegisterCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictFiles("*")
}
func (c *VolumeRegisterCommand) Synopsis() string {
return "Create or update a volume"
}
func (c *VolumeRegisterCommand) Name() string { return "volume register" }
func (c *VolumeRegisterCommand) Run(args []string) int {
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
if err := flags.Parse(args); err != nil {
c.Ui.Error(fmt.Sprintf("Error parsing arguments %s", err))
return 1
}
// Check that we get exactly one argument
args = flags.Args()
if l := len(args); l != 1 {
c.Ui.Error("This command takes one argument: <input>")
c.Ui.Error(commandErrorText(c))
return 1
}
// Read the file contents
file := args[0]
var rawVolume []byte
var err error
if file == "-" {
rawVolume, err = ioutil.ReadAll(os.Stdin)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to read stdin: %v", err))
return 1
}
} else {
rawVolume, err = ioutil.ReadFile(file)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to read file: %v", err))
return 1
}
}
ast, volType, err := parseVolumeType(string(rawVolume))
if err != nil {
c.Ui.Error(fmt.Sprintf("Error parsing the volume type: %s", err))
return 1
}
volType = strings.ToLower(volType)
// Get the HTTP client
client, err := c.Meta.Client()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}
switch volType {
case "csi":
code := c.csiRegister(client, ast)
if code != 0 {
return code
}
default:
c.Ui.Error(fmt.Sprintf("Error unknown volume type: %s", volType))
return 1
}
return 0
}
// parseVolume is used to parse the quota specification from HCL
func parseVolumeType(input string) (*ast.File, string, error) {
// Parse the AST first
ast, err := hcl.Parse(input)
if err != nil {
return nil, "", fmt.Errorf("parse error: %v", err)
}
// Decode the type, so we can dispatch on it
dispatch := &struct {
T string `hcl:"type"`
}{}
err = hcl.DecodeObject(dispatch, ast)
if err != nil {
return nil, "", fmt.Errorf("dispatch error: %v", err)
}
return ast, dispatch.T, nil
}

View File

@ -0,0 +1,44 @@
package command
import (
"fmt"
"github.com/hashicorp/hcl"
"github.com/hashicorp/hcl/hcl/ast"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/helper"
)
func (c *VolumeRegisterCommand) csiRegister(client *api.Client, ast *ast.File) int {
vol, err := csiDecodeVolume(ast)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error decoding the volume definition: %s", err))
return 1
}
_, err = client.CSIVolumes().Register(vol, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error registering volume: %s", err))
return 1
}
return 0
}
// parseVolume is used to parse the quota specification from HCL
func csiDecodeVolume(input *ast.File) (*api.CSIVolume, error) {
output := &api.CSIVolume{}
err := hcl.DecodeObject(output, input)
if err != nil {
return nil, err
}
// api.CSIVolume doesn't have the type field, it's used only for dispatch in
// parseVolumeType
helper.RemoveEqualFold(&output.ExtraKeysHCL, "type")
err = helper.UnusedKeys(output)
if err != nil {
return nil, err
}
return output, nil
}

View File

@ -0,0 +1,97 @@
package command
import (
"testing"
"github.com/hashicorp/hcl"
"github.com/hashicorp/nomad/api"
"github.com/stretchr/testify/require"
)
func TestVolumeDispatchParse(t *testing.T) {
t.Parallel()
cases := []struct {
hcl string
t string
err string
}{{
hcl: `
type = "foo"
rando = "bar"
`,
t: "foo",
err: "",
}, {
hcl: `{"id": "foo", "type": "foo", "other": "bar"}`,
t: "foo",
err: "",
}}
for _, c := range cases {
t.Run(c.hcl, func(t *testing.T) {
_, s, err := parseVolumeType(c.hcl)
require.Equal(t, c.t, s)
if c.err == "" {
require.NoError(t, err)
} else {
require.Contains(t, err.Error(), c.err)
}
})
}
}
func TestCSIVolumeParse(t *testing.T) {
t.Parallel()
cases := []struct {
hcl string
q *api.CSIVolume
err string
}{{
hcl: `
id = "foo"
type = "csi"
namespace = "n"
access_mode = "single-node-writer"
attachment_mode = "file-system"
plugin_id = "p"
`,
q: &api.CSIVolume{
ID: "foo",
Namespace: "n",
AccessMode: "single-node-writer",
AttachmentMode: "file-system",
PluginID: "p",
},
err: "",
}, {
hcl: `
{"id": "foo", "namespace": "n", "type": "csi", "access_mode": "single-node-writer", "attachment_mode": "file-system",
"plugin_id": "p"}
`,
q: &api.CSIVolume{
ID: "foo",
Namespace: "n",
AccessMode: "single-node-writer",
AttachmentMode: "file-system",
PluginID: "p",
},
err: "",
}}
for _, c := range cases {
t.Run(c.hcl, func(t *testing.T) {
ast, err := hcl.ParseString(c.hcl)
require.NoError(t, err)
vol, err := csiDecodeVolume(ast)
require.Equal(t, c.q, vol)
if c.err == "" {
require.NoError(t, err)
} else {
require.Contains(t, err.Error(), c.err)
}
})
}
}

134
command/volume_status.go Normal file
View File

@ -0,0 +1,134 @@
package command
import (
"fmt"
"strings"
"github.com/hashicorp/nomad/api/contexts"
"github.com/posener/complete"
)
type VolumeStatusCommand struct {
Meta
length int
short bool
verbose bool
json bool
template string
}
func (c *VolumeStatusCommand) Help() string {
helpText := `
Usage: nomad volume status [options] <id>
Display status information about a CSI volume. If no volume id is given, a
list of all volumes will be displayed.
General Options:
` + generalOptionsUsage() + `
Status Options:
-type <type>
List only volumes of type <type>.
-short
Display short output. Used only when a single volume is being
queried, and drops verbose information about allocations.
-verbose
Display full allocation information.
-json
Output the allocation in its JSON format.
-t
Format and display allocation using a Go template.
`
return strings.TrimSpace(helpText)
}
func (c *VolumeStatusCommand) Synopsis() string {
return "Display status information about a volume"
}
func (c *VolumeStatusCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"-type": predictVolumeType,
"-short": complete.PredictNothing,
"-verbose": complete.PredictNothing,
"-json": complete.PredictNothing,
"-t": complete.PredictAnything,
})
}
func (c *VolumeStatusCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictFunc(func(a complete.Args) []string {
client, err := c.Meta.Client()
if err != nil {
return nil
}
resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Volumes, nil)
if err != nil {
return []string{}
}
return resp.Matches[contexts.Volumes]
})
}
func (c *VolumeStatusCommand) Name() string { return "volume status" }
func (c *VolumeStatusCommand) Run(args []string) int {
var typeArg string
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
flags.StringVar(&typeArg, "type", "", "")
flags.BoolVar(&c.short, "short", false, "")
flags.BoolVar(&c.verbose, "verbose", false, "")
flags.BoolVar(&c.json, "json", false, "")
flags.StringVar(&c.template, "t", "", "")
if err := flags.Parse(args); err != nil {
c.Ui.Error(fmt.Sprintf("Error parsing arguments %s", err))
return 1
}
// Check that we either got no arguments or exactly one
args = flags.Args()
if len(args) > 1 {
c.Ui.Error("This command takes either no arguments or one: <id>")
c.Ui.Error(commandErrorText(c))
return 1
}
// Truncate the id unless full length is requested
c.length = shortId
if c.verbose {
c.length = fullId
}
// Get the HTTP client
client, err := c.Meta.Client()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}
id := ""
if len(args) == 1 {
id = args[0]
}
code := c.csiStatus(client, id)
if code != 0 {
return code
}
// Extend this section with other volume implementations
return 0
}

View File

@ -0,0 +1,152 @@
package command
import (
"fmt"
"sort"
"strings"
"github.com/hashicorp/nomad/api"
)
func (c *VolumeStatusCommand) csiBanner() {
if !(c.json || len(c.template) > 0) {
c.Ui.Output(c.Colorize().Color("[bold]Container Storage Interface[reset]"))
}
}
func (c *VolumeStatusCommand) csiStatus(client *api.Client, id string) int {
// Invoke list mode if no volume id
if id == "" {
c.csiBanner()
vols, _, err := client.CSIVolumes().List(nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying volumes: %s", err))
return 1
}
if len(vols) == 0 {
// No output if we have no volumes
c.Ui.Error("No CSI volumes")
} else {
str, err := c.csiFormatVolumes(vols)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error formatting: %s", err))
return 1
}
c.Ui.Output(str)
}
return 0
}
// Try querying the volume
vol, _, err := client.CSIVolumes().Info(id, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying volume: %s", err))
return 1
}
str, err := c.formatBasic(vol)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error formatting volume: %s", err))
return 1
}
c.Ui.Output(str)
return 0
}
func (c *VolumeStatusCommand) csiFormatVolumes(vols []*api.CSIVolumeListStub) (string, error) {
// Sort the output by volume id
sort.Slice(vols, func(i, j int) bool { return vols[i].ID < vols[j].ID })
if c.json || len(c.template) > 0 {
out, err := Format(c.json, c.template, vols)
if err != nil {
return "", fmt.Errorf("format error: %v", err)
}
return out, nil
}
rows := make([]string, len(vols)+1)
rows[0] = "ID|Name|Plugin ID|Schedulable|Access Mode"
for i, v := range vols {
rows[i+1] = fmt.Sprintf("%s|%s|%s|%t|%s",
limit(v.ID, c.length),
v.Name,
v.PluginID,
v.Schedulable,
v.AccessMode,
)
}
return formatList(rows), nil
}
func (c *VolumeStatusCommand) formatBasic(vol *api.CSIVolume) (string, error) {
if c.json || len(c.template) > 0 {
out, err := Format(c.json, c.template, vol)
if err != nil {
return "", fmt.Errorf("format error: %v", err)
}
return out, nil
}
// TODO(langmartin) add Provider https://github.com/hashicorp/nomad/issues/7248
output := []string{
fmt.Sprintf("ID|%s", vol.ID),
fmt.Sprintf("Name|%s", vol.Name),
fmt.Sprintf("External ID|%s", vol.ExternalID),
fmt.Sprintf("Schedulable|%t", vol.Schedulable),
fmt.Sprintf("Controllers Healthy|%d", vol.ControllersHealthy),
fmt.Sprintf("Controllers Expected|%d", vol.ControllersExpected),
fmt.Sprintf("Nodes Healthy|%d", vol.NodesHealthy),
fmt.Sprintf("Nodes Expected|%d", vol.NodesExpected),
fmt.Sprintf("Access Mode|%s", vol.AccessMode),
fmt.Sprintf("Attachment Mode|%s", vol.AttachmentMode),
fmt.Sprintf("Namespace|%s", vol.Namespace),
}
// Exit early
if c.short {
return formatKV(output), nil
}
// Format the allocs
banner := c.Colorize().Color("\n[bold]Allocations[reset]")
allocs := formatAllocListStubs(vol.Allocations, c.verbose, c.length)
full := []string{formatKV(output), banner, allocs}
return strings.Join(full, "\n"), nil
}
func (c *VolumeStatusCommand) formatTopologies(vol *api.CSIVolume) string {
var out []string
// Find the union of all the keys
head := map[string]string{}
for _, t := range vol.Topologies {
for key := range t.Segments {
if _, ok := head[key]; !ok {
head[key] = ""
}
}
}
// Append the header
var line []string
for key := range head {
line = append(line, key)
}
out = append(out, strings.Join(line, " "))
// Append each topology
for _, t := range vol.Topologies {
line = []string{}
for key := range head {
line = append(line, t.Segments[key])
}
out = append(out, strings.Join(line, " "))
}
return strings.Join(out, "\n")
}

View File

@ -0,0 +1,58 @@
package command
import (
"testing"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/cli"
"github.com/posener/complete"
"github.com/stretchr/testify/require"
)
func TestCSIVolumeStatusCommand_Implements(t *testing.T) {
t.Parallel()
var _ cli.Command = &VolumeStatusCommand{}
}
func TestCSIVolumeStatusCommand_Fails(t *testing.T) {
t.Parallel()
ui := new(cli.MockUi)
cmd := &VolumeStatusCommand{Meta: Meta{Ui: ui}}
// Fails on misuse
code := cmd.Run([]string{"some", "bad", "args"})
require.Equal(t, 1, code)
out := ui.ErrorWriter.String()
require.Contains(t, out, commandErrorText(cmd))
ui.ErrorWriter.Reset()
}
func TestCSIVolumeStatusCommand_AutocompleteArgs(t *testing.T) {
t.Parallel()
srv, _, url := testServer(t, true, nil)
defer srv.Shutdown()
ui := new(cli.MockUi)
cmd := &VolumeStatusCommand{Meta: Meta{Ui: ui, flagAddress: url}}
state := srv.Agent.Server().State()
vol := &structs.CSIVolume{
ID: uuid.Generate(),
Namespace: "default",
PluginID: "glade",
}
require.NoError(t, state.CSIVolumeRegister(1000, []*structs.CSIVolume{vol}))
prefix := vol.ID[:len(vol.ID)-5]
args := complete.Args{Last: prefix}
predictor := cmd.AutocompleteArgs()
res := predictor.Predict(args)
require.Equal(t, 1, len(res))
require.Equal(t, vol.ID, res[0])
}

View File

@ -3,7 +3,9 @@ package helper
import (
"crypto/sha512"
"fmt"
"reflect"
"regexp"
"strings"
"time"
multierror "github.com/hashicorp/go-multierror"
@ -387,3 +389,75 @@ func CheckHCLKeys(node ast.Node, valid []string) error {
return result
}
// UnusedKeys returns a pretty-printed error if any `hcl:",unusedKeys"` is not empty
func UnusedKeys(obj interface{}) error {
val := reflect.ValueOf(obj)
if val.Kind() == reflect.Ptr {
val = reflect.Indirect(val)
}
return unusedKeysImpl([]string{}, val)
}
func unusedKeysImpl(path []string, val reflect.Value) error {
stype := val.Type()
for i := 0; i < stype.NumField(); i++ {
ftype := stype.Field(i)
fval := val.Field(i)
tags := strings.Split(ftype.Tag.Get("hcl"), ",")
name := tags[0]
tags = tags[1:]
if fval.Kind() == reflect.Ptr {
fval = reflect.Indirect(fval)
}
// struct? recurse. Add the struct's key to the path
if fval.Kind() == reflect.Struct {
err := unusedKeysImpl(append([]string{name}, path...), fval)
if err != nil {
return err
}
continue
}
// Search the hcl tags for "unusedKeys"
unusedKeys := false
for _, p := range tags {
if p == "unusedKeys" {
unusedKeys = true
break
}
}
if unusedKeys {
ks, ok := fval.Interface().([]string)
if ok && len(ks) != 0 {
ps := ""
if len(path) > 0 {
ps = strings.Join(path, ".") + " "
}
return fmt.Errorf("%sunexpected keys %s",
ps,
strings.Join(ks, ", "))
}
}
}
return nil
}
// RemoveEqualFold removes the first string that EqualFold matches. It updates xs in place
func RemoveEqualFold(xs *[]string, search string) {
sl := *xs
for i, x := range sl {
if strings.EqualFold(x, search) {
sl = append(sl[:i], sl[i+1:]...)
if len(sl) == 0 {
*xs = nil
} else {
*xs = sl
}
return
}
}
}

View File

@ -219,20 +219,20 @@ func NodeRpc(session *yamux.Session, method string, args, reply interface{}) err
// Open a new session
stream, err := session.Open()
if err != nil {
return err
return fmt.Errorf("session open: %v", err)
}
defer stream.Close()
// Write the RpcNomad byte to set the mode
if _, err := stream.Write([]byte{byte(pool.RpcNomad)}); err != nil {
stream.Close()
return err
return fmt.Errorf("set mode: %v", err)
}
// Make the RPC
err = msgpackrpc.CallWithCodec(pool.NewClientCodec(stream), method, args, reply)
if err != nil {
return err
return fmt.Errorf("rpc call: %v", err)
}
return nil

View File

@ -9,7 +9,6 @@ import (
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
@ -483,7 +482,7 @@ func TestCSIPluginEndpoint_RegisterViaFingerprint(t *testing.T) {
ns := structs.DefaultNamespace
deleteNodes := CreateTestPlugin(srv.fsm.State(), "foo")
deleteNodes := CreateTestCSIPlugin(srv.fsm.State(), "foo")
defer deleteNodes()
state := srv.fsm.State()
@ -527,74 +526,6 @@ func TestCSIPluginEndpoint_RegisterViaFingerprint(t *testing.T) {
require.Nil(t, resp2.Plugin)
}
// CreateTestPlugin is a helper that generates the node + fingerprint results necessary to
// create a CSIPlugin by directly inserting into the state store. It's exported for use in
// other test packages
func CreateTestPlugin(s *state.StateStore, id string) func() {
// Create some nodes
ns := make([]*structs.Node, 3)
for i := range ns {
n := mock.Node()
ns[i] = n
}
// Install healthy plugin fingerprinting results
ns[0].CSIControllerPlugins = map[string]*structs.CSIInfo{
id: {
PluginID: id,
AllocID: uuid.Generate(),
Healthy: true,
HealthDescription: "healthy",
RequiresControllerPlugin: true,
RequiresTopologies: false,
ControllerInfo: &structs.CSIControllerInfo{
SupportsReadOnlyAttach: true,
SupportsAttachDetach: true,
SupportsListVolumes: true,
SupportsListVolumesAttachedNodes: false,
},
},
}
// Install healthy plugin fingerprinting results
allocID := uuid.Generate()
for _, n := range ns[1:] {
n.CSINodePlugins = map[string]*structs.CSIInfo{
id: {
PluginID: id,
AllocID: allocID,
Healthy: true,
HealthDescription: "healthy",
RequiresControllerPlugin: true,
RequiresTopologies: false,
NodeInfo: &structs.CSINodeInfo{
ID: n.ID,
MaxVolumes: 64,
RequiresNodeStageVolume: true,
},
},
}
}
// Insert them into the state store
index := uint64(999)
for _, n := range ns {
index++
s.UpsertNode(index, n)
}
// Return cleanup function that deletes the nodes
return func() {
ids := make([]string, len(ns))
for i, n := range ns {
ids[i] = n.ID
}
index++
s.DeleteNode(index, ids)
}
}
func TestCSI_RPCVolumeAndPluginLookup(t *testing.T) {
srv, shutdown := TestServer(t, func(c *Config) {})
defer shutdown()

View File

@ -29,6 +29,8 @@ var (
structs.Nodes,
structs.Evals,
structs.Deployments,
structs.Plugins,
structs.Volumes,
}
)
@ -52,15 +54,19 @@ func (s *Search) getMatches(iter memdb.ResultIterator, prefix string) ([]string,
var id string
switch t := raw.(type) {
case *structs.Job:
id = raw.(*structs.Job).ID
id = t.ID
case *structs.Evaluation:
id = raw.(*structs.Evaluation).ID
id = t.ID
case *structs.Allocation:
id = raw.(*structs.Allocation).ID
id = t.ID
case *structs.Node:
id = raw.(*structs.Node).ID
id = t.ID
case *structs.Deployment:
id = raw.(*structs.Deployment).ID
id = t.ID
case *structs.CSIPlugin:
id = t.ID
case *structs.CSIVolume:
id = t.ID
default:
matchID, ok := getEnterpriseMatch(raw)
if !ok {
@ -95,6 +101,10 @@ func getResourceIter(context structs.Context, aclObj *acl.ACL, namespace, prefix
return state.NodesByIDPrefix(ws, prefix)
case structs.Deployments:
return state.DeploymentsByIDPrefix(ws, namespace, prefix)
case structs.Plugins:
return state.CSIPluginsByIDPrefix(ws, prefix)
case structs.Volumes:
return state.CSIVolumesByIDPrefix(ws, namespace, prefix)
default:
return getEnterpriseResourceIter(context, aclObj, namespace, prefix, ws, state)
}

View File

@ -7,10 +7,12 @@ import (
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const jobIndex = 1000
@ -746,3 +748,77 @@ func TestSearch_PrefixSearch_MultiRegion(t *testing.T) {
assert.Equal(job.ID, resp.Matches[structs.Jobs][0])
assert.Equal(uint64(jobIndex), resp.Index)
}
func TestSearch_PrefixSearch_CSIPlugin(t *testing.T) {
t.Parallel()
assert := assert.New(t)
s, cleanupS := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanupS()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
id := uuid.Generate()
CreateTestCSIPlugin(s.fsm.State(), id)
prefix := id[:len(id)-2]
req := &structs.SearchRequest{
Prefix: prefix,
Context: structs.Plugins,
QueryOptions: structs.QueryOptions{
Region: "global",
},
}
var resp structs.SearchResponse
if err := msgpackrpc.CallWithCodec(codec, "Search.PrefixSearch", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
assert.Equal(1, len(resp.Matches[structs.Plugins]))
assert.Equal(id, resp.Matches[structs.Plugins][0])
assert.Equal(resp.Truncations[structs.Plugins], false)
}
func TestSearch_PrefixSearch_CSIVolume(t *testing.T) {
t.Parallel()
assert := assert.New(t)
s, cleanupS := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanupS()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
id := uuid.Generate()
err := s.fsm.State().CSIVolumeRegister(1000, []*structs.CSIVolume{{
ID: id,
Namespace: structs.DefaultNamespace,
PluginID: "glade",
}})
require.NoError(t, err)
prefix := id[:len(id)-2]
req := &structs.SearchRequest{
Prefix: prefix,
Context: structs.Volumes,
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: structs.DefaultNamespace,
},
}
var resp structs.SearchResponse
if err := msgpackrpc.CallWithCodec(codec, "Search.PrefixSearch", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
assert.Equal(1, len(resp.Matches[structs.Volumes]))
assert.Equal(id, resp.Matches[structs.Volumes][0])
assert.Equal(resp.Truncations[structs.Volumes], false)
}

View File

@ -1657,7 +1657,7 @@ func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolum
func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, id string) (*structs.CSIVolume, error) {
txn := s.db.Txn(false)
watchCh, obj, err := txn.FirstWatch("csi_volumes", "id", id)
watchCh, obj, err := txn.FirstWatch("csi_volumes", "id_prefix", id)
if err != nil {
return nil, fmt.Errorf("volume lookup failed: %s %v", id, err)
}
@ -1684,6 +1684,30 @@ func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, pluginID string) (m
return iter, nil
}
// CSIVolumesByIDPrefix supports search
func (s *StateStore) CSIVolumesByIDPrefix(ws memdb.WatchSet, namespace, volumeID string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
iter, err := txn.Get("csi_volumes", "id_prefix", volumeID)
if err != nil {
return nil, err
}
ws.Add(iter.WatchCh())
// Filter the iterator by namespace
f := func(raw interface{}) bool {
v, ok := raw.(*structs.CSIVolume)
if !ok {
return false
}
return v.Namespace != namespace
}
wrap := memdb.NewFilterIterator(iter, f)
return wrap, nil
}
// CSIVolumes looks up the entire csi_volumes table
func (s *StateStore) CSIVolumes(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
@ -1741,7 +1765,7 @@ func (s *StateStore) CSIVolumeDeregister(index uint64, ids []string) error {
defer txn.Abort()
for _, id := range ids {
existing, err := txn.First("csi_volumes", "id", id)
existing, err := txn.First("csi_volumes", "id_prefix", id)
if err != nil {
return fmt.Errorf("volume lookup failed: %s: %v", id, err)
}
@ -1837,12 +1861,26 @@ func (s *StateStore) CSIPlugins(ws memdb.WatchSet) (memdb.ResultIterator, error)
return iter, nil
}
// CSIPluginsByIDPrefix supports search
func (s *StateStore) CSIPluginsByIDPrefix(ws memdb.WatchSet, pluginID string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
iter, err := txn.Get("csi_plugins", "id_prefix", pluginID)
if err != nil {
return nil, err
}
ws.Add(iter.WatchCh())
return iter, nil
}
// CSIPluginByID returns the one named CSIPlugin
func (s *StateStore) CSIPluginByID(ws memdb.WatchSet, id string) (*structs.CSIPlugin, error) {
txn := s.db.Txn(false)
defer txn.Abort()
raw, err := txn.First("csi_plugins", "id", id)
raw, err := txn.First("csi_plugins", "id_prefix", id)
if err != nil {
return nil, fmt.Errorf("csi_plugin lookup failed: %s %v", id, err)
}

View File

@ -163,6 +163,8 @@ const (
Namespaces Context = "namespaces"
Quotas Context = "quotas"
All Context = "all"
Plugins Context = "plugins"
Volumes Context = "volumes"
)
// NamespacedID is a tuple of an ID and a namespace

View File

@ -15,7 +15,9 @@ import (
"github.com/hashicorp/nomad/helper/pluginutils/catalog"
"github.com/hashicorp/nomad/helper/pluginutils/singleton"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/version"
)
@ -154,3 +156,71 @@ func TestJoin(t testing.T, s1 *Server, other ...*Server) {
}
}
}
// CreateTestPlugin is a helper that generates the node + fingerprint results necessary to
// create a CSIPlugin by directly inserting into the state store. It's exported for use in
// other test packages
func CreateTestCSIPlugin(s *state.StateStore, id string) func() {
// Create some nodes
ns := make([]*structs.Node, 3)
for i := range ns {
n := mock.Node()
ns[i] = n
}
// Install healthy plugin fingerprinting results
ns[0].CSIControllerPlugins = map[string]*structs.CSIInfo{
id: {
PluginID: id,
AllocID: uuid.Generate(),
Healthy: true,
HealthDescription: "healthy",
RequiresControllerPlugin: true,
RequiresTopologies: false,
ControllerInfo: &structs.CSIControllerInfo{
SupportsReadOnlyAttach: true,
SupportsAttachDetach: true,
SupportsListVolumes: true,
SupportsListVolumesAttachedNodes: false,
},
},
}
// Install healthy plugin fingerprinting results
allocID := uuid.Generate()
for _, n := range ns[1:] {
n.CSINodePlugins = map[string]*structs.CSIInfo{
id: {
PluginID: id,
AllocID: allocID,
Healthy: true,
HealthDescription: "healthy",
RequiresControllerPlugin: true,
RequiresTopologies: false,
NodeInfo: &structs.CSINodeInfo{
ID: n.ID,
MaxVolumes: 64,
RequiresNodeStageVolume: true,
},
},
}
}
// Insert them into the state store
index := uint64(999)
for _, n := range ns {
index++
s.UpsertNode(index, n)
}
// Return cleanup function that deletes the nodes
return func() {
ids := make([]string, len(ns))
for i, n := range ns {
ids[i] = n.ID
}
index++
s.DeleteNode(index, ids)
}
}