6ffd36c4e5
The CSI Specification defines various gRPC Errors and how they may be retried. After auditing all our CSI RPC calls in #6863, this changeset: * adds retries and backoffs to the where they were needed but not implemented * annotates those CSI RPCs that do not need retries so that we don't wonder whether it's been left off accidentally * added a timeout and cancellation context to the `Probe` call, which didn't have one.
182 lines
6 KiB
Go
182 lines
6 KiB
Go
package client
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"time"
|
|
|
|
metrics "github.com/armon/go-metrics"
|
|
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
|
|
"github.com/hashicorp/nomad/client/dynamicplugins"
|
|
"github.com/hashicorp/nomad/client/structs"
|
|
"github.com/hashicorp/nomad/plugins/csi"
|
|
)
|
|
|
|
// CSIController endpoint is used for interacting with CSI plugins on a client.
|
|
// TODO: Submit metrics with labels to allow debugging per plugin perf problems.
|
|
type CSIController struct {
|
|
c *Client
|
|
}
|
|
|
|
const (
|
|
// CSIPluginRequestTimeout is the timeout that should be used when making reqs
|
|
// against CSI Plugins. It is copied from Kubernetes as an initial seed value.
|
|
// https://github.com/kubernetes/kubernetes/blob/e680ad7156f263a6d8129cc0117fda58602e50ad/pkg/volume/csi/csi_plugin.go#L52
|
|
CSIPluginRequestTimeout = 2 * time.Minute
|
|
)
|
|
|
|
var (
|
|
ErrPluginTypeError = errors.New("CSI Plugin loaded incorrectly")
|
|
)
|
|
|
|
// ValidateVolume is used during volume registration to validate
|
|
// that a volume exists and that the capabilities it was registered with are
|
|
// supported by the CSI Plugin and external volume configuration.
|
|
func (c *CSIController) ValidateVolume(req *structs.ClientCSIControllerValidateVolumeRequest, resp *structs.ClientCSIControllerValidateVolumeResponse) error {
|
|
defer metrics.MeasureSince([]string{"client", "csi_controller", "validate_volume"}, time.Now())
|
|
|
|
if req.VolumeID == "" {
|
|
return errors.New("VolumeID is required")
|
|
}
|
|
|
|
if req.PluginID == "" {
|
|
return errors.New("PluginID is required")
|
|
}
|
|
|
|
plugin, err := c.findControllerPlugin(req.PluginID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer plugin.Close()
|
|
|
|
caps, err := csi.VolumeCapabilityFromStructs(req.AttachmentMode, req.AccessMode)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ctx, cancelFn := c.requestContext()
|
|
defer cancelFn()
|
|
|
|
// CSI ValidateVolumeCapabilities errors for timeout, codes.Unavailable and
|
|
// codes.ResourceExhausted are retried; all other errors are fatal.
|
|
return plugin.ControllerValidateCapabilities(ctx, req.VolumeID, caps,
|
|
grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout),
|
|
grpc_retry.WithMax(3),
|
|
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
|
|
}
|
|
|
|
// AttachVolume is used to attach a volume from a CSI Cluster to
|
|
// the storage node provided in the request.
|
|
//
|
|
// The controller attachment flow currently works as follows:
|
|
// 1. Validate the volume request
|
|
// 2. Call ControllerPublishVolume on the CSI Plugin to trigger a remote attachment
|
|
//
|
|
// In the future this may be expanded to request dynamic secrets for attachment.
|
|
func (c *CSIController) AttachVolume(req *structs.ClientCSIControllerAttachVolumeRequest, resp *structs.ClientCSIControllerAttachVolumeResponse) error {
|
|
defer metrics.MeasureSince([]string{"client", "csi_controller", "publish_volume"}, time.Now())
|
|
plugin, err := c.findControllerPlugin(req.PluginID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer plugin.Close()
|
|
|
|
// The following block of validation checks should not be reached on a
|
|
// real Nomad cluster as all of this data should be validated when registering
|
|
// volumes with the cluster. They serve as a defensive check before forwarding
|
|
// requests to plugins, and to aid with development.
|
|
|
|
if req.VolumeID == "" {
|
|
return errors.New("VolumeID is required")
|
|
}
|
|
|
|
if req.ClientCSINodeID == "" {
|
|
return errors.New("ClientCSINodeID is required")
|
|
}
|
|
|
|
csiReq, err := req.ToCSIRequest()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Submit the request for a volume to the CSI Plugin.
|
|
ctx, cancelFn := c.requestContext()
|
|
defer cancelFn()
|
|
// CSI ControllerPublishVolume errors for timeout, codes.Unavailable and
|
|
// codes.ResourceExhausted are retried; all other errors are fatal.
|
|
cresp, err := plugin.ControllerPublishVolume(ctx, csiReq,
|
|
grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout),
|
|
grpc_retry.WithMax(3),
|
|
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
resp.PublishContext = cresp.PublishContext
|
|
return nil
|
|
}
|
|
|
|
// DetachVolume is used to detach a volume from a CSI Cluster from
|
|
// the storage node provided in the request.
|
|
func (c *CSIController) DetachVolume(req *structs.ClientCSIControllerDetachVolumeRequest, resp *structs.ClientCSIControllerDetachVolumeResponse) error {
|
|
defer metrics.MeasureSince([]string{"client", "csi_controller", "unpublish_volume"}, time.Now())
|
|
plugin, err := c.findControllerPlugin(req.PluginID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer plugin.Close()
|
|
|
|
// The following block of validation checks should not be reached on a
|
|
// real Nomad cluster as all of this data should be validated when registering
|
|
// volumes with the cluster. They serve as a defensive check before forwarding
|
|
// requests to plugins, and to aid with development.
|
|
|
|
if req.VolumeID == "" {
|
|
return errors.New("VolumeID is required")
|
|
}
|
|
|
|
if req.ClientCSINodeID == "" {
|
|
return errors.New("ClientCSINodeID is required")
|
|
}
|
|
|
|
csiReq := req.ToCSIRequest()
|
|
|
|
// Submit the request for a volume to the CSI Plugin.
|
|
ctx, cancelFn := c.requestContext()
|
|
defer cancelFn()
|
|
// CSI ControllerUnpublishVolume errors for timeout, codes.Unavailable and
|
|
// codes.ResourceExhausted are retried; all other errors are fatal.
|
|
_, err = plugin.ControllerUnpublishVolume(ctx, csiReq,
|
|
grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout),
|
|
grpc_retry.WithMax(3),
|
|
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *CSIController) findControllerPlugin(name string) (csi.CSIPlugin, error) {
|
|
return c.findPlugin(dynamicplugins.PluginTypeCSIController, name)
|
|
}
|
|
|
|
// TODO: Cache Plugin Clients?
|
|
func (c *CSIController) findPlugin(ptype, name string) (csi.CSIPlugin, error) {
|
|
pIface, err := c.c.dynamicRegistry.DispensePlugin(ptype, name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
plugin, ok := pIface.(csi.CSIPlugin)
|
|
if !ok {
|
|
return nil, ErrPluginTypeError
|
|
}
|
|
|
|
return plugin, nil
|
|
}
|
|
|
|
func (c *CSIController) requestContext() (context.Context, context.CancelFunc) {
|
|
return context.WithTimeout(context.Background(), CSIPluginRequestTimeout)
|
|
}
|