From 3bff9fefae53840617ed476c180d2c2792aad36e Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Thu, 6 Feb 2020 15:26:29 +0100 Subject: [PATCH] csi: Provide plugin-scoped paths during RPCs When providing paths to plugins, the path needs to be in the scope of the plugins container, rather than that of the host. Here we enable that by providing the mount point through the plugin registration and then use it when constructing request target paths. --- .../taskrunner/plugin_supervisor_hook.go | 3 +- client/pluginmanager/csimanager/instance.go | 7 +- client/pluginmanager/csimanager/volume.go | 76 ++++++++++--------- .../pluginmanager/csimanager/volume_test.go | 14 ++-- 4 files changed, 57 insertions(+), 43 deletions(-) diff --git a/client/allocrunner/taskrunner/plugin_supervisor_hook.go b/client/allocrunner/taskrunner/plugin_supervisor_hook.go index 6c63c4c96..e66cb95e4 100644 --- a/client/allocrunner/taskrunner/plugin_supervisor_hook.go +++ b/client/allocrunner/taskrunner/plugin_supervisor_hook.go @@ -258,7 +258,8 @@ func (h *csiPluginSupervisorHook) registerPlugin(socketPath string) (func(), err SocketPath: socketPath, }, Options: map[string]string{ - "MountPoint": h.mountPoint, + "MountPoint": h.mountPoint, + "ContainerMountPoint": h.task.CSIPluginConfig.MountDir, }, } } diff --git a/client/pluginmanager/csimanager/instance.go b/client/pluginmanager/csimanager/instance.go index 736c76c56..bc6c63f0f 100644 --- a/client/pluginmanager/csimanager/instance.go +++ b/client/pluginmanager/csimanager/instance.go @@ -27,6 +27,10 @@ type instanceManager struct { // stored and where mount points will be created mountPoint string + // containerMountPoint is the location _inside_ the plugin container that the + // `mountPoint` is bound in to. + containerMountPoint string + fp *pluginFingerprinter volumeManager *volumeManager @@ -51,7 +55,8 @@ func newInstanceManager(logger hclog.Logger, updater UpdateNodeCSIInfoFunc, p *d hadFirstSuccessfulFingerprintCh: make(chan struct{}), }, - mountPoint: p.Options["MountPoint"], + mountPoint: p.Options["MountPoint"], + containerMountPoint: p.Options["ContainerMountPoint"], volumeManagerSetupCh: make(chan struct{}), diff --git a/client/pluginmanager/csimanager/volume.go b/client/pluginmanager/csimanager/volume.go index f5a107ec1..2650e2fa0 100644 --- a/client/pluginmanager/csimanager/volume.go +++ b/client/pluginmanager/csimanager/volume.go @@ -40,27 +40,32 @@ type volumeManager struct { // e.g /opt/nomad.d/statedir/csi/my-csi-plugin/ mountRoot string + // containerMountPoint is the location _inside_ the plugin container that the + // `mountRoot` is bound in to. + containerMountPoint string + // requiresStaging shows whether the plugin requires that the volume manager // calls NodeStageVolume and NodeUnstageVolume RPCs during setup and teardown requiresStaging bool } -func newVolumeManager(logger hclog.Logger, plugin csi.CSIPlugin, rootDir string, requiresStaging bool) *volumeManager { +func newVolumeManager(logger hclog.Logger, plugin csi.CSIPlugin, rootDir, containerRootDir string, requiresStaging bool) *volumeManager { return &volumeManager{ - logger: logger.Named("volume_manager"), - plugin: plugin, - mountRoot: rootDir, - requiresStaging: requiresStaging, - volumes: make(map[string]interface{}), + logger: logger.Named("volume_manager"), + plugin: plugin, + mountRoot: rootDir, + containerMountPoint: containerRootDir, + requiresStaging: requiresStaging, + volumes: make(map[string]interface{}), } } -func (v *volumeManager) stagingDirForVolume(vol *structs.CSIVolume) string { - return filepath.Join(v.mountRoot, StagingDirName, vol.ID, "todo-provide-usage-options") +func (v *volumeManager) stagingDirForVolume(root string, vol *structs.CSIVolume) string { + return filepath.Join(root, StagingDirName, vol.ID, "todo-provide-usage-options") } -func (v *volumeManager) allocDirForVolume(vol *structs.CSIVolume, alloc *structs.Allocation) string { - return filepath.Join(v.mountRoot, AllocSpecificDirName, alloc.ID, vol.ID, "todo-provide-usage-options") +func (v *volumeManager) allocDirForVolume(root string, vol *structs.CSIVolume, alloc *structs.Allocation) string { + return filepath.Join(root, AllocSpecificDirName, alloc.ID, vol.ID, "todo-provide-usage-options") } // ensureStagingDir attempts to create a directory for use when staging a volume @@ -70,7 +75,7 @@ func (v *volumeManager) allocDirForVolume(vol *structs.CSIVolume, alloc *structs // Returns whether the directory is a pre-existing mountpoint, the staging path, // and any errors that occurred. func (v *volumeManager) ensureStagingDir(vol *structs.CSIVolume) (string, bool, error) { - stagingPath := v.stagingDirForVolume(vol) + stagingPath := v.stagingDirForVolume(v.mountRoot, vol) // Make the staging path, owned by the Nomad User if err := os.MkdirAll(stagingPath, 0700); err != nil && !os.IsExist(err) { @@ -95,7 +100,7 @@ func (v *volumeManager) ensureStagingDir(vol *structs.CSIVolume) (string, bool, // Returns whether the directory is a pre-existing mountpoint, the publish path, // and any errors that occurred. func (v *volumeManager) ensureAllocDir(vol *structs.CSIVolume, alloc *structs.Allocation) (string, bool, error) { - allocPath := v.allocDirForVolume(vol, alloc) + allocPath := v.allocDirForVolume(v.mountRoot, vol, alloc) // Make the alloc path, owned by the Nomad User if err := os.MkdirAll(allocPath, 0700); err != nil && !os.IsExist(err) { @@ -159,23 +164,25 @@ func capabilitiesFromVolume(vol *structs.CSIVolume) (*csi.VolumeCapability, erro // stageVolume prepares a volume for use by allocations. When a plugin exposes // the STAGE_UNSTAGE_VOLUME capability it MUST be called once-per-volume for a // given usage mode before the volume can be NodePublish-ed. -func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume) (string, error) { +func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume) error { logger := hclog.FromContext(ctx) logger.Trace("Preparing volume staging environment") - stagingPath, isMount, err := v.ensureStagingDir(vol) + hostStagingPath, isMount, err := v.ensureStagingDir(vol) if err != nil { - return "", err + return err } - logger.Trace("Volume staging environment", "pre-existing_mount", isMount, "staging_path", stagingPath) + pluginStagingPath := v.stagingDirForVolume(v.containerMountPoint, vol) + + logger.Trace("Volume staging environment", "pre-existing_mount", isMount, "host_staging_path", hostStagingPath, "plugin_staging_path", pluginStagingPath) if isMount { - logger.Debug("re-using existing staging mount for volume", "staging_path", stagingPath) - return stagingPath, nil + logger.Debug("re-using existing staging mount for volume", "staging_path", hostStagingPath) + return nil } capability, err := capabilitiesFromVolume(vol) if err != nil { - return "", err + return err } // We currently treat all explicit CSI NodeStageVolume errors (aside from timeouts, codes.ResourceExhausted, and codes.Unavailable) @@ -183,10 +190,10 @@ func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume) // In the future, we can provide more useful error messages based on // different types of error. For error documentation see: // https://github.com/container-storage-interface/spec/blob/4731db0e0bc53238b93850f43ab05d9355df0fd9/spec.md#nodestagevolume-errors - return stagingPath, v.plugin.NodeStageVolume(ctx, + return v.plugin.NodeStageVolume(ctx, vol.ID, nil, /* TODO: Get publishContext from Server */ - stagingPath, + pluginStagingPath, capability, grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout), grpc_retry.WithMax(3), @@ -194,17 +201,22 @@ func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume) ) } -func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, stagingPath string) (*MountInfo, error) { +func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation) (*MountInfo, error) { logger := hclog.FromContext(ctx) + var pluginStagingPath string + if v.requiresStaging { + pluginStagingPath = v.stagingDirForVolume(v.containerMountPoint, vol) + } - targetPath, isMount, err := v.ensureAllocDir(vol, alloc) + hostTargetPath, isMount, err := v.ensureAllocDir(vol, alloc) if err != nil { return nil, err } + pluginTargetPath := v.allocDirForVolume(v.containerMountPoint, vol, alloc) if isMount { logger.Debug("Re-using existing published volume for allocation") - return &MountInfo{Source: targetPath}, nil + return &MountInfo{Source: hostTargetPath}, nil } capabilities, err := capabilitiesFromVolume(vol) @@ -215,8 +227,8 @@ func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolum err = v.plugin.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{ VolumeID: vol.ID, PublishContext: nil, // TODO: get publishcontext from server - StagingTargetPath: stagingPath, - TargetPath: targetPath, + StagingTargetPath: pluginStagingPath, + TargetPath: pluginTargetPath, VolumeCapability: capabilities, }, grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout), @@ -224,7 +236,7 @@ func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolum grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)), ) - return &MountInfo{Source: targetPath}, err + return &MountInfo{Source: hostTargetPath}, err } // MountVolume performs the steps required for using a given volume @@ -235,17 +247,13 @@ func (v *volumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume, logger := v.logger.With("volume_id", vol.ID, "alloc_id", alloc.ID) ctx = hclog.WithContext(ctx, logger) - var stagingPath string - var err error - if v.requiresStaging { - stagingPath, err = v.stageVolume(ctx, vol) - if err != nil { + if err := v.stageVolume(ctx, vol); err != nil { return nil, err } } - return v.publishVolume(ctx, vol, alloc, stagingPath) + return v.publishVolume(ctx, vol, alloc) } // unstageVolume is the inverse operation of `stageVolume` and must be called @@ -255,7 +263,7 @@ func (v *volumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume, func (v *volumeManager) unstageVolume(ctx context.Context, vol *structs.CSIVolume) error { logger := hclog.FromContext(ctx) logger.Trace("Unstaging volume") - stagingPath := v.stagingDirForVolume(vol) + stagingPath := v.stagingDirForVolume(v.containerMountPoint, vol) return v.plugin.NodeUnstageVolume(ctx, vol.ID, stagingPath, diff --git a/client/pluginmanager/csimanager/volume_test.go b/client/pluginmanager/csimanager/volume_test.go index 10c828ca7..b5b74c142 100644 --- a/client/pluginmanager/csimanager/volume_test.go +++ b/client/pluginmanager/csimanager/volume_test.go @@ -74,8 +74,8 @@ func TestVolumeManager_ensureStagingDir(t *testing.T) { defer os.RemoveAll(tmpPath) csiFake := &csifake.Client{} - manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, true) - expectedStagingPath := manager.stagingDirForVolume(tc.Volume) + manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, tmpPath, true) + expectedStagingPath := manager.stagingDirForVolume(tmpPath, tc.Volume) if tc.CreateDirAheadOfTime { err := os.MkdirAll(expectedStagingPath, 0700) @@ -164,10 +164,10 @@ func TestVolumeManager_stageVolume(t *testing.T) { csiFake := &csifake.Client{} csiFake.NextNodeStageVolumeErr = tc.PluginErr - manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, true) + manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, tmpPath, true) ctx := context.Background() - _, err := manager.stageVolume(ctx, tc.Volume) + err := manager.stageVolume(ctx, tc.Volume) if tc.ExpectedErr != nil { require.EqualError(t, err, tc.ExpectedErr.Error()) @@ -215,7 +215,7 @@ func TestVolumeManager_unstageVolume(t *testing.T) { csiFake := &csifake.Client{} csiFake.NextNodeUnstageVolumeErr = tc.PluginErr - manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, true) + manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, tmpPath, true) ctx := context.Background() err := manager.unstageVolume(ctx, tc.Volume) @@ -275,10 +275,10 @@ func TestVolumeManager_publishVolume(t *testing.T) { csiFake := &csifake.Client{} csiFake.NextNodePublishVolumeErr = tc.PluginErr - manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, true) + manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, tmpPath, true) ctx := context.Background() - _, err := manager.publishVolume(ctx, tc.Volume, tc.Allocation, "") + _, err := manager.publishVolume(ctx, tc.Volume, tc.Allocation) if tc.ExpectedErr != nil { require.EqualError(t, err, tc.ExpectedErr.Error())