csi: fix unpublish workflow ID mismatches
The CSI plugins uses the external volume ID for all operations, but the Client CSI RPCs uses the Nomad volume ID (human-friendly) for the mount paths. Pass the External ID as an arg in the RPC call so that the unpublish workflows have it without calling back to the server to find the external ID. The controller CSI plugins need the CSI node ID (or in other words, the storage provider's view of node ID like the EC2 instance ID), not the Nomad node ID, to determine how to detach the external volume.
This commit is contained in:
parent
161f9aedc3
commit
5a3b45864d
|
@ -190,7 +190,7 @@ func (c *CSI) NodeDetachVolume(req *structs.ClientCSINodeDetachVolumeRequest, re
|
|||
AccessMode: string(req.AccessMode),
|
||||
}
|
||||
|
||||
err = mounter.UnmountVolume(ctx, req.VolumeID, req.AllocID, usageOpts)
|
||||
err = mounter.UnmountVolume(ctx, req.VolumeID, req.ExternalID, req.AllocID, usageOpts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -42,7 +42,7 @@ func (u *UsageOptions) ToFS() string {
|
|||
|
||||
type VolumeMounter interface {
|
||||
MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usageOpts *UsageOptions, publishContext map[string]string) (*MountInfo, error)
|
||||
UnmountVolume(ctx context.Context, volID, allocID string, usageOpts *UsageOptions) error
|
||||
UnmountVolume(ctx context.Context, volID, remoteID, allocID string, usageOpts *UsageOptions) error
|
||||
}
|
||||
|
||||
type Manager interface {
|
||||
|
|
|
@ -259,7 +259,7 @@ func (v *volumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume,
|
|||
// once for each staging path that a volume has been staged under.
|
||||
// It is safe to call multiple times and a plugin is required to return OK if
|
||||
// the volume has been unstaged or was never staged on the node.
|
||||
func (v *volumeManager) unstageVolume(ctx context.Context, volID string, usage *UsageOptions) error {
|
||||
func (v *volumeManager) unstageVolume(ctx context.Context, volID, remoteID string, usage *UsageOptions) error {
|
||||
logger := hclog.FromContext(ctx)
|
||||
logger.Trace("Unstaging volume")
|
||||
stagingPath := v.stagingDirForVolume(v.containerMountPoint, volID, usage)
|
||||
|
@ -267,7 +267,7 @@ func (v *volumeManager) unstageVolume(ctx context.Context, volID string, usage *
|
|||
// CSI NodeUnstageVolume errors for timeout, codes.Unavailable and
|
||||
// codes.ResourceExhausted are retried; all other errors are fatal.
|
||||
return v.plugin.NodeUnstageVolume(ctx,
|
||||
volID,
|
||||
remoteID,
|
||||
stagingPath,
|
||||
grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout),
|
||||
grpc_retry.WithMax(3),
|
||||
|
@ -288,12 +288,12 @@ func combineErrors(maybeErrs ...error) error {
|
|||
return result.ErrorOrNil()
|
||||
}
|
||||
|
||||
func (v *volumeManager) unpublishVolume(ctx context.Context, volID, allocID string, usage *UsageOptions) error {
|
||||
func (v *volumeManager) unpublishVolume(ctx context.Context, volID, remoteID, allocID string, usage *UsageOptions) error {
|
||||
pluginTargetPath := v.allocDirForVolume(v.containerMountPoint, volID, allocID, usage)
|
||||
|
||||
// CSI NodeUnpublishVolume errors for timeout, codes.Unavailable and
|
||||
// codes.ResourceExhausted are retried; all other errors are fatal.
|
||||
rpcErr := v.plugin.NodeUnpublishVolume(ctx, volID, pluginTargetPath,
|
||||
rpcErr := v.plugin.NodeUnpublishVolume(ctx, remoteID, pluginTargetPath,
|
||||
grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout),
|
||||
grpc_retry.WithMax(3),
|
||||
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)),
|
||||
|
@ -325,16 +325,16 @@ func (v *volumeManager) unpublishVolume(ctx context.Context, volID, allocID stri
|
|||
return rpcErr
|
||||
}
|
||||
|
||||
func (v *volumeManager) UnmountVolume(ctx context.Context, volID, allocID string, usage *UsageOptions) (err error) {
|
||||
func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allocID string, usage *UsageOptions) (err error) {
|
||||
logger := v.logger.With("volume_id", volID, "alloc_id", allocID)
|
||||
ctx = hclog.WithContext(ctx, logger)
|
||||
|
||||
err = v.unpublishVolume(ctx, volID, allocID, usage)
|
||||
err = v.unpublishVolume(ctx, volID, remoteID, allocID, usage)
|
||||
|
||||
if err == nil {
|
||||
canRelease := v.usageTracker.Free(allocID, volID, usage)
|
||||
if v.requiresStaging && canRelease {
|
||||
err = v.unstageVolume(ctx, volID, usage)
|
||||
err = v.unstageVolume(ctx, volID, remoteID, usage)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -236,7 +236,8 @@ func TestVolumeManager_unstageVolume(t *testing.T) {
|
|||
manager := newVolumeManager(testlog.HCLogger(t), eventer, csiFake, tmpPath, tmpPath, true)
|
||||
ctx := context.Background()
|
||||
|
||||
err := manager.unstageVolume(ctx, tc.Volume.ID, tc.UsageOptions)
|
||||
err := manager.unstageVolume(ctx,
|
||||
tc.Volume.ID, tc.Volume.RemoteID(), tc.UsageOptions)
|
||||
|
||||
if tc.ExpectedErr != nil {
|
||||
require.EqualError(t, err, tc.ExpectedErr.Error())
|
||||
|
@ -416,7 +417,8 @@ func TestVolumeManager_unpublishVolume(t *testing.T) {
|
|||
manager := newVolumeManager(testlog.HCLogger(t), eventer, csiFake, tmpPath, tmpPath, true)
|
||||
ctx := context.Background()
|
||||
|
||||
err := manager.unpublishVolume(ctx, tc.Volume.ID, tc.Allocation.ID, tc.UsageOptions)
|
||||
err := manager.unpublishVolume(ctx,
|
||||
tc.Volume.ID, tc.Volume.RemoteID(), tc.Allocation.ID, tc.UsageOptions)
|
||||
|
||||
if tc.ExpectedErr != nil {
|
||||
require.EqualError(t, err, tc.ExpectedErr.Error())
|
||||
|
@ -476,7 +478,7 @@ func TestVolumeManager_MountVolumeEvents(t *testing.T) {
|
|||
require.Equal(t, "true", e.Details["success"])
|
||||
events = events[1:]
|
||||
|
||||
err = manager.UnmountVolume(ctx, vol.ID, alloc.ID, usage)
|
||||
err = manager.UnmountVolume(ctx, vol.ID, vol.RemoteID(), alloc.ID, usage)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, 1, len(events))
|
||||
|
|
|
@ -31,7 +31,7 @@ type CSIControllerQuery struct {
|
|||
}
|
||||
|
||||
type ClientCSIControllerValidateVolumeRequest struct {
|
||||
VolumeID string
|
||||
VolumeID string // note: this is the external ID
|
||||
|
||||
AttachmentMode structs.CSIVolumeAttachmentMode
|
||||
AccessMode structs.CSIVolumeAccessMode
|
||||
|
@ -43,7 +43,7 @@ type ClientCSIControllerValidateVolumeResponse struct {
|
|||
}
|
||||
|
||||
type ClientCSIControllerAttachVolumeRequest struct {
|
||||
// The ID of the volume to be used on a node.
|
||||
// The external ID of the volume to be used on a node.
|
||||
// This field is REQUIRED.
|
||||
VolumeID string
|
||||
|
||||
|
@ -141,6 +141,7 @@ type ClientCSINodeDetachVolumeRequest struct {
|
|||
VolumeID string // ID of the volume to be unpublished (required)
|
||||
AllocID string // ID of the allocation we're unpublishing for (required)
|
||||
NodeID string // ID of the Nomad client targeted
|
||||
ExternalID string // External ID of the volume to be unpublished (required)
|
||||
|
||||
// These fields should match the original volume request so that
|
||||
// we can find the mount points on the client
|
||||
|
|
|
@ -862,7 +862,8 @@ func volumeClaimReapImpl(srv RPCServer, args *volumeClaimReapArgs) (map[string]i
|
|||
// operations or releasing the claim.
|
||||
nReq := &cstructs.ClientCSINodeDetachVolumeRequest{
|
||||
PluginID: args.plug.ID,
|
||||
VolumeID: vol.RemoteID(),
|
||||
VolumeID: vol.ID,
|
||||
ExternalID: vol.RemoteID(),
|
||||
AllocID: args.allocID,
|
||||
NodeID: nodeID,
|
||||
AttachmentMode: vol.AttachmentMode,
|
||||
|
@ -880,13 +881,30 @@ func volumeClaimReapImpl(srv RPCServer, args *volumeClaimReapArgs) (map[string]i
|
|||
// on the node need it, but we also only want to make this
|
||||
// call at most once per node
|
||||
if vol.ControllerRequired && args.nodeClaims[nodeID] < 1 {
|
||||
|
||||
// we need to get the CSI Node ID, which is not the same as
|
||||
// the Nomad Node ID
|
||||
ws := memdb.NewWatchSet()
|
||||
targetNode, err := srv.State().NodeByID(ws, nodeID)
|
||||
if err != nil {
|
||||
return args.nodeClaims, err
|
||||
}
|
||||
if targetNode == nil {
|
||||
return args.nodeClaims, fmt.Errorf("%s: %s",
|
||||
structs.ErrUnknownNodePrefix, nodeID)
|
||||
}
|
||||
targetCSIInfo, ok := targetNode.CSINodePlugins[args.plug.ID]
|
||||
if !ok {
|
||||
return args.nodeClaims, fmt.Errorf("Failed to find NodeInfo for node: %s", targetNode.ID)
|
||||
}
|
||||
|
||||
controllerNodeID, err := nodeForControllerPlugin(srv.State(), args.plug)
|
||||
if err != nil || nodeID == "" {
|
||||
if err != nil || controllerNodeID == "" {
|
||||
return args.nodeClaims, err
|
||||
}
|
||||
cReq := &cstructs.ClientCSIControllerDetachVolumeRequest{
|
||||
VolumeID: vol.RemoteID(),
|
||||
ClientCSINodeID: nodeID,
|
||||
ClientCSINodeID: targetCSIInfo.NodeInfo.ID,
|
||||
}
|
||||
cReq.PluginID = args.plug.ID
|
||||
cReq.ControllerNodeID = controllerNodeID
|
||||
|
|
|
@ -2364,7 +2364,7 @@ func TestCSI_GCVolumeClaims_Reap(t *testing.T) {
|
|||
ClaimsCount: map[string]int{node.ID: 1},
|
||||
ControllerRequired: true,
|
||||
ExpectedErr: fmt.Sprintf(
|
||||
"no controllers available for plugin %q", plugin.ID),
|
||||
"Unknown node: %s", node.ID),
|
||||
ExpectedClaimsCount: 0,
|
||||
ExpectedNodeDetachVolumeCount: 1,
|
||||
ExpectedControllerDetachVolumeCount: 0,
|
||||
|
|
Loading…
Reference in New Issue