CSI: create/delete/list volume RPCs

This commit implements the RPC handlers on the client that talk to the CSI
plugins on that client for the Create/Delete/List RPC.
This commit is contained in:
Tim Gross 2021-03-22 09:43:30 -04:00
parent 43622680fa
commit d38008176e
10 changed files with 1452 additions and 130 deletions

View File

@ -39,24 +39,25 @@ func (c *CSI) ControllerValidateVolume(req *structs.ClientCSIControllerValidateV
defer metrics.MeasureSince([]string{"client", "csi_controller", "validate_volume"}, time.Now())
if req.VolumeID == "" {
return errors.New("VolumeID is required")
return errors.New("CSI.ControllerValidateVolume: VolumeID is required")
}
if req.PluginID == "" {
return errors.New("PluginID is required")
return errors.New("CSI.ControllerValidateVolume: PluginID is required")
}
plugin, err := c.findControllerPlugin(req.PluginID)
if err != nil {
// the server's view of the plugin health is stale, so let it know it
// should retry with another controller instance
return fmt.Errorf("%w: %v", nstructs.ErrCSIClientRPCRetryable, err)
return fmt.Errorf("CSI.ControllerValidateVolume: %w: %v",
nstructs.ErrCSIClientRPCRetryable, err)
}
defer plugin.Close()
csiReq, err := req.ToCSIRequest()
if err != nil {
return err
return fmt.Errorf("CSI.ControllerValidateVolume: %v", err)
}
ctx, cancelFn := c.requestContext()
@ -64,10 +65,14 @@ func (c *CSI) ControllerValidateVolume(req *structs.ClientCSIControllerValidateV
// CSI ValidateVolumeCapabilities errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
return plugin.ControllerValidateCapabilities(ctx, csiReq,
err = plugin.ControllerValidateCapabilities(ctx, csiReq,
grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
if err != nil {
return fmt.Errorf("CSI.ControllerValidateVolume: %v", err)
}
return nil
}
// ControllerAttachVolume is used to attach a volume from a CSI Cluster to
@ -84,7 +89,8 @@ func (c *CSI) ControllerAttachVolume(req *structs.ClientCSIControllerAttachVolum
if err != nil {
// the server's view of the plugin health is stale, so let it know it
// should retry with another controller instance
return fmt.Errorf("%w: %v", nstructs.ErrCSIClientRPCRetryable, err)
return fmt.Errorf("CSI.ControllerAttachVolume: %w: %v",
nstructs.ErrCSIClientRPCRetryable, err)
}
defer plugin.Close()
@ -94,16 +100,16 @@ func (c *CSI) ControllerAttachVolume(req *structs.ClientCSIControllerAttachVolum
// requests to plugins, and to aid with development.
if req.VolumeID == "" {
return errors.New("VolumeID is required")
return errors.New("CSI.ControllerAttachVolume: VolumeID is required")
}
if req.ClientCSINodeID == "" {
return errors.New("ClientCSINodeID is required")
return errors.New("CSI.ControllerAttachVolume: ClientCSINodeID is required")
}
csiReq, err := req.ToCSIRequest()
if err != nil {
return err
return fmt.Errorf("CSI.ControllerAttachVolume: %v", err)
}
// Submit the request for a volume to the CSI Plugin.
@ -116,7 +122,7 @@ func (c *CSI) ControllerAttachVolume(req *structs.ClientCSIControllerAttachVolum
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
if err != nil {
return err
return fmt.Errorf("CSI.ControllerAttachVolume: %v", err)
}
resp.PublishContext = cresp.PublishContext
@ -131,7 +137,8 @@ func (c *CSI) ControllerDetachVolume(req *structs.ClientCSIControllerDetachVolum
if err != nil {
// the server's view of the plugin health is stale, so let it know it
// should retry with another controller instance
return fmt.Errorf("%w: %v", nstructs.ErrCSIClientRPCRetryable, err)
return fmt.Errorf("CSI.ControllerDetachVolume: %w: %v",
nstructs.ErrCSIClientRPCRetryable, err)
}
defer plugin.Close()
@ -141,11 +148,11 @@ func (c *CSI) ControllerDetachVolume(req *structs.ClientCSIControllerDetachVolum
// requests to plugins, and to aid with development.
if req.VolumeID == "" {
return errors.New("VolumeID is required")
return errors.New("CSI.ControllerDetachVolume: VolumeID is required")
}
if req.ClientCSINodeID == "" {
return errors.New("ClientCSINodeID is required")
return errors.New("CSI.ControllerDetachVolume: ClientCSINodeID is required")
}
csiReq := req.ToCSIRequest()
@ -159,15 +166,148 @@ func (c *CSI) ControllerDetachVolume(req *structs.ClientCSIControllerDetachVolum
grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
if err != nil {
if errors.Is(err, nstructs.ErrCSIClientRPCIgnorable) {
// if the controller detach previously happened but the server failed to
// checkpoint, we'll get an error from the plugin but can safely ignore it.
c.c.logger.Debug("could not unpublish volume: %v", err)
return nil
}
return err
if errors.Is(err, nstructs.ErrCSIClientRPCIgnorable) {
// if the controller detach previously happened but the server failed to
// checkpoint, we'll get an error from the plugin but can safely ignore it.
c.c.logger.Debug("could not unpublish volume: %v", err)
return nil
}
if err != nil {
return fmt.Errorf("CSI.ControllerDetachVolume: %v", err)
}
return err
}
func (c *CSI) ControllerCreateVolume(req *structs.ClientCSIControllerCreateVolumeRequest, resp *structs.ClientCSIControllerCreateVolumeResponse) error {
defer metrics.MeasureSince([]string{"client", "csi_controller", "create_volume"}, time.Now())
plugin, err := c.findControllerPlugin(req.PluginID)
if err != nil {
// the server's view of the plugin health is stale, so let it know it
// should retry with another controller instance
return fmt.Errorf("CSI.ControllerCreateVolume: %w: %v",
nstructs.ErrCSIClientRPCRetryable, err)
}
defer plugin.Close()
csiReq, err := req.ToCSIRequest()
if err != nil {
return fmt.Errorf("CSI.ControllerCreateVolume: %v", err)
}
ctx, cancelFn := c.requestContext()
defer cancelFn()
// CSI ControllerCreateVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
cresp, err := plugin.ControllerCreateVolume(ctx, csiReq,
grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
if err != nil {
return fmt.Errorf("CSI.ControllerCreateVolume: %v", err)
}
if cresp == nil || cresp.Volume == nil {
c.c.logger.Warn("plugin did not return error or volume; this is a bug in the plugin and should be reported to the plugin author")
return fmt.Errorf("CSI.ControllerCreateVolume: plugin did not return error or volume")
}
resp.ExternalVolumeID = cresp.Volume.ExternalVolumeID
resp.CapacityBytes = cresp.Volume.CapacityBytes
resp.VolumeContext = cresp.Volume.VolumeContext
return nil
}
func (c *CSI) ControllerDeleteVolume(req *structs.ClientCSIControllerDeleteVolumeRequest, resp *structs.ClientCSIControllerDeleteVolumeResponse) error {
defer metrics.MeasureSince([]string{"client", "csi_controller", "delete_volume"}, time.Now())
plugin, err := c.findControllerPlugin(req.PluginID)
if err != nil {
// the server's view of the plugin health is stale, so let it know it
// should retry with another controller instance
return fmt.Errorf("CSI.ControllerDeleteVolume: %w: %v",
nstructs.ErrCSIClientRPCRetryable, err)
}
defer plugin.Close()
csiReq := req.ToCSIRequest()
ctx, cancelFn := c.requestContext()
defer cancelFn()
// CSI ControllerDeleteVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
err = plugin.ControllerDeleteVolume(ctx, csiReq,
grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
if errors.Is(err, nstructs.ErrCSIClientRPCIgnorable) {
// if the volume was deleted out-of-band, we'll get an error from
// the plugin but can safely ignore it
c.c.logger.Debug("could not delete volume: %v", err)
return nil
}
if err != nil {
return fmt.Errorf("CSI.ControllerDeleteVolume: %v", err)
}
return err
}
func (c *CSI) ControllerListVolumes(req *structs.ClientCSIControllerListVolumesRequest, resp *structs.ClientCSIControllerListVolumesResponse) error {
defer metrics.MeasureSince([]string{"client", "csi_controller", "list_volumes"}, time.Now())
plugin, err := c.findControllerPlugin(req.PluginID)
if err != nil {
// the server's view of the plugin health is stale, so let it know it
// should retry with another controller instance
return fmt.Errorf("CSI.ControllerListVolumes: %w: %v",
nstructs.ErrCSIClientRPCRetryable, err)
}
defer plugin.Close()
csiReq := req.ToCSIRequest()
ctx, cancelFn := c.requestContext()
defer cancelFn()
// CSI ControllerListVolumes errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
cresp, err := plugin.ControllerListVolumes(ctx, csiReq,
grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
if err != nil {
return fmt.Errorf("CSI.ControllerListVolumes: %v", err)
}
resp.NextToken = cresp.NextToken
resp.Entries = []*nstructs.CSIVolumeExternalStub{}
for _, entry := range cresp.Entries {
if entry.Volume == nil {
return fmt.Errorf("CSI.ControllerListVolumes: plugin returned an invalid entry")
}
vol := &nstructs.CSIVolumeExternalStub{
ExternalID: entry.Volume.ExternalVolumeID,
CapacityBytes: entry.Volume.CapacityBytes,
VolumeContext: entry.Volume.VolumeContext,
CloneID: entry.Volume.ContentSource.CloneID,
SnapshotID: entry.Volume.ContentSource.SnapshotID,
}
if entry.Status != nil {
vol.PublishedExternalNodeIDs = entry.Status.PublishedNodeIds
vol.IsAbnormal = entry.Status.VolumeCondition.Abnormal
if entry.Status.VolumeCondition != nil {
vol.Status = entry.Status.VolumeCondition.Message
}
}
resp.Entries = append(resp.Entries, vol)
if req.MaxEntries != 0 && int32(len(resp.Entries)) == req.MaxEntries {
break
}
}
return nil
}
@ -180,13 +320,13 @@ func (c *CSI) NodeDetachVolume(req *structs.ClientCSINodeDetachVolumeRequest, re
// real Nomad cluster. They serve as a defensive check before forwarding
// requests to plugins, and to aid with development.
if req.PluginID == "" {
return errors.New("PluginID is required")
return errors.New("CSI.NodeDetachVolume: PluginID is required")
}
if req.VolumeID == "" {
return errors.New("VolumeID is required")
return errors.New("CSI.NodeDetachVolume: VolumeID is required")
}
if req.AllocID == "" {
return errors.New("AllocID is required")
return errors.New("CSI.NodeDetachVolume: AllocID is required")
}
ctx, cancelFn := c.requestContext()
@ -194,7 +334,7 @@ func (c *CSI) NodeDetachVolume(req *structs.ClientCSINodeDetachVolumeRequest, re
mounter, err := c.c.csimanager.MounterForPlugin(ctx, req.PluginID)
if err != nil {
return err
return fmt.Errorf("CSI.NodeDetachVolume: %v", err)
}
usageOpts := &csimanager.UsageOptions{
@ -208,7 +348,7 @@ func (c *CSI) NodeDetachVolume(req *structs.ClientCSINodeDetachVolumeRequest, re
// if the unmounting previously happened but the server failed to
// checkpoint, we'll get an error from Unmount but can safely
// ignore it.
return err
return fmt.Errorf("CSI.NodeDetachVolume: %v", err)
}
return nil
}

View File

@ -41,7 +41,7 @@ func TestCSIController_AttachVolume(t *testing.T) {
PluginID: "some-garbage",
},
},
ExpectedErr: errors.New("CSI client error (retryable): plugin some-garbage for type csi-controller not found"),
ExpectedErr: errors.New("CSI.ControllerAttachVolume: CSI client error (retryable): plugin some-garbage for type csi-controller not found"),
},
{
Name: "validates volumeid is not empty",
@ -50,7 +50,7 @@ func TestCSIController_AttachVolume(t *testing.T) {
PluginID: fakePlugin.Name,
},
},
ExpectedErr: errors.New("VolumeID is required"),
ExpectedErr: errors.New("CSI.ControllerAttachVolume: VolumeID is required"),
},
{
Name: "validates nodeid is not empty",
@ -60,7 +60,7 @@ func TestCSIController_AttachVolume(t *testing.T) {
},
VolumeID: "1234-4321-1234-4321",
},
ExpectedErr: errors.New("ClientCSINodeID is required"),
ExpectedErr: errors.New("CSI.ControllerAttachVolume: ClientCSINodeID is required"),
},
{
Name: "validates AccessMode",
@ -73,7 +73,7 @@ func TestCSIController_AttachVolume(t *testing.T) {
AttachmentMode: nstructs.CSIVolumeAttachmentModeFilesystem,
AccessMode: nstructs.CSIVolumeAccessMode("foo"),
},
ExpectedErr: errors.New("Unknown volume access mode: foo"),
ExpectedErr: errors.New("CSI.ControllerAttachVolume: unknown volume access mode: foo"),
},
{
Name: "validates attachmentmode is not empty",
@ -86,7 +86,7 @@ func TestCSIController_AttachVolume(t *testing.T) {
AccessMode: nstructs.CSIVolumeAccessModeMultiNodeReader,
AttachmentMode: nstructs.CSIVolumeAttachmentMode("bar"),
},
ExpectedErr: errors.New("Unknown volume attachment mode: bar"),
ExpectedErr: errors.New("CSI.ControllerAttachVolume: unknown volume attachment mode: bar"),
},
{
Name: "returns transitive errors",
@ -102,7 +102,7 @@ func TestCSIController_AttachVolume(t *testing.T) {
AccessMode: nstructs.CSIVolumeAccessModeSingleNodeWriter,
AttachmentMode: nstructs.CSIVolumeAttachmentModeFilesystem,
},
ExpectedErr: errors.New("hello"),
ExpectedErr: errors.New("CSI.ControllerAttachVolume: hello"),
},
{
Name: "handles nil PublishContext",
@ -188,7 +188,7 @@ func TestCSIController_ValidateVolume(t *testing.T) {
PluginID: fakePlugin.Name,
},
},
ExpectedErr: errors.New("VolumeID is required"),
ExpectedErr: errors.New("CSI.ControllerValidateVolume: VolumeID is required"),
},
{
Name: "returns plugin not found errors",
@ -198,7 +198,7 @@ func TestCSIController_ValidateVolume(t *testing.T) {
},
VolumeID: "foo",
},
ExpectedErr: errors.New("CSI client error (retryable): plugin some-garbage for type csi-controller not found"),
ExpectedErr: errors.New("CSI.ControllerValidateVolume: CSI client error (retryable): plugin some-garbage for type csi-controller not found"),
},
{
Name: "validates attachmentmode",
@ -210,7 +210,7 @@ func TestCSIController_ValidateVolume(t *testing.T) {
AttachmentMode: nstructs.CSIVolumeAttachmentMode("bar"),
AccessMode: nstructs.CSIVolumeAccessModeMultiNodeReader,
},
ExpectedErr: errors.New("Unknown volume attachment mode: bar"),
ExpectedErr: errors.New("CSI.ControllerValidateVolume: unknown volume attachment mode: bar"),
},
{
Name: "validates AccessMode",
@ -222,7 +222,7 @@ func TestCSIController_ValidateVolume(t *testing.T) {
AttachmentMode: nstructs.CSIVolumeAttachmentModeFilesystem,
AccessMode: nstructs.CSIVolumeAccessMode("foo"),
},
ExpectedErr: errors.New("Unknown volume access mode: foo"),
ExpectedErr: errors.New("CSI.ControllerValidateVolume: unknown volume access mode: foo"),
},
{
Name: "returns transitive errors",
@ -237,7 +237,7 @@ func TestCSIController_ValidateVolume(t *testing.T) {
AccessMode: nstructs.CSIVolumeAccessModeSingleNodeWriter,
AttachmentMode: nstructs.CSIVolumeAttachmentModeFilesystem,
},
ExpectedErr: errors.New("hello"),
ExpectedErr: errors.New("CSI.ControllerValidateVolume: hello"),
},
}
@ -287,7 +287,7 @@ func TestCSIController_DetachVolume(t *testing.T) {
PluginID: "some-garbage",
},
},
ExpectedErr: errors.New("CSI client error (retryable): plugin some-garbage for type csi-controller not found"),
ExpectedErr: errors.New("CSI.ControllerDetachVolume: CSI client error (retryable): plugin some-garbage for type csi-controller not found"),
},
{
Name: "validates volumeid is not empty",
@ -296,7 +296,7 @@ func TestCSIController_DetachVolume(t *testing.T) {
PluginID: fakePlugin.Name,
},
},
ExpectedErr: errors.New("VolumeID is required"),
ExpectedErr: errors.New("CSI.ControllerDetachVolume: VolumeID is required"),
},
{
Name: "validates nodeid is not empty",
@ -306,7 +306,7 @@ func TestCSIController_DetachVolume(t *testing.T) {
},
VolumeID: "1234-4321-1234-4321",
},
ExpectedErr: errors.New("ClientCSINodeID is required"),
ExpectedErr: errors.New("CSI.ControllerDetachVolume: ClientCSINodeID is required"),
},
{
Name: "returns transitive errors",
@ -320,7 +320,7 @@ func TestCSIController_DetachVolume(t *testing.T) {
VolumeID: "1234-4321-1234-4321",
ClientCSINodeID: "abcde",
},
ExpectedErr: errors.New("hello"),
ExpectedErr: errors.New("CSI.ControllerDetachVolume: hello"),
},
}
@ -353,6 +353,281 @@ func TestCSIController_DetachVolume(t *testing.T) {
}
}
func TestCSIController_CreateVolume(t *testing.T) {
t.Parallel()
cases := []struct {
Name string
ClientSetupFunc func(*fake.Client)
Request *structs.ClientCSIControllerCreateVolumeRequest
ExpectedErr error
ExpectedResponse *structs.ClientCSIControllerCreateVolumeResponse
}{
{
Name: "returns plugin not found errors",
Request: &structs.ClientCSIControllerCreateVolumeRequest{
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: "some-garbage",
},
},
ExpectedErr: errors.New("CSI.ControllerCreateVolume: CSI client error (retryable): plugin some-garbage for type csi-controller not found"),
},
{
Name: "validates AccessMode",
Request: &structs.ClientCSIControllerCreateVolumeRequest{
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
Name: "1234-4321-1234-4321",
VolumeCapabilities: []*nstructs.CSIVolumeCapability{
{
AttachmentMode: nstructs.CSIVolumeAttachmentModeFilesystem,
AccessMode: nstructs.CSIVolumeAccessMode("foo"),
},
},
},
ExpectedErr: errors.New("CSI.ControllerCreateVolume: unknown volume access mode: foo"),
},
{
Name: "validates attachmentmode is not empty",
Request: &structs.ClientCSIControllerCreateVolumeRequest{
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
Name: "1234-4321-1234-4321",
VolumeCapabilities: []*nstructs.CSIVolumeCapability{
{
AccessMode: nstructs.CSIVolumeAccessModeMultiNodeReader,
AttachmentMode: nstructs.CSIVolumeAttachmentMode("bar"),
},
},
},
ExpectedErr: errors.New("CSI.ControllerCreateVolume: unknown volume attachment mode: bar"),
},
{
Name: "returns transitive errors",
ClientSetupFunc: func(fc *fake.Client) {
fc.NextControllerCreateVolumeErr = errors.New("internal plugin error")
},
Request: &structs.ClientCSIControllerCreateVolumeRequest{
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
Name: "1234-4321-1234-4321",
VolumeCapabilities: []*nstructs.CSIVolumeCapability{
{
AccessMode: nstructs.CSIVolumeAccessModeSingleNodeWriter,
AttachmentMode: nstructs.CSIVolumeAttachmentModeFilesystem,
},
},
},
ExpectedErr: errors.New("CSI.ControllerCreateVolume: internal plugin error"),
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
require := require.New(t)
client, cleanup := TestClient(t, nil)
defer cleanup()
fakeClient := &fake.Client{}
if tc.ClientSetupFunc != nil {
tc.ClientSetupFunc(fakeClient)
}
dispenserFunc := func(*dynamicplugins.PluginInfo) (interface{}, error) {
return fakeClient, nil
}
client.dynamicRegistry.StubDispenserForType(
dynamicplugins.PluginTypeCSIController, dispenserFunc)
err := client.dynamicRegistry.RegisterPlugin(fakePlugin)
require.Nil(err)
var resp structs.ClientCSIControllerCreateVolumeResponse
err = client.ClientRPC("CSI.ControllerCreateVolume", tc.Request, &resp)
require.Equal(tc.ExpectedErr, err)
if tc.ExpectedResponse != nil {
require.Equal(tc.ExpectedResponse, &resp)
}
})
}
}
func TestCSIController_DeleteVolume(t *testing.T) {
t.Parallel()
cases := []struct {
Name string
ClientSetupFunc func(*fake.Client)
Request *structs.ClientCSIControllerDeleteVolumeRequest
ExpectedErr error
ExpectedResponse *structs.ClientCSIControllerDeleteVolumeResponse
}{
{
Name: "returns plugin not found errors",
Request: &structs.ClientCSIControllerDeleteVolumeRequest{
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: "some-garbage",
},
},
ExpectedErr: errors.New("CSI.ControllerDeleteVolume: CSI client error (retryable): plugin some-garbage for type csi-controller not found"),
},
{
Name: "returns transitive errors",
ClientSetupFunc: func(fc *fake.Client) {
fc.NextControllerDeleteVolumeErr = errors.New("internal plugin error")
},
Request: &structs.ClientCSIControllerDeleteVolumeRequest{
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
ExternalVolumeID: "1234-4321-1234-4321",
},
ExpectedErr: errors.New("CSI.ControllerDeleteVolume: internal plugin error"),
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
require := require.New(t)
client, cleanup := TestClient(t, nil)
defer cleanup()
fakeClient := &fake.Client{}
if tc.ClientSetupFunc != nil {
tc.ClientSetupFunc(fakeClient)
}
dispenserFunc := func(*dynamicplugins.PluginInfo) (interface{}, error) {
return fakeClient, nil
}
client.dynamicRegistry.StubDispenserForType(
dynamicplugins.PluginTypeCSIController, dispenserFunc)
err := client.dynamicRegistry.RegisterPlugin(fakePlugin)
require.Nil(err)
var resp structs.ClientCSIControllerDeleteVolumeResponse
err = client.ClientRPC("CSI.ControllerDeleteVolume", tc.Request, &resp)
require.Equal(tc.ExpectedErr, err)
if tc.ExpectedResponse != nil {
require.Equal(tc.ExpectedResponse, &resp)
}
})
}
}
func TestCSIController_ListVolumes(t *testing.T) {
t.Parallel()
cases := []struct {
Name string
ClientSetupFunc func(*fake.Client)
Request *structs.ClientCSIControllerListVolumesRequest
ExpectedErr error
ExpectedResponse *structs.ClientCSIControllerListVolumesResponse
}{
{
Name: "returns plugin not found errors",
Request: &structs.ClientCSIControllerListVolumesRequest{
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: "some-garbage",
},
},
ExpectedErr: errors.New("CSI.ControllerListVolumes: CSI client error (retryable): plugin some-garbage for type csi-controller not found"),
},
{
Name: "returns transitive errors",
ClientSetupFunc: func(fc *fake.Client) {
fc.NextControllerListVolumesErr = errors.New("internal plugin error")
},
Request: &structs.ClientCSIControllerListVolumesRequest{
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
},
ExpectedErr: errors.New("CSI.ControllerListVolumes: internal plugin error"),
},
{
Name: "returns volumes",
ClientSetupFunc: func(fc *fake.Client) {
fc.NextControllerListVolumesResponse = &csi.ControllerListVolumesResponse{
Entries: []*csi.ListVolumesResponse_Entry{
{
Volume: &csi.Volume{
CapacityBytes: 1000000,
ExternalVolumeID: "vol-1",
VolumeContext: map[string]string{"foo": "bar"},
ContentSource: &csi.VolumeContentSource{
SnapshotID: "snap-1",
},
},
Status: &csi.ListVolumesResponse_VolumeStatus{
PublishedNodeIds: []string{"i-1234", "i-5678"},
VolumeCondition: &csi.VolumeCondition{
Message: "ok",
},
},
},
},
NextToken: "2",
}
},
Request: &structs.ClientCSIControllerListVolumesRequest{
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
StartingToken: "1",
MaxEntries: 100,
},
ExpectedResponse: &structs.ClientCSIControllerListVolumesResponse{
Entries: []*nstructs.CSIVolumeExternalStub{
{
ExternalID: "vol-1",
CapacityBytes: 1000000,
VolumeContext: map[string]string{"foo": "bar"},
SnapshotID: "snap-1",
PublishedExternalNodeIDs: []string{"i-1234", "i-5678"},
Status: "ok",
},
},
NextToken: "2",
},
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
require := require.New(t)
client, cleanup := TestClient(t, nil)
defer cleanup()
fakeClient := &fake.Client{}
if tc.ClientSetupFunc != nil {
tc.ClientSetupFunc(fakeClient)
}
dispenserFunc := func(*dynamicplugins.PluginInfo) (interface{}, error) {
return fakeClient, nil
}
client.dynamicRegistry.StubDispenserForType(
dynamicplugins.PluginTypeCSIController, dispenserFunc)
err := client.dynamicRegistry.RegisterPlugin(fakePlugin)
require.Nil(err)
var resp structs.ClientCSIControllerListVolumesResponse
err = client.ClientRPC("CSI.ControllerListVolumes", tc.Request, &resp)
require.Equal(tc.ExpectedErr, err)
if tc.ExpectedResponse != nil {
require.Equal(tc.ExpectedResponse, &resp)
}
})
}
}
func TestCSINode_DetachVolume(t *testing.T) {
t.Parallel()
@ -374,14 +649,14 @@ func TestCSINode_DetachVolume(t *testing.T) {
AccessMode: nstructs.CSIVolumeAccessModeMultiNodeReader,
ReadOnly: true,
},
ExpectedErr: errors.New("plugin some-garbage for type csi-node not found"),
ExpectedErr: errors.New("CSI.NodeDetachVolume: plugin some-garbage for type csi-node not found"),
},
{
Name: "validates volumeid is not empty",
Request: &structs.ClientCSINodeDetachVolumeRequest{
PluginID: fakeNodePlugin.Name,
},
ExpectedErr: errors.New("VolumeID is required"),
ExpectedErr: errors.New("CSI.NodeDetachVolume: VolumeID is required"),
},
{
Name: "validates nodeid is not empty",
@ -389,7 +664,7 @@ func TestCSINode_DetachVolume(t *testing.T) {
PluginID: fakeNodePlugin.Name,
VolumeID: "1234-4321-1234-4321",
},
ExpectedErr: errors.New("AllocID is required"),
ExpectedErr: errors.New("CSI.NodeDetachVolume: AllocID is required"),
},
{
Name: "returns transitive errors",
@ -402,7 +677,7 @@ func TestCSINode_DetachVolume(t *testing.T) {
AllocID: "4321-1234-4321-1234",
},
// we don't have a csimanager in this context
ExpectedErr: errors.New("plugin test-plugin for type csi-node not found"),
ExpectedErr: errors.New("CSI.NodeDetachVolume: plugin test-plugin for type csi-node not found"),
},
}

View File

@ -152,7 +152,7 @@ func TestVolumeManager_stageVolume(t *testing.T) {
AttachmentMode: "nonsense",
},
UsageOptions: &UsageOptions{},
ExpectedErr: errors.New("Unknown volume attachment mode: nonsense"),
ExpectedErr: errors.New("unknown volume attachment mode: nonsense"),
},
{
Name: "Returns an error when an invalid AccessMode is provided",
@ -162,7 +162,7 @@ func TestVolumeManager_stageVolume(t *testing.T) {
AccessMode: "nonsense",
},
UsageOptions: &UsageOptions{},
ExpectedErr: errors.New("Unknown volume access mode: nonsense"),
ExpectedErr: errors.New("unknown volume access mode: nonsense"),
},
{
Name: "Returns an error when the plugin returns an error",
@ -490,14 +490,14 @@ func TestVolumeManager_MountVolumeEvents(t *testing.T) {
pubCtx := map[string]string{}
_, err := manager.MountVolume(ctx, vol, alloc, usage, pubCtx)
require.Error(t, err, "Unknown volume attachment mode: ")
require.Error(t, err, "unknown volume attachment mode: ")
require.Equal(t, 1, len(events))
e := events[0]
require.Equal(t, "Mount volume", e.Message)
require.Equal(t, "Storage", e.Subsystem)
require.Equal(t, "vol", e.Details["volume_id"])
require.Equal(t, "false", e.Details["success"])
require.Equal(t, "Unknown volume attachment mode: ", e.Details["error"])
require.Equal(t, "unknown volume attachment mode: ", e.Details["error"])
events = events[1:]
vol.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem

View File

@ -20,6 +20,12 @@ type CSIVolumeMountOptions struct {
MountFlags []string
}
// CSIControllerRequest interface lets us set embedded CSIControllerQuery
// fields in the server
type CSIControllerRequest interface {
SetControllerNodeID(string)
}
// CSIControllerQuery is used to specify various flags for queries against CSI
// Controllers
type CSIControllerQuery struct {
@ -30,6 +36,10 @@ type CSIControllerQuery struct {
PluginID string
}
func (c *CSIControllerQuery) SetControllerNodeID(nodeID string) {
c.ControllerNodeID = nodeID
}
type ClientCSIControllerValidateVolumeRequest struct {
VolumeID string // note: this is the external ID
@ -175,6 +185,102 @@ func (c *ClientCSIControllerDetachVolumeRequest) ToCSIRequest() *csi.ControllerU
type ClientCSIControllerDetachVolumeResponse struct{}
// ClientCSIControllerCreateVolumeRequest the RPC made from the server to a
// Nomad client to tell a CSI controller plugin on that client to perform
// CreateVolume
type ClientCSIControllerCreateVolumeRequest struct {
Name string
VolumeCapabilities []*structs.CSIVolumeCapability
Parameters map[string]string
Secrets structs.CSISecrets
CapacityMin int64
CapacityMax int64
SnapshotID string
CloneID string
// TODO: topology is not yet supported
// TopologyRequirement
CSIControllerQuery
}
func (req *ClientCSIControllerCreateVolumeRequest) ToCSIRequest() (*csi.ControllerCreateVolumeRequest, error) {
creq := &csi.ControllerCreateVolumeRequest{
Name: req.Name,
CapacityRange: &csi.CapacityRange{
RequiredBytes: req.CapacityMin,
LimitBytes: req.CapacityMax,
},
VolumeCapabilities: []*csi.VolumeCapability{},
Parameters: req.Parameters,
Secrets: req.Secrets,
ContentSource: &csi.VolumeContentSource{
CloneID: req.CloneID,
SnapshotID: req.SnapshotID,
},
// TODO: topology is not yet supported
AccessibilityRequirements: &csi.TopologyRequirement{},
}
for _, cap := range req.VolumeCapabilities {
ccap, err := csi.VolumeCapabilityFromStructs(cap.AttachmentMode, cap.AccessMode)
if err != nil {
return nil, err
}
creq.VolumeCapabilities = append(creq.VolumeCapabilities, ccap)
}
return creq, nil
}
type ClientCSIControllerCreateVolumeResponse struct {
ExternalVolumeID string
CapacityBytes int64
VolumeContext map[string]string
// TODO: topology is not yet supported
// AccessibleTopology []*Topology
}
// ClientCSIControllerDeleteVolumeRequest the RPC made from the server to a
// Nomad client to tell a CSI controller plugin on that client to perform
// DeleteVolume
type ClientCSIControllerDeleteVolumeRequest struct {
ExternalVolumeID string
Secrets structs.CSISecrets
CSIControllerQuery
}
func (req *ClientCSIControllerDeleteVolumeRequest) ToCSIRequest() *csi.ControllerDeleteVolumeRequest {
return &csi.ControllerDeleteVolumeRequest{
ExternalVolumeID: req.ExternalVolumeID,
Secrets: req.Secrets,
}
}
type ClientCSIControllerDeleteVolumeResponse struct{}
// ClientCSIControllerListVolumesVolumeRequest the RPC made from the server to
// a Nomad client to tell a CSI controller plugin on that client to perform
// ListVolumes
type ClientCSIControllerListVolumesRequest struct {
MaxEntries int32
StartingToken string
CSIControllerQuery
}
func (req *ClientCSIControllerListVolumesRequest) ToCSIRequest() *csi.ControllerListVolumesRequest {
return &csi.ControllerListVolumesRequest{
MaxEntries: req.MaxEntries,
StartingToken: req.StartingToken,
}
}
type ClientCSIControllerListVolumesResponse struct {
Entries []*structs.CSIVolumeExternalStub
NextToken string
}
// ClientCSINodeDetachVolumeRequest is the RPC made from the server to
// a Nomad client to tell a CSI node plugin on that client to perform
// NodeUnpublish and NodeUnstage.

View File

@ -22,80 +22,98 @@ type ClientCSI struct {
func (a *ClientCSI) ControllerAttachVolume(args *cstructs.ClientCSIControllerAttachVolumeRequest, reply *cstructs.ClientCSIControllerAttachVolumeResponse) error {
defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "attach_volume"}, time.Now())
clientIDs, err := a.clientIDsForController(args.PluginID)
err := a.sendCSIControllerRPC(args.PluginID,
"CSI.ControllerAttachVolume",
"ClientCSI.ControllerAttachVolume",
args, reply)
if err != nil {
return fmt.Errorf("controller attach volume: %v", err)
}
for _, clientID := range clientIDs {
args.ControllerNodeID = clientID
state, ok := a.srv.getNodeConn(clientID)
if !ok {
return findNodeConnAndForward(a.srv,
clientID, "ClientCSI.ControllerAttachVolume", args, reply)
}
err = NodeRpc(state.Session, "CSI.ControllerAttachVolume", args, reply)
if err == nil {
return nil
}
if a.isRetryable(err) {
a.logger.Debug("failed to reach controller on client",
"nodeID", clientID, "err", err)
continue
}
return fmt.Errorf("controller attach volume: %v", err)
}
return fmt.Errorf("controller attach volume: %v", err)
return nil
}
func (a *ClientCSI) ControllerValidateVolume(args *cstructs.ClientCSIControllerValidateVolumeRequest, reply *cstructs.ClientCSIControllerValidateVolumeResponse) error {
defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "validate_volume"}, time.Now())
clientIDs, err := a.clientIDsForController(args.PluginID)
err := a.sendCSIControllerRPC(args.PluginID,
"CSI.ControllerValidateVolume",
"ClientCSI.ControllerValidateVolume",
args, reply)
if err != nil {
return fmt.Errorf("validate volume: %v", err)
return fmt.Errorf("controller validate volume: %v", err)
}
for _, clientID := range clientIDs {
args.ControllerNodeID = clientID
state, ok := a.srv.getNodeConn(clientID)
if !ok {
return findNodeConnAndForward(a.srv,
clientID, "ClientCSI.ControllerValidateVolume", args, reply)
}
err = NodeRpc(state.Session, "CSI.ControllerValidateVolume", args, reply)
if err == nil {
return nil
}
if a.isRetryable(err) {
a.logger.Debug("failed to reach controller on client",
"nodeID", clientID, "err", err)
continue
}
return fmt.Errorf("validate volume: %v", err)
}
return fmt.Errorf("validate volume: %v", err)
return nil
}
func (a *ClientCSI) ControllerDetachVolume(args *cstructs.ClientCSIControllerDetachVolumeRequest, reply *cstructs.ClientCSIControllerDetachVolumeResponse) error {
defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "detach_volume"}, time.Now())
clientIDs, err := a.clientIDsForController(args.PluginID)
err := a.sendCSIControllerRPC(args.PluginID,
"CSI.ControllerDetachVolume",
"ClientCSI.ControllerDetachVolume",
args, reply)
if err != nil {
return fmt.Errorf("controller detach volume: %v", err)
}
return nil
}
func (a *ClientCSI) ControllerCreateVolume(args *cstructs.ClientCSIControllerCreateVolumeRequest, reply *cstructs.ClientCSIControllerCreateVolumeResponse) error {
defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "create_volume"}, time.Now())
err := a.sendCSIControllerRPC(args.PluginID,
"CSI.ControllerCreateVolume",
"ClientCSI.ControllerCreateVolume",
args, reply)
if err != nil {
return fmt.Errorf("controller create volume: %v", err)
}
return nil
}
func (a *ClientCSI) ControllerDeleteVolume(args *cstructs.ClientCSIControllerDeleteVolumeRequest, reply *cstructs.ClientCSIControllerDeleteVolumeResponse) error {
defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "delete_volume"}, time.Now())
err := a.sendCSIControllerRPC(args.PluginID,
"CSI.ControllerDeleteVolume",
"ClientCSI.ControllerDeleteVolume",
args, reply)
if err != nil {
return fmt.Errorf("controller delete volume: %v", err)
}
return nil
}
func (a *ClientCSI) ControllerListVolumes(args *cstructs.ClientCSIControllerListVolumesRequest, reply *cstructs.ClientCSIControllerListVolumesResponse) error {
defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "list_volumes"}, time.Now())
err := a.sendCSIControllerRPC(args.PluginID,
"CSI.ControllerListVolumes",
"ClientCSI.ControllerListVolumes",
args, reply)
if err != nil {
return fmt.Errorf("controller list volumes: %v", err)
}
return nil
}
func (a *ClientCSI) sendCSIControllerRPC(pluginID, method, fwdMethod string, args cstructs.CSIControllerRequest, reply interface{}) error {
clientIDs, err := a.clientIDsForController(pluginID)
if err != nil {
return err
}
for _, clientID := range clientIDs {
args.ControllerNodeID = clientID
args.SetControllerNodeID(clientID)
state, ok := a.srv.getNodeConn(clientID)
if !ok {
return findNodeConnAndForward(a.srv,
clientID, "ClientCSI.ControllerDetachVolume", args, reply)
clientID, fwdMethod, args, reply)
}
err = NodeRpc(state.Session, "CSI.ControllerDetachVolume", args, reply)
err = NodeRpc(state.Session, method, args, reply)
if err == nil {
return nil
}
@ -104,9 +122,9 @@ func (a *ClientCSI) ControllerDetachVolume(args *cstructs.ClientCSIControllerDet
"nodeID", clientID, "err", err)
continue
}
return fmt.Errorf("controller detach volume: %v", err)
return err
}
return fmt.Errorf("controller detach volume: %v", err)
return err
}
// we can retry the same RPC on a different controller in the cases where the

View File

@ -24,16 +24,23 @@ import (
// responses that have no bodies have no "Next*Response" field and will always
// return an empty response body.
type MockClientCSI struct {
NextValidateError error
NextAttachError error
NextAttachResponse *cstructs.ClientCSIControllerAttachVolumeResponse
NextDetachError error
NextNodeDetachError error
NextValidateError error
NextAttachError error
NextAttachResponse *cstructs.ClientCSIControllerAttachVolumeResponse
NextDetachError error
NextCreateError error
NextCreateResponse *cstructs.ClientCSIControllerCreateVolumeResponse
NextDeleteError error
NextListExternalError error
NextListExternalResponse *cstructs.ClientCSIControllerListVolumesResponse
NextNodeDetachError error
}
func newMockClientCSI() *MockClientCSI {
return &MockClientCSI{
NextAttachResponse: &cstructs.ClientCSIControllerAttachVolumeResponse{},
NextAttachResponse: &cstructs.ClientCSIControllerAttachVolumeResponse{},
NextCreateResponse: &cstructs.ClientCSIControllerCreateVolumeResponse{},
NextListExternalResponse: &cstructs.ClientCSIControllerListVolumesResponse{},
}
}
@ -50,6 +57,20 @@ func (c *MockClientCSI) ControllerDetachVolume(req *cstructs.ClientCSIController
return c.NextDetachError
}
func (c *MockClientCSI) ControllerCreateVolume(req *cstructs.ClientCSIControllerCreateVolumeRequest, resp *cstructs.ClientCSIControllerCreateVolumeResponse) error {
*resp = *c.NextCreateResponse
return c.NextCreateError
}
func (c *MockClientCSI) ControllerDeleteVolume(req *cstructs.ClientCSIControllerDeleteVolumeRequest, resp *cstructs.ClientCSIControllerDeleteVolumeResponse) error {
return c.NextDeleteError
}
func (c *MockClientCSI) ControllerListVolumes(req *cstructs.ClientCSIControllerListVolumesRequest, resp *cstructs.ClientCSIControllerListVolumesResponse) error {
*resp = *c.NextListExternalResponse
return c.NextListExternalError
}
func (c *MockClientCSI) NodeDetachVolume(req *cstructs.ClientCSINodeDetachVolumeRequest, resp *cstructs.ClientCSINodeDetachVolumeResponse) error {
return c.NextNodeDetachError
}
@ -152,6 +173,104 @@ func TestClientCSIController_ValidateVolume_Forwarded(t *testing.T) {
require.Contains(err.Error(), "no plugins registered for type")
}
func TestClientCSIController_CreateVolume_Local(t *testing.T) {
t.Parallel()
require := require.New(t)
codec, cleanup := setupLocal(t)
defer cleanup()
req := &cstructs.ClientCSIControllerCreateVolumeRequest{
CSIControllerQuery: cstructs.CSIControllerQuery{PluginID: "minnie"},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientCSI.ControllerCreateVolume", req, &resp)
require.Error(err)
require.Contains(err.Error(), "no plugins registered for type")
}
func TestClientCSIController_CreateVolume_Forwarded(t *testing.T) {
t.Parallel()
require := require.New(t)
codec, cleanup := setupForward(t)
defer cleanup()
req := &cstructs.ClientCSIControllerCreateVolumeRequest{
CSIControllerQuery: cstructs.CSIControllerQuery{PluginID: "minnie"},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientCSI.ControllerCreateVolume", req, &resp)
require.Error(err)
require.Contains(err.Error(), "no plugins registered for type")
}
func TestClientCSIController_DeleteVolume_Local(t *testing.T) {
t.Parallel()
require := require.New(t)
codec, cleanup := setupLocal(t)
defer cleanup()
req := &cstructs.ClientCSIControllerDeleteVolumeRequest{
ExternalVolumeID: "test",
CSIControllerQuery: cstructs.CSIControllerQuery{PluginID: "minnie"},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientCSI.ControllerDeleteVolume", req, &resp)
require.Error(err)
require.Contains(err.Error(), "no plugins registered for type")
}
func TestClientCSIController_DeleteVolume_Forwarded(t *testing.T) {
t.Parallel()
require := require.New(t)
codec, cleanup := setupForward(t)
defer cleanup()
req := &cstructs.ClientCSIControllerDeleteVolumeRequest{
ExternalVolumeID: "test",
CSIControllerQuery: cstructs.CSIControllerQuery{PluginID: "minnie"},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientCSI.ControllerDeleteVolume", req, &resp)
require.Error(err)
require.Contains(err.Error(), "no plugins registered for type")
}
func TestClientCSIController_ListVolumes_Local(t *testing.T) {
t.Parallel()
require := require.New(t)
codec, cleanup := setupLocal(t)
defer cleanup()
req := &cstructs.ClientCSIControllerListVolumesRequest{
CSIControllerQuery: cstructs.CSIControllerQuery{PluginID: "minnie"},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientCSI.ControllerListVolumes", req, &resp)
require.Error(err)
require.Contains(err.Error(), "no plugins registered for type")
}
func TestClientCSIController_ListVolumes_Forwarded(t *testing.T) {
t.Parallel()
require := require.New(t)
codec, cleanup := setupForward(t)
defer cleanup()
req := &cstructs.ClientCSIControllerListVolumesRequest{
CSIControllerQuery: cstructs.CSIControllerQuery{PluginID: "minnie"},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientCSI.ControllerListVolumes", req, &resp)
require.Error(err)
require.Contains(err.Error(), "no plugins registered for type")
}
func TestClientCSI_NodeForControllerPlugin(t *testing.T) {
t.Parallel()
srv, shutdown := TestServer(t, func(c *Config) {})
@ -282,6 +401,9 @@ func setupLocal(t *testing.T) (rpc.ClientCodec, func()) {
mockCSI.NextValidateError = fmt.Errorf("no plugins registered for type")
mockCSI.NextAttachError = fmt.Errorf("no plugins registered for type")
mockCSI.NextDetachError = fmt.Errorf("no plugins registered for type")
mockCSI.NextCreateError = fmt.Errorf("no plugins registered for type")
mockCSI.NextDeleteError = fmt.Errorf("no plugins registered for type")
mockCSI.NextListExternalError = fmt.Errorf("no plugins registered for type")
c1, cleanupC1 := client.TestClientWithRPCs(t,
func(c *config.Config) {

View File

@ -108,8 +108,7 @@ func (v *CSIVolume) List(args *structs.CSIVolumeListRequest, reply *structs.CSIV
return structs.ErrPermissionDenied
}
metricsStart := time.Now()
defer metrics.MeasureSince([]string{"nomad", "volume", "list"}, metricsStart)
defer metrics.MeasureSince([]string{"nomad", "volume", "list"}, time.Now())
ns := args.RequestNamespace()
opts := blockingOptions{
@ -192,8 +191,7 @@ func (v *CSIVolume) Get(args *structs.CSIVolumeGetRequest, reply *structs.CSIVol
return structs.ErrPermissionDenied
}
metricsStart := time.Now()
defer metrics.MeasureSince([]string{"nomad", "volume", "get"}, metricsStart)
defer metrics.MeasureSince([]string{"nomad", "volume", "get"}, time.Now())
if args.ID == "" {
return fmt.Errorf("missing volume ID")
@ -276,14 +274,13 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru
return err
}
metricsStart := time.Now()
defer metrics.MeasureSince([]string{"nomad", "volume", "register"}, metricsStart)
defer metrics.MeasureSince([]string{"nomad", "volume", "register"}, time.Now())
if !allowVolume(aclObj, args.RequestNamespace()) || !aclObj.AllowPluginRead() {
return structs.ErrPermissionDenied
}
if args.Volumes == nil || len(args.Volumes) == 0 {
if len(args.Volumes) == 0 {
return fmt.Errorf("missing volume definition")
}
@ -331,8 +328,7 @@ func (v *CSIVolume) Deregister(args *structs.CSIVolumeDeregisterRequest, reply *
return err
}
metricsStart := time.Now()
defer metrics.MeasureSince([]string{"nomad", "volume", "deregister"}, metricsStart)
defer metrics.MeasureSince([]string{"nomad", "volume", "deregister"}, time.Now())
ns := args.RequestNamespace()
if !allowVolume(aclObj, ns) {
@ -369,8 +365,7 @@ func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CS
return err
}
metricsStart := time.Now()
defer metrics.MeasureSince([]string{"nomad", "volume", "claim"}, metricsStart)
defer metrics.MeasureSince([]string{"nomad", "volume", "claim"}, time.Now())
if !allowVolume(aclObj, args.RequestNamespace()) || !aclObj.AllowPluginRead() {
return structs.ErrPermissionDenied
@ -531,8 +526,7 @@ func (v *CSIVolume) Unpublish(args *structs.CSIVolumeUnpublishRequest, reply *st
return err
}
metricsStart := time.Now()
defer metrics.MeasureSince([]string{"nomad", "volume", "unpublish"}, metricsStart)
defer metrics.MeasureSince([]string{"nomad", "volume", "unpublish"}, time.Now())
allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIMountVolume)
aclObj, err := v.srv.WriteACLObj(&args.WriteRequest, true)
@ -795,6 +789,256 @@ func (v *CSIVolume) checkpointClaim(vol *structs.CSIVolume, claim *structs.CSIVo
return nil
}
func (v *CSIVolume) Create(args *structs.CSIVolumeCreateRequest, reply *structs.CSIVolumeCreateResponse) error {
if done, err := v.srv.forward("CSIVolume.Create", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "volume", "create"}, time.Now())
allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIWriteVolume)
aclObj, err := v.srv.WriteACLObj(&args.WriteRequest, false)
if err != nil {
return err
}
if !allowVolume(aclObj, args.RequestNamespace()) || !aclObj.AllowPluginRead() {
return structs.ErrPermissionDenied
}
if len(args.Volumes) == 0 {
return fmt.Errorf("missing volume definition")
}
regArgs := &structs.CSIVolumeRegisterRequest{WriteRequest: args.WriteRequest}
type validated struct {
vol *structs.CSIVolume
plugin *structs.CSIPlugin
}
validatedVols := []validated{}
// This is the only namespace we ACL checked, force all the volumes to use it.
// We also validate that the plugin exists for each plugin, and validate the
// capabilities when the plugin has a controller.
for _, vol := range args.Volumes {
vol.Namespace = args.RequestNamespace()
if err = vol.Validate(); err != nil {
return err
}
plugin, err := v.pluginValidateVolume(regArgs, vol)
if err != nil {
return err
}
if !plugin.ControllerRequired {
return fmt.Errorf("plugin has no controller")
}
if err := v.controllerValidateVolume(regArgs, vol, plugin); err != nil {
return err
}
validatedVols = append(validatedVols, validated{vol, plugin})
}
// Attempt to create all the validated volumes and write only successfully
// created volumes to raft. And we'll report errors for any failed volumes
//
// NOTE: creating the volume in the external storage provider can't be
// made atomic with the registration, and creating the volume provides
// values we want to write on the CSIVolume in raft anyways. For now
// we'll block the RPC on the external storage provider so that we can
// easily return meaningful errors to the user, but in the future we
// should consider creating registering first and creating a "volume
// eval" that can do the plugin RPCs async.
var mErr multierror.Error
for _, valid := range validatedVols {
err = v.createVolume(valid.vol, valid.plugin)
if err != nil {
multierror.Append(&mErr, err)
} else {
regArgs.Volumes = append(regArgs.Volumes, valid.vol)
}
}
resp, index, err := v.srv.raftApply(structs.CSIVolumeRegisterRequestType, regArgs)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "register")
return err
}
if respErr, ok := resp.(error); ok {
multierror.Append(&mErr, respErr)
}
err = mErr.ErrorOrNil()
if err != nil {
return err
}
reply.Index = index
v.srv.setQueryMeta(&reply.QueryMeta)
return nil
}
func (v *CSIVolume) createVolume(vol *structs.CSIVolume, plugin *structs.CSIPlugin) error {
method := "ClientCSI.ControllerCreateVolume"
cReq := &cstructs.ClientCSIControllerCreateVolumeRequest{
Name: vol.Name,
VolumeCapabilities: vol.RequestedCapabilities,
Parameters: vol.Parameters,
Secrets: vol.Secrets,
CapacityMin: vol.RequestedCapacityMin,
CapacityMax: vol.RequestedCapacityMax,
SnapshotID: vol.SnapshotID,
CloneID: vol.CloneID,
}
cReq.PluginID = plugin.ID
cResp := &cstructs.ClientCSIControllerCreateVolumeResponse{}
err := v.srv.RPC(method, cReq, cResp)
if err != nil {
return err
}
vol.ExternalID = cResp.ExternalVolumeID
vol.Capacity = cResp.CapacityBytes
vol.Context = cResp.VolumeContext
return nil
}
func (v *CSIVolume) Delete(args *structs.CSIVolumeDeleteRequest, reply *structs.CSIVolumeDeleteResponse) error {
if done, err := v.srv.forward("CSIVolume.Delete", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "volume", "delete"}, time.Now())
allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIWriteVolume)
aclObj, err := v.srv.WriteACLObj(&args.WriteRequest, false)
if err != nil {
return err
}
ns := args.RequestNamespace()
if !allowVolume(aclObj, ns) {
return structs.ErrPermissionDenied
}
if len(args.VolumeIDs) == 0 {
return fmt.Errorf("missing volume IDs")
}
for _, volID := range args.VolumeIDs {
plugin, vol, err := v.volAndPluginLookup(args.Namespace, volID)
if err != nil {
if err == fmt.Errorf("volume not found: %s", volID) {
v.logger.Warn("volume %q to be deleted was already deregistered")
continue
} else {
return err
}
}
// NOTE: deleting the volume in the external storage provider can't be
// made atomic with deregistration. We can't delete a volume that's
// not registered because we need to be able to lookup its plugin.
err = v.deleteVolume(vol, plugin)
if err != nil {
return err
}
}
deregArgs := &structs.CSIVolumeDeregisterRequest{
VolumeIDs: args.VolumeIDs,
WriteRequest: args.WriteRequest,
}
resp, index, err := v.srv.raftApply(structs.CSIVolumeDeregisterRequestType, deregArgs)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "deregister")
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
reply.Index = index
v.srv.setQueryMeta(&reply.QueryMeta)
return nil
}
func (v *CSIVolume) deleteVolume(vol *structs.CSIVolume, plugin *structs.CSIPlugin) error {
method := "ClientCSI.ControllerDeleteVolume"
cReq := &cstructs.ClientCSIControllerDeleteVolumeRequest{
ExternalVolumeID: vol.ExternalID,
Secrets: vol.Secrets,
}
cReq.PluginID = plugin.ID
cResp := &cstructs.ClientCSIControllerDeleteVolumeResponse{}
return v.srv.RPC(method, cReq, cResp)
}
func (v *CSIVolume) ListExternal(args *structs.CSIVolumeExternalListRequest, reply *structs.CSIVolumeExternalListResponse) error {
if done, err := v.srv.forward("CSIVolume.ListExternal", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "volume", "list_external"}, time.Now())
allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIListVolume,
acl.NamespaceCapabilityCSIReadVolume,
acl.NamespaceCapabilityCSIMountVolume,
acl.NamespaceCapabilityListJobs)
aclObj, err := v.srv.QueryACLObj(&args.QueryOptions, false)
if err != nil {
return err
}
// NOTE: this is the plugin's namespace, not the volume(s) because they
// might not even be registered
if !allowVolume(aclObj, args.RequestNamespace()) {
return structs.ErrPermissionDenied
}
snap, err := v.srv.fsm.State().Snapshot()
if err != nil {
return err
}
plugin, err := snap.CSIPluginByID(nil, args.PluginID)
if err != nil {
return err
}
if plugin == nil {
return fmt.Errorf("no such plugin")
}
method := "ClientCSI.ControllerListVolumes"
cReq := &cstructs.ClientCSIControllerListVolumesRequest{
MaxEntries: args.MaxEntries,
StartingToken: args.StartingToken,
}
cReq.PluginID = plugin.ID
cResp := &cstructs.ClientCSIControllerListVolumesResponse{}
err = v.srv.RPC(method, cReq, cResp)
if err != nil {
return err
}
if args.MaxEntries > 0 {
reply.Volumes = cResp.Entries[:args.MaxEntries]
} else {
reply.Volumes = cResp.Entries
}
reply.NextToken = cResp.NextToken
return nil
}
// CSIPlugin wraps the structs.CSIPlugin with request data and server context
type CSIPlugin struct {
srv *Server
@ -816,8 +1060,7 @@ func (v *CSIPlugin) List(args *structs.CSIPluginListRequest, reply *structs.CSIP
return structs.ErrPermissionDenied
}
metricsStart := time.Now()
defer metrics.MeasureSince([]string{"nomad", "plugin", "list"}, metricsStart)
defer metrics.MeasureSince([]string{"nomad", "plugin", "list"}, time.Now())
opts := blockingOptions{
queryOpts: &args.QueryOptions,
@ -865,8 +1108,7 @@ func (v *CSIPlugin) Get(args *structs.CSIPluginGetRequest, reply *structs.CSIPlu
withAllocs := aclObj == nil ||
aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob)
metricsStart := time.Now()
defer metrics.MeasureSince([]string{"nomad", "plugin", "get"}, metricsStart)
defer metrics.MeasureSince([]string{"nomad", "plugin", "get"}, time.Now())
if args.ID == "" {
return fmt.Errorf("missing plugin ID")
@ -926,8 +1168,7 @@ func (v *CSIPlugin) Delete(args *structs.CSIPluginDeleteRequest, reply *structs.
return structs.ErrPermissionDenied
}
metricsStart := time.Now()
defer metrics.MeasureSince([]string{"nomad", "plugin", "delete"}, metricsStart)
defer metrics.MeasureSince([]string{"nomad", "plugin", "delete"}, time.Now())
if args.ID == "" {
return fmt.Errorf("missing plugin ID")

View File

@ -7,6 +7,9 @@ import (
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/client"
cconfig "github.com/hashicorp/nomad/client/config"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
@ -435,14 +438,14 @@ func TestCSIVolumeEndpoint_ClaimWithController(t *testing.T) {
claimResp := &structs.CSIVolumeClaimResponse{}
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp)
// Because the node is not registered
require.EqualError(t, err, "controller publish: attach volume: No path to node")
require.EqualError(t, err, "controller publish: attach volume: controller attach volume: No path to node")
// The node SecretID is authorized for all policies
claimReq.AuthToken = node.SecretID
claimReq.Namespace = ""
claimResp = &structs.CSIVolumeClaimResponse{}
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp)
require.EqualError(t, err, "controller publish: attach volume: No path to node")
require.EqualError(t, err, "controller publish: attach volume: controller attach volume: No path to node")
}
func TestCSIVolumeEndpoint_Unpublish(t *testing.T) {
@ -496,7 +499,7 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) {
{
name: "unpublish previously detached node",
startingState: structs.CSIVolumeClaimStateNodeDetached,
expectedErrMsg: "could not detach from controller: No path to node",
expectedErrMsg: "could not detach from controller: controller detach volume: No path to node",
},
{
name: "first unpublish",
@ -653,6 +656,352 @@ func TestCSIVolumeEndpoint_List(t *testing.T) {
require.Equal(t, vols[1].ID, resp.Volumes[0].ID)
}
func TestCSIVolumeEndpoint_Create(t *testing.T) {
t.Parallel()
var err error
srv, shutdown := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer shutdown()
testutil.WaitForLeader(t, srv.RPC)
fake := newMockClientCSI()
fake.NextValidateError = nil
fake.NextCreateError = nil
fake.NextCreateResponse = &cstructs.ClientCSIControllerCreateVolumeResponse{
ExternalVolumeID: "vol-12345",
CapacityBytes: 42,
VolumeContext: map[string]string{"plugincontext": "bar"},
}
client, cleanup := client.TestClientWithRPCs(t,
func(c *cconfig.Config) {
c.Servers = []string{srv.config.RPCAddr.String()}
},
map[string]interface{}{"CSI": fake},
)
defer cleanup()
node := client.Node()
node.Attributes["nomad.version"] = "0.11.0" // client RPCs not supported on early versions
req0 := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp0 structs.NodeUpdateResponse
err = client.RPC("Node.Register", req0, &resp0)
require.NoError(t, err)
testutil.WaitForResult(func() (bool, error) {
nodes := srv.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
t.Fatalf("should have a client")
})
ns := structs.DefaultNamespace
state := srv.fsm.State()
codec := rpcClient(t, srv)
index := uint64(1000)
node.CSIControllerPlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
ControllerInfo: &structs.CSIControllerInfo{
SupportsAttachDetach: true,
},
RequiresControllerPlugin: true,
},
}
node.CSINodePlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
NodeInfo: &structs.CSINodeInfo{},
},
}
index++
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, index, node))
// Create the volume
volID := uuid.Generate()
vols := []*structs.CSIVolume{{
ID: volID,
Name: "vol",
Namespace: "notTheNamespace", // overriden by WriteRequest
PluginID: "minnie",
AccessMode: structs.CSIVolumeAccessModeMultiNodeReader, // ignored in create
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, // ignored in create
MountOptions: &structs.CSIMountOptions{
FSType: "ext4", MountFlags: []string{"sensitive"}}, // ignored in create
Secrets: structs.CSISecrets{"mysecret": "secretvalue"},
Parameters: map[string]string{"myparam": "paramvalue"},
Context: map[string]string{"mycontext": "contextvalue"}, // dropped by create
}}
// Create the create request
req1 := &structs.CSIVolumeCreateRequest{
Volumes: vols,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: ns,
},
}
resp1 := &structs.CSIVolumeCreateResponse{}
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Create", req1, resp1)
require.NoError(t, err)
// Get the volume back out
req2 := &structs.CSIVolumeGetRequest{
ID: volID,
QueryOptions: structs.QueryOptions{
Region: "global",
},
}
resp2 := &structs.CSIVolumeGetResponse{}
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Get", req2, resp2)
require.NoError(t, err)
require.Equal(t, resp1.Index, resp2.Index)
vol := resp2.Volume
require.NotNil(t, vol)
require.Equal(t, volID, vol.ID)
// these fields are set from the args
require.Equal(t, "csi.CSISecrets(map[mysecret:[REDACTED]])",
vol.Secrets.String())
require.Equal(t, "csi.CSIOptions(FSType: ext4, MountFlags: [REDACTED])",
vol.MountOptions.String())
require.Equal(t, ns, vol.Namespace)
// these fields are set from the plugin and should have been written to raft
require.Equal(t, "vol-12345", vol.ExternalID)
require.Equal(t, int64(42), vol.Capacity)
require.Equal(t, "bar", vol.Context["plugincontext"])
require.Equal(t, "", vol.Context["mycontext"])
}
func TestCSIVolumeEndpoint_Delete(t *testing.T) {
t.Parallel()
var err error
srv, shutdown := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer shutdown()
testutil.WaitForLeader(t, srv.RPC)
fake := newMockClientCSI()
fake.NextDeleteError = fmt.Errorf("should not see this")
client, cleanup := client.TestClientWithRPCs(t,
func(c *cconfig.Config) {
c.Servers = []string{srv.config.RPCAddr.String()}
},
map[string]interface{}{"CSI": fake},
)
defer cleanup()
node := client.Node()
node.Attributes["nomad.version"] = "0.11.0" // client RPCs not supported on early versions
req0 := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp0 structs.NodeUpdateResponse
err = client.RPC("Node.Register", req0, &resp0)
require.NoError(t, err)
testutil.WaitForResult(func() (bool, error) {
nodes := srv.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
t.Fatalf("should have a client")
})
ns := structs.DefaultNamespace
state := srv.fsm.State()
codec := rpcClient(t, srv)
index := uint64(1000)
node.CSIControllerPlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
ControllerInfo: &structs.CSIControllerInfo{
SupportsAttachDetach: true,
},
RequiresControllerPlugin: true,
},
}
node.CSINodePlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
NodeInfo: &structs.CSINodeInfo{},
},
}
index++
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, index, node))
volID := uuid.Generate()
vols := []*structs.CSIVolume{{
ID: volID,
Namespace: structs.DefaultNamespace,
PluginID: "minnie",
Secrets: structs.CSISecrets{"mysecret": "secretvalue"},
}}
index++
err = state.CSIVolumeRegister(index, vols)
require.NoError(t, err)
// Delete volumes
// Create an invalid delete request, ensure it doesn't hit the plugin
req1 := &structs.CSIVolumeDeleteRequest{
VolumeIDs: []string{"bad", volID},
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: ns,
},
}
resp1 := &structs.CSIVolumeCreateResponse{}
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Delete", req1, resp1)
require.EqualError(t, err, "volume not found: bad")
// Make sure the valid volume wasn't deleted
req2 := &structs.CSIVolumeGetRequest{
ID: volID,
QueryOptions: structs.QueryOptions{
Region: "global",
},
}
resp2 := &structs.CSIVolumeGetResponse{}
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Get", req2, resp2)
require.NoError(t, err)
require.NotNil(t, resp2.Volume)
// Fix the delete request
fake.NextDeleteError = nil
req1.VolumeIDs = []string{volID}
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Delete", req1, resp1)
require.NoError(t, err)
// Make sure it was deregistered
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Get", req2, resp2)
require.NoError(t, err)
require.Nil(t, resp2.Volume)
}
func TestCSIVolumeEndpoint_ListExternal(t *testing.T) {
t.Parallel()
var err error
srv, shutdown := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer shutdown()
testutil.WaitForLeader(t, srv.RPC)
fake := newMockClientCSI()
fake.NextDeleteError = fmt.Errorf("should not see this")
fake.NextListExternalResponse = &cstructs.ClientCSIControllerListVolumesResponse{
Entries: []*structs.CSIVolumeExternalStub{
{
ExternalID: "vol-12345",
CapacityBytes: 70000,
PublishedExternalNodeIDs: []string{"i-12345"},
},
{
ExternalID: "vol-abcde",
CapacityBytes: 50000,
IsAbnormal: true,
Status: "something went wrong",
},
{
ExternalID: "vol-00000",
Status: "you should not see me",
},
},
NextToken: "page2",
}
client, cleanup := client.TestClientWithRPCs(t,
func(c *cconfig.Config) {
c.Servers = []string{srv.config.RPCAddr.String()}
},
map[string]interface{}{"CSI": fake},
)
defer cleanup()
node := client.Node()
node.Attributes["nomad.version"] = "0.11.0" // client RPCs not supported on early versions
req0 := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp0 structs.NodeUpdateResponse
err = client.RPC("Node.Register", req0, &resp0)
require.NoError(t, err)
testutil.WaitForResult(func() (bool, error) {
nodes := srv.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
t.Fatalf("should have a client")
})
state := srv.fsm.State()
codec := rpcClient(t, srv)
index := uint64(1000)
node.CSIControllerPlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
ControllerInfo: &structs.CSIControllerInfo{
SupportsAttachDetach: true,
},
RequiresControllerPlugin: true,
},
}
node.CSINodePlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
NodeInfo: &structs.CSINodeInfo{},
},
}
index++
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, index, node))
// List external volumes; note that none of these exist in the state store
req := &structs.CSIVolumeExternalListRequest{
MaxEntries: 2,
StartingToken: "page1",
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: structs.DefaultNamespace,
},
}
resp := &structs.CSIVolumeExternalListResponse{}
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.ListExternal", req, resp)
require.NoError(t, err)
require.Len(t, resp.Volumes, 2)
require.Equal(t, "vol-12345", resp.Volumes[0].ExternalID)
require.Equal(t, "vol-abcde", resp.Volumes[1].ExternalID)
require.True(t, resp.Volumes[1].IsAbnormal)
require.Equal(t, "page2", resp.NextToken)
}
func TestCSIPluginEndpoint_RegisterViaFingerprint(t *testing.T) {
t.Parallel()
srv, shutdown := TestServer(t, func(c *Config) {

View File

@ -79,6 +79,13 @@ func (t *TaskCSIPluginConfig) Copy() *TaskCSIPluginConfig {
return nt
}
// CSIVolumeCapability is the requested attachment and access mode for a
// volume
type CSIVolumeCapability struct {
AttachmentMode CSIVolumeAttachmentMode
AccessMode CSIVolumeAccessMode
}
// CSIVolumeAttachmentMode chooses the type of storage api that will be used to
// interact with the device.
type CSIVolumeAttachmentMode string
@ -246,6 +253,15 @@ type CSIVolume struct {
Secrets CSISecrets
Parameters map[string]string
Context map[string]string
Capacity int64 // bytes
// These values are used only on volume creation but we record them
// so that we can diff the volume later
RequestedCapacityMin int64 // bytes
RequestedCapacityMax int64 // bytes
RequestedCapabilities []*CSIVolumeCapability
CloneID string
SnapshotID string
// Allocations, tracking claim status
ReadAllocs map[string]*Allocation // AllocID -> Allocation
@ -590,6 +606,9 @@ func (v *CSIVolume) Validate() error {
}
}
}
if v.SnapshotID != "" && v.CloneID != "" {
errs = append(errs, "only one of snapshot_id and clone_id is allowed")
}
// TODO: Volume Topologies are optional - We should check to see if the plugin
// the volume is being registered with requires them.
@ -630,6 +649,25 @@ type CSIVolumeDeregisterResponse struct {
QueryMeta
}
type CSIVolumeCreateRequest struct {
Volumes []*CSIVolume
WriteRequest
}
type CSIVolumeCreateResponse struct {
Volumes []*CSIVolume
QueryMeta
}
type CSIVolumeDeleteRequest struct {
VolumeIDs []string
WriteRequest
}
type CSIVolumeDeleteResponse struct {
QueryMeta
}
type CSIVolumeClaimMode int
const (
@ -700,6 +738,39 @@ type CSIVolumeListResponse struct {
QueryMeta
}
// CSIVolumeExternalListRequest is a request to a controller plugin to list
// all the volumes known to the the storage provider. This request is
// paginated by the plugin.
type CSIVolumeExternalListRequest struct {
PluginID string
MaxEntries int32
StartingToken string
QueryOptions
}
type CSIVolumeExternalListResponse struct {
Volumes []*CSIVolumeExternalStub
NextToken string
QueryMeta
}
// CSIVolumeExternalStub is the storage provider's view of a volume, as
// returned from the controller plugin; all IDs are for external resources
type CSIVolumeExternalStub struct {
ExternalID string
CapacityBytes int64
VolumeContext map[string]string
CloneID string
SnapshotID string
// TODO: topology support
// AccessibleTopology []*Topology
PublishedExternalNodeIDs []string
IsAbnormal bool
Status string
}
type CSIVolumeGetRequest struct {
ID string
QueryOptions

View File

@ -743,7 +743,7 @@ func VolumeCapabilityFromStructs(sAccessType structs.CSIVolumeAttachmentMode, sA
// final check during transformation into the requisite CSI Data type to
// defend against development bugs and corrupted state - and incompatible
// nomad versions in the future.
return nil, fmt.Errorf("Unknown volume attachment mode: %s", sAccessType)
return nil, fmt.Errorf("unknown volume attachment mode: %s", sAccessType)
}
var accessMode VolumeAccessMode
@ -763,7 +763,7 @@ func VolumeCapabilityFromStructs(sAccessType structs.CSIVolumeAttachmentMode, sA
// final check during transformation into the requisite CSI Data type to
// defend against development bugs and corrupted state - and incompatible
// nomad versions in the future.
return nil, fmt.Errorf("Unknown volume access mode: %v", sAccessMode)
return nil, fmt.Errorf("unknown volume access mode: %v", sAccessMode)
}
return &VolumeCapability{