2020-01-08 12:47:07 +00:00
|
|
|
package csimanager
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
2020-01-28 12:19:56 +00:00
|
|
|
"os"
|
|
|
|
"path/filepath"
|
2020-01-08 12:47:07 +00:00
|
|
|
"time"
|
|
|
|
|
2020-01-29 12:20:41 +00:00
|
|
|
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
|
2020-01-08 12:47:07 +00:00
|
|
|
"github.com/hashicorp/go-hclog"
|
2020-02-14 12:34:41 +00:00
|
|
|
"github.com/hashicorp/go-multierror"
|
2020-01-28 12:19:56 +00:00
|
|
|
"github.com/hashicorp/nomad/helper/mount"
|
2020-01-08 12:47:07 +00:00
|
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
|
|
"github.com/hashicorp/nomad/plugins/csi"
|
|
|
|
)
|
|
|
|
|
|
|
|
var _ VolumeMounter = &volumeManager{}
|
|
|
|
|
|
|
|
const (
|
|
|
|
DefaultMountActionTimeout = 2 * time.Minute
|
|
|
|
StagingDirName = "staging"
|
|
|
|
AllocSpecificDirName = "per-alloc"
|
|
|
|
)
|
|
|
|
|
|
|
|
// volumeManager handles the state of attached volumes for a given CSI Plugin.
|
|
|
|
//
|
|
|
|
// volumeManagers outlive the lifetime of a given allocation as volumes may be
|
|
|
|
// shared by multiple allocations on the same node.
|
|
|
|
//
|
|
|
|
// volumes are stored by an enriched volume usage struct as the CSI Spec requires
|
|
|
|
// slightly different usage based on the given usage model.
|
|
|
|
type volumeManager struct {
|
|
|
|
logger hclog.Logger
|
|
|
|
plugin csi.CSIPlugin
|
|
|
|
|
|
|
|
volumes map[string]interface{}
|
|
|
|
// volumesMu sync.Mutex
|
|
|
|
|
|
|
|
// mountRoot is the root of where plugin directories and mounts may be created
|
|
|
|
// e.g /opt/nomad.d/statedir/csi/my-csi-plugin/
|
|
|
|
mountRoot string
|
|
|
|
|
2020-02-06 14:26:29 +00:00
|
|
|
// containerMountPoint is the location _inside_ the plugin container that the
|
|
|
|
// `mountRoot` is bound in to.
|
|
|
|
containerMountPoint string
|
|
|
|
|
2020-01-08 12:47:07 +00:00
|
|
|
// requiresStaging shows whether the plugin requires that the volume manager
|
|
|
|
// calls NodeStageVolume and NodeUnstageVolume RPCs during setup and teardown
|
|
|
|
requiresStaging bool
|
|
|
|
}
|
|
|
|
|
2020-02-06 14:26:29 +00:00
|
|
|
func newVolumeManager(logger hclog.Logger, plugin csi.CSIPlugin, rootDir, containerRootDir string, requiresStaging bool) *volumeManager {
|
2020-01-08 12:47:07 +00:00
|
|
|
return &volumeManager{
|
2020-02-06 14:26:29 +00:00
|
|
|
logger: logger.Named("volume_manager"),
|
|
|
|
plugin: plugin,
|
|
|
|
mountRoot: rootDir,
|
|
|
|
containerMountPoint: containerRootDir,
|
|
|
|
requiresStaging: requiresStaging,
|
|
|
|
volumes: make(map[string]interface{}),
|
2020-01-08 12:47:07 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-02-17 11:10:12 +00:00
|
|
|
func (v *volumeManager) stagingDirForVolume(root string, vol *structs.CSIVolume, usage *UsageOptions) string {
|
|
|
|
return filepath.Join(root, StagingDirName, vol.ID, usage.ToFS())
|
2020-01-28 12:19:56 +00:00
|
|
|
}
|
|
|
|
|
2020-02-17 11:10:12 +00:00
|
|
|
func (v *volumeManager) allocDirForVolume(root string, vol *structs.CSIVolume, alloc *structs.Allocation, usage *UsageOptions) string {
|
|
|
|
return filepath.Join(root, AllocSpecificDirName, alloc.ID, vol.ID, usage.ToFS())
|
2020-01-31 13:45:48 +00:00
|
|
|
}
|
|
|
|
|
2020-01-28 12:19:56 +00:00
|
|
|
// ensureStagingDir attempts to create a directory for use when staging a volume
|
|
|
|
// and then validates that the path is not already a mount point for e.g an
|
|
|
|
// existing volume stage.
|
|
|
|
//
|
|
|
|
// Returns whether the directory is a pre-existing mountpoint, the staging path,
|
2020-01-29 12:20:41 +00:00
|
|
|
// and any errors that occurred.
|
2020-02-17 11:10:12 +00:00
|
|
|
func (v *volumeManager) ensureStagingDir(vol *structs.CSIVolume, usage *UsageOptions) (string, bool, error) {
|
|
|
|
stagingPath := v.stagingDirForVolume(v.mountRoot, vol, usage)
|
2020-01-28 12:19:56 +00:00
|
|
|
|
|
|
|
// Make the staging path, owned by the Nomad User
|
|
|
|
if err := os.MkdirAll(stagingPath, 0700); err != nil && !os.IsExist(err) {
|
2020-02-02 09:13:51 +00:00
|
|
|
return "", false, fmt.Errorf("failed to create staging directory for volume (%s): %v", vol.ID, err)
|
|
|
|
|
2020-01-28 12:19:56 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Validate that it is not already a mount point
|
|
|
|
m := mount.New()
|
|
|
|
isNotMount, err := m.IsNotAMountPoint(stagingPath)
|
|
|
|
if err != nil {
|
2020-02-02 09:13:51 +00:00
|
|
|
return "", false, fmt.Errorf("mount point detection failed for volume (%s): %v", vol.ID, err)
|
2020-01-28 12:19:56 +00:00
|
|
|
}
|
|
|
|
|
2020-02-02 09:13:51 +00:00
|
|
|
return stagingPath, !isNotMount, nil
|
2020-01-28 12:19:56 +00:00
|
|
|
}
|
|
|
|
|
2020-01-31 13:45:48 +00:00
|
|
|
// ensureAllocDir attempts to create a directory for use when publishing a volume
|
|
|
|
// and then validates that the path is not already a mount point (e.g when reattaching
|
|
|
|
// to existing allocs).
|
|
|
|
//
|
|
|
|
// Returns whether the directory is a pre-existing mountpoint, the publish path,
|
|
|
|
// and any errors that occurred.
|
2020-02-17 11:10:12 +00:00
|
|
|
func (v *volumeManager) ensureAllocDir(vol *structs.CSIVolume, alloc *structs.Allocation, usage *UsageOptions) (string, bool, error) {
|
|
|
|
allocPath := v.allocDirForVolume(v.mountRoot, vol, alloc, usage)
|
2020-01-31 13:45:48 +00:00
|
|
|
|
|
|
|
// Make the alloc path, owned by the Nomad User
|
|
|
|
if err := os.MkdirAll(allocPath, 0700); err != nil && !os.IsExist(err) {
|
2020-02-02 09:13:51 +00:00
|
|
|
return "", false, fmt.Errorf("failed to create allocation directory for volume (%s): %v", vol.ID, err)
|
2020-01-29 12:20:41 +00:00
|
|
|
}
|
|
|
|
|
2020-01-31 13:45:48 +00:00
|
|
|
// Validate that it is not already a mount point
|
|
|
|
m := mount.New()
|
|
|
|
isNotMount, err := m.IsNotAMountPoint(allocPath)
|
|
|
|
if err != nil {
|
2020-02-02 09:13:51 +00:00
|
|
|
return "", false, fmt.Errorf("mount point detection failed for volume (%s): %v", vol.ID, err)
|
2020-01-29 12:20:41 +00:00
|
|
|
}
|
|
|
|
|
2020-02-02 09:13:51 +00:00
|
|
|
return allocPath, !isNotMount, nil
|
2020-01-31 13:45:48 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// stageVolume prepares a volume for use by allocations. When a plugin exposes
|
|
|
|
// the STAGE_UNSTAGE_VOLUME capability it MUST be called once-per-volume for a
|
|
|
|
// given usage mode before the volume can be NodePublish-ed.
|
2020-02-17 11:10:12 +00:00
|
|
|
func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume, usage *UsageOptions) error {
|
2020-01-31 13:45:48 +00:00
|
|
|
logger := hclog.FromContext(ctx)
|
|
|
|
logger.Trace("Preparing volume staging environment")
|
2020-02-17 11:10:12 +00:00
|
|
|
hostStagingPath, isMount, err := v.ensureStagingDir(vol, usage)
|
2020-01-31 13:45:48 +00:00
|
|
|
if err != nil {
|
2020-02-06 14:26:29 +00:00
|
|
|
return err
|
2020-01-31 13:45:48 +00:00
|
|
|
}
|
2020-02-17 11:10:12 +00:00
|
|
|
pluginStagingPath := v.stagingDirForVolume(v.containerMountPoint, vol, usage)
|
2020-02-06 14:26:29 +00:00
|
|
|
|
|
|
|
logger.Trace("Volume staging environment", "pre-existing_mount", isMount, "host_staging_path", hostStagingPath, "plugin_staging_path", pluginStagingPath)
|
2020-01-31 13:45:48 +00:00
|
|
|
|
2020-02-02 09:13:51 +00:00
|
|
|
if isMount {
|
2020-02-06 14:26:29 +00:00
|
|
|
logger.Debug("re-using existing staging mount for volume", "staging_path", hostStagingPath)
|
|
|
|
return nil
|
2020-01-31 13:45:48 +00:00
|
|
|
}
|
|
|
|
|
2020-02-18 16:07:27 +00:00
|
|
|
capability, err := csi.VolumeCapabilityFromStructs(vol.AttachmentMode, vol.AccessMode)
|
2020-01-31 13:45:48 +00:00
|
|
|
if err != nil {
|
2020-02-06 14:26:29 +00:00
|
|
|
return err
|
2020-01-29 12:20:41 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// We currently treat all explicit CSI NodeStageVolume errors (aside from timeouts, codes.ResourceExhausted, and codes.Unavailable)
|
|
|
|
// as fatal.
|
|
|
|
// In the future, we can provide more useful error messages based on
|
|
|
|
// different types of error. For error documentation see:
|
|
|
|
// https://github.com/container-storage-interface/spec/blob/4731db0e0bc53238b93850f43ab05d9355df0fd9/spec.md#nodestagevolume-errors
|
2020-02-06 14:26:29 +00:00
|
|
|
return v.plugin.NodeStageVolume(ctx,
|
2020-01-29 12:20:41 +00:00
|
|
|
vol.ID,
|
|
|
|
nil, /* TODO: Get publishContext from Server */
|
2020-02-06 14:26:29 +00:00
|
|
|
pluginStagingPath,
|
2020-01-31 13:45:48 +00:00
|
|
|
capability,
|
|
|
|
grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout),
|
|
|
|
grpc_retry.WithMax(3),
|
|
|
|
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
2020-02-17 11:10:12 +00:00
|
|
|
func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usage *UsageOptions) (*MountInfo, error) {
|
2020-01-31 13:45:48 +00:00
|
|
|
logger := hclog.FromContext(ctx)
|
2020-02-06 14:26:29 +00:00
|
|
|
var pluginStagingPath string
|
|
|
|
if v.requiresStaging {
|
2020-02-17 11:10:12 +00:00
|
|
|
pluginStagingPath = v.stagingDirForVolume(v.containerMountPoint, vol, usage)
|
2020-02-06 14:26:29 +00:00
|
|
|
}
|
2020-01-31 13:45:48 +00:00
|
|
|
|
2020-02-17 11:10:12 +00:00
|
|
|
hostTargetPath, isMount, err := v.ensureAllocDir(vol, alloc, usage)
|
2020-01-31 13:45:48 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-02-17 11:10:12 +00:00
|
|
|
pluginTargetPath := v.allocDirForVolume(v.containerMountPoint, vol, alloc, usage)
|
2020-01-31 13:45:48 +00:00
|
|
|
|
2020-02-02 09:13:51 +00:00
|
|
|
if isMount {
|
2020-01-31 13:45:48 +00:00
|
|
|
logger.Debug("Re-using existing published volume for allocation")
|
2020-02-06 14:26:29 +00:00
|
|
|
return &MountInfo{Source: hostTargetPath}, nil
|
2020-01-31 13:45:48 +00:00
|
|
|
}
|
|
|
|
|
2020-02-18 16:07:27 +00:00
|
|
|
capabilities, err := csi.VolumeCapabilityFromStructs(vol.AttachmentMode, vol.AccessMode)
|
2020-01-31 13:45:48 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
err = v.plugin.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{
|
|
|
|
VolumeID: vol.ID,
|
|
|
|
PublishContext: nil, // TODO: get publishcontext from server
|
2020-02-06 14:26:29 +00:00
|
|
|
StagingTargetPath: pluginStagingPath,
|
|
|
|
TargetPath: pluginTargetPath,
|
2020-01-31 13:45:48 +00:00
|
|
|
VolumeCapability: capabilities,
|
2020-02-17 11:10:12 +00:00
|
|
|
Readonly: usage.ReadOnly,
|
2020-01-31 13:45:48 +00:00
|
|
|
},
|
2020-01-29 12:20:41 +00:00
|
|
|
grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout),
|
|
|
|
grpc_retry.WithMax(3),
|
|
|
|
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)),
|
|
|
|
)
|
2020-01-31 13:45:48 +00:00
|
|
|
|
2020-02-06 14:26:29 +00:00
|
|
|
return &MountInfo{Source: hostTargetPath}, err
|
2020-01-29 12:20:41 +00:00
|
|
|
}
|
|
|
|
|
2020-01-08 12:47:07 +00:00
|
|
|
// MountVolume performs the steps required for using a given volume
|
|
|
|
// configuration for the provided allocation.
|
|
|
|
//
|
|
|
|
// TODO: Validate remote volume attachment and implement.
|
2020-02-17 11:10:12 +00:00
|
|
|
func (v *volumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usage *UsageOptions) (*MountInfo, error) {
|
2020-01-31 11:11:40 +00:00
|
|
|
logger := v.logger.With("volume_id", vol.ID, "alloc_id", alloc.ID)
|
2020-01-29 12:20:41 +00:00
|
|
|
ctx = hclog.WithContext(ctx, logger)
|
|
|
|
|
|
|
|
if v.requiresStaging {
|
2020-02-17 11:10:12 +00:00
|
|
|
if err := v.stageVolume(ctx, vol, usage); err != nil {
|
2020-01-29 12:20:41 +00:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-02-17 11:10:12 +00:00
|
|
|
return v.publishVolume(ctx, vol, alloc, usage)
|
2020-01-08 12:47:07 +00:00
|
|
|
}
|
|
|
|
|
2020-01-31 11:11:40 +00:00
|
|
|
// unstageVolume is the inverse operation of `stageVolume` and must be called
|
|
|
|
// once for each staging path that a volume has been staged under.
|
|
|
|
// It is safe to call multiple times and a plugin is required to return OK if
|
|
|
|
// the volume has been unstaged or was never staged on the node.
|
2020-02-17 11:10:12 +00:00
|
|
|
func (v *volumeManager) unstageVolume(ctx context.Context, vol *structs.CSIVolume, usage *UsageOptions) error {
|
2020-01-31 11:11:40 +00:00
|
|
|
logger := hclog.FromContext(ctx)
|
|
|
|
logger.Trace("Unstaging volume")
|
2020-02-17 11:10:12 +00:00
|
|
|
stagingPath := v.stagingDirForVolume(v.containerMountPoint, vol, usage)
|
2020-01-31 11:11:40 +00:00
|
|
|
return v.plugin.NodeUnstageVolume(ctx,
|
|
|
|
vol.ID,
|
|
|
|
stagingPath,
|
|
|
|
grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout),
|
|
|
|
grpc_retry.WithMax(3),
|
|
|
|
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
2020-02-14 12:34:41 +00:00
|
|
|
func combineErrors(maybeErrs ...error) error {
|
|
|
|
var result *multierror.Error
|
|
|
|
for _, err := range maybeErrs {
|
|
|
|
if err == nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
result = multierror.Append(result, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return result.ErrorOrNil()
|
|
|
|
}
|
|
|
|
|
2020-02-17 11:10:12 +00:00
|
|
|
func (v *volumeManager) unpublishVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usage *UsageOptions) error {
|
|
|
|
pluginTargetPath := v.allocDirForVolume(v.containerMountPoint, vol, alloc, usage)
|
2020-02-14 12:34:41 +00:00
|
|
|
|
|
|
|
rpcErr := v.plugin.NodeUnpublishVolume(ctx, vol.ID, pluginTargetPath,
|
|
|
|
grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout),
|
|
|
|
grpc_retry.WithMax(3),
|
|
|
|
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)),
|
|
|
|
)
|
|
|
|
|
2020-02-17 11:10:12 +00:00
|
|
|
hostTargetPath := v.allocDirForVolume(v.mountRoot, vol, alloc, usage)
|
2020-02-14 12:34:41 +00:00
|
|
|
if _, err := os.Stat(hostTargetPath); os.IsNotExist(err) {
|
|
|
|
// Host Target Path already got destroyed, just return any rpcErr
|
|
|
|
return rpcErr
|
|
|
|
}
|
|
|
|
|
|
|
|
// Host Target Path was not cleaned up, attempt to do so here. If it's still
|
|
|
|
// a mount then removing the dir will fail and we'll return any rpcErr and the
|
|
|
|
// file error.
|
|
|
|
rmErr := os.Remove(hostTargetPath)
|
|
|
|
if rmErr != nil {
|
|
|
|
return combineErrors(rpcErr, rmErr)
|
|
|
|
}
|
|
|
|
|
|
|
|
// We successfully removed the directory, return any rpcErrors that were
|
|
|
|
// encountered, but because we got here, they were probably flaky or was
|
|
|
|
// cleaned up externally. We might want to just return `nil` here in the
|
|
|
|
// future.
|
|
|
|
return rpcErr
|
|
|
|
}
|
|
|
|
|
2020-02-17 11:10:12 +00:00
|
|
|
func (v *volumeManager) UnmountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usage *UsageOptions) error {
|
2020-01-31 11:11:40 +00:00
|
|
|
logger := v.logger.With("volume_id", vol.ID, "alloc_id", alloc.ID)
|
|
|
|
ctx = hclog.WithContext(ctx, logger)
|
|
|
|
|
2020-02-17 11:10:12 +00:00
|
|
|
err := v.unpublishVolume(ctx, vol, alloc, usage)
|
2020-02-14 12:34:41 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2020-01-31 11:11:40 +00:00
|
|
|
|
|
|
|
if !v.requiresStaging {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO(GH-7029): Implement volume usage tracking and only unstage volumes
|
|
|
|
// when the last alloc stops using it.
|
2020-02-17 11:10:12 +00:00
|
|
|
return v.unstageVolume(ctx, vol, usage)
|
2020-01-08 12:47:07 +00:00
|
|
|
}
|