CSI: implement support for topology (#12129)

This commit is contained in:
Tim Gross 2022-03-01 10:15:46 -05:00 committed by GitHub
parent c90e674918
commit f2a4ad0949
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 598 additions and 118 deletions

3
.changelog/12129.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
csi: Added support for storage topology
```

View File

@ -252,11 +252,21 @@ func (w *WriteOptions) SetHeadersFromCSISecrets(secrets CSISecrets) {
// CSIVolume is used for serialization, see also nomad/structs/csi.go
type CSIVolume struct {
ID string
Name string
ExternalID string `mapstructure:"external_id" hcl:"external_id"`
Namespace string
Topologies []*CSITopology
ID string
Name string
ExternalID string `mapstructure:"external_id" hcl:"external_id"`
Namespace string
// RequestedTopologies are the topologies submitted as options to
// the storage provider at the time the volume was created. After
// volumes are created, this field is ignored.
RequestedTopologies *CSITopologyRequest `hcl:"topology_request"`
// Topologies are the topologies returned by the storage provider,
// based on the RequestedTopologies and what the storage provider
// could support. This value cannot be set by the user.
Topologies []*CSITopology
AccessMode CSIVolumeAccessMode `hcl:"access_mode"`
AttachmentMode CSIVolumeAttachmentMode `hcl:"attachment_mode"`
MountOptions *CSIMountOptions `hcl:"mount_options"`

View File

@ -614,8 +614,13 @@ type NodeReservedNetworkResources struct {
ReservedHostPorts string
}
type CSITopologyRequest struct {
Required []*CSITopology `hcl:"required"`
Preferred []*CSITopology `hcl:"preferred"`
}
type CSITopology struct {
Segments map[string]string
Segments map[string]string `hcl:"segments"`
}
// CSINodeInfo is the fingerprinted data from a CSI Plugin that is specific to

View File

@ -216,6 +216,12 @@ func (c *CSI) ControllerCreateVolume(req *structs.ClientCSIControllerCreateVolum
resp.CapacityBytes = cresp.Volume.CapacityBytes
resp.VolumeContext = cresp.Volume.VolumeContext
resp.Topologies = make([]*nstructs.CSITopology, len(cresp.Volume.AccessibleTopology))
for _, topo := range cresp.Volume.AccessibleTopology {
resp.Topologies = append(resp.Topologies,
&nstructs.CSITopology{Segments: topo.Segments})
}
return nil
}

View File

@ -211,17 +211,16 @@ type ClientCSIControllerDetachVolumeResponse struct{}
// Nomad client to tell a CSI controller plugin on that client to perform
// CreateVolume
type ClientCSIControllerCreateVolumeRequest struct {
Name string
VolumeCapabilities []*structs.CSIVolumeCapability
MountOptions *structs.CSIMountOptions
Parameters map[string]string
Secrets structs.CSISecrets
CapacityMin int64
CapacityMax int64
SnapshotID string
CloneID string
// TODO: topology is not yet supported
// TopologyRequirement
Name string
VolumeCapabilities []*structs.CSIVolumeCapability
MountOptions *structs.CSIMountOptions
Parameters map[string]string
Secrets structs.CSISecrets
CapacityMin int64
CapacityMax int64
SnapshotID string
CloneID string
RequestedTopologies *structs.CSITopologyRequest
CSIControllerQuery
}
@ -237,8 +236,10 @@ func (req *ClientCSIControllerCreateVolumeRequest) ToCSIRequest() (*csi.Controll
CloneID: req.CloneID,
SnapshotID: req.SnapshotID,
},
// TODO: topology is not yet supported
AccessibilityRequirements: &csi.TopologyRequirement{},
AccessibilityRequirements: &csi.TopologyRequirement{
Requisite: []*csi.Topology{},
Preferred: []*csi.Topology{},
},
}
// The CSI spec requires that at least one of the fields in CapacityRange
@ -258,6 +259,21 @@ func (req *ClientCSIControllerCreateVolumeRequest) ToCSIRequest() (*csi.Controll
}
creq.VolumeCapabilities = append(creq.VolumeCapabilities, ccap)
}
if req.RequestedTopologies != nil {
for _, topo := range req.RequestedTopologies.Required {
creq.AccessibilityRequirements.Requisite = append(
creq.AccessibilityRequirements.Requisite, &csi.Topology{
Segments: topo.Segments,
})
}
for _, topo := range req.RequestedTopologies.Preferred {
creq.AccessibilityRequirements.Preferred = append(
creq.AccessibilityRequirements.Preferred, &csi.Topology{
Segments: topo.Segments,
})
}
}
return creq, nil
}
@ -265,9 +281,7 @@ type ClientCSIControllerCreateVolumeResponse struct {
ExternalVolumeID string
CapacityBytes int64
VolumeContext map[string]string
// TODO: topology is not yet supported
// AccessibleTopology []*Topology
Topologies []*structs.CSITopology
}
// ClientCSIControllerDeleteVolumeRequest the RPC made from the server to a

View File

@ -508,6 +508,13 @@ func structsCSIVolumeToApi(vol *structs.CSIVolume) *api.CSIVolume {
ModifyIndex: vol.ModifyIndex,
}
if vol.RequestedTopologies != nil {
out.RequestedTopologies = &api.CSITopologyRequest{
Preferred: structsCSITopolgiesToApi(vol.RequestedTopologies.Preferred),
Required: structsCSITopolgiesToApi(vol.RequestedTopologies.Required),
}
}
// WriteAllocs and ReadAllocs will only ever contain the Allocation ID,
// with a null value for the Allocation; these IDs are mapped to
// allocation stubs in the Allocations field. This indirection is so the
@ -725,9 +732,11 @@ func structsTaskEventToApi(te *structs.TaskEvent) *api.TaskEvent {
func structsCSITopolgiesToApi(tops []*structs.CSITopology) []*api.CSITopology {
out := make([]*api.CSITopology, 0, len(tops))
for _, t := range tops {
out = append(out, &api.CSITopology{
Segments: t.Segments,
})
if t != nil {
out = append(out, &api.CSITopology{
Segments: t.Segments,
})
}
}
return out

View File

@ -118,6 +118,12 @@ func (c *PluginStatusCommand) csiFormatPlugin(plug *api.CSIPlugin) (string, erro
full = append(full, c.Colorize().Color("\n[bold]Node Capabilities[reset]"))
full = append(full, nodeCaps)
}
topos := c.formatTopology(plug.Nodes)
if topos != "" {
full = append(full, c.Colorize().Color("\n[bold]Accessible Topologies[reset]"))
full = append(full, topos)
}
}
// Format the allocs
@ -183,6 +189,9 @@ func (c *PluginStatusCommand) formatControllerCaps(controllers map[string]*api.C
func (c *PluginStatusCommand) formatNodeCaps(nodes map[string]*api.CSIInfo) string {
caps := []string{}
for _, node := range nodes {
if node.RequiresTopologies {
caps = append(caps, "VOLUME_ACCESSIBILITY_CONSTRAINTS")
}
switch info := node.NodeInfo; {
case info.RequiresNodeStageVolume:
caps = append(caps, "STAGE_UNSTAGE_VOLUME")
@ -207,3 +216,21 @@ func (c *PluginStatusCommand) formatNodeCaps(nodes map[string]*api.CSIInfo) stri
return " " + strings.Join(sort.StringSlice(caps), "\n ")
}
func (c *PluginStatusCommand) formatTopology(nodes map[string]*api.CSIInfo) string {
rows := []string{"Node ID|Accessible Topology"}
for nodeID, node := range nodes {
if node.NodeInfo.AccessibleTopology != nil {
segments := node.NodeInfo.AccessibleTopology.Segments
segmentPairs := make([]string, 0, len(segments))
for k, v := range segments {
segmentPairs = append(segmentPairs, fmt.Sprintf("%s=%s", k, v))
}
rows = append(rows, fmt.Sprintf("%s|%s", nodeID[:8], strings.Join(segmentPairs, ",")))
}
}
if len(rows) == 1 {
return ""
}
return formatList(rows)
}

View File

@ -133,7 +133,7 @@ capacity_max = "20G"
# capabilities to validate. Registering an existing volume will record but
# ignore these fields.
capability {
access_mode = "single-node-writer"
access_mode = "single-node-writer"
attachment_mode = "file-system"
}
@ -150,6 +150,18 @@ mount_options {
mount_flags = ["ro"]
}
# Optional: specify one or more locations where the volume must be accessible
# from. Refer to the plugin documentation for what segment values are supported.
topology_request {
preferred {
topology { segments { rack = "R1" } }
}
required {
topology { segments { rack = "R1" } }
topology { segments { rack = "R2", zone = "us-east-1a" } }
}
}
# Optional: provide any secrets specified by the plugin.
secrets {
example_secret = "xyzzy"
@ -201,6 +213,34 @@ var defaultJsonVolumeSpec = strings.TrimSpace(`
]
}
],
"topology_request": {
"preferred": [
{
"topology": {
"segments": {
"rack": "R1"
}
}
}
],
"required": [
{
"topology": {
"segments": {
"rack": "R1"
}
}
},
{
"topology": {
"segments": {
"rack": "R2",
"zone": "us-east-1a"
}
}
}
]
},
"parameters": [
{
"skuname": "Premium_LRS"

View File

@ -48,6 +48,7 @@ func csiDecodeVolume(input *ast.File) (*api.CSIVolume, error) {
delete(m, "mount_options")
delete(m, "capacity_max")
delete(m, "capacity_min")
delete(m, "topology_request")
delete(m, "type")
// Decode the rest
@ -116,6 +117,48 @@ func csiDecodeVolume(input *ast.File) (*api.CSIVolume, error) {
}
}
requestedTopos := list.Filter("topology_request")
if len(requestedTopos.Items) > 0 {
vol.RequestedTopologies = &api.CSITopologyRequest{}
for _, o := range requestedTopos.Elem().Items {
if err := helper.CheckHCLKeys(o.Val, []string{"preferred", "required"}); err != nil {
return nil, err
}
ot, ok := o.Val.(*ast.ObjectType)
if !ok {
break
}
// topology_request -> required|preferred -> []topology -> []segments (kv)
decoded := map[string][]map[string][]map[string][]map[string]string{}
if err := hcl.DecodeObject(&decoded, ot.List); err != nil {
return nil, err
}
getTopologies := func(topKey string) []*api.CSITopology {
for _, topo := range decoded[topKey] {
var topos []*api.CSITopology
for _, segments := range topo["topology"] {
for _, segment := range segments["segments"] {
if len(segment) > 0 {
topos = append(topos, &api.CSITopology{Segments: segment})
}
}
}
if len(topos) > 0 {
return topos
}
}
return nil
}
vol.RequestedTopologies.Required = getTopologies("required")
vol.RequestedTopologies.Preferred = getTopologies("preferred")
}
}
return vol, nil
}

View File

@ -84,6 +84,17 @@ capability {
access_mode = "single-node-reader-only"
attachment_mode = "block-device"
}
topology_request {
preferred {
topology { segments {rack = "R1"} }
}
required {
topology { segments {rack = "R1"} }
topology { segments {rack = "R2", zone = "us-east-1a"} }
}
}
`,
expected: &api.CSIVolume{
ID: "testvolume",
@ -108,6 +119,16 @@ capability {
},
Parameters: map[string]string{"skuname": "Premium_LRS"},
Secrets: map[string]string{"password": "xyzzy"},
RequestedTopologies: &api.CSITopologyRequest{
Required: []*api.CSITopology{
{Segments: map[string]string{"rack": "R1"}},
{Segments: map[string]string{"rack": "R2", "zone": "us-east-1a"}},
},
Preferred: []*api.CSITopology{
{Segments: map[string]string{"rack": "R1"}},
},
},
Topologies: nil, // this is left empty
},
err: "",
}, {
@ -124,6 +145,19 @@ capability {
access_mode = "single-node-writer"
attachment_mode = "file-system"
}
topology_request {
# make sure we safely handle empty blocks even
# if they're invalid
preferred {
topology {}
topology { segments {} }
}
required {
topology { segments { rack = "R2", zone = "us-east-1a"} }
}
}
`,
expected: &api.CSIVolume{
ID: "testvolume",
@ -136,6 +170,13 @@ capability {
AttachmentMode: api.CSIVolumeAttachmentModeFilesystem,
},
},
RequestedTopologies: &api.CSITopologyRequest{
Required: []*api.CSITopology{
{Segments: map[string]string{"rack": "R2", "zone": "us-east-1a"}},
},
Preferred: nil,
},
Topologies: nil,
},
err: "",
},

View File

@ -212,43 +212,42 @@ func (c *VolumeStatusCommand) formatBasic(vol *api.CSIVolume) (string, error) {
return formatKV(output), nil
}
full := []string{formatKV(output)}
if len(vol.Topologies) > 0 {
topoBanner := c.Colorize().Color("\n[bold]Topologies[reset]")
topo := c.formatTopology(vol)
full = append(full, topoBanner)
full = append(full, topo)
}
// Format the allocs
banner := c.Colorize().Color("\n[bold]Allocations[reset]")
allocs := formatAllocListStubs(vol.Allocations, c.verbose, c.length)
full := []string{formatKV(output), banner, allocs}
full = append(full, banner)
full = append(full, allocs)
return strings.Join(full, "\n"), nil
}
func (c *VolumeStatusCommand) formatTopologies(vol *api.CSIVolume) string {
var out []string
// Find the union of all the keys
head := map[string]string{}
for _, t := range vol.Topologies {
for key := range t.Segments {
if _, ok := head[key]; !ok {
head[key] = ""
}
func (c *VolumeStatusCommand) formatTopology(vol *api.CSIVolume) string {
rows := []string{"Topology|Segments"}
for i, t := range vol.Topologies {
segmentPairs := make([]string, 0, len(t.Segments))
for k, v := range t.Segments {
segmentPairs = append(segmentPairs, fmt.Sprintf("%s=%s", k, v))
}
// note: this looks awkward because we don't have any other
// place where we list collections of arbitrary k/v's like
// this without just dumping JSON formatted outputs. It's likely
// the spec will expand to add extra fields, in which case we'll
// add them here and drop the first column
rows = append(rows, fmt.Sprintf("%02d|%v", i, strings.Join(segmentPairs, ", ")))
}
// Append the header
var line []string
for key := range head {
line = append(line, key)
if len(rows) == 1 {
return ""
}
out = append(out, strings.Join(line, " "))
// Append each topology
for _, t := range vol.Topologies {
line = []string{}
for key := range head {
line = append(line, t.Segments[key])
}
out = append(out, strings.Join(line, " "))
}
return strings.Join(out, "\n")
return formatList(rows)
}
func csiVolMountOption(volume, request *api.CSIMountOptions) string {

View File

@ -226,9 +226,13 @@ func volumeRegister(volID, volFilePath, createOrRegister string) error {
}
// hack off the first line to replace with our unique ID
var re = regexp.MustCompile(`(?m)^id ".*"`)
volspec := re.ReplaceAllString(string(content),
fmt.Sprintf("id = \"%s\"", volID))
var idRegex = regexp.MustCompile(`(?m)^id ".*"`)
volspec := idRegex.ReplaceAllString(string(content),
fmt.Sprintf("id = %q", volID))
var nameRegex = regexp.MustCompile(`(?m)^name ".*"`)
volspec = nameRegex.ReplaceAllString(volspec,
fmt.Sprintf("name = %q", volID))
go func() {
defer stdin.Close()

View File

@ -1,5 +1,5 @@
id = "ebs-vol[0]"
name = "this-is-a-test-0" # CSIVolumeName tag
name = "idempotency-token" # CSIVolumeName tag, must be idempotent
type = "csi"
plugin_id = "aws-ebs0"
@ -19,3 +19,14 @@ capability {
parameters {
type = "gp2"
}
topology_request {
required {
topology {
segments {
# this zone should match the one set in e2e/terraform/variables.tf
"topology.ebs.csi.aws.com/zone" = "us-east-1b"
}
}
}
}

View File

@ -1,5 +1,5 @@
id = "ebs-vol[1]"
name = "this-is-a-test-1" # CSIVolumeName tag
name = "idempotency-token" # CSIVolumeName tag
type = "csi"
plugin_id = "aws-ebs0"
@ -19,3 +19,14 @@ capability {
parameters {
type = "gp2"
}
topology_request {
required {
topology {
segments {
# this zone should match the one set in e2e/terraform/variables.tf
"topology.ebs.csi.aws.com/zone" = "us-east-1b"
}
}
}
}

View File

@ -22,7 +22,7 @@ job "plugin-aws-ebs-controller" {
driver = "docker"
config {
image = "amazon/aws-ebs-csi-driver:v0.9.0"
image = "public.ecr.aws/ebs-csi-driver/aws-ebs-csi-driver:v1.5.1"
args = [
"controller",

View File

@ -19,7 +19,7 @@ job "plugin-aws-ebs-nodes" {
driver = "docker"
config {
image = "amazon/aws-ebs-csi-driver:v0.9.0"
image = "public.ecr.aws/ebs-csi-driver/aws-ebs-csi-driver:v1.5.1"
args = [
"node",

View File

@ -10,7 +10,7 @@ variable "region" {
variable "availability_zone" {
description = "The AWS availability zone to deploy to."
default = "us-east-1a"
default = "us-east-1b"
}
variable "instance_type" {

View File

@ -301,6 +301,14 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru
if err := v.controllerValidateVolume(args, vol, plugin); err != nil {
return err
}
// The topologies for the volume have already been set when it was
// created, so we accept the user's description of that topology
if vol.Topologies == nil || len(vol.Topologies) == 0 {
if vol.RequestedTopologies != nil {
vol.Topologies = vol.RequestedTopologies.Required
}
}
}
resp, index, err := v.srv.raftApply(structs.CSIVolumeRegisterRequestType, args)
@ -898,15 +906,16 @@ func (v *CSIVolume) createVolume(vol *structs.CSIVolume, plugin *structs.CSIPlug
method := "ClientCSI.ControllerCreateVolume"
cReq := &cstructs.ClientCSIControllerCreateVolumeRequest{
Name: vol.Name,
VolumeCapabilities: vol.RequestedCapabilities,
MountOptions: vol.MountOptions,
Parameters: vol.Parameters,
Secrets: vol.Secrets,
CapacityMin: vol.RequestedCapacityMin,
CapacityMax: vol.RequestedCapacityMax,
SnapshotID: vol.SnapshotID,
CloneID: vol.CloneID,
Name: vol.Name,
VolumeCapabilities: vol.RequestedCapabilities,
MountOptions: vol.MountOptions,
Parameters: vol.Parameters,
Secrets: vol.Secrets,
CapacityMin: vol.RequestedCapacityMin,
CapacityMax: vol.RequestedCapacityMax,
SnapshotID: vol.SnapshotID,
CloneID: vol.CloneID,
RequestedTopologies: vol.RequestedTopologies,
}
cReq.PluginID = plugin.ID
cResp := &cstructs.ClientCSIControllerCreateVolumeResponse{}
@ -918,6 +927,7 @@ func (v *CSIVolume) createVolume(vol *structs.CSIVolume, plugin *structs.CSIPlug
vol.ExternalID = cResp.ExternalVolumeID
vol.Capacity = cResp.CapacityBytes
vol.Context = cResp.VolumeContext
vol.Topologies = cResp.Topologies
return nil
}

View File

@ -275,9 +275,10 @@ func TestCSIVolumeEndpoint_Claim(t *testing.T) {
ID: id0,
Namespace: structs.DefaultNamespace,
PluginID: "minnie",
Topologies: []*structs.CSITopology{{
Segments: map[string]string{"foo": "bar"},
}},
RequestedTopologies: &structs.CSITopologyRequest{
Required: []*structs.CSITopology{
{Segments: map[string]string{"foo": "bar"}}},
},
Secrets: structs.CSISecrets{"mysecret": "secretvalue"},
RequestedCapabilities: []*structs.CSIVolumeCapability{{
AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter,
@ -754,6 +755,9 @@ func TestCSIVolumeEndpoint_Create(t *testing.T) {
ExternalVolumeID: "vol-12345",
CapacityBytes: 42,
VolumeContext: map[string]string{"plugincontext": "bar"},
Topologies: []*structs.CSITopology{
{Segments: map[string]string{"rack": "R1"}},
},
}
client, cleanup := client.TestClientWithRPCs(t,
@ -829,6 +833,10 @@ func TestCSIVolumeEndpoint_Create(t *testing.T) {
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
},
},
Topologies: []*structs.CSITopology{
{Segments: map[string]string{"rack": "R1"}},
{Segments: map[string]string{"zone": "Z2"}},
},
}}
// Create the create request
@ -872,6 +880,7 @@ func TestCSIVolumeEndpoint_Create(t *testing.T) {
require.Equal(t, int64(42), vol.Capacity)
require.Equal(t, "bar", vol.Context["plugincontext"])
require.Equal(t, "", vol.Context["mycontext"])
require.Equal(t, map[string]string{"rack": "R1"}, vol.Topologies[0].Segments)
}
func TestCSIVolumeEndpoint_Delete(t *testing.T) {

View File

@ -246,9 +246,19 @@ type CSIVolume struct {
// Name is a display name for the volume, not required to be unique
Name string
// ExternalID identifies the volume for the CSI interface, may be URL unsafe
ExternalID string
Namespace string
Topologies []*CSITopology
ExternalID string
Namespace string
// RequestedTopologies are the topologies submitted as options to
// the storage provider at the time the volume was created. After
// volumes are created, this field is ignored.
RequestedTopologies *CSITopologyRequest
// Topologies are the topologies returned by the storage provider,
// based on the RequestedTopologies and what the storage provider
// could support. This value cannot be set by the user.
Topologies []*CSITopology
AccessMode CSIVolumeAccessMode // *current* access mode
AttachmentMode CSIVolumeAttachmentMode // *current* attachment mode
MountOptions *CSIMountOptions
@ -679,20 +689,18 @@ func (v *CSIVolume) Validate() error {
if len(v.RequestedCapabilities) == 0 {
errs = append(errs, "must include at least one capability block")
}
// TODO: Volume Topologies are optional - We should check to see if the plugin
// the volume is being registered with requires them.
// var ok bool
// for _, t := range v.Topologies {
// if t != nil && len(t.Segments) > 0 {
// ok = true
// break
// }
// }
// if !ok {
// errs = append(errs, "missing topology")
// }
if v.RequestedTopologies != nil {
for _, t := range v.RequestedTopologies.Required {
if t != nil && len(t.Segments) == 0 {
errs = append(errs, "required topology is missing segments field")
}
}
for _, t := range v.RequestedTopologies.Preferred {
if t != nil && len(t.Segments) == 0 {
errs = append(errs, "preferred topology is missing segments field")
}
}
}
if len(errs) > 0 {
return fmt.Errorf("validation: %s", strings.Join(errs, ", "))
}
@ -836,9 +844,6 @@ type CSIVolumeExternalStub struct {
CloneID string
SnapshotID string
// TODO: topology support
// AccessibleTopology []*Topology
PublishedExternalNodeIDs []string
IsAbnormal bool
Status string

View File

@ -554,6 +554,21 @@ func TestVolume_Copy(t *testing.T) {
}
func TestCSIVolume_Validate(t *testing.T) {
vol := &CSIVolume{
ID: "test",
PluginID: "test",
SnapshotID: "test-snapshot",
CloneID: "test-clone",
RequestedTopologies: &CSITopologyRequest{
Required: []*CSITopology{{}, {}},
},
}
err := vol.Validate()
require.EqualError(t, err, "validation: missing namespace, only one of snapshot_id and clone_id is allowed, must include at least one capability block, required topology is missing segments field, required topology is missing segments field")
}
func TestCSIPluginJobs(t *testing.T) {
plug := NewCSIPlugin("foo", 1000)
controller := &Job{

View File

@ -62,6 +62,14 @@ func (t *CSITopology) Equal(o *CSITopology) bool {
return helper.CompareMapStringString(t.Segments, o.Segments)
}
// CSITopologyRequest are the topologies submitted as options to the
// storage provider at the time the volume was created. The storage
// provider will return a single topology.
type CSITopologyRequest struct {
Required []*CSITopology
Preferred []*CSITopology
}
// CSINodeInfo is the fingerprinted data from a CSI Plugin that is specific to
// the Node API.
type CSINodeInfo struct {

View File

@ -740,6 +740,11 @@ func (c *client) NodeGetInfo(ctx context.Context) (*NodeGetInfoResponse, error)
result.MaxVolumes = math.MaxInt64
}
topo := resp.GetAccessibleTopology()
if topo != nil {
result.AccessibleTopology = &Topology{Segments: topo.Segments}
}
return result, nil
}

View File

@ -746,7 +746,7 @@ func TestClient_RPC_ControllerCreateVolume(t *testing.T) {
},
{
Name: "handles success with capacity range and source",
Name: "handles success with capacity range, source, and topology",
CapacityRange: &CapacityRange{
RequiredBytes: 500,
LimitBytes: 1000,
@ -764,6 +764,9 @@ func TestClient_RPC_ControllerCreateVolume(t *testing.T) {
},
},
},
AccessibleTopology: []*csipbv1.Topology{
{Segments: map[string]string{"rack": "R1"}},
},
},
},
},
@ -782,10 +785,19 @@ func TestClient_RPC_ControllerCreateVolume(t *testing.T) {
AccessMode: VolumeAccessModeMultiNodeMultiWriter,
},
},
Parameters: map[string]string{},
Secrets: structs.CSISecrets{},
ContentSource: tc.ContentSource,
AccessibilityRequirements: &TopologyRequirement{},
Parameters: map[string]string{},
Secrets: structs.CSISecrets{},
ContentSource: tc.ContentSource,
AccessibilityRequirements: &TopologyRequirement{
Requisite: []*Topology{
{
Segments: map[string]string{"rack": "R1"},
},
{
Segments: map[string]string{"rack": "R2"},
},
},
},
}
cc.NextCreateVolumeResponse = tc.Response
@ -808,6 +820,14 @@ func TestClient_RPC_ControllerCreateVolume(t *testing.T) {
require.Equal(t, tc.ContentSource.CloneID, resp.Volume.ContentSource.CloneID)
require.Equal(t, tc.ContentSource.SnapshotID, resp.Volume.ContentSource.SnapshotID)
}
if tc.Response != nil && tc.Response.Volume != nil {
require.Len(t, resp.Volume.AccessibleTopology, 1)
require.Equal(t,
req.AccessibilityRequirements.Requisite[0].Segments,
resp.Volume.AccessibleTopology[0].Segments,
)
}
})
}
}

View File

@ -581,10 +581,11 @@ type ControllerCreateVolumeResponse struct {
func NewCreateVolumeResponse(resp *csipbv1.CreateVolumeResponse) *ControllerCreateVolumeResponse {
vol := resp.GetVolume()
return &ControllerCreateVolumeResponse{Volume: &Volume{
CapacityBytes: vol.GetCapacityBytes(),
ExternalVolumeID: vol.GetVolumeId(),
VolumeContext: vol.GetVolumeContext(),
ContentSource: newVolumeContentSource(vol.GetContentSource()),
CapacityBytes: vol.GetCapacityBytes(),
ExternalVolumeID: vol.GetVolumeId(),
VolumeContext: vol.GetVolumeContext(),
ContentSource: newVolumeContentSource(vol.GetContentSource()),
AccessibleTopology: newTopologies(vol.GetAccessibleTopology()),
}}
}

View File

@ -27,6 +27,7 @@ const (
FilterConstraintCSIVolumeGCdAllocationTemplate = "CSI volume %s has exhausted its available writer claims and is claimed by a garbage collected allocation %s; waiting for claim to be released"
FilterConstraintDrivers = "missing drivers"
FilterConstraintDevices = "missing devices"
FilterConstraintsCSIPluginTopology = "did not meet topology requirement"
)
var (
@ -313,6 +314,22 @@ func (c *CSIVolumeChecker) isFeasible(n *structs.Node) (bool, string) {
return false, fmt.Sprintf(FilterConstraintCSIPluginMaxVolumesTemplate, vol.PluginID, n.ID)
}
// CSI spec: "If requisite is specified, the provisioned
// volume MUST be accessible from at least one of the
// requisite topologies."
if len(vol.Topologies) > 0 {
var ok bool
for _, requiredTopo := range vol.Topologies {
if requiredTopo.Equal(plugin.NodeInfo.AccessibleTopology) {
ok = true
break
}
}
if !ok {
return false, FilterConstraintsCSIPluginTopology
}
}
if req.ReadOnly {
if !vol.ReadSchedulable() {
return false, fmt.Sprintf(FilterConstraintCSIVolumeNoReadTemplate, vol.ID)

View File

@ -10,6 +10,7 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
psstructs "github.com/hashicorp/nomad/plugins/shared/structs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -240,6 +241,8 @@ func TestCSIVolumeChecker(t *testing.T) {
mock.Node(),
mock.Node(),
mock.Node(),
mock.Node(),
mock.Node(),
}
// Register running plugins on some nodes
@ -254,28 +257,69 @@ func TestCSIVolumeChecker(t *testing.T) {
"foo": {
PluginID: "foo",
Healthy: true,
NodeInfo: &structs.CSINodeInfo{MaxVolumes: 1},
NodeInfo: &structs.CSINodeInfo{
MaxVolumes: 1,
AccessibleTopology: &structs.CSITopology{
Segments: map[string]string{"rack": "R1"},
},
},
},
}
nodes[1].CSINodePlugins = map[string]*structs.CSIInfo{
"foo": {
PluginID: "foo",
Healthy: false,
NodeInfo: &structs.CSINodeInfo{MaxVolumes: 1},
NodeInfo: &structs.CSINodeInfo{
MaxVolumes: 1,
AccessibleTopology: &structs.CSITopology{
Segments: map[string]string{"rack": "R1"},
},
},
},
}
nodes[2].CSINodePlugins = map[string]*structs.CSIInfo{
"bar": {
PluginID: "bar",
Healthy: true,
NodeInfo: &structs.CSINodeInfo{MaxVolumes: 1},
NodeInfo: &structs.CSINodeInfo{
MaxVolumes: 1,
AccessibleTopology: &structs.CSITopology{
Segments: map[string]string{"rack": "R1"},
},
},
},
}
nodes[4].CSINodePlugins = map[string]*structs.CSIInfo{
"foo": {
PluginID: "foo",
Healthy: true,
NodeInfo: &structs.CSINodeInfo{MaxVolumes: 1},
NodeInfo: &structs.CSINodeInfo{
MaxVolumes: 1,
AccessibleTopology: &structs.CSITopology{
Segments: map[string]string{"rack": "R1"},
},
},
},
}
nodes[5].CSINodePlugins = map[string]*structs.CSIInfo{
"foo": {
PluginID: "foo",
Healthy: true,
NodeInfo: &structs.CSINodeInfo{
MaxVolumes: 1,
AccessibleTopology: &structs.CSITopology{
Segments: map[string]string{"rack": "R4"},
},
},
},
}
nodes[6].CSINodePlugins = map[string]*structs.CSIInfo{
"foo": {
PluginID: "foo",
Healthy: true,
NodeInfo: &structs.CSINodeInfo{
MaxVolumes: 1,
},
},
}
@ -294,6 +338,10 @@ func TestCSIVolumeChecker(t *testing.T) {
vol.Namespace = structs.DefaultNamespace
vol.AccessMode = structs.CSIVolumeAccessModeMultiNodeMultiWriter
vol.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem
vol.Topologies = []*structs.CSITopology{
{Segments: map[string]string{"rack": "R1"}},
{Segments: map[string]string{"rack": "R2"}},
}
err := state.CSIVolumeRegister(index, []*structs.CSIVolume{vol})
require.NoError(t, err)
index++
@ -361,52 +409,70 @@ func TestCSIVolumeChecker(t *testing.T) {
checker.SetNamespace(structs.DefaultNamespace)
cases := []struct {
Name string
Node *structs.Node
RequestedVolumes map[string]*structs.VolumeRequest
Result bool
}{
{ // Get it
{
Name: "ok",
Node: nodes[0],
RequestedVolumes: volumes,
Result: true,
},
{ // Unhealthy
{
Name: "unhealthy node",
Node: nodes[1],
RequestedVolumes: volumes,
Result: false,
},
{ // Wrong id
{
Name: "wrong id",
Node: nodes[2],
RequestedVolumes: volumes,
Result: false,
},
{ // No Volumes requested or available
{
Name: "no volumes requested or available",
Node: nodes[3],
RequestedVolumes: noVolumes,
Result: true,
},
{ // No Volumes requested, some available
{
Name: "no volumes requested, some available",
Node: nodes[0],
RequestedVolumes: noVolumes,
Result: true,
},
{ // Volumes requested, none available
{
Name: "volumes requested, none available",
Node: nodes[3],
RequestedVolumes: volumes,
Result: false,
},
{ // Volumes requested, MaxVolumes exceeded
{
Name: "volumes requested, max volumes exceeded",
Node: nodes[4],
RequestedVolumes: volumes,
Result: false,
},
{
Name: "no matching topology",
Node: nodes[5],
RequestedVolumes: volumes,
Result: false,
},
{
Name: "nil topology",
Node: nodes[6],
RequestedVolumes: volumes,
Result: false,
},
}
for i, c := range cases {
for _, c := range cases {
checker.SetVolumes(alloc.Name, c.RequestedVolumes)
if act := checker.Feasible(c.Node); act != c.Result {
t.Fatalf("case(%d) failed: got %v; want %v", i, act, c.Result)
}
assert.Equal(t, c.Result, checker.Feasible(c.Node), c.Name)
}
// add a missing volume

View File

@ -59,6 +59,16 @@ mount_options {
mount_flags = ["noatime"]
}
topology_request {
required {
topology { segments { "rack" = "R2" } }
topology { segments { "rack" = "R1", "zone" = "us-east-1a"} }
}
preferred {
topology { segments { "rack" = "R1", "zone" = "us-east-1a"} }
}
}
secrets {
example_secret = "xyzzy"
}
@ -134,6 +144,15 @@ parameters {
- `mount_flags` `([]string: <optional>)` - The flags passed to `mount`
(ex. `["ro", "noatime"]`)
- `topology_request` <code>([TopologyRequest](#topology_request-parameters): nil)</code> -
Specify locations (region, zone, rack, etc.) where the provisioned
volume must be accessible from. Consult the documentation for your
storage provider and CSI plugin as to whether it supports defining
topology and what values it expects for topology
segments. Specifying topology segments that aren't supported by the
storage provider may return an error or may be silently removed by
the plugin.
- `secrets` <code>(map<string|string>:nil)</code> - An optional
key-value map of strings used as credentials for publishing and
unpublishing volumes.
@ -144,6 +163,40 @@ parameters {
to each storage provider, so please see the specific plugin
documentation for more information.
### `topology_request` Parameters
For the `topology_request` field, you may specify a list of either
`required` or `preferred` topologies (or both). The `required`
topologies indicate that the volume must be created in a location
accessible from at least one of the listed topologies. The `preferred`
topologies indicate that you would prefer the storage provider to
create the volume in one of the provided topologies.
Each topology listed has a single field:
- `segments` `(map[string]string)` - A map of location types to their
values. The specific fields required are defined by the CSI
plugin. For example, a plugin might require defining both a rack and
a zone: `segments {rack = "R2", zone = "us-east-1a"}`.
For example:
```hcl
topology_request {
required {
topology { segments { "rack" = "R1", "zone" = "us-east-1a" } }
topology { segments { "rack" = "R2", "zone" = "us-east-1a" } }
}
preferred {
topology { segments { "rack" = "R1", "zone" = "us-east-1a"} }
}
}
```
This configuration indicates you require the volume to be created
within racks `R1` or `R2`, but that you prefer the volume to be
created within `R1`.
### Unused Fields
Note that several fields used in the [`volume register`] command are set

View File

@ -61,6 +61,13 @@ mount_options {
mount_flags = ["noatime"]
}
topology_request {
required {
topology { segments { "rack" = "R2" } }
topology { segments { "rack" = "R1", "zone" = "us-east-1a"} }
}
}
secrets {
example_secret = "xyzzy"
}
@ -120,6 +127,15 @@ context {
- `fs_type`: file system type (ex. `"ext4"`)
- `mount_flags`: the flags passed to `mount` (ex. `"ro,noatime"`)
- `topology_request` <code>([TopologyRequest](#topology_request-parameters): nil)</code> -
Specify locations (region, zone, rack, etc.) where the provisioned
volume is accessible from. Consult the documentation for your
storage provider and CSI plugin as to whether it supports defining
topology and what values it expects for topology
segments. Specifying topology segments that aren't supported by the
storage provider may return an error or may be silently removed by
the plugin.
- `secrets` <code>(map<string|string>:nil)</code> - An optional
key-value map of strings used as credentials for publishing and
unpublishing volumes.
@ -136,6 +152,38 @@ context {
each storage provider, so please see the specific plugin
documentation for more information.
### `topology_request` Parameters
For the `topology_request` field, you may specify a list of `required`
topologies. The `required` topologies indicate that the volume was
created in a location accessible from all the listed topologies.
Note this behavior is different from the `nomad volume create`
command, because the volume has already been created and you are
defining the topology for Nomad. The `register` command does not
support setting `preferred` topologies.
Each topology listed has a single field:
- `segments` `(map[string]string)` - A map of location types to their
values. The specific fields required are defined by the CSI
plugin. For example, a plugin might require defining both a rack and
a zone: `segments {rack = "R2", zone = "us-east-1a"}`.
For example:
```hcl
topology_request {
required {
topology { segments { "rack" = "R1", "zone" = "us-east-1a" } }
topology { segments { "rack" = "R2", "zone" = "us-east-1a" } }
}
}
```
This configuration indicates that the volume is accessible from both
racks `R1` or `R2`.
### Unused Fields
Note that several fields used in the [`volume create`] command are set