diff --git a/client/allocrunner/taskrunner/plugin_supervisor_hook.go b/client/allocrunner/taskrunner/plugin_supervisor_hook.go index 6c63c4c96..e66cb95e4 100644 --- a/client/allocrunner/taskrunner/plugin_supervisor_hook.go +++ b/client/allocrunner/taskrunner/plugin_supervisor_hook.go @@ -258,7 +258,8 @@ func (h *csiPluginSupervisorHook) registerPlugin(socketPath string) (func(), err SocketPath: socketPath, }, Options: map[string]string{ - "MountPoint": h.mountPoint, + "MountPoint": h.mountPoint, + "ContainerMountPoint": h.task.CSIPluginConfig.MountDir, }, } } diff --git a/client/pluginmanager/csimanager/instance.go b/client/pluginmanager/csimanager/instance.go index 736c76c56..bc6c63f0f 100644 --- a/client/pluginmanager/csimanager/instance.go +++ b/client/pluginmanager/csimanager/instance.go @@ -27,6 +27,10 @@ type instanceManager struct { // stored and where mount points will be created mountPoint string + // containerMountPoint is the location _inside_ the plugin container that the + // `mountPoint` is bound in to. + containerMountPoint string + fp *pluginFingerprinter volumeManager *volumeManager @@ -51,7 +55,8 @@ func newInstanceManager(logger hclog.Logger, updater UpdateNodeCSIInfoFunc, p *d hadFirstSuccessfulFingerprintCh: make(chan struct{}), }, - mountPoint: p.Options["MountPoint"], + mountPoint: p.Options["MountPoint"], + containerMountPoint: p.Options["ContainerMountPoint"], volumeManagerSetupCh: make(chan struct{}), diff --git a/client/pluginmanager/csimanager/volume.go b/client/pluginmanager/csimanager/volume.go index f5a107ec1..2650e2fa0 100644 --- a/client/pluginmanager/csimanager/volume.go +++ b/client/pluginmanager/csimanager/volume.go @@ -40,27 +40,32 @@ type volumeManager struct { // e.g /opt/nomad.d/statedir/csi/my-csi-plugin/ mountRoot string + // containerMountPoint is the location _inside_ the plugin container that the + // `mountRoot` is bound in to. + containerMountPoint string + // requiresStaging shows whether the plugin requires that the volume manager // calls NodeStageVolume and NodeUnstageVolume RPCs during setup and teardown requiresStaging bool } -func newVolumeManager(logger hclog.Logger, plugin csi.CSIPlugin, rootDir string, requiresStaging bool) *volumeManager { +func newVolumeManager(logger hclog.Logger, plugin csi.CSIPlugin, rootDir, containerRootDir string, requiresStaging bool) *volumeManager { return &volumeManager{ - logger: logger.Named("volume_manager"), - plugin: plugin, - mountRoot: rootDir, - requiresStaging: requiresStaging, - volumes: make(map[string]interface{}), + logger: logger.Named("volume_manager"), + plugin: plugin, + mountRoot: rootDir, + containerMountPoint: containerRootDir, + requiresStaging: requiresStaging, + volumes: make(map[string]interface{}), } } -func (v *volumeManager) stagingDirForVolume(vol *structs.CSIVolume) string { - return filepath.Join(v.mountRoot, StagingDirName, vol.ID, "todo-provide-usage-options") +func (v *volumeManager) stagingDirForVolume(root string, vol *structs.CSIVolume) string { + return filepath.Join(root, StagingDirName, vol.ID, "todo-provide-usage-options") } -func (v *volumeManager) allocDirForVolume(vol *structs.CSIVolume, alloc *structs.Allocation) string { - return filepath.Join(v.mountRoot, AllocSpecificDirName, alloc.ID, vol.ID, "todo-provide-usage-options") +func (v *volumeManager) allocDirForVolume(root string, vol *structs.CSIVolume, alloc *structs.Allocation) string { + return filepath.Join(root, AllocSpecificDirName, alloc.ID, vol.ID, "todo-provide-usage-options") } // ensureStagingDir attempts to create a directory for use when staging a volume @@ -70,7 +75,7 @@ func (v *volumeManager) allocDirForVolume(vol *structs.CSIVolume, alloc *structs // Returns whether the directory is a pre-existing mountpoint, the staging path, // and any errors that occurred. func (v *volumeManager) ensureStagingDir(vol *structs.CSIVolume) (string, bool, error) { - stagingPath := v.stagingDirForVolume(vol) + stagingPath := v.stagingDirForVolume(v.mountRoot, vol) // Make the staging path, owned by the Nomad User if err := os.MkdirAll(stagingPath, 0700); err != nil && !os.IsExist(err) { @@ -95,7 +100,7 @@ func (v *volumeManager) ensureStagingDir(vol *structs.CSIVolume) (string, bool, // Returns whether the directory is a pre-existing mountpoint, the publish path, // and any errors that occurred. func (v *volumeManager) ensureAllocDir(vol *structs.CSIVolume, alloc *structs.Allocation) (string, bool, error) { - allocPath := v.allocDirForVolume(vol, alloc) + allocPath := v.allocDirForVolume(v.mountRoot, vol, alloc) // Make the alloc path, owned by the Nomad User if err := os.MkdirAll(allocPath, 0700); err != nil && !os.IsExist(err) { @@ -159,23 +164,25 @@ func capabilitiesFromVolume(vol *structs.CSIVolume) (*csi.VolumeCapability, erro // stageVolume prepares a volume for use by allocations. When a plugin exposes // the STAGE_UNSTAGE_VOLUME capability it MUST be called once-per-volume for a // given usage mode before the volume can be NodePublish-ed. -func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume) (string, error) { +func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume) error { logger := hclog.FromContext(ctx) logger.Trace("Preparing volume staging environment") - stagingPath, isMount, err := v.ensureStagingDir(vol) + hostStagingPath, isMount, err := v.ensureStagingDir(vol) if err != nil { - return "", err + return err } - logger.Trace("Volume staging environment", "pre-existing_mount", isMount, "staging_path", stagingPath) + pluginStagingPath := v.stagingDirForVolume(v.containerMountPoint, vol) + + logger.Trace("Volume staging environment", "pre-existing_mount", isMount, "host_staging_path", hostStagingPath, "plugin_staging_path", pluginStagingPath) if isMount { - logger.Debug("re-using existing staging mount for volume", "staging_path", stagingPath) - return stagingPath, nil + logger.Debug("re-using existing staging mount for volume", "staging_path", hostStagingPath) + return nil } capability, err := capabilitiesFromVolume(vol) if err != nil { - return "", err + return err } // We currently treat all explicit CSI NodeStageVolume errors (aside from timeouts, codes.ResourceExhausted, and codes.Unavailable) @@ -183,10 +190,10 @@ func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume) // In the future, we can provide more useful error messages based on // different types of error. For error documentation see: // https://github.com/container-storage-interface/spec/blob/4731db0e0bc53238b93850f43ab05d9355df0fd9/spec.md#nodestagevolume-errors - return stagingPath, v.plugin.NodeStageVolume(ctx, + return v.plugin.NodeStageVolume(ctx, vol.ID, nil, /* TODO: Get publishContext from Server */ - stagingPath, + pluginStagingPath, capability, grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout), grpc_retry.WithMax(3), @@ -194,17 +201,22 @@ func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume) ) } -func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, stagingPath string) (*MountInfo, error) { +func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation) (*MountInfo, error) { logger := hclog.FromContext(ctx) + var pluginStagingPath string + if v.requiresStaging { + pluginStagingPath = v.stagingDirForVolume(v.containerMountPoint, vol) + } - targetPath, isMount, err := v.ensureAllocDir(vol, alloc) + hostTargetPath, isMount, err := v.ensureAllocDir(vol, alloc) if err != nil { return nil, err } + pluginTargetPath := v.allocDirForVolume(v.containerMountPoint, vol, alloc) if isMount { logger.Debug("Re-using existing published volume for allocation") - return &MountInfo{Source: targetPath}, nil + return &MountInfo{Source: hostTargetPath}, nil } capabilities, err := capabilitiesFromVolume(vol) @@ -215,8 +227,8 @@ func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolum err = v.plugin.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{ VolumeID: vol.ID, PublishContext: nil, // TODO: get publishcontext from server - StagingTargetPath: stagingPath, - TargetPath: targetPath, + StagingTargetPath: pluginStagingPath, + TargetPath: pluginTargetPath, VolumeCapability: capabilities, }, grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout), @@ -224,7 +236,7 @@ func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolum grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)), ) - return &MountInfo{Source: targetPath}, err + return &MountInfo{Source: hostTargetPath}, err } // MountVolume performs the steps required for using a given volume @@ -235,17 +247,13 @@ func (v *volumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume, logger := v.logger.With("volume_id", vol.ID, "alloc_id", alloc.ID) ctx = hclog.WithContext(ctx, logger) - var stagingPath string - var err error - if v.requiresStaging { - stagingPath, err = v.stageVolume(ctx, vol) - if err != nil { + if err := v.stageVolume(ctx, vol); err != nil { return nil, err } } - return v.publishVolume(ctx, vol, alloc, stagingPath) + return v.publishVolume(ctx, vol, alloc) } // unstageVolume is the inverse operation of `stageVolume` and must be called @@ -255,7 +263,7 @@ func (v *volumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume, func (v *volumeManager) unstageVolume(ctx context.Context, vol *structs.CSIVolume) error { logger := hclog.FromContext(ctx) logger.Trace("Unstaging volume") - stagingPath := v.stagingDirForVolume(vol) + stagingPath := v.stagingDirForVolume(v.containerMountPoint, vol) return v.plugin.NodeUnstageVolume(ctx, vol.ID, stagingPath, diff --git a/client/pluginmanager/csimanager/volume_test.go b/client/pluginmanager/csimanager/volume_test.go index 10c828ca7..b5b74c142 100644 --- a/client/pluginmanager/csimanager/volume_test.go +++ b/client/pluginmanager/csimanager/volume_test.go @@ -74,8 +74,8 @@ func TestVolumeManager_ensureStagingDir(t *testing.T) { defer os.RemoveAll(tmpPath) csiFake := &csifake.Client{} - manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, true) - expectedStagingPath := manager.stagingDirForVolume(tc.Volume) + manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, tmpPath, true) + expectedStagingPath := manager.stagingDirForVolume(tmpPath, tc.Volume) if tc.CreateDirAheadOfTime { err := os.MkdirAll(expectedStagingPath, 0700) @@ -164,10 +164,10 @@ func TestVolumeManager_stageVolume(t *testing.T) { csiFake := &csifake.Client{} csiFake.NextNodeStageVolumeErr = tc.PluginErr - manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, true) + manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, tmpPath, true) ctx := context.Background() - _, err := manager.stageVolume(ctx, tc.Volume) + err := manager.stageVolume(ctx, tc.Volume) if tc.ExpectedErr != nil { require.EqualError(t, err, tc.ExpectedErr.Error()) @@ -215,7 +215,7 @@ func TestVolumeManager_unstageVolume(t *testing.T) { csiFake := &csifake.Client{} csiFake.NextNodeUnstageVolumeErr = tc.PluginErr - manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, true) + manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, tmpPath, true) ctx := context.Background() err := manager.unstageVolume(ctx, tc.Volume) @@ -275,10 +275,10 @@ func TestVolumeManager_publishVolume(t *testing.T) { csiFake := &csifake.Client{} csiFake.NextNodePublishVolumeErr = tc.PluginErr - manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, true) + manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, tmpPath, true) ctx := context.Background() - _, err := manager.publishVolume(ctx, tc.Volume, tc.Allocation, "") + _, err := manager.publishVolume(ctx, tc.Volume, tc.Allocation) if tc.ExpectedErr != nil { require.EqualError(t, err, tc.ExpectedErr.Error())