diff --git a/api/allocations.go b/api/allocations.go index 27a6d5b1f..cc2be5ea7 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -399,6 +399,36 @@ type NodeScoreMeta struct { NormScore float64 } +// Stub returns a list stub for the allocation +func (a *Allocation) Stub() *AllocationListStub { + return &AllocationListStub{ + ID: a.ID, + EvalID: a.EvalID, + Name: a.Name, + Namespace: a.Namespace, + NodeID: a.NodeID, + NodeName: a.NodeName, + JobID: a.JobID, + JobType: *a.Job.Type, + JobVersion: *a.Job.Version, + TaskGroup: a.TaskGroup, + DesiredStatus: a.DesiredStatus, + DesiredDescription: a.DesiredDescription, + ClientStatus: a.ClientStatus, + ClientDescription: a.ClientDescription, + TaskStates: a.TaskStates, + DeploymentStatus: a.DeploymentStatus, + FollowupEvalID: a.FollowupEvalID, + RescheduleTracker: a.RescheduleTracker, + PreemptedAllocations: a.PreemptedAllocations, + PreemptedByAllocation: a.PreemptedByAllocation, + CreateIndex: a.CreateIndex, + ModifyIndex: a.ModifyIndex, + CreateTime: a.CreateTime, + ModifyTime: a.ModifyTime, + } +} + // AllocationListStub is used to return a subset of an allocation // during list operations. type AllocationListStub struct { diff --git a/api/contexts/contexts.go b/api/contexts/contexts.go index 51b257c40..ae40db3f8 100644 --- a/api/contexts/contexts.go +++ b/api/contexts/contexts.go @@ -11,5 +11,7 @@ const ( Nodes Context = "nodes" Namespaces Context = "namespaces" Quotas Context = "quotas" + Plugins Context = "plugins" + Volumes Context = "volumes" All Context = "all" ) diff --git a/api/csi.go b/api/csi.go index 556867d4e..8d5883221 100644 --- a/api/csi.go +++ b/api/csi.go @@ -38,6 +38,10 @@ func (v *CSIVolumes) Info(id string, q *QueryOptions) (*CSIVolume, *QueryMeta, e if err != nil { return nil, nil, err } + + // Cleanup allocation representation for the ui + resp.allocs() + return &resp, qm, nil } @@ -79,19 +83,24 @@ const ( // CSIVolume is used for serialization, see also nomad/structs/csi.go type CSIVolume struct { - ID string - Namespace string - Name string - ExternalID string - Topologies []*CSITopology - AccessMode CSIVolumeAccessMode - AttachmentMode CSIVolumeAttachmentMode + ID string `hcl:"id"` + Name string `hcl:"name"` + ExternalID string `hcl:"external_id"` + Namespace string `hcl:"namespace"` + Topologies []*CSITopology `hcl:"topologies"` + AccessMode CSIVolumeAccessMode `hcl:"access_mode"` + AttachmentMode CSIVolumeAttachmentMode `hcl:"attachment_mode"` - // Combine structs.{Read,Write,Past}Allocs + // Allocations, tracking claim status + ReadAllocs map[string]*Allocation + WriteAllocs map[string]*Allocation + + // Combine structs.{Read,Write}Allocs Allocations []*AllocationListStub + // Schedulable is true if all the denormalized plugin health fields are true Schedulable bool - PluginID string + PluginID string `hcl:"plugin_id"` ControllerRequired bool ControllersHealthy int ControllersExpected int @@ -101,6 +110,20 @@ type CSIVolume struct { CreateIndex uint64 ModifyIndex uint64 + + // ExtraKeysHCL is used by the hcl parser to report unexpected keys + ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"` +} + +// allocs is called after we query the volume (creating this CSIVolume struct) to collapse +// allocations for the UI +func (v *CSIVolume) allocs() { + for _, a := range v.WriteAllocs { + v.Allocations = append(v.Allocations, a.Stub()) + } + for _, a := range v.ReadAllocs { + v.Allocations = append(v.Allocations, a.Stub()) + } } type CSIVolumeIndexSort []*CSIVolumeListStub @@ -160,6 +183,7 @@ type CSIPlugin struct { // Map Node.ID to CSIInfo fingerprint results Controllers map[string]*CSIInfo Nodes map[string]*CSIInfo + Allocations []*AllocationListStub ControllersHealthy int NodesHealthy int CreateIndex uint64 diff --git a/command/agent/config_parse.go b/command/agent/config_parse.go index 34e192b74..6893abdaf 100644 --- a/command/agent/config_parse.go +++ b/command/agent/config_parse.go @@ -6,11 +6,10 @@ import ( "io" "os" "path/filepath" - "reflect" - "strings" "time" "github.com/hashicorp/hcl" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs/config" ) @@ -99,106 +98,44 @@ func durations(xs []td) error { return nil } -// removeEqualFold removes the first string that EqualFold matches -func removeEqualFold(xs *[]string, search string) { - sl := *xs - for i, x := range sl { - if strings.EqualFold(x, search) { - sl = append(sl[:i], sl[i+1:]...) - if len(sl) == 0 { - *xs = nil - } else { - *xs = sl - } - return - } - } -} - func extraKeys(c *Config) error { // hcl leaves behind extra keys when parsing JSON. These keys // are kept on the top level, taken from slices or the keys of // structs contained in slices. Clean up before looking for // extra keys. for range c.HTTPAPIResponseHeaders { - removeEqualFold(&c.ExtraKeysHCL, "http_api_response_headers") + helper.RemoveEqualFold(&c.ExtraKeysHCL, "http_api_response_headers") } for _, p := range c.Plugins { - removeEqualFold(&c.ExtraKeysHCL, p.Name) - removeEqualFold(&c.ExtraKeysHCL, "config") - removeEqualFold(&c.ExtraKeysHCL, "plugin") + helper.RemoveEqualFold(&c.ExtraKeysHCL, p.Name) + helper.RemoveEqualFold(&c.ExtraKeysHCL, "config") + helper.RemoveEqualFold(&c.ExtraKeysHCL, "plugin") } for _, k := range []string{"options", "meta", "chroot_env", "servers", "server_join"} { - removeEqualFold(&c.ExtraKeysHCL, k) - removeEqualFold(&c.ExtraKeysHCL, "client") + helper.RemoveEqualFold(&c.ExtraKeysHCL, k) + helper.RemoveEqualFold(&c.ExtraKeysHCL, "client") } // stats is an unused key, continue to silently ignore it - removeEqualFold(&c.Client.ExtraKeysHCL, "stats") + helper.RemoveEqualFold(&c.Client.ExtraKeysHCL, "stats") // Remove HostVolume extra keys for _, hv := range c.Client.HostVolumes { - removeEqualFold(&c.Client.ExtraKeysHCL, hv.Name) - removeEqualFold(&c.Client.ExtraKeysHCL, "host_volume") + helper.RemoveEqualFold(&c.Client.ExtraKeysHCL, hv.Name) + helper.RemoveEqualFold(&c.Client.ExtraKeysHCL, "host_volume") } for _, k := range []string{"enabled_schedulers", "start_join", "retry_join", "server_join"} { - removeEqualFold(&c.ExtraKeysHCL, k) - removeEqualFold(&c.ExtraKeysHCL, "server") + helper.RemoveEqualFold(&c.ExtraKeysHCL, k) + helper.RemoveEqualFold(&c.ExtraKeysHCL, "server") } for _, k := range []string{"datadog_tags"} { - removeEqualFold(&c.ExtraKeysHCL, k) - removeEqualFold(&c.ExtraKeysHCL, "telemetry") + helper.RemoveEqualFold(&c.ExtraKeysHCL, k) + helper.RemoveEqualFold(&c.ExtraKeysHCL, "telemetry") } - return extraKeysImpl([]string{}, reflect.ValueOf(*c)) -} - -// extraKeysImpl returns an error if any extraKeys array is not empty -func extraKeysImpl(path []string, val reflect.Value) error { - stype := val.Type() - for i := 0; i < stype.NumField(); i++ { - ftype := stype.Field(i) - fval := val.Field(i) - - name := ftype.Name - prop := "" - tagSplit(ftype, "hcl", &name, &prop) - - if fval.Kind() == reflect.Ptr { - fval = reflect.Indirect(fval) - } - - // struct? recurse. add the struct's key to the path - if fval.Kind() == reflect.Struct { - err := extraKeysImpl(append([]string{name}, path...), fval) - if err != nil { - return err - } - } - - if "unusedKeys" == prop { - if ks, ok := fval.Interface().([]string); ok && len(ks) != 0 { - return fmt.Errorf("%s unexpected keys %s", - strings.Join(path, "."), - strings.Join(ks, ", ")) - } - } - } - return nil -} - -// tagSplit reads the named tag from the structfield and splits its values into strings -func tagSplit(field reflect.StructField, tagName string, vars ...*string) { - tag := strings.Split(field.Tag.Get(tagName), ",") - end := len(tag) - 1 - for i, s := range vars { - if i > end { - return - } - *s = tag[i] - } + return helper.UnusedKeys(c) } diff --git a/command/commands.go b/command/commands.go index ff2100a9d..dbbcec006 100644 --- a/command/commands.go +++ b/command/commands.go @@ -493,6 +493,17 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory { }, nil }, + "plugin": func() (cli.Command, error) { + return &PluginCommand{ + Meta: meta, + }, nil + }, + "plugin status": func() (cli.Command, error) { + return &PluginStatusCommand{ + Meta: meta, + }, nil + }, + "quota": func() (cli.Command, error) { return &QuotaCommand{ Meta: meta, @@ -646,6 +657,26 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory { Ui: meta.Ui, }, nil }, + "volume": func() (cli.Command, error) { + return &VolumeCommand{ + Meta: meta, + }, nil + }, + "volume status": func() (cli.Command, error) { + return &VolumeStatusCommand{ + Meta: meta, + }, nil + }, + "volume register": func() (cli.Command, error) { + return &VolumeRegisterCommand{ + Meta: meta, + }, nil + }, + "volume deregister": func() (cli.Command, error) { + return &VolumeDeregisterCommand{ + Meta: meta, + }, nil + }, } deprecated := map[string]cli.CommandFactory{ diff --git a/command/plugin.go b/command/plugin.go new file mode 100644 index 000000000..7128e7cbe --- /dev/null +++ b/command/plugin.go @@ -0,0 +1,26 @@ +package command + +import "github.com/mitchellh/cli" + +type PluginCommand struct { + Meta +} + +func (c *PluginCommand) Help() string { + helpText := ` +Usage nomad plugin status [options] [plugin] + + This command groups subcommands for interacting with plugins. +` + return helpText +} + +func (c *PluginCommand) Synopsis() string { + return "Inspect plugins" +} + +func (c *PluginCommand) Name() string { return "plugin" } + +func (c *PluginCommand) Run(args []string) int { + return cli.RunResultHelp +} diff --git a/command/plugin_status.go b/command/plugin_status.go new file mode 100644 index 000000000..95eeb5cf6 --- /dev/null +++ b/command/plugin_status.go @@ -0,0 +1,146 @@ +package command + +import ( + "fmt" + "strings" + + "github.com/hashicorp/nomad/api/contexts" + "github.com/posener/complete" +) + +type PluginStatusCommand struct { + Meta + length int + short bool + verbose bool + json bool + template string +} + +func (c *PluginStatusCommand) Help() string { + helpText := ` +Usage nomad plugin status [options] + + Display status information about a plugin. If no plugin id is given, + a list of all plugins will be displayed. + +General Options: + + ` + generalOptionsUsage() + ` + +Status Options: + + -type + List only plugins of type . + + -short + Display short output. + + -verbose + Display full information. + + -json + Output the allocation in its JSON format. + + -t + Format and display allocation using a Go template. +` + return helpText +} + +func (c *PluginStatusCommand) Synopsis() string { + return "Display status information about a plugin" +} + +// predictVolumeType is also used in volume_status +var predictVolumeType = complete.PredictFunc(func(a complete.Args) []string { + types := []string{"csi"} + for _, t := range types { + if strings.Contains(t, a.Last) { + return []string{t} + } + } + return nil +}) + +func (c *PluginStatusCommand) AutocompleteFlags() complete.Flags { + return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient), + complete.Flags{ + "-type": predictVolumeType, + "-short": complete.PredictNothing, + "-verbose": complete.PredictNothing, + "-json": complete.PredictNothing, + "-t": complete.PredictAnything, + }) +} + +func (c *PluginStatusCommand) AutocompleteArgs() complete.Predictor { + return complete.PredictFunc(func(a complete.Args) []string { + client, err := c.Meta.Client() + if err != nil { + return nil + } + + resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Plugins, nil) + if err != nil { + return []string{} + } + return resp.Matches[contexts.Plugins] + }) +} + +func (c *PluginStatusCommand) Name() string { return "plugin status" } + +func (c *PluginStatusCommand) Run(args []string) int { + var typeArg string + + flags := c.Meta.FlagSet(c.Name(), FlagSetClient) + flags.Usage = func() { c.Ui.Output(c.Help()) } + flags.StringVar(&typeArg, "type", "", "") + flags.BoolVar(&c.short, "short", false, "") + flags.BoolVar(&c.verbose, "verbose", false, "") + flags.BoolVar(&c.json, "json", false, "") + flags.StringVar(&c.template, "t", "", "") + + if err := flags.Parse(args); err != nil { + c.Ui.Error(fmt.Sprintf("Error parsing arguments %s", err)) + return 1 + } + + typeArg = strings.ToLower(typeArg) + + // Check that we either got no arguments or exactly one. + args = flags.Args() + if len(args) > 1 { + c.Ui.Error("This command takes either no arguments or one: ") + c.Ui.Error(commandErrorText(c)) + return 1 + } + + // Truncate the id unless full length is requested + c.length = shortId + if c.verbose { + c.length = fullId + } + + // Get the HTTP client + client, err := c.Meta.Client() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err)) + return 1 + } + + id := "" + if len(args) == 1 { + id = args[0] + } + + code := c.csiStatus(client, id) + if code != 0 { + return code + } + + // Extend this section with other plugin implementations + + return 0 +} diff --git a/command/plugin_status_csi.go b/command/plugin_status_csi.go new file mode 100644 index 000000000..ef7d28101 --- /dev/null +++ b/command/plugin_status_csi.go @@ -0,0 +1,111 @@ +package command + +import ( + "fmt" + "sort" + "strings" + + "github.com/hashicorp/nomad/api" +) + +func (c *PluginStatusCommand) csiBanner() { + if !(c.json || len(c.template) > 0) { + c.Ui.Output(c.Colorize().Color("[bold]Container Storage Interface[reset]")) + } +} + +func (c *PluginStatusCommand) csiStatus(client *api.Client, id string) int { + if id == "" { + c.csiBanner() + plugs, _, err := client.CSIPlugins().List(nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying CSI plugins: %s", err)) + return 1 + } + + if len(plugs) == 0 { + // No output if we have no plugins + c.Ui.Error("No CSI plugins") + } else { + str, err := c.csiFormatPlugins(plugs) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error formatting: %s", err)) + return 1 + } + c.Ui.Output(str) + } + return 0 + } + + // Lookup matched a single plugin + plug, _, err := client.CSIPlugins().Info(id, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying plugin: %s", err)) + return 1 + } + + str, err := c.csiFormatPlugin(plug) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error formatting plugin: %s", err)) + return 1 + } + + c.Ui.Output(str) + return 0 +} + +func (c *PluginStatusCommand) csiFormatPlugins(plugs []*api.CSIPluginListStub) (string, error) { + // Sort the output by quota name + sort.Slice(plugs, func(i, j int) bool { return plugs[i].ID < plugs[j].ID }) + + if c.json || len(c.template) > 0 { + out, err := Format(c.json, c.template, plugs) + if err != nil { + return "", fmt.Errorf("format error: %v", err) + } + return out, nil + } + + // TODO(langmartin) add Provider https://github.com/hashicorp/nomad/issues/7248 + rows := make([]string, len(plugs)+1) + rows[0] = "ID|Controllers Healthy/Expected|Nodes Healthy/Expected" + for i, p := range plugs { + rows[i+1] = fmt.Sprintf("%s|%d/%d|%d/%d", + limit(p.ID, c.length), + p.ControllersHealthy, + p.ControllersExpected, + p.NodesHealthy, + p.NodesExpected, + ) + } + return formatList(rows), nil +} + +func (c *PluginStatusCommand) csiFormatPlugin(plug *api.CSIPlugin) (string, error) { + if c.json || len(c.template) > 0 { + out, err := Format(c.json, c.template, plug) + if err != nil { + return "", fmt.Errorf("format error: %v", err) + } + return out, nil + } + + output := []string{ + fmt.Sprintf("ID|%s", plug.ID), + fmt.Sprintf("Controllers Healthy|%d", plug.ControllersHealthy), + fmt.Sprintf("Controllers Expected|%d", len(plug.Controllers)), + fmt.Sprintf("Nodes Healthy|%d", plug.NodesHealthy), + fmt.Sprintf("Nodes Expected|%d", len(plug.Nodes)), + } + + // Exit early + if c.short { + return formatKV(output), nil + } + + // Format the allocs + banner := c.Colorize().Color("\n[bold]Allocations[reset]") + allocs := formatAllocListStubs(plug.Allocations, c.verbose, c.length) + full := []string{formatKV(output), banner, allocs} + return strings.Join(full, "\n"), nil +} diff --git a/command/plugin_status_test.go b/command/plugin_status_test.go new file mode 100644 index 000000000..cdf38e67a --- /dev/null +++ b/command/plugin_status_test.go @@ -0,0 +1,57 @@ +package command + +import ( + "testing" + + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/nomad" + "github.com/mitchellh/cli" + "github.com/posener/complete" + "github.com/stretchr/testify/require" +) + +func TestPluginStatusCommand_Implements(t *testing.T) { + t.Parallel() + var _ cli.Command = &PluginStatusCommand{} +} + +func TestPluginStatusCommand_Fails(t *testing.T) { + t.Parallel() + ui := new(cli.MockUi) + cmd := &PluginStatusCommand{Meta: Meta{Ui: ui}} + + // Fails on misuse + code := cmd.Run([]string{"some", "bad", "args"}) + require.Equal(t, 1, code) + + out := ui.ErrorWriter.String() + require.Contains(t, out, commandErrorText(cmd)) + ui.ErrorWriter.Reset() +} + +func TestPluginStatusCommand_AutocompleteArgs(t *testing.T) { + t.Parallel() + + srv, _, url := testServer(t, true, nil) + defer srv.Shutdown() + + ui := new(cli.MockUi) + cmd := &PluginStatusCommand{Meta: Meta{Ui: ui, flagAddress: url}} + + // Create a plugin + id := "long-plugin-id" + state := srv.Agent.Server().State() + cleanup := nomad.CreateTestCSIPlugin(state, id) + defer cleanup() + ws := memdb.NewWatchSet() + plug, err := state.CSIPluginByID(ws, id) + require.NoError(t, err) + + prefix := plug.ID[:len(plug.ID)-5] + args := complete.Args{Last: prefix} + predictor := cmd.AutocompleteArgs() + + res := predictor.Predict(args) + require.Equal(t, 1, len(res)) + require.Equal(t, plug.ID, res[0]) +} diff --git a/command/status.go b/command/status.go index 4cf028b39..c67a73f62 100644 --- a/command/status.go +++ b/command/status.go @@ -162,6 +162,10 @@ func (c *StatusCommand) Run(args []string) int { cmd = &NamespaceStatusCommand{Meta: c.Meta} case contexts.Quotas: cmd = &QuotaStatusCommand{Meta: c.Meta} + case contexts.Plugins: + cmd = &PluginStatusCommand{Meta: c.Meta} + case contexts.Volumes: + cmd = &VolumeStatusCommand{Meta: c.Meta} default: c.Ui.Error(fmt.Sprintf("Unable to resolve ID: %q", id)) return 1 diff --git a/command/volume.go b/command/volume.go new file mode 100644 index 000000000..83eb44093 --- /dev/null +++ b/command/volume.go @@ -0,0 +1,46 @@ +package command + +import ( + "strings" + + "github.com/mitchellh/cli" +) + +type VolumeCommand struct { + Meta +} + +func (c *VolumeCommand) Help() string { + helpText := ` +Usage: nomad volume [options] + + volume groups commands that interact with volumes. + + Register a new volume or update an existing volume: + + $ nomad volume register + + Examine the status of a volume: + + $ nomad volume status + + Deregister an unused volume: + + $ nomad volume deregister + + Please see the individual subcommand help for detailed usage information. +` + return strings.TrimSpace(helpText) +} + +func (c *VolumeCommand) Name() string { + return "volume" +} + +func (c *VolumeCommand) Synopsis() string { + return "Interact with volumes" +} + +func (c *VolumeCommand) Run(args []string) int { + return cli.RunResultHelp +} diff --git a/command/volume_deregister.go b/command/volume_deregister.go new file mode 100644 index 000000000..eafb14ea6 --- /dev/null +++ b/command/volume_deregister.go @@ -0,0 +1,88 @@ +package command + +import ( + "fmt" + "strings" + + "github.com/hashicorp/nomad/api/contexts" + "github.com/posener/complete" +) + +type VolumeDeregisterCommand struct { + Meta +} + +func (c *VolumeDeregisterCommand) Help() string { + helpText := ` +Usage: nomad volume deregister [options] + + Remove an unused volume from Nomad. + +General Options: + + ` + generalOptionsUsage() + + return strings.TrimSpace(helpText) +} + +func (c *VolumeDeregisterCommand) AutocompleteFlags() complete.Flags { + return c.Meta.AutocompleteFlags(FlagSetClient) +} + +func (c *VolumeDeregisterCommand) AutocompleteArgs() complete.Predictor { + return complete.PredictFunc(func(a complete.Args) []string { + client, err := c.Meta.Client() + if err != nil { + return nil + } + + // When multiple volume types are implemented, this search should merge contexts + resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Volumes, nil) + if err != nil { + return []string{} + } + return resp.Matches[contexts.Volumes] + }) +} + +func (c *VolumeDeregisterCommand) Synopsis() string { + return "Remove a volume" +} + +func (c *VolumeDeregisterCommand) Name() string { return "volume deregister" } + +func (c *VolumeDeregisterCommand) Run(args []string) int { + flags := c.Meta.FlagSet(c.Name(), FlagSetClient) + flags.Usage = func() { c.Ui.Output(c.Help()) } + + if err := flags.Parse(args); err != nil { + c.Ui.Error(fmt.Sprintf("Error parsing arguments %s", err)) + return 1 + } + + // Check that we get exactly one argument + args = flags.Args() + if l := len(args); l != 1 { + c.Ui.Error("This command takes one argument: ") + c.Ui.Error(commandErrorText(c)) + return 1 + } + volID := args[0] + + // Get the HTTP client + client, err := c.Meta.Client() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err)) + return 1 + } + + // Deregister only works on CSI volumes, but could be extended to support other + // network interfaces or host volumes + err = client.CSIVolumes().Deregister(volID, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error deregistering volume: %s", err)) + return 1 + } + + return 0 +} diff --git a/command/volume_register.go b/command/volume_register.go new file mode 100644 index 000000000..83d0307ba --- /dev/null +++ b/command/volume_register.go @@ -0,0 +1,130 @@ +package command + +import ( + "fmt" + "io/ioutil" + "os" + "strings" + + "github.com/hashicorp/hcl" + "github.com/hashicorp/hcl/hcl/ast" + "github.com/posener/complete" +) + +type VolumeRegisterCommand struct { + Meta +} + +func (c *VolumeRegisterCommand) Help() string { + helpText := ` +Usage: nomad volume register [options] + + Creates or updates a volume in Nomad. The volume must exist on the remote + storage provider before it can be used by a task. + + If the supplied path is "-" the volume file is read from stdin. Otherwise, it + is read from the file at the supplied path. + +General Options: + + ` + generalOptionsUsage() + + return strings.TrimSpace(helpText) +} + +func (c *VolumeRegisterCommand) AutocompleteFlags() complete.Flags { + return c.Meta.AutocompleteFlags(FlagSetClient) +} + +func (c *VolumeRegisterCommand) AutocompleteArgs() complete.Predictor { + return complete.PredictFiles("*") +} + +func (c *VolumeRegisterCommand) Synopsis() string { + return "Create or update a volume" +} + +func (c *VolumeRegisterCommand) Name() string { return "volume register" } + +func (c *VolumeRegisterCommand) Run(args []string) int { + flags := c.Meta.FlagSet(c.Name(), FlagSetClient) + flags.Usage = func() { c.Ui.Output(c.Help()) } + + if err := flags.Parse(args); err != nil { + c.Ui.Error(fmt.Sprintf("Error parsing arguments %s", err)) + return 1 + } + + // Check that we get exactly one argument + args = flags.Args() + if l := len(args); l != 1 { + c.Ui.Error("This command takes one argument: ") + c.Ui.Error(commandErrorText(c)) + return 1 + } + + // Read the file contents + file := args[0] + var rawVolume []byte + var err error + if file == "-" { + rawVolume, err = ioutil.ReadAll(os.Stdin) + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to read stdin: %v", err)) + return 1 + } + } else { + rawVolume, err = ioutil.ReadFile(file) + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to read file: %v", err)) + return 1 + } + } + + ast, volType, err := parseVolumeType(string(rawVolume)) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error parsing the volume type: %s", err)) + return 1 + } + volType = strings.ToLower(volType) + + // Get the HTTP client + client, err := c.Meta.Client() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err)) + return 1 + } + + switch volType { + case "csi": + code := c.csiRegister(client, ast) + if code != 0 { + return code + } + default: + c.Ui.Error(fmt.Sprintf("Error unknown volume type: %s", volType)) + return 1 + } + + return 0 +} + +// parseVolume is used to parse the quota specification from HCL +func parseVolumeType(input string) (*ast.File, string, error) { + // Parse the AST first + ast, err := hcl.Parse(input) + if err != nil { + return nil, "", fmt.Errorf("parse error: %v", err) + } + + // Decode the type, so we can dispatch on it + dispatch := &struct { + T string `hcl:"type"` + }{} + err = hcl.DecodeObject(dispatch, ast) + if err != nil { + return nil, "", fmt.Errorf("dispatch error: %v", err) + } + + return ast, dispatch.T, nil +} diff --git a/command/volume_register_csi.go b/command/volume_register_csi.go new file mode 100644 index 000000000..a7f74f2f3 --- /dev/null +++ b/command/volume_register_csi.go @@ -0,0 +1,44 @@ +package command + +import ( + "fmt" + + "github.com/hashicorp/hcl" + "github.com/hashicorp/hcl/hcl/ast" + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/helper" +) + +func (c *VolumeRegisterCommand) csiRegister(client *api.Client, ast *ast.File) int { + vol, err := csiDecodeVolume(ast) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error decoding the volume definition: %s", err)) + return 1 + } + _, err = client.CSIVolumes().Register(vol, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error registering volume: %s", err)) + return 1 + } + + return 0 +} + +// parseVolume is used to parse the quota specification from HCL +func csiDecodeVolume(input *ast.File) (*api.CSIVolume, error) { + output := &api.CSIVolume{} + err := hcl.DecodeObject(output, input) + if err != nil { + return nil, err + } + + // api.CSIVolume doesn't have the type field, it's used only for dispatch in + // parseVolumeType + helper.RemoveEqualFold(&output.ExtraKeysHCL, "type") + err = helper.UnusedKeys(output) + if err != nil { + return nil, err + } + + return output, nil +} diff --git a/command/volume_register_test.go b/command/volume_register_test.go new file mode 100644 index 000000000..d707f6171 --- /dev/null +++ b/command/volume_register_test.go @@ -0,0 +1,97 @@ +package command + +import ( + "testing" + + "github.com/hashicorp/hcl" + "github.com/hashicorp/nomad/api" + "github.com/stretchr/testify/require" +) + +func TestVolumeDispatchParse(t *testing.T) { + t.Parallel() + + cases := []struct { + hcl string + t string + err string + }{{ + hcl: ` +type = "foo" +rando = "bar" +`, + t: "foo", + err: "", + }, { + hcl: `{"id": "foo", "type": "foo", "other": "bar"}`, + t: "foo", + err: "", + }} + + for _, c := range cases { + t.Run(c.hcl, func(t *testing.T) { + _, s, err := parseVolumeType(c.hcl) + require.Equal(t, c.t, s) + if c.err == "" { + require.NoError(t, err) + } else { + require.Contains(t, err.Error(), c.err) + } + + }) + } +} + +func TestCSIVolumeParse(t *testing.T) { + t.Parallel() + + cases := []struct { + hcl string + q *api.CSIVolume + err string + }{{ + hcl: ` +id = "foo" +type = "csi" +namespace = "n" +access_mode = "single-node-writer" +attachment_mode = "file-system" +plugin_id = "p" +`, + q: &api.CSIVolume{ + ID: "foo", + Namespace: "n", + AccessMode: "single-node-writer", + AttachmentMode: "file-system", + PluginID: "p", + }, + err: "", + }, { + hcl: ` +{"id": "foo", "namespace": "n", "type": "csi", "access_mode": "single-node-writer", "attachment_mode": "file-system", +"plugin_id": "p"} +`, + q: &api.CSIVolume{ + ID: "foo", + Namespace: "n", + AccessMode: "single-node-writer", + AttachmentMode: "file-system", + PluginID: "p", + }, + err: "", + }} + + for _, c := range cases { + t.Run(c.hcl, func(t *testing.T) { + ast, err := hcl.ParseString(c.hcl) + require.NoError(t, err) + vol, err := csiDecodeVolume(ast) + require.Equal(t, c.q, vol) + if c.err == "" { + require.NoError(t, err) + } else { + require.Contains(t, err.Error(), c.err) + } + }) + } +} diff --git a/command/volume_status.go b/command/volume_status.go new file mode 100644 index 000000000..a13be974f --- /dev/null +++ b/command/volume_status.go @@ -0,0 +1,134 @@ +package command + +import ( + "fmt" + "strings" + + "github.com/hashicorp/nomad/api/contexts" + "github.com/posener/complete" +) + +type VolumeStatusCommand struct { + Meta + length int + short bool + verbose bool + json bool + template string +} + +func (c *VolumeStatusCommand) Help() string { + helpText := ` +Usage: nomad volume status [options] + + Display status information about a CSI volume. If no volume id is given, a + list of all volumes will be displayed. + +General Options: + + ` + generalOptionsUsage() + ` + +Status Options: + + -type + List only volumes of type . + + -short + Display short output. Used only when a single volume is being + queried, and drops verbose information about allocations. + + -verbose + Display full allocation information. + + -json + Output the allocation in its JSON format. + + -t + Format and display allocation using a Go template. +` + return strings.TrimSpace(helpText) +} + +func (c *VolumeStatusCommand) Synopsis() string { + return "Display status information about a volume" +} + +func (c *VolumeStatusCommand) AutocompleteFlags() complete.Flags { + return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient), + complete.Flags{ + "-type": predictVolumeType, + "-short": complete.PredictNothing, + "-verbose": complete.PredictNothing, + "-json": complete.PredictNothing, + "-t": complete.PredictAnything, + }) +} + +func (c *VolumeStatusCommand) AutocompleteArgs() complete.Predictor { + return complete.PredictFunc(func(a complete.Args) []string { + client, err := c.Meta.Client() + if err != nil { + return nil + } + + resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Volumes, nil) + if err != nil { + return []string{} + } + return resp.Matches[contexts.Volumes] + }) +} + +func (c *VolumeStatusCommand) Name() string { return "volume status" } + +func (c *VolumeStatusCommand) Run(args []string) int { + var typeArg string + + flags := c.Meta.FlagSet(c.Name(), FlagSetClient) + flags.Usage = func() { c.Ui.Output(c.Help()) } + flags.StringVar(&typeArg, "type", "", "") + flags.BoolVar(&c.short, "short", false, "") + flags.BoolVar(&c.verbose, "verbose", false, "") + flags.BoolVar(&c.json, "json", false, "") + flags.StringVar(&c.template, "t", "", "") + + if err := flags.Parse(args); err != nil { + c.Ui.Error(fmt.Sprintf("Error parsing arguments %s", err)) + return 1 + } + + // Check that we either got no arguments or exactly one + args = flags.Args() + if len(args) > 1 { + c.Ui.Error("This command takes either no arguments or one: ") + c.Ui.Error(commandErrorText(c)) + return 1 + } + + // Truncate the id unless full length is requested + c.length = shortId + if c.verbose { + c.length = fullId + } + + // Get the HTTP client + client, err := c.Meta.Client() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err)) + return 1 + } + + id := "" + if len(args) == 1 { + id = args[0] + } + + code := c.csiStatus(client, id) + if code != 0 { + return code + } + + // Extend this section with other volume implementations + + return 0 +} diff --git a/command/volume_status_csi.go b/command/volume_status_csi.go new file mode 100644 index 000000000..27b9dd566 --- /dev/null +++ b/command/volume_status_csi.go @@ -0,0 +1,152 @@ +package command + +import ( + "fmt" + "sort" + "strings" + + "github.com/hashicorp/nomad/api" +) + +func (c *VolumeStatusCommand) csiBanner() { + if !(c.json || len(c.template) > 0) { + c.Ui.Output(c.Colorize().Color("[bold]Container Storage Interface[reset]")) + } +} + +func (c *VolumeStatusCommand) csiStatus(client *api.Client, id string) int { + // Invoke list mode if no volume id + if id == "" { + c.csiBanner() + vols, _, err := client.CSIVolumes().List(nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying volumes: %s", err)) + return 1 + } + + if len(vols) == 0 { + // No output if we have no volumes + c.Ui.Error("No CSI volumes") + } else { + str, err := c.csiFormatVolumes(vols) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error formatting: %s", err)) + return 1 + } + c.Ui.Output(str) + } + return 0 + } + + // Try querying the volume + vol, _, err := client.CSIVolumes().Info(id, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying volume: %s", err)) + return 1 + } + + str, err := c.formatBasic(vol) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error formatting volume: %s", err)) + return 1 + } + c.Ui.Output(str) + + return 0 +} + +func (c *VolumeStatusCommand) csiFormatVolumes(vols []*api.CSIVolumeListStub) (string, error) { + // Sort the output by volume id + sort.Slice(vols, func(i, j int) bool { return vols[i].ID < vols[j].ID }) + + if c.json || len(c.template) > 0 { + out, err := Format(c.json, c.template, vols) + if err != nil { + return "", fmt.Errorf("format error: %v", err) + } + return out, nil + } + + rows := make([]string, len(vols)+1) + rows[0] = "ID|Name|Plugin ID|Schedulable|Access Mode" + for i, v := range vols { + rows[i+1] = fmt.Sprintf("%s|%s|%s|%t|%s", + limit(v.ID, c.length), + v.Name, + v.PluginID, + v.Schedulable, + v.AccessMode, + ) + } + return formatList(rows), nil +} + +func (c *VolumeStatusCommand) formatBasic(vol *api.CSIVolume) (string, error) { + if c.json || len(c.template) > 0 { + out, err := Format(c.json, c.template, vol) + if err != nil { + return "", fmt.Errorf("format error: %v", err) + } + return out, nil + } + + // TODO(langmartin) add Provider https://github.com/hashicorp/nomad/issues/7248 + output := []string{ + fmt.Sprintf("ID|%s", vol.ID), + fmt.Sprintf("Name|%s", vol.Name), + fmt.Sprintf("External ID|%s", vol.ExternalID), + + fmt.Sprintf("Schedulable|%t", vol.Schedulable), + fmt.Sprintf("Controllers Healthy|%d", vol.ControllersHealthy), + fmt.Sprintf("Controllers Expected|%d", vol.ControllersExpected), + fmt.Sprintf("Nodes Healthy|%d", vol.NodesHealthy), + fmt.Sprintf("Nodes Expected|%d", vol.NodesExpected), + + fmt.Sprintf("Access Mode|%s", vol.AccessMode), + fmt.Sprintf("Attachment Mode|%s", vol.AttachmentMode), + fmt.Sprintf("Namespace|%s", vol.Namespace), + } + + // Exit early + if c.short { + return formatKV(output), nil + } + + // Format the allocs + banner := c.Colorize().Color("\n[bold]Allocations[reset]") + allocs := formatAllocListStubs(vol.Allocations, c.verbose, c.length) + full := []string{formatKV(output), banner, allocs} + return strings.Join(full, "\n"), nil +} + +func (c *VolumeStatusCommand) formatTopologies(vol *api.CSIVolume) string { + var out []string + + // Find the union of all the keys + head := map[string]string{} + for _, t := range vol.Topologies { + for key := range t.Segments { + if _, ok := head[key]; !ok { + head[key] = "" + } + } + } + + // Append the header + var line []string + for key := range head { + line = append(line, key) + } + out = append(out, strings.Join(line, " ")) + + // Append each topology + for _, t := range vol.Topologies { + line = []string{} + for key := range head { + line = append(line, t.Segments[key]) + } + out = append(out, strings.Join(line, " ")) + } + + return strings.Join(out, "\n") +} diff --git a/command/volume_status_test.go b/command/volume_status_test.go new file mode 100644 index 000000000..a3c6a5b20 --- /dev/null +++ b/command/volume_status_test.go @@ -0,0 +1,58 @@ +package command + +import ( + "testing" + + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/mitchellh/cli" + "github.com/posener/complete" + "github.com/stretchr/testify/require" +) + +func TestCSIVolumeStatusCommand_Implements(t *testing.T) { + t.Parallel() + var _ cli.Command = &VolumeStatusCommand{} +} + +func TestCSIVolumeStatusCommand_Fails(t *testing.T) { + t.Parallel() + ui := new(cli.MockUi) + cmd := &VolumeStatusCommand{Meta: Meta{Ui: ui}} + + // Fails on misuse + code := cmd.Run([]string{"some", "bad", "args"}) + require.Equal(t, 1, code) + + out := ui.ErrorWriter.String() + require.Contains(t, out, commandErrorText(cmd)) + ui.ErrorWriter.Reset() +} + +func TestCSIVolumeStatusCommand_AutocompleteArgs(t *testing.T) { + t.Parallel() + + srv, _, url := testServer(t, true, nil) + defer srv.Shutdown() + + ui := new(cli.MockUi) + cmd := &VolumeStatusCommand{Meta: Meta{Ui: ui, flagAddress: url}} + + state := srv.Agent.Server().State() + + vol := &structs.CSIVolume{ + ID: uuid.Generate(), + Namespace: "default", + PluginID: "glade", + } + + require.NoError(t, state.CSIVolumeRegister(1000, []*structs.CSIVolume{vol})) + + prefix := vol.ID[:len(vol.ID)-5] + args := complete.Args{Last: prefix} + predictor := cmd.AutocompleteArgs() + + res := predictor.Predict(args) + require.Equal(t, 1, len(res)) + require.Equal(t, vol.ID, res[0]) +} diff --git a/helper/funcs.go b/helper/funcs.go index 7a6b4c151..c75294a1b 100644 --- a/helper/funcs.go +++ b/helper/funcs.go @@ -3,7 +3,9 @@ package helper import ( "crypto/sha512" "fmt" + "reflect" "regexp" + "strings" "time" multierror "github.com/hashicorp/go-multierror" @@ -387,3 +389,75 @@ func CheckHCLKeys(node ast.Node, valid []string) error { return result } + +// UnusedKeys returns a pretty-printed error if any `hcl:",unusedKeys"` is not empty +func UnusedKeys(obj interface{}) error { + val := reflect.ValueOf(obj) + if val.Kind() == reflect.Ptr { + val = reflect.Indirect(val) + } + return unusedKeysImpl([]string{}, val) +} + +func unusedKeysImpl(path []string, val reflect.Value) error { + stype := val.Type() + for i := 0; i < stype.NumField(); i++ { + ftype := stype.Field(i) + fval := val.Field(i) + tags := strings.Split(ftype.Tag.Get("hcl"), ",") + name := tags[0] + tags = tags[1:] + + if fval.Kind() == reflect.Ptr { + fval = reflect.Indirect(fval) + } + + // struct? recurse. Add the struct's key to the path + if fval.Kind() == reflect.Struct { + err := unusedKeysImpl(append([]string{name}, path...), fval) + if err != nil { + return err + } + continue + } + + // Search the hcl tags for "unusedKeys" + unusedKeys := false + for _, p := range tags { + if p == "unusedKeys" { + unusedKeys = true + break + } + } + + if unusedKeys { + ks, ok := fval.Interface().([]string) + if ok && len(ks) != 0 { + ps := "" + if len(path) > 0 { + ps = strings.Join(path, ".") + " " + } + return fmt.Errorf("%sunexpected keys %s", + ps, + strings.Join(ks, ", ")) + } + } + } + return nil +} + +// RemoveEqualFold removes the first string that EqualFold matches. It updates xs in place +func RemoveEqualFold(xs *[]string, search string) { + sl := *xs + for i, x := range sl { + if strings.EqualFold(x, search) { + sl = append(sl[:i], sl[i+1:]...) + if len(sl) == 0 { + *xs = nil + } else { + *xs = sl + } + return + } + } +} diff --git a/nomad/client_rpc.go b/nomad/client_rpc.go index ca8db2336..0c5d611e5 100644 --- a/nomad/client_rpc.go +++ b/nomad/client_rpc.go @@ -219,20 +219,20 @@ func NodeRpc(session *yamux.Session, method string, args, reply interface{}) err // Open a new session stream, err := session.Open() if err != nil { - return err + return fmt.Errorf("session open: %v", err) } defer stream.Close() // Write the RpcNomad byte to set the mode if _, err := stream.Write([]byte{byte(pool.RpcNomad)}); err != nil { stream.Close() - return err + return fmt.Errorf("set mode: %v", err) } // Make the RPC err = msgpackrpc.CallWithCodec(pool.NewClientCodec(stream), method, args, reply) if err != nil { - return err + return fmt.Errorf("rpc call: %v", err) } return nil diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index fedb37801..dbd33ad9a 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -9,7 +9,6 @@ import ( "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" - "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" @@ -483,7 +482,7 @@ func TestCSIPluginEndpoint_RegisterViaFingerprint(t *testing.T) { ns := structs.DefaultNamespace - deleteNodes := CreateTestPlugin(srv.fsm.State(), "foo") + deleteNodes := CreateTestCSIPlugin(srv.fsm.State(), "foo") defer deleteNodes() state := srv.fsm.State() @@ -527,74 +526,6 @@ func TestCSIPluginEndpoint_RegisterViaFingerprint(t *testing.T) { require.Nil(t, resp2.Plugin) } -// CreateTestPlugin is a helper that generates the node + fingerprint results necessary to -// create a CSIPlugin by directly inserting into the state store. It's exported for use in -// other test packages -func CreateTestPlugin(s *state.StateStore, id string) func() { - // Create some nodes - ns := make([]*structs.Node, 3) - for i := range ns { - n := mock.Node() - ns[i] = n - } - - // Install healthy plugin fingerprinting results - ns[0].CSIControllerPlugins = map[string]*structs.CSIInfo{ - id: { - PluginID: id, - AllocID: uuid.Generate(), - Healthy: true, - HealthDescription: "healthy", - RequiresControllerPlugin: true, - RequiresTopologies: false, - ControllerInfo: &structs.CSIControllerInfo{ - SupportsReadOnlyAttach: true, - SupportsAttachDetach: true, - SupportsListVolumes: true, - SupportsListVolumesAttachedNodes: false, - }, - }, - } - - // Install healthy plugin fingerprinting results - allocID := uuid.Generate() - for _, n := range ns[1:] { - n.CSINodePlugins = map[string]*structs.CSIInfo{ - id: { - PluginID: id, - AllocID: allocID, - Healthy: true, - HealthDescription: "healthy", - RequiresControllerPlugin: true, - RequiresTopologies: false, - NodeInfo: &structs.CSINodeInfo{ - ID: n.ID, - MaxVolumes: 64, - RequiresNodeStageVolume: true, - }, - }, - } - } - - // Insert them into the state store - index := uint64(999) - for _, n := range ns { - index++ - s.UpsertNode(index, n) - } - - // Return cleanup function that deletes the nodes - return func() { - ids := make([]string, len(ns)) - for i, n := range ns { - ids[i] = n.ID - } - - index++ - s.DeleteNode(index, ids) - } -} - func TestCSI_RPCVolumeAndPluginLookup(t *testing.T) { srv, shutdown := TestServer(t, func(c *Config) {}) defer shutdown() diff --git a/nomad/search_endpoint.go b/nomad/search_endpoint.go index 4e1b3acfa..3e5eea504 100644 --- a/nomad/search_endpoint.go +++ b/nomad/search_endpoint.go @@ -29,6 +29,8 @@ var ( structs.Nodes, structs.Evals, structs.Deployments, + structs.Plugins, + structs.Volumes, } ) @@ -52,15 +54,19 @@ func (s *Search) getMatches(iter memdb.ResultIterator, prefix string) ([]string, var id string switch t := raw.(type) { case *structs.Job: - id = raw.(*structs.Job).ID + id = t.ID case *structs.Evaluation: - id = raw.(*structs.Evaluation).ID + id = t.ID case *structs.Allocation: - id = raw.(*structs.Allocation).ID + id = t.ID case *structs.Node: - id = raw.(*structs.Node).ID + id = t.ID case *structs.Deployment: - id = raw.(*structs.Deployment).ID + id = t.ID + case *structs.CSIPlugin: + id = t.ID + case *structs.CSIVolume: + id = t.ID default: matchID, ok := getEnterpriseMatch(raw) if !ok { @@ -95,6 +101,10 @@ func getResourceIter(context structs.Context, aclObj *acl.ACL, namespace, prefix return state.NodesByIDPrefix(ws, prefix) case structs.Deployments: return state.DeploymentsByIDPrefix(ws, namespace, prefix) + case structs.Plugins: + return state.CSIPluginsByIDPrefix(ws, prefix) + case structs.Volumes: + return state.CSIVolumesByIDPrefix(ws, namespace, prefix) default: return getEnterpriseResourceIter(context, aclObj, namespace, prefix, ws, state) } diff --git a/nomad/search_endpoint_test.go b/nomad/search_endpoint_test.go index 31c3ae72c..3ee3d3afb 100644 --- a/nomad/search_endpoint_test.go +++ b/nomad/search_endpoint_test.go @@ -7,10 +7,12 @@ import ( msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) const jobIndex = 1000 @@ -746,3 +748,77 @@ func TestSearch_PrefixSearch_MultiRegion(t *testing.T) { assert.Equal(job.ID, resp.Matches[structs.Jobs][0]) assert.Equal(uint64(jobIndex), resp.Index) } + +func TestSearch_PrefixSearch_CSIPlugin(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + s, cleanupS := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 + }) + defer cleanupS() + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + id := uuid.Generate() + CreateTestCSIPlugin(s.fsm.State(), id) + + prefix := id[:len(id)-2] + + req := &structs.SearchRequest{ + Prefix: prefix, + Context: structs.Plugins, + QueryOptions: structs.QueryOptions{ + Region: "global", + }, + } + + var resp structs.SearchResponse + if err := msgpackrpc.CallWithCodec(codec, "Search.PrefixSearch", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + assert.Equal(1, len(resp.Matches[structs.Plugins])) + assert.Equal(id, resp.Matches[structs.Plugins][0]) + assert.Equal(resp.Truncations[structs.Plugins], false) +} + +func TestSearch_PrefixSearch_CSIVolume(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + s, cleanupS := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 + }) + defer cleanupS() + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + id := uuid.Generate() + err := s.fsm.State().CSIVolumeRegister(1000, []*structs.CSIVolume{{ + ID: id, + Namespace: structs.DefaultNamespace, + PluginID: "glade", + }}) + require.NoError(t, err) + + prefix := id[:len(id)-2] + + req := &structs.SearchRequest{ + Prefix: prefix, + Context: structs.Volumes, + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: structs.DefaultNamespace, + }, + } + + var resp structs.SearchResponse + if err := msgpackrpc.CallWithCodec(codec, "Search.PrefixSearch", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + assert.Equal(1, len(resp.Matches[structs.Volumes])) + assert.Equal(id, resp.Matches[structs.Volumes][0]) + assert.Equal(resp.Truncations[structs.Volumes], false) +} diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 2b1567d7a..c52653dee 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1657,7 +1657,7 @@ func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolum func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, id string) (*structs.CSIVolume, error) { txn := s.db.Txn(false) - watchCh, obj, err := txn.FirstWatch("csi_volumes", "id", id) + watchCh, obj, err := txn.FirstWatch("csi_volumes", "id_prefix", id) if err != nil { return nil, fmt.Errorf("volume lookup failed: %s %v", id, err) } @@ -1684,6 +1684,30 @@ func (s *StateStore) CSIVolumesByPluginID(ws memdb.WatchSet, pluginID string) (m return iter, nil } +// CSIVolumesByIDPrefix supports search +func (s *StateStore) CSIVolumesByIDPrefix(ws memdb.WatchSet, namespace, volumeID string) (memdb.ResultIterator, error) { + txn := s.db.Txn(false) + + iter, err := txn.Get("csi_volumes", "id_prefix", volumeID) + if err != nil { + return nil, err + } + + ws.Add(iter.WatchCh()) + + // Filter the iterator by namespace + f := func(raw interface{}) bool { + v, ok := raw.(*structs.CSIVolume) + if !ok { + return false + } + return v.Namespace != namespace + } + + wrap := memdb.NewFilterIterator(iter, f) + return wrap, nil +} + // CSIVolumes looks up the entire csi_volumes table func (s *StateStore) CSIVolumes(ws memdb.WatchSet) (memdb.ResultIterator, error) { txn := s.db.Txn(false) @@ -1741,7 +1765,7 @@ func (s *StateStore) CSIVolumeDeregister(index uint64, ids []string) error { defer txn.Abort() for _, id := range ids { - existing, err := txn.First("csi_volumes", "id", id) + existing, err := txn.First("csi_volumes", "id_prefix", id) if err != nil { return fmt.Errorf("volume lookup failed: %s: %v", id, err) } @@ -1837,12 +1861,26 @@ func (s *StateStore) CSIPlugins(ws memdb.WatchSet) (memdb.ResultIterator, error) return iter, nil } +// CSIPluginsByIDPrefix supports search +func (s *StateStore) CSIPluginsByIDPrefix(ws memdb.WatchSet, pluginID string) (memdb.ResultIterator, error) { + txn := s.db.Txn(false) + + iter, err := txn.Get("csi_plugins", "id_prefix", pluginID) + if err != nil { + return nil, err + } + + ws.Add(iter.WatchCh()) + + return iter, nil +} + // CSIPluginByID returns the one named CSIPlugin func (s *StateStore) CSIPluginByID(ws memdb.WatchSet, id string) (*structs.CSIPlugin, error) { txn := s.db.Txn(false) defer txn.Abort() - raw, err := txn.First("csi_plugins", "id", id) + raw, err := txn.First("csi_plugins", "id_prefix", id) if err != nil { return nil, fmt.Errorf("csi_plugin lookup failed: %s %v", id, err) } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 7a758605a..3c1fa2b6c 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -163,6 +163,8 @@ const ( Namespaces Context = "namespaces" Quotas Context = "quotas" All Context = "all" + Plugins Context = "plugins" + Volumes Context = "volumes" ) // NamespacedID is a tuple of an ID and a namespace diff --git a/nomad/testing.go b/nomad/testing.go index 3beeeb370..c5593eeaa 100644 --- a/nomad/testing.go +++ b/nomad/testing.go @@ -15,7 +15,9 @@ import ( "github.com/hashicorp/nomad/helper/pluginutils/catalog" "github.com/hashicorp/nomad/helper/pluginutils/singleton" "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/version" ) @@ -154,3 +156,71 @@ func TestJoin(t testing.T, s1 *Server, other ...*Server) { } } } + +// CreateTestPlugin is a helper that generates the node + fingerprint results necessary to +// create a CSIPlugin by directly inserting into the state store. It's exported for use in +// other test packages +func CreateTestCSIPlugin(s *state.StateStore, id string) func() { + // Create some nodes + ns := make([]*structs.Node, 3) + for i := range ns { + n := mock.Node() + ns[i] = n + } + + // Install healthy plugin fingerprinting results + ns[0].CSIControllerPlugins = map[string]*structs.CSIInfo{ + id: { + PluginID: id, + AllocID: uuid.Generate(), + Healthy: true, + HealthDescription: "healthy", + RequiresControllerPlugin: true, + RequiresTopologies: false, + ControllerInfo: &structs.CSIControllerInfo{ + SupportsReadOnlyAttach: true, + SupportsAttachDetach: true, + SupportsListVolumes: true, + SupportsListVolumesAttachedNodes: false, + }, + }, + } + + // Install healthy plugin fingerprinting results + allocID := uuid.Generate() + for _, n := range ns[1:] { + n.CSINodePlugins = map[string]*structs.CSIInfo{ + id: { + PluginID: id, + AllocID: allocID, + Healthy: true, + HealthDescription: "healthy", + RequiresControllerPlugin: true, + RequiresTopologies: false, + NodeInfo: &structs.CSINodeInfo{ + ID: n.ID, + MaxVolumes: 64, + RequiresNodeStageVolume: true, + }, + }, + } + } + + // Insert them into the state store + index := uint64(999) + for _, n := range ns { + index++ + s.UpsertNode(index, n) + } + + // Return cleanup function that deletes the nodes + return func() { + ids := make([]string, len(ns)) + for i, n := range ns { + ids[i] = n.ID + } + + index++ + s.DeleteNode(index, ids) + } +}