open-nomad/client/csi_endpoint.go

221 lines
7.2 KiB
Go

package client
import (
"context"
"errors"
"time"
metrics "github.com/armon/go-metrics"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/hashicorp/nomad/client/dynamicplugins"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
"github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/plugins/csi"
)
// CSI endpoint is used for interacting with CSI plugins on a client.
// TODO: Submit metrics with labels to allow debugging per plugin perf problems.
type CSI struct {
c *Client
}
const (
// CSIPluginRequestTimeout is the timeout that should be used when making reqs
// against CSI Plugins. It is copied from Kubernetes as an initial seed value.
// https://github.com/kubernetes/kubernetes/blob/e680ad7156f263a6d8129cc0117fda58602e50ad/pkg/volume/csi/csi_plugin.go#L52
CSIPluginRequestTimeout = 2 * time.Minute
)
var (
ErrPluginTypeError = errors.New("CSI Plugin loaded incorrectly")
)
// ControllerValidateVolume is used during volume registration to validate
// that a volume exists and that the capabilities it was registered with are
// supported by the CSI Plugin and external volume configuration.
func (c *CSI) ControllerValidateVolume(req *structs.ClientCSIControllerValidateVolumeRequest, resp *structs.ClientCSIControllerValidateVolumeResponse) error {
defer metrics.MeasureSince([]string{"client", "csi_controller", "validate_volume"}, time.Now())
if req.VolumeID == "" {
return errors.New("VolumeID is required")
}
if req.PluginID == "" {
return errors.New("PluginID is required")
}
plugin, err := c.findControllerPlugin(req.PluginID)
if err != nil {
return err
}
defer plugin.Close()
caps, err := csi.VolumeCapabilityFromStructs(req.AttachmentMode, req.AccessMode)
if err != nil {
return err
}
ctx, cancelFn := c.requestContext()
defer cancelFn()
// CSI ValidateVolumeCapabilities errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
return plugin.ControllerValidateCapabilities(ctx, req.VolumeID, caps,
grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
}
// ControllerAttachVolume is used to attach a volume from a CSI Cluster to
// the storage node provided in the request.
//
// The controller attachment flow currently works as follows:
// 1. Validate the volume request
// 2. Call ControllerPublishVolume on the CSI Plugin to trigger a remote attachment
//
// In the future this may be expanded to request dynamic secrets for attachment.
func (c *CSI) ControllerAttachVolume(req *structs.ClientCSIControllerAttachVolumeRequest, resp *structs.ClientCSIControllerAttachVolumeResponse) error {
defer metrics.MeasureSince([]string{"client", "csi_controller", "publish_volume"}, time.Now())
plugin, err := c.findControllerPlugin(req.PluginID)
if err != nil {
return err
}
defer plugin.Close()
// The following block of validation checks should not be reached on a
// real Nomad cluster as all of this data should be validated when registering
// volumes with the cluster. They serve as a defensive check before forwarding
// requests to plugins, and to aid with development.
if req.VolumeID == "" {
return errors.New("VolumeID is required")
}
if req.ClientCSINodeID == "" {
return errors.New("ClientCSINodeID is required")
}
csiReq, err := req.ToCSIRequest()
if err != nil {
return err
}
// Submit the request for a volume to the CSI Plugin.
ctx, cancelFn := c.requestContext()
defer cancelFn()
// CSI ControllerPublishVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
cresp, err := plugin.ControllerPublishVolume(ctx, csiReq,
grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
if err != nil {
return err
}
resp.PublishContext = cresp.PublishContext
return nil
}
// ControllerDetachVolume is used to detach a volume from a CSI Cluster from
// the storage node provided in the request.
func (c *CSI) ControllerDetachVolume(req *structs.ClientCSIControllerDetachVolumeRequest, resp *structs.ClientCSIControllerDetachVolumeResponse) error {
defer metrics.MeasureSince([]string{"client", "csi_controller", "unpublish_volume"}, time.Now())
plugin, err := c.findControllerPlugin(req.PluginID)
if err != nil {
return err
}
defer plugin.Close()
// The following block of validation checks should not be reached on a
// real Nomad cluster as all of this data should be validated when registering
// volumes with the cluster. They serve as a defensive check before forwarding
// requests to plugins, and to aid with development.
if req.VolumeID == "" {
return errors.New("VolumeID is required")
}
if req.ClientCSINodeID == "" {
return errors.New("ClientCSINodeID is required")
}
csiReq := req.ToCSIRequest()
// Submit the request for a volume to the CSI Plugin.
ctx, cancelFn := c.requestContext()
defer cancelFn()
// CSI ControllerUnpublishVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
_, err = plugin.ControllerUnpublishVolume(ctx, csiReq,
grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
if err != nil {
return err
}
return nil
}
// NodeDetachVolume is used to detach a volume from a CSI Cluster from
// the storage node provided in the request.
func (c *CSI) NodeDetachVolume(req *structs.ClientCSINodeDetachVolumeRequest, resp *structs.ClientCSINodeDetachVolumeResponse) error {
defer metrics.MeasureSince([]string{"client", "csi_node", "detach_volume"}, time.Now())
// The following block of validation checks should not be reached on a
// real Nomad cluster. They serve as a defensive check before forwarding
// requests to plugins, and to aid with development.
if req.PluginID == "" {
return errors.New("PluginID is required")
}
if req.VolumeID == "" {
return errors.New("VolumeID is required")
}
if req.AllocID == "" {
return errors.New("AllocID is required")
}
ctx, cancelFn := c.requestContext()
defer cancelFn()
mounter, err := c.c.csimanager.MounterForPlugin(ctx, req.PluginID)
if err != nil {
return err
}
usageOpts := &csimanager.UsageOptions{
ReadOnly: req.ReadOnly,
AttachmentMode: string(req.AttachmentMode),
AccessMode: string(req.AccessMode),
}
err = mounter.UnmountVolume(ctx, req.VolumeID, req.AllocID, usageOpts)
if err != nil {
return err
}
return nil
}
func (c *CSI) findControllerPlugin(name string) (csi.CSIPlugin, error) {
return c.findPlugin(dynamicplugins.PluginTypeCSIController, name)
}
func (c *CSI) findPlugin(ptype, name string) (csi.CSIPlugin, error) {
pIface, err := c.c.dynamicRegistry.DispensePlugin(ptype, name)
if err != nil {
return nil, err
}
plugin, ok := pIface.(csi.CSIPlugin)
if !ok {
return nil, ErrPluginTypeError
}
return plugin, nil
}
func (c *CSI) requestContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), CSIPluginRequestTimeout)
}