diff --git a/client/csi_controller_endpoint.go b/client/csi_controller_endpoint.go index d724f72bd..d1c25c3f0 100644 --- a/client/csi_controller_endpoint.go +++ b/client/csi_controller_endpoint.go @@ -3,7 +3,6 @@ package client import ( "context" "errors" - "fmt" "time" metrics "github.com/armon/go-metrics" @@ -66,7 +65,7 @@ func (c *CSIController) ValidateVolume(req *structs.ClientCSIControllerValidateV // 1. Validate the volume request // 2. Call ControllerPublishVolume on the CSI Plugin to trigger a remote attachment // -// In the future this may be expanded to request dynamic secrets for attachement. +// In the future this may be expanded to request dynamic secrets for attachment. func (c *CSIController) AttachVolume(req *structs.ClientCSIControllerAttachVolumeRequest, resp *structs.ClientCSIControllerAttachVolumeResponse) error { defer metrics.MeasureSince([]string{"client", "csi_controller", "publish_volume"}, time.Now()) plugin, err := c.findControllerPlugin(req.PluginID) @@ -105,8 +104,40 @@ func (c *CSIController) AttachVolume(req *structs.ClientCSIControllerAttachVolum return nil } +// DetachVolume is used to detach a volume from a CSI Cluster from +// the storage node provided in the request. func (c *CSIController) DetachVolume(req *structs.ClientCSIControllerDetachVolumeRequest, resp *structs.ClientCSIControllerDetachVolumeResponse) error { - return fmt.Errorf("Unimplemented") + defer metrics.MeasureSince([]string{"client", "csi_controller", "unpublish_volume"}, time.Now()) + plugin, err := c.findControllerPlugin(req.PluginID) + if err != nil { + return err + } + defer plugin.Close() + + // The following block of validation checks should not be reached on a + // real Nomad cluster as all of this data should be validated when registering + // volumes with the cluster. They serve as a defensive check before forwarding + // requests to plugins, and to aid with development. + + if req.VolumeID == "" { + return errors.New("VolumeID is required") + } + + if req.ClientCSINodeID == "" { + return errors.New("ClientCSINodeID is required") + } + + csiReq := req.ToCSIRequest() + + // Submit the request for a volume to the CSI Plugin. + ctx, cancelFn := c.requestContext() + defer cancelFn() + _, err = plugin.ControllerUnpublishVolume(ctx, csiReq) + if err != nil { + return err + } + + return nil } func (c *CSIController) findControllerPlugin(name string) (csi.CSIPlugin, error) { diff --git a/client/csi_controller_endpoint_test.go b/client/csi_controller_endpoint_test.go index 90795ba0d..777a87e4b 100644 --- a/client/csi_controller_endpoint_test.go +++ b/client/csi_controller_endpoint_test.go @@ -263,3 +263,86 @@ func TestCSIController_ValidateVolume(t *testing.T) { }) } } + +func TestCSIController_DetachVolume(t *testing.T) { + t.Parallel() + + cases := []struct { + Name string + ClientSetupFunc func(*fake.Client) + Request *structs.ClientCSIControllerDetachVolumeRequest + ExpectedErr error + ExpectedResponse *structs.ClientCSIControllerDetachVolumeResponse + }{ + { + Name: "returns plugin not found errors", + Request: &structs.ClientCSIControllerDetachVolumeRequest{ + CSIControllerQuery: structs.CSIControllerQuery{ + PluginID: "some-garbage", + }, + }, + ExpectedErr: errors.New("plugin some-garbage for type csi-controller not found"), + }, + { + Name: "validates volumeid is not empty", + Request: &structs.ClientCSIControllerDetachVolumeRequest{ + CSIControllerQuery: structs.CSIControllerQuery{ + PluginID: fakePlugin.Name, + }, + }, + ExpectedErr: errors.New("VolumeID is required"), + }, + { + Name: "validates nodeid is not empty", + Request: &structs.ClientCSIControllerDetachVolumeRequest{ + CSIControllerQuery: structs.CSIControllerQuery{ + PluginID: fakePlugin.Name, + }, + VolumeID: "1234-4321-1234-4321", + }, + ExpectedErr: errors.New("ClientCSINodeID is required"), + }, + { + Name: "returns transitive errors", + ClientSetupFunc: func(fc *fake.Client) { + fc.NextControllerUnpublishVolumeErr = errors.New("hello") + }, + Request: &structs.ClientCSIControllerDetachVolumeRequest{ + CSIControllerQuery: structs.CSIControllerQuery{ + PluginID: fakePlugin.Name, + }, + VolumeID: "1234-4321-1234-4321", + ClientCSINodeID: "abcde", + }, + ExpectedErr: errors.New("hello"), + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + require := require.New(t) + client, cleanup := TestClient(t, nil) + defer cleanup() + + fakeClient := &fake.Client{} + if tc.ClientSetupFunc != nil { + tc.ClientSetupFunc(fakeClient) + } + + dispenserFunc := func(*dynamicplugins.PluginInfo) (interface{}, error) { + return fakeClient, nil + } + client.dynamicRegistry.StubDispenserForType(dynamicplugins.PluginTypeCSIController, dispenserFunc) + + err := client.dynamicRegistry.RegisterPlugin(fakePlugin) + require.Nil(err) + + var resp structs.ClientCSIControllerDetachVolumeResponse + err = client.ClientRPC("CSIController.DetachVolume", tc.Request, &resp) + require.Equal(tc.ExpectedErr, err) + if tc.ExpectedResponse != nil { + require.Equal(tc.ExpectedResponse, &resp) + } + }) + } +} diff --git a/nomad/client_csi_endpoint.go b/nomad/client_csi_endpoint.go index 36c230ea7..18f60b361 100644 --- a/nomad/client_csi_endpoint.go +++ b/nomad/client_csi_endpoint.go @@ -82,3 +82,37 @@ func (a *ClientCSIController) ValidateVolume(args *cstructs.ClientCSIControllerV } return nil } + +func (a *ClientCSIController) DetachVolume(args *cstructs.ClientCSIControllerDetachVolumeRequest, reply *cstructs.ClientCSIControllerDetachVolumeResponse) error { + defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "detach_volume"}, time.Now()) + + // Verify the arguments. + if args.ControllerNodeID == "" { + return errors.New("missing ControllerNodeID") + } + + // Make sure Node is valid and new enough to support RPC + snap, err := a.srv.State().Snapshot() + if err != nil { + return err + } + + _, err = getNodeForRpc(snap, args.ControllerNodeID) + if err != nil { + return err + } + + // Get the connection to the client + state, ok := a.srv.getNodeConn(args.ControllerNodeID) + if !ok { + return findNodeConnAndForward(a.srv, args.ControllerNodeID, "ClientCSIController.DetachVolume", args, reply) + } + + // Make the RPC + err = NodeRpc(state.Session, "CSIController.DetachVolume", args, reply) + if err != nil { + return fmt.Errorf("detach volume: %v", err) + } + return nil + +} diff --git a/nomad/client_csi_endpoint_test.go b/nomad/client_csi_endpoint_test.go index 4be3ce99d..56e52cd06 100644 --- a/nomad/client_csi_endpoint_test.go +++ b/nomad/client_csi_endpoint_test.go @@ -89,3 +89,81 @@ func TestClientCSIController_AttachVolume_Forwarded(t *testing.T) { // Should recieve an error from the client endpoint require.Contains(err.Error(), "must specify plugin name to dispense") } + +func TestClientCSIController_DetachVolume_Local(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Start a server and client + s, cleanupS := TestServer(t, nil) + defer cleanupS() + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + c, cleanupC := client.TestClient(t, func(c *config.Config) { + c.Servers = []string{s.config.RPCAddr.String()} + }) + defer cleanupC() + + testutil.WaitForResult(func() (bool, error) { + nodes := s.connectedNodes() + return len(nodes) == 1, nil + }, func(err error) { + require.Fail("should have a client") + }) + + req := &cstructs.ClientCSIControllerDetachVolumeRequest{ + CSIControllerQuery: cstructs.CSIControllerQuery{ControllerNodeID: c.NodeID()}, + } + + // Fetch the response + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "ClientCSIController.DetachVolume", req, &resp) + require.NotNil(err) + // Should recieve an error from the client endpoint + require.Contains(err.Error(), "must specify plugin name to dispense") +} + +func TestClientCSIController_DetachVolume_Forwarded(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Start a server and client + s1, cleanupS1 := TestServer(t, func(c *Config) { c.BootstrapExpect = 2 }) + defer cleanupS1() + s2, cleanupS2 := TestServer(t, func(c *Config) { c.BootstrapExpect = 2 }) + defer cleanupS2() + TestJoin(t, s1, s2) + testutil.WaitForLeader(t, s1.RPC) + testutil.WaitForLeader(t, s2.RPC) + codec := rpcClient(t, s2) + + c, cleanupC := client.TestClient(t, func(c *config.Config) { + c.Servers = []string{s2.config.RPCAddr.String()} + c.GCDiskUsageThreshold = 100.0 + }) + defer cleanupC() + + testutil.WaitForResult(func() (bool, error) { + nodes := s2.connectedNodes() + return len(nodes) == 1, nil + }, func(err error) { + require.Fail("should have a client") + }) + + // Force remove the connection locally in case it exists + s1.nodeConnsLock.Lock() + delete(s1.nodeConns, c.NodeID()) + s1.nodeConnsLock.Unlock() + + req := &cstructs.ClientCSIControllerDetachVolumeRequest{ + CSIControllerQuery: cstructs.CSIControllerQuery{ControllerNodeID: c.NodeID()}, + } + + // Fetch the response + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "ClientCSIController.DetachVolume", req, &resp) + require.NotNil(err) + // Should recieve an error from the client endpoint + require.Contains(err.Error(), "must specify plugin name to dispense") +} diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index 99e534565..f00533a79 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -2200,7 +2200,6 @@ func TestCSI_GCVolumeClaims(t *testing.T) { defer shutdown() testutil.WaitForLeader(t, srv.RPC) - // codec := rpcClient(t, srv) state := srv.fsm.State() ws := memdb.NewWatchSet() @@ -2303,3 +2302,130 @@ func TestCSI_GCVolumeClaims(t *testing.T) { require.Len(t, vol.ReadAllocs, 1) require.Len(t, vol.WriteAllocs, 0) } + +func TestCSI_GCVolumeClaims_Controller(t *testing.T) { + t.Parallel() + srv, shutdown := TestServer(t, func(c *Config) { c.NumSchedulers = 0 }) + defer shutdown() + testutil.WaitForLeader(t, srv.RPC) + + state := srv.fsm.State() + ws := memdb.NewWatchSet() + + // Create a client node, plugin, and volume + node := mock.Node() + node.Attributes["nomad.version"] = "0.11.0" // client RPCs not supported on early version + node.CSINodePlugins = map[string]*structs.CSIInfo{ + "csi-plugin-example": { + PluginID: "csi-plugin-example", + Healthy: true, + RequiresControllerPlugin: true, + NodeInfo: &structs.CSINodeInfo{}, + }, + } + node.CSIControllerPlugins = map[string]*structs.CSIInfo{ + "csi-plugin-example": { + PluginID: "csi-plugin-example", + Healthy: true, + RequiresControllerPlugin: true, + ControllerInfo: &structs.CSIControllerInfo{ + SupportsReadOnlyAttach: true, + SupportsAttachDetach: true, + SupportsListVolumes: true, + SupportsListVolumesAttachedNodes: false, + }, + }, + } + err := state.UpsertNode(99, node) + require.NoError(t, err) + volId0 := uuid.Generate() + vols := []*structs.CSIVolume{{ + ID: volId0, + Namespace: "notTheNamespace", + PluginID: "csi-plugin-example", + AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + }} + err = state.CSIVolumeRegister(100, vols) + require.NoError(t, err) + vol, err := state.CSIVolumeByID(ws, volId0) + + require.NoError(t, err) + require.True(t, vol.ControllerRequired) + require.Len(t, vol.ReadAllocs, 0) + require.Len(t, vol.WriteAllocs, 0) + + // Create a job with 2 allocations + job := mock.Job() + job.TaskGroups[0].Volumes = map[string]*structs.VolumeRequest{ + "_": { + Name: "someVolume", + Type: structs.VolumeTypeCSI, + Source: volId0, + ReadOnly: false, + }, + } + err = state.UpsertJob(101, job) + require.NoError(t, err) + + alloc1 := mock.Alloc() + alloc1.JobID = job.ID + alloc1.NodeID = node.ID + err = state.UpsertJobSummary(102, mock.JobSummary(alloc1.JobID)) + require.NoError(t, err) + alloc1.TaskGroup = job.TaskGroups[0].Name + + alloc2 := mock.Alloc() + alloc2.JobID = job.ID + alloc2.NodeID = node.ID + err = state.UpsertJobSummary(103, mock.JobSummary(alloc2.JobID)) + require.NoError(t, err) + alloc2.TaskGroup = job.TaskGroups[0].Name + + err = state.UpsertAllocs(104, []*structs.Allocation{alloc1, alloc2}) + require.NoError(t, err) + + // Claim the volumes and verify the claims were set + err = state.CSIVolumeClaim(105, volId0, alloc1, structs.CSIVolumeClaimWrite) + require.NoError(t, err) + err = state.CSIVolumeClaim(106, volId0, alloc2, structs.CSIVolumeClaimRead) + require.NoError(t, err) + vol, err = state.CSIVolumeByID(ws, volId0) + require.NoError(t, err) + require.Len(t, vol.ReadAllocs, 1) + require.Len(t, vol.WriteAllocs, 1) + + // Update both allocs as failed/terminated + alloc1.ClientStatus = structs.AllocClientStatusFailed + alloc2.ClientStatus = structs.AllocClientStatusFailed + err = state.UpdateAllocsFromClient(107, []*structs.Allocation{alloc1, alloc2}) + require.NoError(t, err) + + // Create the GC eval we'd get from Node.UpdateAlloc + now := time.Now().UTC() + eval := &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: job.Namespace, + Priority: structs.CoreJobPriority, + Type: structs.JobTypeCore, + TriggeredBy: structs.EvalTriggerAllocStop, + JobID: structs.CoreJobCSIVolumeClaimGC + ":" + job.ID, + LeaderACL: srv.getLeaderAcl(), + Status: structs.EvalStatusPending, + CreateTime: now.UTC().UnixNano(), + ModifyTime: now.UTC().UnixNano(), + } + + // Process the eval + snap, err := state.Snapshot() + require.NoError(t, err) + core := NewCoreScheduler(srv, snap) + err = core.Process(eval) + require.NoError(t, err) + + // Verify both claims were released + vol, err = state.CSIVolumeByID(ws, volId0) + require.NoError(t, err) + require.Len(t, vol.ReadAllocs, 0) + require.Len(t, vol.WriteAllocs, 0) +} diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index 79632e436..09dc46c21 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -342,7 +342,7 @@ func (v *CSIVolume) Deregister(args *structs.CSIVolumeDeregisterRequest, reply * return nil } -// Claim claims a volume +// Claim submits a change to a volume claim func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CSIVolumeClaimResponse) error { if done, err := v.srv.forward("CSIVolume.Claim", args, args, reply); done { return err @@ -361,10 +361,13 @@ func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CS return structs.ErrPermissionDenied } - // adds a Volume and PublishContext from the controller (if any) to the reply - err = v.srv.controllerPublishVolume(args, reply) - if err != nil { - return fmt.Errorf("controllerPublish: %v", err) + // if this is a new claim, add a Volume and PublishContext from the + // controller (if any) to the reply + if args.Claim != structs.CSIVolumeClaimRelease { + err = v.srv.controllerPublishVolume(args, reply) + if err != nil { + return fmt.Errorf("controller publish: %v", err) + } } resp, index, err := v.srv.raftApply(structs.CSIVolumeClaimRequestType, args) diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index 0bbd5684b..25d954152 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -220,7 +220,7 @@ func TestCSIVolumeEndpoint_Claim(t *testing.T) { } claimResp := &structs.CSIVolumeClaimResponse{} err := msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp) - require.EqualError(t, err, fmt.Sprintf("controllerPublish: volume not found: %s", id0), + require.EqualError(t, err, fmt.Sprintf("controller publish: volume not found: %s", id0), "expected 'volume not found' error because volume hasn't yet been created") // Create a client node, plugin, alloc, and volume @@ -377,7 +377,7 @@ func TestCSIVolumeEndpoint_ClaimWithController(t *testing.T) { claimResp := &structs.CSIVolumeClaimResponse{} err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp) // Because the node is not registered - require.EqualError(t, err, "controllerPublish: attach volume: No path to node") + require.EqualError(t, err, "controller publish: attach volume: No path to node") } func TestCSIVolumeEndpoint_List(t *testing.T) {