CSI: make gRPC client creation more robust (#12057)
Nomad communicates with CSI plugin tasks via gRPC. The plugin supervisor hook uses this to ping the plugin for health checks which it emits as task events. After the first successful health check the plugin supervisor registers the plugin in the client's dynamic plugin registry, which in turn creates a CSI plugin manager instance that has its own gRPC client for fingerprinting the plugin and sending mount requests. If the plugin manager instance fails to connect to the plugin on its first attempt, it exits. The plugin supervisor hook is unaware that connection failed so long as its own pings continue to work. A transient failure during plugin startup may mislead the plugin supervisor hook into thinking the plugin is up (so there's no need to restart the allocation) but no fingerprinter is started. * Refactors the gRPC client to connect on first use. This provides the plugin manager instance the ability to retry the gRPC client connection until success. * Add a 30s timeout to the plugin supervisor so that we don't poll forever waiting for a plugin that will never come back up. Minor improvements: * The plugin supervisor hook creates a new gRPC client for every probe and then throws it away. Instead, reuse the client as we do for the plugin manager. * The gRPC client constructor has a 1 second timeout. Clarify that this timeout applies to the connection and not the rest of the client lifetime.
This commit is contained in:
parent
ac3cd73d00
commit
27bb2da5ee
|
@ -0,0 +1,7 @@
|
|||
```release-note:bug
|
||||
csi: Fixed a bug where the plugin instance manager would not retry the initial gRPC connection to plugins
|
||||
```
|
||||
|
||||
```release-note:bug
|
||||
csi: Fixed a bug where the plugin supervisor would not restart the task if it failed to connect to the plugin
|
||||
```
|
|
@ -38,6 +38,7 @@ type csiPluginSupervisorHook struct {
|
|||
|
||||
// eventEmitter is used to emit events to the task
|
||||
eventEmitter ti.EventEmitter
|
||||
lifecycle ti.TaskLifecycle
|
||||
|
||||
shutdownCtx context.Context
|
||||
shutdownCancelFn context.CancelFunc
|
||||
|
@ -54,6 +55,7 @@ type csiPluginSupervisorHookConfig struct {
|
|||
clientStateDirPath string
|
||||
events ti.EventEmitter
|
||||
runner *TaskRunner
|
||||
lifecycle ti.TaskLifecycle
|
||||
capabilities *drivers.Capabilities
|
||||
logger hclog.Logger
|
||||
}
|
||||
|
@ -90,6 +92,7 @@ func newCSIPluginSupervisorHook(config *csiPluginSupervisorHookConfig) *csiPlugi
|
|||
hook := &csiPluginSupervisorHook{
|
||||
alloc: config.runner.Alloc(),
|
||||
runner: config.runner,
|
||||
lifecycle: config.lifecycle,
|
||||
logger: config.logger,
|
||||
task: task,
|
||||
mountPoint: pluginRoot,
|
||||
|
@ -201,27 +204,41 @@ func (h *csiPluginSupervisorHook) ensureSupervisorLoop(ctx context.Context) {
|
|||
}()
|
||||
|
||||
socketPath := filepath.Join(h.mountPoint, structs.CSISocketName)
|
||||
|
||||
client := csi.NewClient(socketPath, h.logger.Named("csi_client").With(
|
||||
"plugin.name", h.task.CSIPluginConfig.ID,
|
||||
"plugin.type", h.task.CSIPluginConfig.Type))
|
||||
defer client.Close()
|
||||
|
||||
t := time.NewTimer(0)
|
||||
|
||||
// We're in Poststart at this point, so if we can't connect within
|
||||
// this deadline, assume it's broken so we can restart the task
|
||||
startCtx, startCancelFn := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer startCancelFn()
|
||||
|
||||
var err error
|
||||
var pluginHealthy bool
|
||||
|
||||
// Step 1: Wait for the plugin to initially become available.
|
||||
WAITFORREADY:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-startCtx.Done():
|
||||
h.kill(ctx, fmt.Errorf("CSI plugin failed probe: %v", err))
|
||||
return
|
||||
case <-t.C:
|
||||
pluginHealthy, err := h.supervisorLoopOnce(ctx, socketPath)
|
||||
pluginHealthy, err = h.supervisorLoopOnce(startCtx, client)
|
||||
if err != nil || !pluginHealthy {
|
||||
h.logger.Debug("CSI Plugin not ready", "error", err)
|
||||
|
||||
// Plugin is not yet returning healthy, because we want to optimise for
|
||||
// quickly bringing a plugin online, we use a short timeout here.
|
||||
// TODO(dani): Test with more plugins and adjust.
|
||||
h.logger.Debug("CSI plugin not ready", "error", err)
|
||||
// Use only a short delay here to optimize for quickly
|
||||
// bringing up a plugin
|
||||
t.Reset(5 * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
// Mark the plugin as healthy in a task event
|
||||
h.logger.Debug("CSI plugin is ready")
|
||||
h.previousHealthState = pluginHealthy
|
||||
event := structs.NewTaskEvent(structs.TaskPluginHealthy)
|
||||
event.SetMessage(fmt.Sprintf("plugin: %s", h.task.CSIPluginConfig.ID))
|
||||
|
@ -232,15 +249,14 @@ WAITFORREADY:
|
|||
}
|
||||
|
||||
// Step 2: Register the plugin with the catalog.
|
||||
deregisterPluginFn, err := h.registerPlugin(socketPath)
|
||||
deregisterPluginFn, err := h.registerPlugin(client, socketPath)
|
||||
if err != nil {
|
||||
h.logger.Error("CSI Plugin registration failed", "error", err)
|
||||
event := structs.NewTaskEvent(structs.TaskPluginUnhealthy)
|
||||
event.SetMessage(fmt.Sprintf("failed to register plugin: %s, reason: %v", h.task.CSIPluginConfig.ID, err))
|
||||
h.eventEmitter.EmitEvent(event)
|
||||
h.kill(ctx, fmt.Errorf("CSI plugin failed to register: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
// Step 3: Start the lightweight supervisor loop.
|
||||
// Step 3: Start the lightweight supervisor loop. At this point, failures
|
||||
// don't cause the task to restart
|
||||
t.Reset(0)
|
||||
for {
|
||||
select {
|
||||
|
@ -249,9 +265,9 @@ WAITFORREADY:
|
|||
deregisterPluginFn()
|
||||
return
|
||||
case <-t.C:
|
||||
pluginHealthy, err := h.supervisorLoopOnce(ctx, socketPath)
|
||||
pluginHealthy, err := h.supervisorLoopOnce(ctx, client)
|
||||
if err != nil {
|
||||
h.logger.Error("CSI Plugin fingerprinting failed", "error", err)
|
||||
h.logger.Error("CSI plugin fingerprinting failed", "error", err)
|
||||
}
|
||||
|
||||
// The plugin has transitioned to a healthy state. Emit an event.
|
||||
|
@ -265,7 +281,7 @@ WAITFORREADY:
|
|||
if h.previousHealthState && !pluginHealthy {
|
||||
event := structs.NewTaskEvent(structs.TaskPluginUnhealthy)
|
||||
if err != nil {
|
||||
event.SetMessage(fmt.Sprintf("error: %v", err))
|
||||
event.SetMessage(fmt.Sprintf("Error: %v", err))
|
||||
} else {
|
||||
event.SetMessage("Unknown Reason")
|
||||
}
|
||||
|
@ -281,16 +297,9 @@ WAITFORREADY:
|
|||
}
|
||||
}
|
||||
|
||||
func (h *csiPluginSupervisorHook) registerPlugin(socketPath string) (func(), error) {
|
||||
|
||||
func (h *csiPluginSupervisorHook) registerPlugin(client csi.CSIPlugin, socketPath string) (func(), error) {
|
||||
// At this point we know the plugin is ready and we can fingerprint it
|
||||
// to get its vendor name and version
|
||||
client, err := csi.NewClient(socketPath, h.logger.Named("csi_client").With("plugin.name", h.task.CSIPluginConfig.ID, "plugin.type", h.task.CSIPluginConfig.Type))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create csi client: %v", err)
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
info, err := client.PluginInfo()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to probe plugin: %v", err)
|
||||
|
@ -354,21 +363,13 @@ func (h *csiPluginSupervisorHook) registerPlugin(socketPath string) (func(), err
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, socketPath string) (bool, error) {
|
||||
_, err := os.Stat(socketPath)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to stat socket: %v", err)
|
||||
}
|
||||
func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, client csi.CSIPlugin) (bool, error) {
|
||||
probeCtx, probeCancelFn := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer probeCancelFn()
|
||||
|
||||
client, err := csi.NewClient(socketPath, h.logger.Named("csi_client").With("plugin.name", h.task.CSIPluginConfig.ID, "plugin.type", h.task.CSIPluginConfig.Type))
|
||||
healthy, err := client.PluginProbe(probeCtx)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to create csi client: %v", err)
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
healthy, err := client.PluginProbe(ctx)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to probe plugin: %v", err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
return healthy, nil
|
||||
|
@ -387,6 +388,21 @@ func (h *csiPluginSupervisorHook) Stop(_ context.Context, req *interfaces.TaskSt
|
|||
return nil
|
||||
}
|
||||
|
||||
func (h *csiPluginSupervisorHook) kill(ctx context.Context, reason error) {
|
||||
h.logger.Error("killing task because plugin failed", "error", reason)
|
||||
event := structs.NewTaskEvent(structs.TaskPluginUnhealthy)
|
||||
event.SetMessage(fmt.Sprintf("Error: %v", reason.Error()))
|
||||
h.eventEmitter.EmitEvent(event)
|
||||
|
||||
if err := h.lifecycle.Kill(ctx,
|
||||
structs.NewTaskEvent(structs.TaskKilling).
|
||||
SetFailsTask().
|
||||
SetDisplayMessage("CSI plugin did not become healthy before timeout"),
|
||||
); err != nil {
|
||||
h.logger.Error("failed to kill task", "kill_reason", reason, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
func ensureMountpointInserted(mounts []*drivers.MountConfig, mount *drivers.MountConfig) []*drivers.MountConfig {
|
||||
for _, mnt := range mounts {
|
||||
if mnt.IsEqual(mount) {
|
||||
|
|
|
@ -76,6 +76,7 @@ func (tr *TaskRunner) initHooks() {
|
|||
clientStateDirPath: tr.clientConfig.StateDir,
|
||||
events: tr,
|
||||
runner: tr,
|
||||
lifecycle: tr,
|
||||
capabilities: tr.driverCapabilities,
|
||||
logger: hookLogger,
|
||||
}))
|
||||
|
|
|
@ -390,11 +390,11 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
|
|||
c.dynamicRegistry =
|
||||
dynamicplugins.NewRegistry(c.stateDB, map[string]dynamicplugins.PluginDispenser{
|
||||
dynamicplugins.PluginTypeCSIController: func(info *dynamicplugins.PluginInfo) (interface{}, error) {
|
||||
return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "controller"))
|
||||
return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "controller")), nil
|
||||
},
|
||||
dynamicplugins.PluginTypeCSINode: func(info *dynamicplugins.PluginInfo) (interface{}, error) {
|
||||
return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "client"))
|
||||
}, // TODO(tgross): refactor these dispenser constructors into csimanager to tidy it up
|
||||
return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "client")), nil
|
||||
},
|
||||
})
|
||||
|
||||
// Setup the clients RPC server
|
||||
|
|
|
@ -73,12 +73,7 @@ func newInstanceManager(logger hclog.Logger, eventer TriggerNodeEvent, updater U
|
|||
}
|
||||
|
||||
func (i *instanceManager) run() {
|
||||
c, err := csi.NewClient(i.info.ConnectionInfo.SocketPath, i.logger)
|
||||
if err != nil {
|
||||
i.logger.Error("failed to setup instance manager client", "error", err)
|
||||
close(i.shutdownCh)
|
||||
return
|
||||
}
|
||||
c := csi.NewClient(i.info.ConnectionInfo.SocketPath, i.logger)
|
||||
i.client = c
|
||||
i.fp.client = c
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"math"
|
||||
"net"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
|
||||
|
@ -88,6 +89,7 @@ type CSINodeClient interface {
|
|||
}
|
||||
|
||||
type client struct {
|
||||
addr string
|
||||
conn *grpc.ClientConn
|
||||
identityClient csipbv1.IdentityClient
|
||||
controllerClient CSIControllerClient
|
||||
|
@ -102,30 +104,59 @@ func (c *client) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func NewClient(addr string, logger hclog.Logger) (CSIPlugin, error) {
|
||||
if addr == "" {
|
||||
return nil, fmt.Errorf("address is empty")
|
||||
}
|
||||
|
||||
conn, err := newGrpcConn(addr, logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func NewClient(addr string, logger hclog.Logger) CSIPlugin {
|
||||
return &client{
|
||||
conn: conn,
|
||||
identityClient: csipbv1.NewIdentityClient(conn),
|
||||
controllerClient: csipbv1.NewControllerClient(conn),
|
||||
nodeClient: csipbv1.NewNodeClient(conn),
|
||||
logger: logger,
|
||||
}, nil
|
||||
addr: addr,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *client) ensureConnected(ctx context.Context) error {
|
||||
if c == nil {
|
||||
return fmt.Errorf("client not initialized")
|
||||
}
|
||||
if c.conn != nil {
|
||||
return nil
|
||||
}
|
||||
if c.addr == "" {
|
||||
return fmt.Errorf("address is empty")
|
||||
}
|
||||
var conn *grpc.ClientConn
|
||||
var err error
|
||||
t := time.NewTimer(0)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("timeout while connecting to gRPC socket: %v", err)
|
||||
case <-t.C:
|
||||
_, err = os.Stat(c.addr)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to stat socket: %v", err)
|
||||
t.Reset(5 * time.Second)
|
||||
continue
|
||||
}
|
||||
conn, err = newGrpcConn(c.addr, c.logger)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to create gRPC connection: %v", err)
|
||||
t.Reset(time.Second * 5)
|
||||
continue
|
||||
}
|
||||
c.conn = conn
|
||||
c.identityClient = csipbv1.NewIdentityClient(conn)
|
||||
c.controllerClient = csipbv1.NewControllerClient(conn)
|
||||
c.nodeClient = csipbv1.NewNodeClient(conn)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func newGrpcConn(addr string, logger hclog.Logger) (*grpc.ClientConn, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
|
||||
// after DialContext returns w/ initial connection, closing this
|
||||
// context is a no-op
|
||||
connectCtx, cancel := context.WithTimeout(context.Background(), time.Second*1)
|
||||
defer cancel()
|
||||
conn, err := grpc.DialContext(
|
||||
ctx,
|
||||
connectCtx,
|
||||
addr,
|
||||
grpc.WithBlock(),
|
||||
grpc.WithInsecure(),
|
||||
|
@ -146,10 +177,14 @@ func newGrpcConn(addr string, logger hclog.Logger) (*grpc.ClientConn, error) {
|
|||
// PluginInfo describes the type and version of a plugin as required by the nomad
|
||||
// base.BasePlugin interface.
|
||||
func (c *client) PluginInfo() (*base.PluginInfoResponse, error) {
|
||||
// note: no grpc retries needed here, as this is called in
|
||||
// fingerprinting and will get retried by the caller.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
if err := c.ensureConnected(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// note: no grpc retries needed here, as this is called in
|
||||
// fingerprinting and will get retried by the caller.
|
||||
name, version, err := c.PluginGetInfo(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -176,6 +211,10 @@ func (c *client) SetConfig(_ *base.Config) error {
|
|||
}
|
||||
|
||||
func (c *client) PluginProbe(ctx context.Context) (bool, error) {
|
||||
if err := c.ensureConnected(ctx); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// note: no grpc retries should be done here
|
||||
req, err := c.identityClient.Probe(ctx, &csipbv1.ProbeRequest{})
|
||||
if err != nil {
|
||||
|
@ -198,11 +237,8 @@ func (c *client) PluginProbe(ctx context.Context) (bool, error) {
|
|||
}
|
||||
|
||||
func (c *client) PluginGetInfo(ctx context.Context) (string, string, error) {
|
||||
if c == nil {
|
||||
return "", "", fmt.Errorf("Client not initialized")
|
||||
}
|
||||
if c.identityClient == nil {
|
||||
return "", "", fmt.Errorf("Client not initialized")
|
||||
if err := c.ensureConnected(ctx); err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
resp, err := c.identityClient.GetPluginInfo(ctx, &csipbv1.GetPluginInfoRequest{})
|
||||
|
@ -220,11 +256,8 @@ func (c *client) PluginGetInfo(ctx context.Context) (string, string, error) {
|
|||
}
|
||||
|
||||
func (c *client) PluginGetCapabilities(ctx context.Context) (*PluginCapabilitySet, error) {
|
||||
if c == nil {
|
||||
return nil, fmt.Errorf("Client not initialized")
|
||||
}
|
||||
if c.identityClient == nil {
|
||||
return nil, fmt.Errorf("Client not initialized")
|
||||
if err := c.ensureConnected(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// note: no grpc retries needed here, as this is called in
|
||||
|
@ -243,11 +276,8 @@ func (c *client) PluginGetCapabilities(ctx context.Context) (*PluginCapabilitySe
|
|||
//
|
||||
|
||||
func (c *client) ControllerGetCapabilities(ctx context.Context) (*ControllerCapabilitySet, error) {
|
||||
if c == nil {
|
||||
return nil, fmt.Errorf("Client not initialized")
|
||||
}
|
||||
if c.controllerClient == nil {
|
||||
return nil, fmt.Errorf("controllerClient not initialized")
|
||||
if err := c.ensureConnected(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// note: no grpc retries needed here, as this is called in
|
||||
|
@ -262,11 +292,8 @@ func (c *client) ControllerGetCapabilities(ctx context.Context) (*ControllerCapa
|
|||
}
|
||||
|
||||
func (c *client) ControllerPublishVolume(ctx context.Context, req *ControllerPublishVolumeRequest, opts ...grpc.CallOption) (*ControllerPublishVolumeResponse, error) {
|
||||
if c == nil {
|
||||
return nil, fmt.Errorf("Client not initialized")
|
||||
}
|
||||
if c.controllerClient == nil {
|
||||
return nil, fmt.Errorf("controllerClient not initialized")
|
||||
if err := c.ensureConnected(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err := req.Validate()
|
||||
|
@ -304,11 +331,8 @@ func (c *client) ControllerPublishVolume(ctx context.Context, req *ControllerPub
|
|||
}
|
||||
|
||||
func (c *client) ControllerUnpublishVolume(ctx context.Context, req *ControllerUnpublishVolumeRequest, opts ...grpc.CallOption) (*ControllerUnpublishVolumeResponse, error) {
|
||||
if c == nil {
|
||||
return nil, fmt.Errorf("Client not initialized")
|
||||
}
|
||||
if c.controllerClient == nil {
|
||||
return nil, fmt.Errorf("controllerClient not initialized")
|
||||
if err := c.ensureConnected(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err := req.Validate()
|
||||
if err != nil {
|
||||
|
@ -337,13 +361,9 @@ func (c *client) ControllerUnpublishVolume(ctx context.Context, req *ControllerU
|
|||
}
|
||||
|
||||
func (c *client) ControllerValidateCapabilities(ctx context.Context, req *ControllerValidateVolumeRequest, opts ...grpc.CallOption) error {
|
||||
if c == nil {
|
||||
return fmt.Errorf("Client not initialized")
|
||||
if err := c.ensureConnected(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
if c.controllerClient == nil {
|
||||
return fmt.Errorf("controllerClient not initialized")
|
||||
}
|
||||
|
||||
if req.ExternalID == "" {
|
||||
return fmt.Errorf("missing volume ID")
|
||||
}
|
||||
|
@ -390,6 +410,10 @@ func (c *client) ControllerValidateCapabilities(ctx context.Context, req *Contro
|
|||
}
|
||||
|
||||
func (c *client) ControllerCreateVolume(ctx context.Context, req *ControllerCreateVolumeRequest, opts ...grpc.CallOption) (*ControllerCreateVolumeResponse, error) {
|
||||
if err := c.ensureConnected(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err := req.Validate()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -433,6 +457,10 @@ func (c *client) ControllerCreateVolume(ctx context.Context, req *ControllerCrea
|
|||
}
|
||||
|
||||
func (c *client) ControllerListVolumes(ctx context.Context, req *ControllerListVolumesRequest, opts ...grpc.CallOption) (*ControllerListVolumesResponse, error) {
|
||||
if err := c.ensureConnected(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err := req.Validate()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -455,6 +483,10 @@ func (c *client) ControllerListVolumes(ctx context.Context, req *ControllerListV
|
|||
}
|
||||
|
||||
func (c *client) ControllerDeleteVolume(ctx context.Context, req *ControllerDeleteVolumeRequest, opts ...grpc.CallOption) error {
|
||||
if err := c.ensureConnected(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err := req.Validate()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -552,6 +584,10 @@ NEXT_CAP:
|
|||
}
|
||||
|
||||
func (c *client) ControllerCreateSnapshot(ctx context.Context, req *ControllerCreateSnapshotRequest, opts ...grpc.CallOption) (*ControllerCreateSnapshotResponse, error) {
|
||||
if err := c.ensureConnected(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err := req.Validate()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -596,6 +632,10 @@ func (c *client) ControllerCreateSnapshot(ctx context.Context, req *ControllerCr
|
|||
}
|
||||
|
||||
func (c *client) ControllerDeleteSnapshot(ctx context.Context, req *ControllerDeleteSnapshotRequest, opts ...grpc.CallOption) error {
|
||||
if err := c.ensureConnected(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err := req.Validate()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -626,6 +666,10 @@ func (c *client) ControllerDeleteSnapshot(ctx context.Context, req *ControllerDe
|
|||
}
|
||||
|
||||
func (c *client) ControllerListSnapshots(ctx context.Context, req *ControllerListSnapshotsRequest, opts ...grpc.CallOption) (*ControllerListSnapshotsResponse, error) {
|
||||
if err := c.ensureConnected(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err := req.Validate()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -657,11 +701,8 @@ func (c *client) ControllerListSnapshots(ctx context.Context, req *ControllerLis
|
|||
//
|
||||
|
||||
func (c *client) NodeGetCapabilities(ctx context.Context) (*NodeCapabilitySet, error) {
|
||||
if c == nil {
|
||||
return nil, fmt.Errorf("Client not initialized")
|
||||
}
|
||||
if c.nodeClient == nil {
|
||||
return nil, fmt.Errorf("Client not initialized")
|
||||
if err := c.ensureConnected(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// note: no grpc retries needed here, as this is called in
|
||||
|
@ -675,11 +716,8 @@ func (c *client) NodeGetCapabilities(ctx context.Context) (*NodeCapabilitySet, e
|
|||
}
|
||||
|
||||
func (c *client) NodeGetInfo(ctx context.Context) (*NodeGetInfoResponse, error) {
|
||||
if c == nil {
|
||||
return nil, fmt.Errorf("Client not initialized")
|
||||
}
|
||||
if c.nodeClient == nil {
|
||||
return nil, fmt.Errorf("Client not initialized")
|
||||
if err := c.ensureConnected(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result := &NodeGetInfoResponse{}
|
||||
|
@ -706,11 +744,8 @@ func (c *client) NodeGetInfo(ctx context.Context) (*NodeGetInfoResponse, error)
|
|||
}
|
||||
|
||||
func (c *client) NodeStageVolume(ctx context.Context, req *NodeStageVolumeRequest, opts ...grpc.CallOption) error {
|
||||
if c == nil {
|
||||
return fmt.Errorf("Client not initialized")
|
||||
}
|
||||
if c.nodeClient == nil {
|
||||
return fmt.Errorf("Client not initialized")
|
||||
if err := c.ensureConnected(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
err := req.Validate()
|
||||
if err != nil {
|
||||
|
@ -741,11 +776,8 @@ func (c *client) NodeStageVolume(ctx context.Context, req *NodeStageVolumeReques
|
|||
}
|
||||
|
||||
func (c *client) NodeUnstageVolume(ctx context.Context, volumeID string, stagingTargetPath string, opts ...grpc.CallOption) error {
|
||||
if c == nil {
|
||||
return fmt.Errorf("Client not initialized")
|
||||
}
|
||||
if c.nodeClient == nil {
|
||||
return fmt.Errorf("Client not initialized")
|
||||
if err := c.ensureConnected(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
// These errors should not be returned during production use but exist as aids
|
||||
// during Nomad development
|
||||
|
@ -779,13 +811,9 @@ func (c *client) NodeUnstageVolume(ctx context.Context, volumeID string, staging
|
|||
}
|
||||
|
||||
func (c *client) NodePublishVolume(ctx context.Context, req *NodePublishVolumeRequest, opts ...grpc.CallOption) error {
|
||||
if c == nil {
|
||||
return fmt.Errorf("Client not initialized")
|
||||
if err := c.ensureConnected(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
if c.nodeClient == nil {
|
||||
return fmt.Errorf("Client not initialized")
|
||||
}
|
||||
|
||||
if err := req.Validate(); err != nil {
|
||||
return fmt.Errorf("validation error: %v", err)
|
||||
}
|
||||
|
@ -813,13 +841,9 @@ func (c *client) NodePublishVolume(ctx context.Context, req *NodePublishVolumeRe
|
|||
}
|
||||
|
||||
func (c *client) NodeUnpublishVolume(ctx context.Context, volumeID, targetPath string, opts ...grpc.CallOption) error {
|
||||
if c == nil {
|
||||
return fmt.Errorf("Client not initialized")
|
||||
if err := c.ensureConnected(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
if c.nodeClient == nil {
|
||||
return fmt.Errorf("Client not initialized")
|
||||
}
|
||||
|
||||
// These errors should not be returned during production use but exist as aids
|
||||
// during Nomad development
|
||||
if volumeID == "" {
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
|
||||
|
@ -11,15 +12,26 @@ import (
|
|||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
fake "github.com/hashicorp/nomad/plugins/csi/testing"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func newTestClient() (*fake.IdentityClient, *fake.ControllerClient, *fake.NodeClient, CSIPlugin) {
|
||||
func newTestClient(t *testing.T) (*fake.IdentityClient, *fake.ControllerClient, *fake.NodeClient, CSIPlugin) {
|
||||
ic := fake.NewIdentityClient()
|
||||
cc := fake.NewControllerClient()
|
||||
nc := fake.NewNodeClient()
|
||||
|
||||
// we've set this as non-blocking so it won't connect to the
|
||||
// socket unless a RPC is invoked
|
||||
conn, err := grpc.DialContext(context.Background(),
|
||||
filepath.Join(t.TempDir(), "csi.sock"), grpc.WithInsecure())
|
||||
if err != nil {
|
||||
t.Errorf("failed: %v", err)
|
||||
}
|
||||
|
||||
client := &client{
|
||||
conn: conn,
|
||||
identityClient: ic,
|
||||
controllerClient: cc,
|
||||
nodeClient: nc,
|
||||
|
@ -69,7 +81,7 @@ func TestClient_RPC_PluginProbe(t *testing.T) {
|
|||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
ic, _, _, client := newTestClient()
|
||||
ic, _, _, client := newTestClient(t)
|
||||
defer client.Close()
|
||||
|
||||
ic.NextErr = tc.ResponseErr
|
||||
|
@ -121,7 +133,7 @@ func TestClient_RPC_PluginInfo(t *testing.T) {
|
|||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
ic, _, _, client := newTestClient()
|
||||
ic, _, _, client := newTestClient(t)
|
||||
defer client.Close()
|
||||
|
||||
ic.NextErr = tc.ResponseErr
|
||||
|
@ -186,7 +198,7 @@ func TestClient_RPC_PluginGetCapabilities(t *testing.T) {
|
|||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
ic, _, _, client := newTestClient()
|
||||
ic, _, _, client := newTestClient(t)
|
||||
defer client.Close()
|
||||
|
||||
ic.NextErr = tc.ResponseErr
|
||||
|
@ -284,7 +296,7 @@ func TestClient_RPC_ControllerGetCapabilities(t *testing.T) {
|
|||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
_, cc, _, client := newTestClient()
|
||||
_, cc, _, client := newTestClient(t)
|
||||
defer client.Close()
|
||||
|
||||
cc.NextErr = tc.ResponseErr
|
||||
|
@ -342,7 +354,7 @@ func TestClient_RPC_NodeGetCapabilities(t *testing.T) {
|
|||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
_, _, nc, client := newTestClient()
|
||||
_, _, nc, client := newTestClient(t)
|
||||
defer client.Close()
|
||||
|
||||
nc.NextErr = tc.ResponseErr
|
||||
|
@ -407,7 +419,7 @@ func TestClient_RPC_ControllerPublishVolume(t *testing.T) {
|
|||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
_, cc, _, client := newTestClient()
|
||||
_, cc, _, client := newTestClient(t)
|
||||
defer client.Close()
|
||||
|
||||
cc.NextErr = tc.ResponseErr
|
||||
|
@ -453,7 +465,7 @@ func TestClient_RPC_ControllerUnpublishVolume(t *testing.T) {
|
|||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
_, cc, _, client := newTestClient()
|
||||
_, cc, _, client := newTestClient(t)
|
||||
defer client.Close()
|
||||
|
||||
cc.NextErr = tc.ResponseErr
|
||||
|
@ -661,7 +673,7 @@ func TestClient_RPC_ControllerValidateVolume(t *testing.T) {
|
|||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
_, cc, _, client := newTestClient()
|
||||
_, cc, _, client := newTestClient(t)
|
||||
defer client.Close()
|
||||
|
||||
requestedCaps := []*VolumeCapability{{
|
||||
|
@ -758,7 +770,7 @@ func TestClient_RPC_ControllerCreateVolume(t *testing.T) {
|
|||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
_, cc, _, client := newTestClient()
|
||||
_, cc, _, client := newTestClient(t)
|
||||
defer client.Close()
|
||||
|
||||
req := &ControllerCreateVolumeRequest{
|
||||
|
@ -828,7 +840,7 @@ func TestClient_RPC_ControllerDeleteVolume(t *testing.T) {
|
|||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
_, cc, _, client := newTestClient()
|
||||
_, cc, _, client := newTestClient(t)
|
||||
defer client.Close()
|
||||
|
||||
cc.NextErr = tc.ResponseErr
|
||||
|
@ -871,7 +883,7 @@ func TestClient_RPC_ControllerListVolume(t *testing.T) {
|
|||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
_, cc, _, client := newTestClient()
|
||||
_, cc, _, client := newTestClient(t)
|
||||
defer client.Close()
|
||||
|
||||
cc.NextErr = tc.ResponseErr
|
||||
|
@ -979,7 +991,7 @@ func TestClient_RPC_ControllerCreateSnapshot(t *testing.T) {
|
|||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
_, cc, _, client := newTestClient()
|
||||
_, cc, _, client := newTestClient(t)
|
||||
defer client.Close()
|
||||
|
||||
cc.NextErr = tc.ResponseErr
|
||||
|
@ -1025,7 +1037,7 @@ func TestClient_RPC_ControllerDeleteSnapshot(t *testing.T) {
|
|||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
_, cc, _, client := newTestClient()
|
||||
_, cc, _, client := newTestClient(t)
|
||||
defer client.Close()
|
||||
|
||||
cc.NextErr = tc.ResponseErr
|
||||
|
@ -1068,7 +1080,7 @@ func TestClient_RPC_ControllerListSnapshots(t *testing.T) {
|
|||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
_, cc, _, client := newTestClient()
|
||||
_, cc, _, client := newTestClient(t)
|
||||
defer client.Close()
|
||||
|
||||
cc.NextErr = tc.ResponseErr
|
||||
|
@ -1124,7 +1136,7 @@ func TestClient_RPC_NodeStageVolume(t *testing.T) {
|
|||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
_, _, nc, client := newTestClient()
|
||||
_, _, nc, client := newTestClient(t)
|
||||
defer client.Close()
|
||||
|
||||
nc.NextErr = tc.ResponseErr
|
||||
|
@ -1165,7 +1177,7 @@ func TestClient_RPC_NodeUnstageVolume(t *testing.T) {
|
|||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
_, _, nc, client := newTestClient()
|
||||
_, _, nc, client := newTestClient(t)
|
||||
defer client.Close()
|
||||
|
||||
nc.NextErr = tc.ResponseErr
|
||||
|
@ -1221,7 +1233,7 @@ func TestClient_RPC_NodePublishVolume(t *testing.T) {
|
|||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
_, _, nc, client := newTestClient()
|
||||
_, _, nc, client := newTestClient(t)
|
||||
defer client.Close()
|
||||
|
||||
nc.NextErr = tc.ResponseErr
|
||||
|
@ -1274,7 +1286,7 @@ func TestClient_RPC_NodeUnpublishVolume(t *testing.T) {
|
|||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
_, _, nc, client := newTestClient()
|
||||
_, _, nc, client := newTestClient(t)
|
||||
defer client.Close()
|
||||
|
||||
nc.NextErr = tc.ResponseErr
|
||||
|
|
Loading…
Reference in New Issue