From 5a3b45864d90d7d9a510d2f26cf1b8900b7a2062 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Sat, 4 Apr 2020 11:03:44 -0400 Subject: [PATCH] csi: fix unpublish workflow ID mismatches The CSI plugins uses the external volume ID for all operations, but the Client CSI RPCs uses the Nomad volume ID (human-friendly) for the mount paths. Pass the External ID as an arg in the RPC call so that the unpublish workflows have it without calling back to the server to find the external ID. The controller CSI plugins need the CSI node ID (or in other words, the storage provider's view of node ID like the EC2 instance ID), not the Nomad node ID, to determine how to detach the external volume. --- client/csi_endpoint.go | 2 +- client/pluginmanager/csimanager/interface.go | 2 +- client/pluginmanager/csimanager/volume.go | 14 +++++------ .../pluginmanager/csimanager/volume_test.go | 8 ++++--- client/structs/csi.go | 13 +++++----- nomad/core_sched.go | 24 ++++++++++++++++--- nomad/core_sched_test.go | 2 +- 7 files changed, 43 insertions(+), 22 deletions(-) diff --git a/client/csi_endpoint.go b/client/csi_endpoint.go index 43248daf8..a4251e473 100644 --- a/client/csi_endpoint.go +++ b/client/csi_endpoint.go @@ -190,7 +190,7 @@ func (c *CSI) NodeDetachVolume(req *structs.ClientCSINodeDetachVolumeRequest, re AccessMode: string(req.AccessMode), } - err = mounter.UnmountVolume(ctx, req.VolumeID, req.AllocID, usageOpts) + err = mounter.UnmountVolume(ctx, req.VolumeID, req.ExternalID, req.AllocID, usageOpts) if err != nil { return err } diff --git a/client/pluginmanager/csimanager/interface.go b/client/pluginmanager/csimanager/interface.go index 29f393c5a..343e8d977 100644 --- a/client/pluginmanager/csimanager/interface.go +++ b/client/pluginmanager/csimanager/interface.go @@ -42,7 +42,7 @@ func (u *UsageOptions) ToFS() string { type VolumeMounter interface { MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usageOpts *UsageOptions, publishContext map[string]string) (*MountInfo, error) - UnmountVolume(ctx context.Context, volID, allocID string, usageOpts *UsageOptions) error + UnmountVolume(ctx context.Context, volID, remoteID, allocID string, usageOpts *UsageOptions) error } type Manager interface { diff --git a/client/pluginmanager/csimanager/volume.go b/client/pluginmanager/csimanager/volume.go index d300e805a..10c39fa8d 100644 --- a/client/pluginmanager/csimanager/volume.go +++ b/client/pluginmanager/csimanager/volume.go @@ -259,7 +259,7 @@ func (v *volumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume, // once for each staging path that a volume has been staged under. // It is safe to call multiple times and a plugin is required to return OK if // the volume has been unstaged or was never staged on the node. -func (v *volumeManager) unstageVolume(ctx context.Context, volID string, usage *UsageOptions) error { +func (v *volumeManager) unstageVolume(ctx context.Context, volID, remoteID string, usage *UsageOptions) error { logger := hclog.FromContext(ctx) logger.Trace("Unstaging volume") stagingPath := v.stagingDirForVolume(v.containerMountPoint, volID, usage) @@ -267,7 +267,7 @@ func (v *volumeManager) unstageVolume(ctx context.Context, volID string, usage * // CSI NodeUnstageVolume errors for timeout, codes.Unavailable and // codes.ResourceExhausted are retried; all other errors are fatal. return v.plugin.NodeUnstageVolume(ctx, - volID, + remoteID, stagingPath, grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout), grpc_retry.WithMax(3), @@ -288,12 +288,12 @@ func combineErrors(maybeErrs ...error) error { return result.ErrorOrNil() } -func (v *volumeManager) unpublishVolume(ctx context.Context, volID, allocID string, usage *UsageOptions) error { +func (v *volumeManager) unpublishVolume(ctx context.Context, volID, remoteID, allocID string, usage *UsageOptions) error { pluginTargetPath := v.allocDirForVolume(v.containerMountPoint, volID, allocID, usage) // CSI NodeUnpublishVolume errors for timeout, codes.Unavailable and // codes.ResourceExhausted are retried; all other errors are fatal. - rpcErr := v.plugin.NodeUnpublishVolume(ctx, volID, pluginTargetPath, + rpcErr := v.plugin.NodeUnpublishVolume(ctx, remoteID, pluginTargetPath, grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout), grpc_retry.WithMax(3), grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)), @@ -325,16 +325,16 @@ func (v *volumeManager) unpublishVolume(ctx context.Context, volID, allocID stri return rpcErr } -func (v *volumeManager) UnmountVolume(ctx context.Context, volID, allocID string, usage *UsageOptions) (err error) { +func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allocID string, usage *UsageOptions) (err error) { logger := v.logger.With("volume_id", volID, "alloc_id", allocID) ctx = hclog.WithContext(ctx, logger) - err = v.unpublishVolume(ctx, volID, allocID, usage) + err = v.unpublishVolume(ctx, volID, remoteID, allocID, usage) if err == nil { canRelease := v.usageTracker.Free(allocID, volID, usage) if v.requiresStaging && canRelease { - err = v.unstageVolume(ctx, volID, usage) + err = v.unstageVolume(ctx, volID, remoteID, usage) } } diff --git a/client/pluginmanager/csimanager/volume_test.go b/client/pluginmanager/csimanager/volume_test.go index 025c66739..4922cddbe 100644 --- a/client/pluginmanager/csimanager/volume_test.go +++ b/client/pluginmanager/csimanager/volume_test.go @@ -236,7 +236,8 @@ func TestVolumeManager_unstageVolume(t *testing.T) { manager := newVolumeManager(testlog.HCLogger(t), eventer, csiFake, tmpPath, tmpPath, true) ctx := context.Background() - err := manager.unstageVolume(ctx, tc.Volume.ID, tc.UsageOptions) + err := manager.unstageVolume(ctx, + tc.Volume.ID, tc.Volume.RemoteID(), tc.UsageOptions) if tc.ExpectedErr != nil { require.EqualError(t, err, tc.ExpectedErr.Error()) @@ -416,7 +417,8 @@ func TestVolumeManager_unpublishVolume(t *testing.T) { manager := newVolumeManager(testlog.HCLogger(t), eventer, csiFake, tmpPath, tmpPath, true) ctx := context.Background() - err := manager.unpublishVolume(ctx, tc.Volume.ID, tc.Allocation.ID, tc.UsageOptions) + err := manager.unpublishVolume(ctx, + tc.Volume.ID, tc.Volume.RemoteID(), tc.Allocation.ID, tc.UsageOptions) if tc.ExpectedErr != nil { require.EqualError(t, err, tc.ExpectedErr.Error()) @@ -476,7 +478,7 @@ func TestVolumeManager_MountVolumeEvents(t *testing.T) { require.Equal(t, "true", e.Details["success"]) events = events[1:] - err = manager.UnmountVolume(ctx, vol.ID, alloc.ID, usage) + err = manager.UnmountVolume(ctx, vol.ID, vol.RemoteID(), alloc.ID, usage) require.NoError(t, err) require.Equal(t, 1, len(events)) diff --git a/client/structs/csi.go b/client/structs/csi.go index f76ab4ebf..99f0b0773 100644 --- a/client/structs/csi.go +++ b/client/structs/csi.go @@ -31,7 +31,7 @@ type CSIControllerQuery struct { } type ClientCSIControllerValidateVolumeRequest struct { - VolumeID string + VolumeID string // note: this is the external ID AttachmentMode structs.CSIVolumeAttachmentMode AccessMode structs.CSIVolumeAccessMode @@ -43,7 +43,7 @@ type ClientCSIControllerValidateVolumeResponse struct { } type ClientCSIControllerAttachVolumeRequest struct { - // The ID of the volume to be used on a node. + // The external ID of the volume to be used on a node. // This field is REQUIRED. VolumeID string @@ -137,10 +137,11 @@ type ClientCSIControllerDetachVolumeResponse struct{} // a Nomad client to tell a CSI node plugin on that client to perform // NodeUnpublish and NodeUnstage. type ClientCSINodeDetachVolumeRequest struct { - PluginID string // ID of the plugin that manages the volume (required) - VolumeID string // ID of the volume to be unpublished (required) - AllocID string // ID of the allocation we're unpublishing for (required) - NodeID string // ID of the Nomad client targeted + PluginID string // ID of the plugin that manages the volume (required) + VolumeID string // ID of the volume to be unpublished (required) + AllocID string // ID of the allocation we're unpublishing for (required) + NodeID string // ID of the Nomad client targeted + ExternalID string // External ID of the volume to be unpublished (required) // These fields should match the original volume request so that // we can find the mount points on the client diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 55c765d10..fa64df8af 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -862,7 +862,8 @@ func volumeClaimReapImpl(srv RPCServer, args *volumeClaimReapArgs) (map[string]i // operations or releasing the claim. nReq := &cstructs.ClientCSINodeDetachVolumeRequest{ PluginID: args.plug.ID, - VolumeID: vol.RemoteID(), + VolumeID: vol.ID, + ExternalID: vol.RemoteID(), AllocID: args.allocID, NodeID: nodeID, AttachmentMode: vol.AttachmentMode, @@ -880,13 +881,30 @@ func volumeClaimReapImpl(srv RPCServer, args *volumeClaimReapArgs) (map[string]i // on the node need it, but we also only want to make this // call at most once per node if vol.ControllerRequired && args.nodeClaims[nodeID] < 1 { + + // we need to get the CSI Node ID, which is not the same as + // the Nomad Node ID + ws := memdb.NewWatchSet() + targetNode, err := srv.State().NodeByID(ws, nodeID) + if err != nil { + return args.nodeClaims, err + } + if targetNode == nil { + return args.nodeClaims, fmt.Errorf("%s: %s", + structs.ErrUnknownNodePrefix, nodeID) + } + targetCSIInfo, ok := targetNode.CSINodePlugins[args.plug.ID] + if !ok { + return args.nodeClaims, fmt.Errorf("Failed to find NodeInfo for node: %s", targetNode.ID) + } + controllerNodeID, err := nodeForControllerPlugin(srv.State(), args.plug) - if err != nil || nodeID == "" { + if err != nil || controllerNodeID == "" { return args.nodeClaims, err } cReq := &cstructs.ClientCSIControllerDetachVolumeRequest{ VolumeID: vol.RemoteID(), - ClientCSINodeID: nodeID, + ClientCSINodeID: targetCSIInfo.NodeInfo.ID, } cReq.PluginID = args.plug.ID cReq.ControllerNodeID = controllerNodeID diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index 32d1cad04..890999192 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -2364,7 +2364,7 @@ func TestCSI_GCVolumeClaims_Reap(t *testing.T) { ClaimsCount: map[string]int{node.ID: 1}, ControllerRequired: true, ExpectedErr: fmt.Sprintf( - "no controllers available for plugin %q", plugin.ID), + "Unknown node: %s", node.ID), ExpectedClaimsCount: 0, ExpectedNodeDetachVolumeCount: 1, ExpectedControllerDetachVolumeCount: 0,