csi: implement controller detach RPCs (#7356)

This changeset implements the remaining controller detach RPCs: server-to-client and client-to-controller. The tests also uncovered a bug in our RPC for claims which is fixed here; the volume claim RPC is used for both claiming and releasing a claim on a volume. We should only submit a controller publish RPC when the claim is new and not when it's being released.
This commit is contained in:
Tim Gross 2020-03-16 15:59:42 -04:00 committed by Tim Gross
parent cd1c6173f4
commit 22e9f679c3
7 changed files with 366 additions and 11 deletions

View File

@ -3,7 +3,6 @@ package client
import (
"context"
"errors"
"fmt"
"time"
metrics "github.com/armon/go-metrics"
@ -66,7 +65,7 @@ func (c *CSIController) ValidateVolume(req *structs.ClientCSIControllerValidateV
// 1. Validate the volume request
// 2. Call ControllerPublishVolume on the CSI Plugin to trigger a remote attachment
//
// In the future this may be expanded to request dynamic secrets for attachement.
// In the future this may be expanded to request dynamic secrets for attachment.
func (c *CSIController) AttachVolume(req *structs.ClientCSIControllerAttachVolumeRequest, resp *structs.ClientCSIControllerAttachVolumeResponse) error {
defer metrics.MeasureSince([]string{"client", "csi_controller", "publish_volume"}, time.Now())
plugin, err := c.findControllerPlugin(req.PluginID)
@ -105,8 +104,40 @@ func (c *CSIController) AttachVolume(req *structs.ClientCSIControllerAttachVolum
return nil
}
// DetachVolume is used to detach a volume from a CSI Cluster from
// the storage node provided in the request.
func (c *CSIController) DetachVolume(req *structs.ClientCSIControllerDetachVolumeRequest, resp *structs.ClientCSIControllerDetachVolumeResponse) error {
return fmt.Errorf("Unimplemented")
defer metrics.MeasureSince([]string{"client", "csi_controller", "unpublish_volume"}, time.Now())
plugin, err := c.findControllerPlugin(req.PluginID)
if err != nil {
return err
}
defer plugin.Close()
// The following block of validation checks should not be reached on a
// real Nomad cluster as all of this data should be validated when registering
// volumes with the cluster. They serve as a defensive check before forwarding
// requests to plugins, and to aid with development.
if req.VolumeID == "" {
return errors.New("VolumeID is required")
}
if req.ClientCSINodeID == "" {
return errors.New("ClientCSINodeID is required")
}
csiReq := req.ToCSIRequest()
// Submit the request for a volume to the CSI Plugin.
ctx, cancelFn := c.requestContext()
defer cancelFn()
_, err = plugin.ControllerUnpublishVolume(ctx, csiReq)
if err != nil {
return err
}
return nil
}
func (c *CSIController) findControllerPlugin(name string) (csi.CSIPlugin, error) {

View File

@ -263,3 +263,86 @@ func TestCSIController_ValidateVolume(t *testing.T) {
})
}
}
func TestCSIController_DetachVolume(t *testing.T) {
t.Parallel()
cases := []struct {
Name string
ClientSetupFunc func(*fake.Client)
Request *structs.ClientCSIControllerDetachVolumeRequest
ExpectedErr error
ExpectedResponse *structs.ClientCSIControllerDetachVolumeResponse
}{
{
Name: "returns plugin not found errors",
Request: &structs.ClientCSIControllerDetachVolumeRequest{
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: "some-garbage",
},
},
ExpectedErr: errors.New("plugin some-garbage for type csi-controller not found"),
},
{
Name: "validates volumeid is not empty",
Request: &structs.ClientCSIControllerDetachVolumeRequest{
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
},
ExpectedErr: errors.New("VolumeID is required"),
},
{
Name: "validates nodeid is not empty",
Request: &structs.ClientCSIControllerDetachVolumeRequest{
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
VolumeID: "1234-4321-1234-4321",
},
ExpectedErr: errors.New("ClientCSINodeID is required"),
},
{
Name: "returns transitive errors",
ClientSetupFunc: func(fc *fake.Client) {
fc.NextControllerUnpublishVolumeErr = errors.New("hello")
},
Request: &structs.ClientCSIControllerDetachVolumeRequest{
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
VolumeID: "1234-4321-1234-4321",
ClientCSINodeID: "abcde",
},
ExpectedErr: errors.New("hello"),
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
require := require.New(t)
client, cleanup := TestClient(t, nil)
defer cleanup()
fakeClient := &fake.Client{}
if tc.ClientSetupFunc != nil {
tc.ClientSetupFunc(fakeClient)
}
dispenserFunc := func(*dynamicplugins.PluginInfo) (interface{}, error) {
return fakeClient, nil
}
client.dynamicRegistry.StubDispenserForType(dynamicplugins.PluginTypeCSIController, dispenserFunc)
err := client.dynamicRegistry.RegisterPlugin(fakePlugin)
require.Nil(err)
var resp structs.ClientCSIControllerDetachVolumeResponse
err = client.ClientRPC("CSIController.DetachVolume", tc.Request, &resp)
require.Equal(tc.ExpectedErr, err)
if tc.ExpectedResponse != nil {
require.Equal(tc.ExpectedResponse, &resp)
}
})
}
}

View File

@ -82,3 +82,37 @@ func (a *ClientCSIController) ValidateVolume(args *cstructs.ClientCSIControllerV
}
return nil
}
func (a *ClientCSIController) DetachVolume(args *cstructs.ClientCSIControllerDetachVolumeRequest, reply *cstructs.ClientCSIControllerDetachVolumeResponse) error {
defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "detach_volume"}, time.Now())
// Verify the arguments.
if args.ControllerNodeID == "" {
return errors.New("missing ControllerNodeID")
}
// Make sure Node is valid and new enough to support RPC
snap, err := a.srv.State().Snapshot()
if err != nil {
return err
}
_, err = getNodeForRpc(snap, args.ControllerNodeID)
if err != nil {
return err
}
// Get the connection to the client
state, ok := a.srv.getNodeConn(args.ControllerNodeID)
if !ok {
return findNodeConnAndForward(a.srv, args.ControllerNodeID, "ClientCSIController.DetachVolume", args, reply)
}
// Make the RPC
err = NodeRpc(state.Session, "CSIController.DetachVolume", args, reply)
if err != nil {
return fmt.Errorf("detach volume: %v", err)
}
return nil
}

View File

@ -89,3 +89,81 @@ func TestClientCSIController_AttachVolume_Forwarded(t *testing.T) {
// Should recieve an error from the client endpoint
require.Contains(err.Error(), "must specify plugin name to dispense")
}
func TestClientCSIController_DetachVolume_Local(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and client
s, cleanupS := TestServer(t, nil)
defer cleanupS()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
c, cleanupC := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s.config.RPCAddr.String()}
})
defer cleanupC()
testutil.WaitForResult(func() (bool, error) {
nodes := s.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
require.Fail("should have a client")
})
req := &cstructs.ClientCSIControllerDetachVolumeRequest{
CSIControllerQuery: cstructs.CSIControllerQuery{ControllerNodeID: c.NodeID()},
}
// Fetch the response
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientCSIController.DetachVolume", req, &resp)
require.NotNil(err)
// Should recieve an error from the client endpoint
require.Contains(err.Error(), "must specify plugin name to dispense")
}
func TestClientCSIController_DetachVolume_Forwarded(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and client
s1, cleanupS1 := TestServer(t, func(c *Config) { c.BootstrapExpect = 2 })
defer cleanupS1()
s2, cleanupS2 := TestServer(t, func(c *Config) { c.BootstrapExpect = 2 })
defer cleanupS2()
TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
codec := rpcClient(t, s2)
c, cleanupC := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s2.config.RPCAddr.String()}
c.GCDiskUsageThreshold = 100.0
})
defer cleanupC()
testutil.WaitForResult(func() (bool, error) {
nodes := s2.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
require.Fail("should have a client")
})
// Force remove the connection locally in case it exists
s1.nodeConnsLock.Lock()
delete(s1.nodeConns, c.NodeID())
s1.nodeConnsLock.Unlock()
req := &cstructs.ClientCSIControllerDetachVolumeRequest{
CSIControllerQuery: cstructs.CSIControllerQuery{ControllerNodeID: c.NodeID()},
}
// Fetch the response
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientCSIController.DetachVolume", req, &resp)
require.NotNil(err)
// Should recieve an error from the client endpoint
require.Contains(err.Error(), "must specify plugin name to dispense")
}

View File

@ -2200,7 +2200,6 @@ func TestCSI_GCVolumeClaims(t *testing.T) {
defer shutdown()
testutil.WaitForLeader(t, srv.RPC)
// codec := rpcClient(t, srv)
state := srv.fsm.State()
ws := memdb.NewWatchSet()
@ -2303,3 +2302,130 @@ func TestCSI_GCVolumeClaims(t *testing.T) {
require.Len(t, vol.ReadAllocs, 1)
require.Len(t, vol.WriteAllocs, 0)
}
func TestCSI_GCVolumeClaims_Controller(t *testing.T) {
t.Parallel()
srv, shutdown := TestServer(t, func(c *Config) { c.NumSchedulers = 0 })
defer shutdown()
testutil.WaitForLeader(t, srv.RPC)
state := srv.fsm.State()
ws := memdb.NewWatchSet()
// Create a client node, plugin, and volume
node := mock.Node()
node.Attributes["nomad.version"] = "0.11.0" // client RPCs not supported on early version
node.CSINodePlugins = map[string]*structs.CSIInfo{
"csi-plugin-example": {
PluginID: "csi-plugin-example",
Healthy: true,
RequiresControllerPlugin: true,
NodeInfo: &structs.CSINodeInfo{},
},
}
node.CSIControllerPlugins = map[string]*structs.CSIInfo{
"csi-plugin-example": {
PluginID: "csi-plugin-example",
Healthy: true,
RequiresControllerPlugin: true,
ControllerInfo: &structs.CSIControllerInfo{
SupportsReadOnlyAttach: true,
SupportsAttachDetach: true,
SupportsListVolumes: true,
SupportsListVolumesAttachedNodes: false,
},
},
}
err := state.UpsertNode(99, node)
require.NoError(t, err)
volId0 := uuid.Generate()
vols := []*structs.CSIVolume{{
ID: volId0,
Namespace: "notTheNamespace",
PluginID: "csi-plugin-example",
AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
}}
err = state.CSIVolumeRegister(100, vols)
require.NoError(t, err)
vol, err := state.CSIVolumeByID(ws, volId0)
require.NoError(t, err)
require.True(t, vol.ControllerRequired)
require.Len(t, vol.ReadAllocs, 0)
require.Len(t, vol.WriteAllocs, 0)
// Create a job with 2 allocations
job := mock.Job()
job.TaskGroups[0].Volumes = map[string]*structs.VolumeRequest{
"_": {
Name: "someVolume",
Type: structs.VolumeTypeCSI,
Source: volId0,
ReadOnly: false,
},
}
err = state.UpsertJob(101, job)
require.NoError(t, err)
alloc1 := mock.Alloc()
alloc1.JobID = job.ID
alloc1.NodeID = node.ID
err = state.UpsertJobSummary(102, mock.JobSummary(alloc1.JobID))
require.NoError(t, err)
alloc1.TaskGroup = job.TaskGroups[0].Name
alloc2 := mock.Alloc()
alloc2.JobID = job.ID
alloc2.NodeID = node.ID
err = state.UpsertJobSummary(103, mock.JobSummary(alloc2.JobID))
require.NoError(t, err)
alloc2.TaskGroup = job.TaskGroups[0].Name
err = state.UpsertAllocs(104, []*structs.Allocation{alloc1, alloc2})
require.NoError(t, err)
// Claim the volumes and verify the claims were set
err = state.CSIVolumeClaim(105, volId0, alloc1, structs.CSIVolumeClaimWrite)
require.NoError(t, err)
err = state.CSIVolumeClaim(106, volId0, alloc2, structs.CSIVolumeClaimRead)
require.NoError(t, err)
vol, err = state.CSIVolumeByID(ws, volId0)
require.NoError(t, err)
require.Len(t, vol.ReadAllocs, 1)
require.Len(t, vol.WriteAllocs, 1)
// Update both allocs as failed/terminated
alloc1.ClientStatus = structs.AllocClientStatusFailed
alloc2.ClientStatus = structs.AllocClientStatusFailed
err = state.UpdateAllocsFromClient(107, []*structs.Allocation{alloc1, alloc2})
require.NoError(t, err)
// Create the GC eval we'd get from Node.UpdateAlloc
now := time.Now().UTC()
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: job.Namespace,
Priority: structs.CoreJobPriority,
Type: structs.JobTypeCore,
TriggeredBy: structs.EvalTriggerAllocStop,
JobID: structs.CoreJobCSIVolumeClaimGC + ":" + job.ID,
LeaderACL: srv.getLeaderAcl(),
Status: structs.EvalStatusPending,
CreateTime: now.UTC().UnixNano(),
ModifyTime: now.UTC().UnixNano(),
}
// Process the eval
snap, err := state.Snapshot()
require.NoError(t, err)
core := NewCoreScheduler(srv, snap)
err = core.Process(eval)
require.NoError(t, err)
// Verify both claims were released
vol, err = state.CSIVolumeByID(ws, volId0)
require.NoError(t, err)
require.Len(t, vol.ReadAllocs, 0)
require.Len(t, vol.WriteAllocs, 0)
}

View File

@ -342,7 +342,7 @@ func (v *CSIVolume) Deregister(args *structs.CSIVolumeDeregisterRequest, reply *
return nil
}
// Claim claims a volume
// Claim submits a change to a volume claim
func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CSIVolumeClaimResponse) error {
if done, err := v.srv.forward("CSIVolume.Claim", args, args, reply); done {
return err
@ -361,10 +361,13 @@ func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CS
return structs.ErrPermissionDenied
}
// adds a Volume and PublishContext from the controller (if any) to the reply
err = v.srv.controllerPublishVolume(args, reply)
if err != nil {
return fmt.Errorf("controllerPublish: %v", err)
// if this is a new claim, add a Volume and PublishContext from the
// controller (if any) to the reply
if args.Claim != structs.CSIVolumeClaimRelease {
err = v.srv.controllerPublishVolume(args, reply)
if err != nil {
return fmt.Errorf("controller publish: %v", err)
}
}
resp, index, err := v.srv.raftApply(structs.CSIVolumeClaimRequestType, args)

View File

@ -220,7 +220,7 @@ func TestCSIVolumeEndpoint_Claim(t *testing.T) {
}
claimResp := &structs.CSIVolumeClaimResponse{}
err := msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp)
require.EqualError(t, err, fmt.Sprintf("controllerPublish: volume not found: %s", id0),
require.EqualError(t, err, fmt.Sprintf("controller publish: volume not found: %s", id0),
"expected 'volume not found' error because volume hasn't yet been created")
// Create a client node, plugin, alloc, and volume
@ -377,7 +377,7 @@ func TestCSIVolumeEndpoint_ClaimWithController(t *testing.T) {
claimResp := &structs.CSIVolumeClaimResponse{}
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp)
// Because the node is not registered
require.EqualError(t, err, "controllerPublish: attach volume: No path to node")
require.EqualError(t, err, "controller publish: attach volume: No path to node")
}
func TestCSIVolumeEndpoint_List(t *testing.T) {