From 27bb2da5eeb4e5f019a19b06af40e8e728834b89 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 15 Feb 2022 16:57:29 -0500 Subject: [PATCH] CSI: make gRPC client creation more robust (#12057) Nomad communicates with CSI plugin tasks via gRPC. The plugin supervisor hook uses this to ping the plugin for health checks which it emits as task events. After the first successful health check the plugin supervisor registers the plugin in the client's dynamic plugin registry, which in turn creates a CSI plugin manager instance that has its own gRPC client for fingerprinting the plugin and sending mount requests. If the plugin manager instance fails to connect to the plugin on its first attempt, it exits. The plugin supervisor hook is unaware that connection failed so long as its own pings continue to work. A transient failure during plugin startup may mislead the plugin supervisor hook into thinking the plugin is up (so there's no need to restart the allocation) but no fingerprinter is started. * Refactors the gRPC client to connect on first use. This provides the plugin manager instance the ability to retry the gRPC client connection until success. * Add a 30s timeout to the plugin supervisor so that we don't poll forever waiting for a plugin that will never come back up. Minor improvements: * The plugin supervisor hook creates a new gRPC client for every probe and then throws it away. Instead, reuse the client as we do for the plugin manager. * The gRPC client constructor has a 1 second timeout. Clarify that this timeout applies to the connection and not the rest of the client lifetime. --- .changelog/12057.txt | 7 + .../taskrunner/plugin_supervisor_hook.go | 90 +++++---- .../taskrunner/task_runner_hooks.go | 1 + client/client.go | 6 +- client/pluginmanager/csimanager/instance.go | 7 +- plugins/csi/client.go | 190 ++++++++++-------- plugins/csi/client_test.go | 50 +++-- 7 files changed, 203 insertions(+), 148 deletions(-) create mode 100644 .changelog/12057.txt diff --git a/.changelog/12057.txt b/.changelog/12057.txt new file mode 100644 index 000000000..8e74a8fe0 --- /dev/null +++ b/.changelog/12057.txt @@ -0,0 +1,7 @@ +```release-note:bug +csi: Fixed a bug where the plugin instance manager would not retry the initial gRPC connection to plugins +``` + +```release-note:bug +csi: Fixed a bug where the plugin supervisor would not restart the task if it failed to connect to the plugin +``` diff --git a/client/allocrunner/taskrunner/plugin_supervisor_hook.go b/client/allocrunner/taskrunner/plugin_supervisor_hook.go index 9adb8d53b..679fb2f73 100644 --- a/client/allocrunner/taskrunner/plugin_supervisor_hook.go +++ b/client/allocrunner/taskrunner/plugin_supervisor_hook.go @@ -38,6 +38,7 @@ type csiPluginSupervisorHook struct { // eventEmitter is used to emit events to the task eventEmitter ti.EventEmitter + lifecycle ti.TaskLifecycle shutdownCtx context.Context shutdownCancelFn context.CancelFunc @@ -54,6 +55,7 @@ type csiPluginSupervisorHookConfig struct { clientStateDirPath string events ti.EventEmitter runner *TaskRunner + lifecycle ti.TaskLifecycle capabilities *drivers.Capabilities logger hclog.Logger } @@ -90,6 +92,7 @@ func newCSIPluginSupervisorHook(config *csiPluginSupervisorHookConfig) *csiPlugi hook := &csiPluginSupervisorHook{ alloc: config.runner.Alloc(), runner: config.runner, + lifecycle: config.lifecycle, logger: config.logger, task: task, mountPoint: pluginRoot, @@ -201,27 +204,41 @@ func (h *csiPluginSupervisorHook) ensureSupervisorLoop(ctx context.Context) { }() socketPath := filepath.Join(h.mountPoint, structs.CSISocketName) + + client := csi.NewClient(socketPath, h.logger.Named("csi_client").With( + "plugin.name", h.task.CSIPluginConfig.ID, + "plugin.type", h.task.CSIPluginConfig.Type)) + defer client.Close() + t := time.NewTimer(0) + // We're in Poststart at this point, so if we can't connect within + // this deadline, assume it's broken so we can restart the task + startCtx, startCancelFn := context.WithTimeout(ctx, 30*time.Second) + defer startCancelFn() + + var err error + var pluginHealthy bool + // Step 1: Wait for the plugin to initially become available. WAITFORREADY: for { select { - case <-ctx.Done(): + case <-startCtx.Done(): + h.kill(ctx, fmt.Errorf("CSI plugin failed probe: %v", err)) return case <-t.C: - pluginHealthy, err := h.supervisorLoopOnce(ctx, socketPath) + pluginHealthy, err = h.supervisorLoopOnce(startCtx, client) if err != nil || !pluginHealthy { - h.logger.Debug("CSI Plugin not ready", "error", err) - - // Plugin is not yet returning healthy, because we want to optimise for - // quickly bringing a plugin online, we use a short timeout here. - // TODO(dani): Test with more plugins and adjust. + h.logger.Debug("CSI plugin not ready", "error", err) + // Use only a short delay here to optimize for quickly + // bringing up a plugin t.Reset(5 * time.Second) continue } // Mark the plugin as healthy in a task event + h.logger.Debug("CSI plugin is ready") h.previousHealthState = pluginHealthy event := structs.NewTaskEvent(structs.TaskPluginHealthy) event.SetMessage(fmt.Sprintf("plugin: %s", h.task.CSIPluginConfig.ID)) @@ -232,15 +249,14 @@ WAITFORREADY: } // Step 2: Register the plugin with the catalog. - deregisterPluginFn, err := h.registerPlugin(socketPath) + deregisterPluginFn, err := h.registerPlugin(client, socketPath) if err != nil { - h.logger.Error("CSI Plugin registration failed", "error", err) - event := structs.NewTaskEvent(structs.TaskPluginUnhealthy) - event.SetMessage(fmt.Sprintf("failed to register plugin: %s, reason: %v", h.task.CSIPluginConfig.ID, err)) - h.eventEmitter.EmitEvent(event) + h.kill(ctx, fmt.Errorf("CSI plugin failed to register: %v", err)) + return } - // Step 3: Start the lightweight supervisor loop. + // Step 3: Start the lightweight supervisor loop. At this point, failures + // don't cause the task to restart t.Reset(0) for { select { @@ -249,9 +265,9 @@ WAITFORREADY: deregisterPluginFn() return case <-t.C: - pluginHealthy, err := h.supervisorLoopOnce(ctx, socketPath) + pluginHealthy, err := h.supervisorLoopOnce(ctx, client) if err != nil { - h.logger.Error("CSI Plugin fingerprinting failed", "error", err) + h.logger.Error("CSI plugin fingerprinting failed", "error", err) } // The plugin has transitioned to a healthy state. Emit an event. @@ -265,7 +281,7 @@ WAITFORREADY: if h.previousHealthState && !pluginHealthy { event := structs.NewTaskEvent(structs.TaskPluginUnhealthy) if err != nil { - event.SetMessage(fmt.Sprintf("error: %v", err)) + event.SetMessage(fmt.Sprintf("Error: %v", err)) } else { event.SetMessage("Unknown Reason") } @@ -281,16 +297,9 @@ WAITFORREADY: } } -func (h *csiPluginSupervisorHook) registerPlugin(socketPath string) (func(), error) { - +func (h *csiPluginSupervisorHook) registerPlugin(client csi.CSIPlugin, socketPath string) (func(), error) { // At this point we know the plugin is ready and we can fingerprint it // to get its vendor name and version - client, err := csi.NewClient(socketPath, h.logger.Named("csi_client").With("plugin.name", h.task.CSIPluginConfig.ID, "plugin.type", h.task.CSIPluginConfig.Type)) - if err != nil { - return nil, fmt.Errorf("failed to create csi client: %v", err) - } - defer client.Close() - info, err := client.PluginInfo() if err != nil { return nil, fmt.Errorf("failed to probe plugin: %v", err) @@ -354,21 +363,13 @@ func (h *csiPluginSupervisorHook) registerPlugin(socketPath string) (func(), err }, nil } -func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, socketPath string) (bool, error) { - _, err := os.Stat(socketPath) - if err != nil { - return false, fmt.Errorf("failed to stat socket: %v", err) - } +func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, client csi.CSIPlugin) (bool, error) { + probeCtx, probeCancelFn := context.WithTimeout(ctx, 5*time.Second) + defer probeCancelFn() - client, err := csi.NewClient(socketPath, h.logger.Named("csi_client").With("plugin.name", h.task.CSIPluginConfig.ID, "plugin.type", h.task.CSIPluginConfig.Type)) + healthy, err := client.PluginProbe(probeCtx) if err != nil { - return false, fmt.Errorf("failed to create csi client: %v", err) - } - defer client.Close() - - healthy, err := client.PluginProbe(ctx) - if err != nil { - return false, fmt.Errorf("failed to probe plugin: %v", err) + return false, err } return healthy, nil @@ -387,6 +388,21 @@ func (h *csiPluginSupervisorHook) Stop(_ context.Context, req *interfaces.TaskSt return nil } +func (h *csiPluginSupervisorHook) kill(ctx context.Context, reason error) { + h.logger.Error("killing task because plugin failed", "error", reason) + event := structs.NewTaskEvent(structs.TaskPluginUnhealthy) + event.SetMessage(fmt.Sprintf("Error: %v", reason.Error())) + h.eventEmitter.EmitEvent(event) + + if err := h.lifecycle.Kill(ctx, + structs.NewTaskEvent(structs.TaskKilling). + SetFailsTask(). + SetDisplayMessage("CSI plugin did not become healthy before timeout"), + ); err != nil { + h.logger.Error("failed to kill task", "kill_reason", reason, "error", err) + } +} + func ensureMountpointInserted(mounts []*drivers.MountConfig, mount *drivers.MountConfig) []*drivers.MountConfig { for _, mnt := range mounts { if mnt.IsEqual(mount) { diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index b3f44a8b9..62ff26c4b 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -76,6 +76,7 @@ func (tr *TaskRunner) initHooks() { clientStateDirPath: tr.clientConfig.StateDir, events: tr, runner: tr, + lifecycle: tr, capabilities: tr.driverCapabilities, logger: hookLogger, })) diff --git a/client/client.go b/client/client.go index 34428464a..dd21b66c0 100644 --- a/client/client.go +++ b/client/client.go @@ -390,11 +390,11 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie c.dynamicRegistry = dynamicplugins.NewRegistry(c.stateDB, map[string]dynamicplugins.PluginDispenser{ dynamicplugins.PluginTypeCSIController: func(info *dynamicplugins.PluginInfo) (interface{}, error) { - return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "controller")) + return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "controller")), nil }, dynamicplugins.PluginTypeCSINode: func(info *dynamicplugins.PluginInfo) (interface{}, error) { - return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "client")) - }, // TODO(tgross): refactor these dispenser constructors into csimanager to tidy it up + return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "client")), nil + }, }) // Setup the clients RPC server diff --git a/client/pluginmanager/csimanager/instance.go b/client/pluginmanager/csimanager/instance.go index 062b73972..3839113dc 100644 --- a/client/pluginmanager/csimanager/instance.go +++ b/client/pluginmanager/csimanager/instance.go @@ -73,12 +73,7 @@ func newInstanceManager(logger hclog.Logger, eventer TriggerNodeEvent, updater U } func (i *instanceManager) run() { - c, err := csi.NewClient(i.info.ConnectionInfo.SocketPath, i.logger) - if err != nil { - i.logger.Error("failed to setup instance manager client", "error", err) - close(i.shutdownCh) - return - } + c := csi.NewClient(i.info.ConnectionInfo.SocketPath, i.logger) i.client = c i.fp.client = c diff --git a/plugins/csi/client.go b/plugins/csi/client.go index 89237dbbf..6d34d8f55 100644 --- a/plugins/csi/client.go +++ b/plugins/csi/client.go @@ -5,6 +5,7 @@ import ( "fmt" "math" "net" + "os" "time" csipbv1 "github.com/container-storage-interface/spec/lib/go/csi" @@ -88,6 +89,7 @@ type CSINodeClient interface { } type client struct { + addr string conn *grpc.ClientConn identityClient csipbv1.IdentityClient controllerClient CSIControllerClient @@ -102,30 +104,59 @@ func (c *client) Close() error { return nil } -func NewClient(addr string, logger hclog.Logger) (CSIPlugin, error) { - if addr == "" { - return nil, fmt.Errorf("address is empty") - } - - conn, err := newGrpcConn(addr, logger) - if err != nil { - return nil, err - } - +func NewClient(addr string, logger hclog.Logger) CSIPlugin { return &client{ - conn: conn, - identityClient: csipbv1.NewIdentityClient(conn), - controllerClient: csipbv1.NewControllerClient(conn), - nodeClient: csipbv1.NewNodeClient(conn), - logger: logger, - }, nil + addr: addr, + logger: logger, + } +} + +func (c *client) ensureConnected(ctx context.Context) error { + if c == nil { + return fmt.Errorf("client not initialized") + } + if c.conn != nil { + return nil + } + if c.addr == "" { + return fmt.Errorf("address is empty") + } + var conn *grpc.ClientConn + var err error + t := time.NewTimer(0) + for { + select { + case <-ctx.Done(): + return fmt.Errorf("timeout while connecting to gRPC socket: %v", err) + case <-t.C: + _, err = os.Stat(c.addr) + if err != nil { + err = fmt.Errorf("failed to stat socket: %v", err) + t.Reset(5 * time.Second) + continue + } + conn, err = newGrpcConn(c.addr, c.logger) + if err != nil { + err = fmt.Errorf("failed to create gRPC connection: %v", err) + t.Reset(time.Second * 5) + continue + } + c.conn = conn + c.identityClient = csipbv1.NewIdentityClient(conn) + c.controllerClient = csipbv1.NewControllerClient(conn) + c.nodeClient = csipbv1.NewNodeClient(conn) + return nil + } + } } func newGrpcConn(addr string, logger hclog.Logger) (*grpc.ClientConn, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) + // after DialContext returns w/ initial connection, closing this + // context is a no-op + connectCtx, cancel := context.WithTimeout(context.Background(), time.Second*1) defer cancel() conn, err := grpc.DialContext( - ctx, + connectCtx, addr, grpc.WithBlock(), grpc.WithInsecure(), @@ -146,10 +177,14 @@ func newGrpcConn(addr string, logger hclog.Logger) (*grpc.ClientConn, error) { // PluginInfo describes the type and version of a plugin as required by the nomad // base.BasePlugin interface. func (c *client) PluginInfo() (*base.PluginInfoResponse, error) { - // note: no grpc retries needed here, as this is called in - // fingerprinting and will get retried by the caller. ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() + if err := c.ensureConnected(ctx); err != nil { + return nil, err + } + + // note: no grpc retries needed here, as this is called in + // fingerprinting and will get retried by the caller. name, version, err := c.PluginGetInfo(ctx) if err != nil { return nil, err @@ -176,6 +211,10 @@ func (c *client) SetConfig(_ *base.Config) error { } func (c *client) PluginProbe(ctx context.Context) (bool, error) { + if err := c.ensureConnected(ctx); err != nil { + return false, err + } + // note: no grpc retries should be done here req, err := c.identityClient.Probe(ctx, &csipbv1.ProbeRequest{}) if err != nil { @@ -198,11 +237,8 @@ func (c *client) PluginProbe(ctx context.Context) (bool, error) { } func (c *client) PluginGetInfo(ctx context.Context) (string, string, error) { - if c == nil { - return "", "", fmt.Errorf("Client not initialized") - } - if c.identityClient == nil { - return "", "", fmt.Errorf("Client not initialized") + if err := c.ensureConnected(ctx); err != nil { + return "", "", err } resp, err := c.identityClient.GetPluginInfo(ctx, &csipbv1.GetPluginInfoRequest{}) @@ -220,11 +256,8 @@ func (c *client) PluginGetInfo(ctx context.Context) (string, string, error) { } func (c *client) PluginGetCapabilities(ctx context.Context) (*PluginCapabilitySet, error) { - if c == nil { - return nil, fmt.Errorf("Client not initialized") - } - if c.identityClient == nil { - return nil, fmt.Errorf("Client not initialized") + if err := c.ensureConnected(ctx); err != nil { + return nil, err } // note: no grpc retries needed here, as this is called in @@ -243,11 +276,8 @@ func (c *client) PluginGetCapabilities(ctx context.Context) (*PluginCapabilitySe // func (c *client) ControllerGetCapabilities(ctx context.Context) (*ControllerCapabilitySet, error) { - if c == nil { - return nil, fmt.Errorf("Client not initialized") - } - if c.controllerClient == nil { - return nil, fmt.Errorf("controllerClient not initialized") + if err := c.ensureConnected(ctx); err != nil { + return nil, err } // note: no grpc retries needed here, as this is called in @@ -262,11 +292,8 @@ func (c *client) ControllerGetCapabilities(ctx context.Context) (*ControllerCapa } func (c *client) ControllerPublishVolume(ctx context.Context, req *ControllerPublishVolumeRequest, opts ...grpc.CallOption) (*ControllerPublishVolumeResponse, error) { - if c == nil { - return nil, fmt.Errorf("Client not initialized") - } - if c.controllerClient == nil { - return nil, fmt.Errorf("controllerClient not initialized") + if err := c.ensureConnected(ctx); err != nil { + return nil, err } err := req.Validate() @@ -304,11 +331,8 @@ func (c *client) ControllerPublishVolume(ctx context.Context, req *ControllerPub } func (c *client) ControllerUnpublishVolume(ctx context.Context, req *ControllerUnpublishVolumeRequest, opts ...grpc.CallOption) (*ControllerUnpublishVolumeResponse, error) { - if c == nil { - return nil, fmt.Errorf("Client not initialized") - } - if c.controllerClient == nil { - return nil, fmt.Errorf("controllerClient not initialized") + if err := c.ensureConnected(ctx); err != nil { + return nil, err } err := req.Validate() if err != nil { @@ -337,13 +361,9 @@ func (c *client) ControllerUnpublishVolume(ctx context.Context, req *ControllerU } func (c *client) ControllerValidateCapabilities(ctx context.Context, req *ControllerValidateVolumeRequest, opts ...grpc.CallOption) error { - if c == nil { - return fmt.Errorf("Client not initialized") + if err := c.ensureConnected(ctx); err != nil { + return err } - if c.controllerClient == nil { - return fmt.Errorf("controllerClient not initialized") - } - if req.ExternalID == "" { return fmt.Errorf("missing volume ID") } @@ -390,6 +410,10 @@ func (c *client) ControllerValidateCapabilities(ctx context.Context, req *Contro } func (c *client) ControllerCreateVolume(ctx context.Context, req *ControllerCreateVolumeRequest, opts ...grpc.CallOption) (*ControllerCreateVolumeResponse, error) { + if err := c.ensureConnected(ctx); err != nil { + return nil, err + } + err := req.Validate() if err != nil { return nil, err @@ -433,6 +457,10 @@ func (c *client) ControllerCreateVolume(ctx context.Context, req *ControllerCrea } func (c *client) ControllerListVolumes(ctx context.Context, req *ControllerListVolumesRequest, opts ...grpc.CallOption) (*ControllerListVolumesResponse, error) { + if err := c.ensureConnected(ctx); err != nil { + return nil, err + } + err := req.Validate() if err != nil { return nil, err @@ -455,6 +483,10 @@ func (c *client) ControllerListVolumes(ctx context.Context, req *ControllerListV } func (c *client) ControllerDeleteVolume(ctx context.Context, req *ControllerDeleteVolumeRequest, opts ...grpc.CallOption) error { + if err := c.ensureConnected(ctx); err != nil { + return err + } + err := req.Validate() if err != nil { return err @@ -552,6 +584,10 @@ NEXT_CAP: } func (c *client) ControllerCreateSnapshot(ctx context.Context, req *ControllerCreateSnapshotRequest, opts ...grpc.CallOption) (*ControllerCreateSnapshotResponse, error) { + if err := c.ensureConnected(ctx); err != nil { + return nil, err + } + err := req.Validate() if err != nil { return nil, err @@ -596,6 +632,10 @@ func (c *client) ControllerCreateSnapshot(ctx context.Context, req *ControllerCr } func (c *client) ControllerDeleteSnapshot(ctx context.Context, req *ControllerDeleteSnapshotRequest, opts ...grpc.CallOption) error { + if err := c.ensureConnected(ctx); err != nil { + return err + } + err := req.Validate() if err != nil { return err @@ -626,6 +666,10 @@ func (c *client) ControllerDeleteSnapshot(ctx context.Context, req *ControllerDe } func (c *client) ControllerListSnapshots(ctx context.Context, req *ControllerListSnapshotsRequest, opts ...grpc.CallOption) (*ControllerListSnapshotsResponse, error) { + if err := c.ensureConnected(ctx); err != nil { + return nil, err + } + err := req.Validate() if err != nil { return nil, err @@ -657,11 +701,8 @@ func (c *client) ControllerListSnapshots(ctx context.Context, req *ControllerLis // func (c *client) NodeGetCapabilities(ctx context.Context) (*NodeCapabilitySet, error) { - if c == nil { - return nil, fmt.Errorf("Client not initialized") - } - if c.nodeClient == nil { - return nil, fmt.Errorf("Client not initialized") + if err := c.ensureConnected(ctx); err != nil { + return nil, err } // note: no grpc retries needed here, as this is called in @@ -675,11 +716,8 @@ func (c *client) NodeGetCapabilities(ctx context.Context) (*NodeCapabilitySet, e } func (c *client) NodeGetInfo(ctx context.Context) (*NodeGetInfoResponse, error) { - if c == nil { - return nil, fmt.Errorf("Client not initialized") - } - if c.nodeClient == nil { - return nil, fmt.Errorf("Client not initialized") + if err := c.ensureConnected(ctx); err != nil { + return nil, err } result := &NodeGetInfoResponse{} @@ -706,11 +744,8 @@ func (c *client) NodeGetInfo(ctx context.Context) (*NodeGetInfoResponse, error) } func (c *client) NodeStageVolume(ctx context.Context, req *NodeStageVolumeRequest, opts ...grpc.CallOption) error { - if c == nil { - return fmt.Errorf("Client not initialized") - } - if c.nodeClient == nil { - return fmt.Errorf("Client not initialized") + if err := c.ensureConnected(ctx); err != nil { + return err } err := req.Validate() if err != nil { @@ -741,11 +776,8 @@ func (c *client) NodeStageVolume(ctx context.Context, req *NodeStageVolumeReques } func (c *client) NodeUnstageVolume(ctx context.Context, volumeID string, stagingTargetPath string, opts ...grpc.CallOption) error { - if c == nil { - return fmt.Errorf("Client not initialized") - } - if c.nodeClient == nil { - return fmt.Errorf("Client not initialized") + if err := c.ensureConnected(ctx); err != nil { + return err } // These errors should not be returned during production use but exist as aids // during Nomad development @@ -779,13 +811,9 @@ func (c *client) NodeUnstageVolume(ctx context.Context, volumeID string, staging } func (c *client) NodePublishVolume(ctx context.Context, req *NodePublishVolumeRequest, opts ...grpc.CallOption) error { - if c == nil { - return fmt.Errorf("Client not initialized") + if err := c.ensureConnected(ctx); err != nil { + return err } - if c.nodeClient == nil { - return fmt.Errorf("Client not initialized") - } - if err := req.Validate(); err != nil { return fmt.Errorf("validation error: %v", err) } @@ -813,13 +841,9 @@ func (c *client) NodePublishVolume(ctx context.Context, req *NodePublishVolumeRe } func (c *client) NodeUnpublishVolume(ctx context.Context, volumeID, targetPath string, opts ...grpc.CallOption) error { - if c == nil { - return fmt.Errorf("Client not initialized") + if err := c.ensureConnected(ctx); err != nil { + return err } - if c.nodeClient == nil { - return fmt.Errorf("Client not initialized") - } - // These errors should not be returned during production use but exist as aids // during Nomad development if volumeID == "" { diff --git a/plugins/csi/client_test.go b/plugins/csi/client_test.go index 40da479b7..1c951b2ba 100644 --- a/plugins/csi/client_test.go +++ b/plugins/csi/client_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "path/filepath" "testing" csipbv1 "github.com/container-storage-interface/spec/lib/go/csi" @@ -11,15 +12,26 @@ import ( "github.com/hashicorp/nomad/nomad/structs" fake "github.com/hashicorp/nomad/plugins/csi/testing" "github.com/stretchr/testify/require" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) -func newTestClient() (*fake.IdentityClient, *fake.ControllerClient, *fake.NodeClient, CSIPlugin) { +func newTestClient(t *testing.T) (*fake.IdentityClient, *fake.ControllerClient, *fake.NodeClient, CSIPlugin) { ic := fake.NewIdentityClient() cc := fake.NewControllerClient() nc := fake.NewNodeClient() + + // we've set this as non-blocking so it won't connect to the + // socket unless a RPC is invoked + conn, err := grpc.DialContext(context.Background(), + filepath.Join(t.TempDir(), "csi.sock"), grpc.WithInsecure()) + if err != nil { + t.Errorf("failed: %v", err) + } + client := &client{ + conn: conn, identityClient: ic, controllerClient: cc, nodeClient: nc, @@ -69,7 +81,7 @@ func TestClient_RPC_PluginProbe(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - ic, _, _, client := newTestClient() + ic, _, _, client := newTestClient(t) defer client.Close() ic.NextErr = tc.ResponseErr @@ -121,7 +133,7 @@ func TestClient_RPC_PluginInfo(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - ic, _, _, client := newTestClient() + ic, _, _, client := newTestClient(t) defer client.Close() ic.NextErr = tc.ResponseErr @@ -186,7 +198,7 @@ func TestClient_RPC_PluginGetCapabilities(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - ic, _, _, client := newTestClient() + ic, _, _, client := newTestClient(t) defer client.Close() ic.NextErr = tc.ResponseErr @@ -284,7 +296,7 @@ func TestClient_RPC_ControllerGetCapabilities(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, cc, _, client := newTestClient() + _, cc, _, client := newTestClient(t) defer client.Close() cc.NextErr = tc.ResponseErr @@ -342,7 +354,7 @@ func TestClient_RPC_NodeGetCapabilities(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, _, nc, client := newTestClient() + _, _, nc, client := newTestClient(t) defer client.Close() nc.NextErr = tc.ResponseErr @@ -407,7 +419,7 @@ func TestClient_RPC_ControllerPublishVolume(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, cc, _, client := newTestClient() + _, cc, _, client := newTestClient(t) defer client.Close() cc.NextErr = tc.ResponseErr @@ -453,7 +465,7 @@ func TestClient_RPC_ControllerUnpublishVolume(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, cc, _, client := newTestClient() + _, cc, _, client := newTestClient(t) defer client.Close() cc.NextErr = tc.ResponseErr @@ -661,7 +673,7 @@ func TestClient_RPC_ControllerValidateVolume(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, cc, _, client := newTestClient() + _, cc, _, client := newTestClient(t) defer client.Close() requestedCaps := []*VolumeCapability{{ @@ -758,7 +770,7 @@ func TestClient_RPC_ControllerCreateVolume(t *testing.T) { } for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, cc, _, client := newTestClient() + _, cc, _, client := newTestClient(t) defer client.Close() req := &ControllerCreateVolumeRequest{ @@ -828,7 +840,7 @@ func TestClient_RPC_ControllerDeleteVolume(t *testing.T) { } for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, cc, _, client := newTestClient() + _, cc, _, client := newTestClient(t) defer client.Close() cc.NextErr = tc.ResponseErr @@ -871,7 +883,7 @@ func TestClient_RPC_ControllerListVolume(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, cc, _, client := newTestClient() + _, cc, _, client := newTestClient(t) defer client.Close() cc.NextErr = tc.ResponseErr @@ -979,7 +991,7 @@ func TestClient_RPC_ControllerCreateSnapshot(t *testing.T) { } for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, cc, _, client := newTestClient() + _, cc, _, client := newTestClient(t) defer client.Close() cc.NextErr = tc.ResponseErr @@ -1025,7 +1037,7 @@ func TestClient_RPC_ControllerDeleteSnapshot(t *testing.T) { } for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, cc, _, client := newTestClient() + _, cc, _, client := newTestClient(t) defer client.Close() cc.NextErr = tc.ResponseErr @@ -1068,7 +1080,7 @@ func TestClient_RPC_ControllerListSnapshots(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, cc, _, client := newTestClient() + _, cc, _, client := newTestClient(t) defer client.Close() cc.NextErr = tc.ResponseErr @@ -1124,7 +1136,7 @@ func TestClient_RPC_NodeStageVolume(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, _, nc, client := newTestClient() + _, _, nc, client := newTestClient(t) defer client.Close() nc.NextErr = tc.ResponseErr @@ -1165,7 +1177,7 @@ func TestClient_RPC_NodeUnstageVolume(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, _, nc, client := newTestClient() + _, _, nc, client := newTestClient(t) defer client.Close() nc.NextErr = tc.ResponseErr @@ -1221,7 +1233,7 @@ func TestClient_RPC_NodePublishVolume(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, _, nc, client := newTestClient() + _, _, nc, client := newTestClient(t) defer client.Close() nc.NextErr = tc.ResponseErr @@ -1274,7 +1286,7 @@ func TestClient_RPC_NodeUnpublishVolume(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - _, _, nc, client := newTestClient() + _, _, nc, client := newTestClient(t) defer client.Close() nc.NextErr = tc.ResponseErr