From 6e71baa77d658d07db09c42099a3baed35ee452f Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Wed, 29 Jan 2020 13:20:41 +0100 Subject: [PATCH] volume_manager: NodeStageVolume Support This commit introduces support for staging volumes when a plugin implements the STAGE_UNSTAGE_VOLUME capability. See the following for further reference material: https://github.com/container-storage-interface/spec/blob/4731db0e0bc53238b93850f43ab05d9355df0fd9/spec.md#nodestagevolume --- client/pluginmanager/csimanager/volume.go | 86 ++++++++++++++++++- .../pluginmanager/csimanager/volume_test.go | 71 +++++++++++++++ 2 files changed, 156 insertions(+), 1 deletion(-) diff --git a/client/pluginmanager/csimanager/volume.go b/client/pluginmanager/csimanager/volume.go index 966caaff3..f7f53d969 100644 --- a/client/pluginmanager/csimanager/volume.go +++ b/client/pluginmanager/csimanager/volume.go @@ -7,6 +7,7 @@ import ( "path/filepath" "time" + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/helper/mount" "github.com/hashicorp/nomad/nomad/structs" @@ -63,7 +64,7 @@ func (v *volumeManager) stagingDirForVolume(vol *structs.CSIVolume) string { // existing volume stage. // // Returns whether the directory is a pre-existing mountpoint, the staging path, -// and any errors that occured. +// and any errors that occurred. func (v *volumeManager) ensureStagingDir(vol *structs.CSIVolume) (bool, string, error) { stagingPath := v.stagingDirForVolume(vol) @@ -82,11 +83,94 @@ func (v *volumeManager) ensureStagingDir(vol *structs.CSIVolume) (bool, string, return !isNotMount, stagingPath, nil } +// stageVolume prepares a volume for use by allocations. When a plugin exposes +// the STAGE_UNSTAGE_VOLUME capability it MUST be called once-per-volume for a +// given usage mode before the volume can be NodePublish-ed. +func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume) error { + logger := hclog.FromContext(ctx) + logger.Trace("Preparing volume staging environment") + existingMount, stagingPath, err := v.ensureStagingDir(vol) + if err != nil { + return err + } + logger.Trace("Volume staging environment", "pre-existing_mount", existingMount, "staging_path", stagingPath) + + if existingMount { + logger.Debug("re-using existing staging mount for volume", "staging_path", stagingPath) + return nil + } + + var accessType csi.VolumeAccessType + switch vol.AttachmentMode { + case structs.CSIVolumeAttachmentModeBlockDevice: + accessType = csi.VolumeAccessTypeBlock + case structs.CSIVolumeAttachmentModeFilesystem: + accessType = csi.VolumeAccessTypeMount + default: + // These fields are validated during job submission, but here we perform a + // final check during transformation into the requisite CSI Data type to + // defend against development bugs and corrupted state - and incompatible + // nomad versions in the future. + return fmt.Errorf("Unknown volume attachment mode: %s", vol.AttachmentMode) + } + + var accessMode csi.VolumeAccessMode + switch vol.AccessMode { + case structs.CSIVolumeAccessModeSingleNodeReader: + accessMode = csi.VolumeAccessModeSingleNodeReaderOnly + case structs.CSIVolumeAccessModeSingleNodeWriter: + accessMode = csi.VolumeAccessModeSingleNodeWriter + case structs.CSIVolumeAccessModeMultiNodeMultiWriter: + accessMode = csi.VolumeAccessModeMultiNodeMultiWriter + case structs.CSIVolumeAccessModeMultiNodeSingleWriter: + accessMode = csi.VolumeAccessModeMultiNodeSingleWriter + case structs.CSIVolumeAccessModeMultiNodeReader: + accessMode = csi.VolumeAccessModeMultiNodeReaderOnly + default: + // These fields are validated during job submission, but here we perform a + // final check during transformation into the requisite CSI Data type to + // defend against development bugs and corrupted state - and incompatible + // nomad versions in the future. + return fmt.Errorf("Unknown volume access mode: %v", vol.AccessMode) + } + + // We currently treat all explicit CSI NodeStageVolume errors (aside from timeouts, codes.ResourceExhausted, and codes.Unavailable) + // as fatal. + // In the future, we can provide more useful error messages based on + // different types of error. For error documentation see: + // https://github.com/container-storage-interface/spec/blob/4731db0e0bc53238b93850f43ab05d9355df0fd9/spec.md#nodestagevolume-errors + return v.plugin.NodeStageVolume(ctx, + vol.ID, + nil, /* TODO: Get publishContext from Server */ + stagingPath, + &csi.VolumeCapability{ + AccessType: accessType, + AccessMode: accessMode, + VolumeMountOptions: &csi.VolumeMountOptions{ + // GH-7007: Currently we have no way to provide these + }, + }, + grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout), + grpc_retry.WithMax(3), + grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)), + ) +} + // MountVolume performs the steps required for using a given volume // configuration for the provided allocation. // // TODO: Validate remote volume attachment and implement. func (v *volumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation) (*MountInfo, error) { + logger := v.logger.With("volume_id", vol.ID) + ctx = hclog.WithContext(ctx, logger) + + if v.requiresStaging { + err := v.stageVolume(ctx, vol) + if err != nil { + return nil, err + } + } + return nil, fmt.Errorf("Unimplemented") } diff --git a/client/pluginmanager/csimanager/volume_test.go b/client/pluginmanager/csimanager/volume_test.go index f69fc5d1c..cdd70920b 100644 --- a/client/pluginmanager/csimanager/volume_test.go +++ b/client/pluginmanager/csimanager/volume_test.go @@ -1,6 +1,8 @@ package csimanager import ( + "context" + "errors" "io/ioutil" "os" "runtime" @@ -106,3 +108,72 @@ func TestVolumeManager_ensureStagingDir(t *testing.T) { }) } } + +func TestVolumeManager_stageVolume(t *testing.T) { + t.Parallel() + cases := []struct { + Name string + Volume *structs.CSIVolume + PluginErr error + ExpectedErr error + }{ + { + Name: "Returns an error when an invalid AttachmentMode is provided", + Volume: &structs.CSIVolume{ + ID: "foo", + AttachmentMode: "nonsense", + }, + ExpectedErr: errors.New("Unknown volume attachment mode: nonsense"), + }, + { + Name: "Returns an error when an invalid AccessMode is provided", + Volume: &structs.CSIVolume{ + ID: "foo", + AttachmentMode: structs.CSIVolumeAttachmentModeBlockDevice, + AccessMode: "nonsense", + }, + ExpectedErr: errors.New("Unknown volume access mode: nonsense"), + }, + { + Name: "Returns an error when the plugin returns an error", + Volume: &structs.CSIVolume{ + ID: "foo", + AttachmentMode: structs.CSIVolumeAttachmentModeBlockDevice, + AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter, + }, + PluginErr: errors.New("Some Unknown Error"), + ExpectedErr: errors.New("Some Unknown Error"), + }, + { + Name: "Happy Path", + Volume: &structs.CSIVolume{ + ID: "foo", + AttachmentMode: structs.CSIVolumeAttachmentModeBlockDevice, + AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter, + }, + PluginErr: nil, + ExpectedErr: nil, + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + tmpPath := tmpDir(t) + defer os.RemoveAll(tmpPath) + + csiFake := &csifake.Client{} + csiFake.NextNodeStageVolumeErr = tc.PluginErr + + manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, true) + ctx := context.Background() + + err := manager.stageVolume(ctx, tc.Volume) + + if tc.ExpectedErr != nil { + require.EqualError(t, err, tc.ExpectedErr.Error()) + } else { + require.NoError(t, err) + } + }) + } +}