volume_manager: NodeStageVolume Support
This commit introduces support for staging volumes when a plugin
implements the STAGE_UNSTAGE_VOLUME capability.
See the following for further reference material:
4731db0e0b/spec.md (nodestagevolume)
This commit is contained in:
parent
65d9ddc9af
commit
6e71baa77d
|
@ -7,6 +7,7 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
"github.com/hashicorp/nomad/helper/mount"
|
"github.com/hashicorp/nomad/helper/mount"
|
||||||
"github.com/hashicorp/nomad/nomad/structs"
|
"github.com/hashicorp/nomad/nomad/structs"
|
||||||
|
@ -63,7 +64,7 @@ func (v *volumeManager) stagingDirForVolume(vol *structs.CSIVolume) string {
|
||||||
// existing volume stage.
|
// existing volume stage.
|
||||||
//
|
//
|
||||||
// Returns whether the directory is a pre-existing mountpoint, the staging path,
|
// Returns whether the directory is a pre-existing mountpoint, the staging path,
|
||||||
// and any errors that occured.
|
// and any errors that occurred.
|
||||||
func (v *volumeManager) ensureStagingDir(vol *structs.CSIVolume) (bool, string, error) {
|
func (v *volumeManager) ensureStagingDir(vol *structs.CSIVolume) (bool, string, error) {
|
||||||
stagingPath := v.stagingDirForVolume(vol)
|
stagingPath := v.stagingDirForVolume(vol)
|
||||||
|
|
||||||
|
@ -82,11 +83,94 @@ func (v *volumeManager) ensureStagingDir(vol *structs.CSIVolume) (bool, string,
|
||||||
return !isNotMount, stagingPath, nil
|
return !isNotMount, stagingPath, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// stageVolume prepares a volume for use by allocations. When a plugin exposes
|
||||||
|
// the STAGE_UNSTAGE_VOLUME capability it MUST be called once-per-volume for a
|
||||||
|
// given usage mode before the volume can be NodePublish-ed.
|
||||||
|
func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume) error {
|
||||||
|
logger := hclog.FromContext(ctx)
|
||||||
|
logger.Trace("Preparing volume staging environment")
|
||||||
|
existingMount, stagingPath, err := v.ensureStagingDir(vol)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
logger.Trace("Volume staging environment", "pre-existing_mount", existingMount, "staging_path", stagingPath)
|
||||||
|
|
||||||
|
if existingMount {
|
||||||
|
logger.Debug("re-using existing staging mount for volume", "staging_path", stagingPath)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var accessType csi.VolumeAccessType
|
||||||
|
switch vol.AttachmentMode {
|
||||||
|
case structs.CSIVolumeAttachmentModeBlockDevice:
|
||||||
|
accessType = csi.VolumeAccessTypeBlock
|
||||||
|
case structs.CSIVolumeAttachmentModeFilesystem:
|
||||||
|
accessType = csi.VolumeAccessTypeMount
|
||||||
|
default:
|
||||||
|
// These fields are validated during job submission, but here we perform a
|
||||||
|
// final check during transformation into the requisite CSI Data type to
|
||||||
|
// defend against development bugs and corrupted state - and incompatible
|
||||||
|
// nomad versions in the future.
|
||||||
|
return fmt.Errorf("Unknown volume attachment mode: %s", vol.AttachmentMode)
|
||||||
|
}
|
||||||
|
|
||||||
|
var accessMode csi.VolumeAccessMode
|
||||||
|
switch vol.AccessMode {
|
||||||
|
case structs.CSIVolumeAccessModeSingleNodeReader:
|
||||||
|
accessMode = csi.VolumeAccessModeSingleNodeReaderOnly
|
||||||
|
case structs.CSIVolumeAccessModeSingleNodeWriter:
|
||||||
|
accessMode = csi.VolumeAccessModeSingleNodeWriter
|
||||||
|
case structs.CSIVolumeAccessModeMultiNodeMultiWriter:
|
||||||
|
accessMode = csi.VolumeAccessModeMultiNodeMultiWriter
|
||||||
|
case structs.CSIVolumeAccessModeMultiNodeSingleWriter:
|
||||||
|
accessMode = csi.VolumeAccessModeMultiNodeSingleWriter
|
||||||
|
case structs.CSIVolumeAccessModeMultiNodeReader:
|
||||||
|
accessMode = csi.VolumeAccessModeMultiNodeReaderOnly
|
||||||
|
default:
|
||||||
|
// These fields are validated during job submission, but here we perform a
|
||||||
|
// final check during transformation into the requisite CSI Data type to
|
||||||
|
// defend against development bugs and corrupted state - and incompatible
|
||||||
|
// nomad versions in the future.
|
||||||
|
return fmt.Errorf("Unknown volume access mode: %v", vol.AccessMode)
|
||||||
|
}
|
||||||
|
|
||||||
|
// We currently treat all explicit CSI NodeStageVolume errors (aside from timeouts, codes.ResourceExhausted, and codes.Unavailable)
|
||||||
|
// as fatal.
|
||||||
|
// In the future, we can provide more useful error messages based on
|
||||||
|
// different types of error. For error documentation see:
|
||||||
|
// https://github.com/container-storage-interface/spec/blob/4731db0e0bc53238b93850f43ab05d9355df0fd9/spec.md#nodestagevolume-errors
|
||||||
|
return v.plugin.NodeStageVolume(ctx,
|
||||||
|
vol.ID,
|
||||||
|
nil, /* TODO: Get publishContext from Server */
|
||||||
|
stagingPath,
|
||||||
|
&csi.VolumeCapability{
|
||||||
|
AccessType: accessType,
|
||||||
|
AccessMode: accessMode,
|
||||||
|
VolumeMountOptions: &csi.VolumeMountOptions{
|
||||||
|
// GH-7007: Currently we have no way to provide these
|
||||||
|
},
|
||||||
|
},
|
||||||
|
grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout),
|
||||||
|
grpc_retry.WithMax(3),
|
||||||
|
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
// MountVolume performs the steps required for using a given volume
|
// MountVolume performs the steps required for using a given volume
|
||||||
// configuration for the provided allocation.
|
// configuration for the provided allocation.
|
||||||
//
|
//
|
||||||
// TODO: Validate remote volume attachment and implement.
|
// TODO: Validate remote volume attachment and implement.
|
||||||
func (v *volumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation) (*MountInfo, error) {
|
func (v *volumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation) (*MountInfo, error) {
|
||||||
|
logger := v.logger.With("volume_id", vol.ID)
|
||||||
|
ctx = hclog.WithContext(ctx, logger)
|
||||||
|
|
||||||
|
if v.requiresStaging {
|
||||||
|
err := v.stageVolume(ctx, vol)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil, fmt.Errorf("Unimplemented")
|
return nil, fmt.Errorf("Unimplemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
package csimanager
|
package csimanager
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
@ -106,3 +108,72 @@ func TestVolumeManager_ensureStagingDir(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestVolumeManager_stageVolume(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
cases := []struct {
|
||||||
|
Name string
|
||||||
|
Volume *structs.CSIVolume
|
||||||
|
PluginErr error
|
||||||
|
ExpectedErr error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
Name: "Returns an error when an invalid AttachmentMode is provided",
|
||||||
|
Volume: &structs.CSIVolume{
|
||||||
|
ID: "foo",
|
||||||
|
AttachmentMode: "nonsense",
|
||||||
|
},
|
||||||
|
ExpectedErr: errors.New("Unknown volume attachment mode: nonsense"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Returns an error when an invalid AccessMode is provided",
|
||||||
|
Volume: &structs.CSIVolume{
|
||||||
|
ID: "foo",
|
||||||
|
AttachmentMode: structs.CSIVolumeAttachmentModeBlockDevice,
|
||||||
|
AccessMode: "nonsense",
|
||||||
|
},
|
||||||
|
ExpectedErr: errors.New("Unknown volume access mode: nonsense"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Returns an error when the plugin returns an error",
|
||||||
|
Volume: &structs.CSIVolume{
|
||||||
|
ID: "foo",
|
||||||
|
AttachmentMode: structs.CSIVolumeAttachmentModeBlockDevice,
|
||||||
|
AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter,
|
||||||
|
},
|
||||||
|
PluginErr: errors.New("Some Unknown Error"),
|
||||||
|
ExpectedErr: errors.New("Some Unknown Error"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Happy Path",
|
||||||
|
Volume: &structs.CSIVolume{
|
||||||
|
ID: "foo",
|
||||||
|
AttachmentMode: structs.CSIVolumeAttachmentModeBlockDevice,
|
||||||
|
AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter,
|
||||||
|
},
|
||||||
|
PluginErr: nil,
|
||||||
|
ExpectedErr: nil,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range cases {
|
||||||
|
t.Run(tc.Name, func(t *testing.T) {
|
||||||
|
tmpPath := tmpDir(t)
|
||||||
|
defer os.RemoveAll(tmpPath)
|
||||||
|
|
||||||
|
csiFake := &csifake.Client{}
|
||||||
|
csiFake.NextNodeStageVolumeErr = tc.PluginErr
|
||||||
|
|
||||||
|
manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, true)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
err := manager.stageVolume(ctx, tc.Volume)
|
||||||
|
|
||||||
|
if tc.ExpectedErr != nil {
|
||||||
|
require.EqualError(t, err, tc.ExpectedErr.Error())
|
||||||
|
} else {
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue