backport of commit c6dbba7cde911bb08f1f8da445a44a0125cd2047 (#18505)

Co-authored-by: Daniel Bennett <dbennett@hashicorp.com>
This commit is contained in:
hc-github-team-nomad-core 2023-09-14 14:38:05 -05:00 committed by GitHub
parent 5edf9f7c8f
commit 46b4847885
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1055 additions and 89 deletions

3
.changelog/18359.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
csi: add ability to expand the size of volumes for plugins that support it
```

View File

@ -11,6 +11,7 @@ import (
metrics "github.com/armon/go-metrics"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/hashicorp/nomad/client/dynamicplugins"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
"github.com/hashicorp/nomad/client/structs"
@ -232,6 +233,47 @@ func (c *CSI) ControllerCreateVolume(req *structs.ClientCSIControllerCreateVolum
return nil
}
func (c *CSI) ControllerExpandVolume(req *structs.ClientCSIControllerExpandVolumeRequest, resp *structs.ClientCSIControllerExpandVolumeResponse) error {
defer metrics.MeasureSince([]string{"client", "csi_controller", "expand_volume"}, time.Now())
plugin, err := c.findControllerPlugin(req.PluginID)
if err != nil {
// the server's view of the plugin health is stale, so let it know it
// should retry with another controller instance
return fmt.Errorf("CSI.ControllerExpandVolume could not find plugin: %w: %v",
nstructs.ErrCSIClientRPCRetryable, err)
}
defer plugin.Close()
csiReq := req.ToCSIRequest()
ctx, cancelFn := c.requestContext()
defer cancelFn()
// CSI ControllerExpandVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
cresp, err := plugin.ControllerExpandVolume(ctx, csiReq,
grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
if errors.Is(err, nstructs.ErrCSIClientRPCIgnorable) {
// if the volume was deleted out-of-band, we'll get an error from
// the plugin but can safely ignore it
c.c.logger.Debug("could not expand volume", "error", err)
return nil
}
if err != nil {
return fmt.Errorf("CSI.ControllerExpandVolume: %v", err)
}
if cresp == nil {
c.c.logger.Warn("plugin did not return error or response; this is a bug in the plugin and should be reported to the plugin author")
return fmt.Errorf("CSI.ControllerExpandVolume: plugin did not return error or response")
}
resp.CapacityBytes = cresp.CapacityBytes
resp.NodeExpansionRequired = cresp.NodeExpansionRequired
return nil
}
func (c *CSI) ControllerDeleteVolume(req *structs.ClientCSIControllerDeleteVolumeRequest, resp *structs.ClientCSIControllerDeleteVolumeResponse) error {
defer metrics.MeasureSince([]string{"client", "csi_controller", "delete_volume"}, time.Now())

View File

@ -5,15 +5,19 @@ package client
import (
"errors"
"fmt"
"testing"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/dynamicplugins"
"github.com/hashicorp/nomad/client/structs"
nstructs "github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/csi"
"github.com/hashicorp/nomad/plugins/csi/fake"
"github.com/stretchr/testify/require"
)
var fakePlugin = &dynamicplugins.PluginInfo{
@ -463,6 +467,95 @@ func TestCSIController_CreateVolume(t *testing.T) {
}
}
func TestCSIController_ExpandVolume(t *testing.T) {
cases := []struct {
Name string
ModRequest func(request *structs.ClientCSIControllerExpandVolumeRequest)
NextResp *csi.ControllerExpandVolumeResponse
NextErr error
ExpectErr string
}{
{
Name: "success",
NextResp: &csi.ControllerExpandVolumeResponse{
CapacityBytes: 99,
NodeExpansionRequired: true,
},
},
{
Name: "plugin not found",
ModRequest: func(r *structs.ClientCSIControllerExpandVolumeRequest) {
r.CSIControllerQuery.PluginID = "nonexistent"
},
ExpectErr: "CSI.ControllerExpandVolume could not find plugin: CSI client error (retryable): plugin nonexistent for type csi-controller not found",
},
{
Name: "ignorable error",
NextResp: &csi.ControllerExpandVolumeResponse{},
NextErr: fmt.Errorf("you can ignore me (%w)", nstructs.ErrCSIClientRPCIgnorable),
ExpectErr: "", // explicitly empty here for clarity.
},
{
Name: "controller error",
NextErr: errors.New("sad plugin"),
ExpectErr: "CSI.ControllerExpandVolume: sad plugin",
},
{
Name: "nil response from plugin",
NextResp: nil, // again explicit for clarity.
ExpectErr: "CSI.ControllerExpandVolume: plugin did not return error or response",
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
client, cleanup := TestClient(t, nil)
t.Cleanup(func() { test.NoError(t, cleanup()) })
fakeClient := &fake.Client{
NextControllerExpandVolumeResponse: tc.NextResp,
NextControllerExpandVolumeErr: tc.NextErr,
}
dispenserFunc := func(*dynamicplugins.PluginInfo) (interface{}, error) {
return fakeClient, nil
}
client.dynamicRegistry.StubDispenserForType(
dynamicplugins.PluginTypeCSIController, dispenserFunc)
err := client.dynamicRegistry.RegisterPlugin(fakePlugin)
must.NoError(t, err)
req := &structs.ClientCSIControllerExpandVolumeRequest{
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
ExternalVolumeID: "some-volume-id",
CapacityRange: &csi.CapacityRange{
RequiredBytes: 99,
},
Secrets: map[string]string{"super": "secret"},
}
if tc.ModRequest != nil {
tc.ModRequest(req)
}
var resp structs.ClientCSIControllerExpandVolumeResponse
err = client.ClientRPC("CSI.ControllerExpandVolume", req, &resp)
if tc.ExpectErr != "" {
must.EqError(t, err, tc.ExpectErr)
return
}
must.NoError(t, err)
must.Eq(t, tc.NextResp.CapacityBytes, resp.CapacityBytes)
must.Eq(t, tc.NextResp.NodeExpansionRequired, resp.NodeExpansionRequired)
})
}
}
func TestCSIController_DeleteVolume(t *testing.T) {
ci.Parallel(t)

View File

@ -287,6 +287,36 @@ type ClientCSIControllerCreateVolumeResponse struct {
Topologies []*structs.CSITopology
}
// ClientCSIControllerExpandVolumeRequest is the RPC made from the server to a
// Nomad client to tell a CSI controller plugin on that client to perform
// ControllerExpandVolume
type ClientCSIControllerExpandVolumeRequest struct {
ExternalVolumeID string
CapacityRange *csi.CapacityRange
Secrets structs.CSISecrets
VolumeCapability *csi.VolumeCapability
CSIControllerQuery
}
func (req *ClientCSIControllerExpandVolumeRequest) ToCSIRequest() *csi.ControllerExpandVolumeRequest {
csiReq := &csi.ControllerExpandVolumeRequest{
ExternalVolumeID: req.ExternalVolumeID,
Capability: req.VolumeCapability,
Secrets: req.Secrets,
}
if req.CapacityRange != nil {
csiReq.RequiredBytes = req.CapacityRange.RequiredBytes
csiReq.LimitBytes = req.CapacityRange.LimitBytes
}
return csiReq
}
type ClientCSIControllerExpandVolumeResponse struct {
CapacityBytes int64
NodeExpansionRequired bool
}
// ClientCSIControllerDeleteVolumeRequest the RPC made from the server to a
// Nomad client to tell a CSI controller plugin on that client to perform
// DeleteVolume

View File

@ -13,6 +13,7 @@ import (
metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -85,6 +86,20 @@ func (a *ClientCSI) ControllerCreateVolume(args *cstructs.ClientCSIControllerCre
return nil
}
func (a *ClientCSI) ControllerExpandVolume(args *cstructs.ClientCSIControllerExpandVolumeRequest, reply *cstructs.ClientCSIControllerExpandVolumeResponse) error {
defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "expand_volume"}, time.Now())
err := a.sendCSIControllerRPC(args.PluginID,
"CSI.ControllerExpandVolume",
"ClientCSI.ControllerExpandVolume",
structs.RateMetricWrite,
args, reply)
if err != nil {
return fmt.Errorf("controller expand volume: %v", err)
}
return nil
}
func (a *ClientCSI) ControllerDeleteVolume(args *cstructs.ClientCSIControllerDeleteVolumeRequest, reply *cstructs.ClientCSIControllerDeleteVolumeResponse) error {
defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "delete_volume"}, time.Now())

View File

@ -28,30 +28,33 @@ import (
// responses that have no bodies have no "Next*Response" field and will always
// return an empty response body.
type MockClientCSI struct {
NextValidateError error
NextAttachError error
NextAttachResponse *cstructs.ClientCSIControllerAttachVolumeResponse
NextDetachError error
NextCreateError error
NextCreateResponse *cstructs.ClientCSIControllerCreateVolumeResponse
NextDeleteError error
NextListExternalError error
NextListExternalResponse *cstructs.ClientCSIControllerListVolumesResponse
NextCreateSnapshotError error
NextCreateSnapshotResponse *cstructs.ClientCSIControllerCreateSnapshotResponse
NextDeleteSnapshotError error
NextListExternalSnapshotsError error
NextListExternalSnapshotsResponse *cstructs.ClientCSIControllerListSnapshotsResponse
NextNodeDetachError error
NextValidateError error
NextAttachError error
NextAttachResponse *cstructs.ClientCSIControllerAttachVolumeResponse
NextDetachError error
NextCreateError error
NextCreateResponse *cstructs.ClientCSIControllerCreateVolumeResponse
NextDeleteError error
NextListExternalError error
NextListExternalResponse *cstructs.ClientCSIControllerListVolumesResponse
NextCreateSnapshotError error
NextCreateSnapshotResponse *cstructs.ClientCSIControllerCreateSnapshotResponse
NextDeleteSnapshotError error
NextListExternalSnapshotsError error
NextListExternalSnapshotsResponse *cstructs.ClientCSIControllerListSnapshotsResponse
NextControllerExpandVolumeError error
NextControllerExpandVolumeResponse *cstructs.ClientCSIControllerExpandVolumeResponse
NextNodeDetachError error
}
func newMockClientCSI() *MockClientCSI {
return &MockClientCSI{
NextAttachResponse: &cstructs.ClientCSIControllerAttachVolumeResponse{},
NextCreateResponse: &cstructs.ClientCSIControllerCreateVolumeResponse{},
NextListExternalResponse: &cstructs.ClientCSIControllerListVolumesResponse{},
NextCreateSnapshotResponse: &cstructs.ClientCSIControllerCreateSnapshotResponse{},
NextListExternalSnapshotsResponse: &cstructs.ClientCSIControllerListSnapshotsResponse{},
NextAttachResponse: &cstructs.ClientCSIControllerAttachVolumeResponse{},
NextCreateResponse: &cstructs.ClientCSIControllerCreateVolumeResponse{},
NextListExternalResponse: &cstructs.ClientCSIControllerListVolumesResponse{},
NextCreateSnapshotResponse: &cstructs.ClientCSIControllerCreateSnapshotResponse{},
NextListExternalSnapshotsResponse: &cstructs.ClientCSIControllerListSnapshotsResponse{},
NextControllerExpandVolumeResponse: &cstructs.ClientCSIControllerExpandVolumeResponse{},
}
}
@ -96,6 +99,11 @@ func (c *MockClientCSI) ControllerListSnapshots(req *cstructs.ClientCSIControlle
return c.NextListExternalSnapshotsError
}
func (c *MockClientCSI) ControllerExpandVolume(req *cstructs.ClientCSIControllerExpandVolumeRequest, resp *cstructs.ClientCSIControllerExpandVolumeResponse) error {
*resp = *c.NextControllerExpandVolumeResponse
return c.NextControllerExpandVolumeError
}
func (c *MockClientCSI) NodeDetachVolume(req *cstructs.ClientCSINodeDetachVolumeRequest, resp *cstructs.ClientCSINodeDetachVolumeResponse) error {
return c.NextNodeDetachError
}

View File

@ -5,12 +5,14 @@ package nomad
import (
"context"
"errors"
"fmt"
"net/http"
"strings"
"time"
"github.com/armon/go-metrics"
"github.com/dustin/go-humanize"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror"
@ -20,6 +22,7 @@ import (
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/state/paginator"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/csi"
)
// CSIVolume wraps the structs.CSIVolume with request data and server context
@ -228,7 +231,7 @@ func (v *CSIVolume) Get(args *structs.CSIVolumeGetRequest, reply *structs.CSIVol
return v.srv.blockingRPC(&opts)
}
func (v *CSIVolume) pluginValidateVolume(req *structs.CSIVolumeRegisterRequest, vol *structs.CSIVolume) (*structs.CSIPlugin, error) {
func (v *CSIVolume) pluginValidateVolume(vol *structs.CSIVolume) (*structs.CSIPlugin, error) {
state := v.srv.fsm.State()
plugin, err := state.CSIPluginByID(nil, vol.PluginID)
@ -239,6 +242,10 @@ func (v *CSIVolume) pluginValidateVolume(req *structs.CSIVolumeRegisterRequest,
return nil, fmt.Errorf("no CSI plugin named: %s could be found", vol.PluginID)
}
if plugin.ControllerRequired && plugin.ControllersHealthy < 1 {
return nil, fmt.Errorf("no healthy controllers for CSI plugin: %s", vol.PluginID)
}
vol.Provider = plugin.Provider
vol.ProviderVersion = plugin.Version
@ -267,9 +274,11 @@ func (v *CSIVolume) controllerValidateVolume(req *structs.CSIVolumeRegisterReque
return v.srv.RPC(method, cReq, cResp)
}
// Register registers a new volume or updates an existing volume. Note
// that most user-defined CSIVolume fields are immutable once the
// volume has been created.
// Register registers a new volume or updates an existing volume.
//
// Note that most user-defined CSIVolume fields are immutable once
// the volume has been created, but exceptions include min and max
// requested capacity values.
//
// If the user needs to change fields because they've misconfigured
// the registration of the external volume, we expect that claims
@ -325,6 +334,11 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru
return err
}
plugin, err := v.pluginValidateVolume(vol)
if err != nil {
return err
}
// CSIVolume has many user-defined fields which are immutable
// once set, and many fields that are controlled by Nomad and
// are not user-settable. We merge onto a copy of the existing
@ -335,11 +349,14 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru
// Terraform).
if existingVol != nil {
existingVol = existingVol.Copy()
err = existingVol.Merge(vol)
if err != nil {
return err
// reconcile mutable fields
if err = v.reconcileVolume(plugin, existingVol, vol); err != nil {
return fmt.Errorf("unable to update volume: %s", err)
}
*vol = *existingVol
} else if vol.Topologies == nil || len(vol.Topologies) == 0 {
// The topologies for the volume have already been set
// when it was created, so for newly register volumes
@ -349,10 +366,6 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru
}
}
plugin, err := v.pluginValidateVolume(args, vol)
if err != nil {
return err
}
if err := v.controllerValidateVolume(args, vol, plugin); err != nil {
return err
}
@ -369,6 +382,24 @@ func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *stru
return nil
}
// reconcileVolume updates a volume with many of the contents of another.
// It may or may not do extra work to actually expand a volume outside of Nomad,
// depending on whether requested capacity values have changed.
func (v *CSIVolume) reconcileVolume(plugin *structs.CSIPlugin, vol *structs.CSIVolume, update *structs.CSIVolume) error {
// Merge does some validation, before we attempt any potential CSI RPCs,
// and mutates `vol` with (most of) the values of `update`,
// notably excluding capacity values, which are covered below.
err := vol.Merge(update)
if err != nil {
return err
}
// expandVolume will mutate `vol` with new capacity-related values, if needed.
return v.expandVolume(vol, plugin, &csi.CapacityRange{
RequiredBytes: update.RequestedCapacityMin,
LimitBytes: update.RequestedCapacityMax,
})
}
// Deregister removes a set of volumes
func (v *CSIVolume) Deregister(args *structs.CSIVolumeDeregisterRequest, reply *structs.CSIVolumeDeregisterResponse) error {
@ -1023,6 +1054,8 @@ func (v *CSIVolume) Create(args *structs.CSIVolumeCreateRequest, reply *structs.
type validated struct {
vol *structs.CSIVolume
plugin *structs.CSIPlugin
// if the volume already exists, we'll update it instead of creating.
current *structs.CSIVolume
}
validatedVols := []validated{}
@ -1036,7 +1069,7 @@ func (v *CSIVolume) Create(args *structs.CSIVolumeCreateRequest, reply *structs.
if err = vol.Validate(); err != nil {
return err
}
plugin, err := v.pluginValidateVolume(regArgs, vol)
plugin, err := v.pluginValidateVolume(vol)
if err != nil {
return err
}
@ -1047,7 +1080,19 @@ func (v *CSIVolume) Create(args *structs.CSIVolumeCreateRequest, reply *structs.
return fmt.Errorf("plugin does not support creating volumes")
}
validatedVols = append(validatedVols, validated{vol, plugin})
// if the volume already exists, we'll update it instead
snap, err := v.srv.State().Snapshot()
if err != nil {
return err
}
// current will be nil if it does not exist.
current, err := snap.CSIVolumeByID(nil, vol.Namespace, vol.ID)
if err != nil {
return err
}
validatedVols = append(validatedVols,
validated{vol, plugin, current})
}
// Attempt to create all the validated volumes and write only successfully
@ -1062,20 +1107,37 @@ func (v *CSIVolume) Create(args *structs.CSIVolumeCreateRequest, reply *structs.
// eval" that can do the plugin RPCs async.
var mErr multierror.Error
var index uint64
for _, valid := range validatedVols {
err = v.createVolume(valid.vol, valid.plugin)
if err != nil {
multierror.Append(&mErr, err)
if valid.current != nil {
// reconcile mutable fields
cp := valid.current.Copy()
err = v.reconcileVolume(valid.plugin, cp, valid.vol)
if err != nil {
mErr.Errors = append(mErr.Errors, err)
} else {
// we merged valid.vol into cp, so update state with the copy
regArgs.Volumes = append(regArgs.Volumes, cp)
}
} else {
regArgs.Volumes = append(regArgs.Volumes, valid.vol)
err = v.createVolume(valid.vol, valid.plugin)
if err != nil {
mErr.Errors = append(mErr.Errors, err)
} else {
regArgs.Volumes = append(regArgs.Volumes, valid.vol)
}
}
}
_, index, err := v.srv.raftApply(structs.CSIVolumeRegisterRequestType, regArgs)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "register")
multierror.Append(&mErr, err)
// If we created or updated volumes, apply them to raft.
if len(regArgs.Volumes) > 0 {
_, index, err = v.srv.raftApply(structs.CSIVolumeRegisterRequestType, regArgs)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "register")
mErr.Errors = append(mErr.Errors, err)
}
}
err = mErr.ErrorOrNil()
@ -1118,6 +1180,104 @@ func (v *CSIVolume) createVolume(vol *structs.CSIVolume, plugin *structs.CSIPlug
return nil
}
// expandVolume validates the requested capacity values and issues
// ControllerExpandVolume (and NodeExpandVolume, if needed) to the CSI plugin,
// via Nomad client RPC.
//
// Note that capacity can only be increased; reduction in size is not possible,
// and if the volume is already at the desired capacity, no action is taken.
// vol Capacity-related values are mutated if successful, so callers should
// pass in a copy, then commit changes to raft.
func (v *CSIVolume) expandVolume(vol *structs.CSIVolume, plugin *structs.CSIPlugin, capacity *csi.CapacityRange) error {
if vol == nil || plugin == nil || capacity == nil {
return errors.New("unexpected nil value")
}
newMax := capacity.LimitBytes
newMin := capacity.RequiredBytes
logger := v.logger.Named("expandVolume").With(
"vol", vol.ID,
"requested_min", humanize.Bytes(uint64(newMin)),
"requested_max", humanize.Bytes(uint64(newMax)),
)
// If requested capacity values are unset, skip everything.
if newMax == 0 && newMin == 0 {
logger.Debug("min and max values are zero")
return nil
}
// New values same as current, so nothing to do.
if vol.RequestedCapacityMax == newMax &&
vol.RequestedCapacityMin == newMin {
logger.Debug("requested capacity unchanged")
return nil
}
// If max is specified, it cannot be less than min or current capacity.
if newMax > 0 {
if newMax < newMin {
return fmt.Errorf("max requested capacity (%s) less than or equal to min (%s)",
humanize.Bytes(uint64(newMax)),
humanize.Bytes(uint64(newMin)))
}
if newMax < vol.Capacity {
return fmt.Errorf("max requested capacity (%s) less than or equal to current (%s)",
humanize.Bytes(uint64(newMax)),
humanize.Bytes(uint64(vol.Capacity)))
}
}
// Values are validated, so go ahead and update vol to commit to state,
// even if the external volume does not need expanding.
vol.RequestedCapacityMin = newMin
vol.RequestedCapacityMax = newMax
// Only expand if new min is greater than current capacity.
if newMin <= vol.Capacity {
return nil
}
if !plugin.HasControllerCapability(structs.CSIControllerSupportsExpand) {
return errors.New("expand is not implemented by this controller plugin")
}
capability, err := csi.VolumeCapabilityFromStructs(vol.AttachmentMode, vol.AccessMode, vol.MountOptions)
if err != nil {
logger.Debug("unable to get capability from volume", "error", err)
// We'll optimistically send a nil capability, as an "unknown"
// attachment mode (likely not attached) is acceptable per the spec.
}
method := "ClientCSI.ControllerExpandVolume"
cReq := &cstructs.ClientCSIControllerExpandVolumeRequest{
ExternalVolumeID: vol.ExternalID,
Secrets: vol.Secrets,
CapacityRange: capacity,
VolumeCapability: capability,
}
cReq.PluginID = plugin.ID
cResp := &cstructs.ClientCSIControllerExpandVolumeResponse{}
logger.Info("starting volume expansion")
// This is the real work. The client RPC sends a gRPC to the controller plugin,
// then that controller may reach out to cloud APIs, etc.
err = v.serializedControllerRPC(plugin.ID, func() error {
return v.srv.RPC(method, cReq, cResp)
})
if err != nil {
return fmt.Errorf("unable to expand volume: %w", err)
}
vol.Capacity = cResp.CapacityBytes
logger.Info("controller done expanding volume")
if cResp.NodeExpansionRequired {
v.logger.Warn("TODO: also do node volume expansion if needed") // TODO
}
return nil
}
func (v *CSIVolume) Delete(args *structs.CSIVolumeDeleteRequest, reply *structs.CSIVolumeDeleteResponse) error {
authErr := v.srv.Authenticate(v.ctx, args)

View File

@ -26,6 +26,7 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/csi"
"github.com/hashicorp/nomad/testutil"
)
@ -126,6 +127,83 @@ func TestCSIVolumeEndpoint_Get_ACL(t *testing.T) {
require.Equal(t, vols[0].ID, resp.Volume.ID)
}
func TestCSIVolume_pluginValidateVolume(t *testing.T) {
// bare minimum server for this method
store := state.TestStateStore(t)
srv := &Server{
fsm: &nomadFSM{state: store},
}
// has our method under test
csiVolume := &CSIVolume{srv: srv}
// volume for which we will request a valid plugin
vol := &structs.CSIVolume{PluginID: "neat-plugin"}
// plugin not found
got, err := csiVolume.pluginValidateVolume(vol)
must.Nil(t, got, must.Sprint("nonexistent plugin should be nil"))
must.ErrorContains(t, err, "no CSI plugin named")
// we'll upsert this plugin after optionally modifying it
basePlug := &structs.CSIPlugin{
ID: vol.PluginID,
// these should be set on the volume after success
Provider: "neat-provider",
Version: "v0",
// explicit zero values, because these modify behavior we care about
ControllerRequired: false,
ControllersHealthy: 0,
}
cases := []struct {
name string
updatePlugin func(*structs.CSIPlugin)
expectErr string
}{
{
name: "controller not required",
},
{
name: "controller unhealthy",
updatePlugin: func(p *structs.CSIPlugin) {
p.ControllerRequired = true
},
expectErr: "no healthy controllers",
},
{
name: "controller healthy",
updatePlugin: func(p *structs.CSIPlugin) {
p.ControllerRequired = true
p.ControllersHealthy = 1
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
vol := vol.Copy()
plug := basePlug.Copy()
if tc.updatePlugin != nil {
tc.updatePlugin(plug)
}
must.NoError(t, store.UpsertCSIPlugin(1000, plug))
got, err := csiVolume.pluginValidateVolume(vol)
if tc.expectErr == "" {
must.NoError(t, err)
must.NotNil(t, got, must.Sprint("plugin should not be nil"))
must.Eq(t, vol.Provider, plug.Provider)
must.Eq(t, vol.ProviderVersion, plug.Version)
} else {
must.Error(t, err, must.Sprint("expect error:", tc.expectErr))
must.ErrorContains(t, err, tc.expectErr)
must.Nil(t, got, must.Sprint("plugin should be nil"))
}
})
}
}
func TestCSIVolumeEndpoint_Register(t *testing.T) {
ci.Parallel(t)
srv, shutdown := TestServer(t, func(c *Config) {
@ -1701,6 +1779,143 @@ func TestCSIVolumeEndpoint_ListSnapshots(t *testing.T) {
require.Equal(t, "page2", resp.NextToken)
}
func TestCSIVolume_expandVolume(t *testing.T) {
ci.Parallel(t)
srv, cleanupSrv := TestServer(t, nil)
t.Cleanup(cleanupSrv)
testutil.WaitForLeader(t, srv.RPC)
t.Log("server started 👍")
_, fake, _, fakeVolID := testClientWithCSI(t, srv)
endpoint := NewCSIVolumeEndpoint(srv, nil)
plug, vol, err := endpoint.volAndPluginLookup(structs.DefaultNamespace, fakeVolID)
must.NoError(t, err)
// ensure nil checks
expectErr := "unexpected nil value"
err = endpoint.expandVolume(nil, plug, &csi.CapacityRange{})
must.EqError(t, err, expectErr)
err = endpoint.expandVolume(vol, nil, &csi.CapacityRange{})
must.EqError(t, err, expectErr)
err = endpoint.expandVolume(vol, plug, nil)
must.EqError(t, err, expectErr)
// these tests must be run in order, as they mutate vol along the way
cases := []struct {
Name string
NewMin int64
NewMax int64
ExpectMin int64
ExpectMax int64
ControllerResp int64 // new capacity for the mock controller response
ExpectCapacity int64 // expected resulting capacity on the volume
ExpectErr string
}{
{
// successful expansion from initial vol with no capacity values.
Name: "success",
NewMin: 1000,
NewMax: 2000,
ExpectMin: 1000,
ExpectMax: 2000,
ControllerResp: 1000,
ExpectCapacity: 1000,
},
{
// with min/max both zero, no action should be taken,
// so expect no change to desired or actual capacity on the volume.
Name: "zero",
NewMin: 0,
NewMax: 0,
ExpectMin: 1000,
ExpectMax: 2000,
ControllerResp: 999999, // this should not come into play
ExpectCapacity: 1000,
},
{
// increasing min is what actually triggers an expand to occur.
Name: "increase min",
NewMin: 1500,
NewMax: 2000,
ExpectMin: 1500,
ExpectMax: 2000,
ControllerResp: 1500,
ExpectCapacity: 1500,
},
{
// min going down is okay, but no expand should occur.
Name: "reduce min",
NewMin: 500,
NewMax: 2000,
ExpectMin: 500,
ExpectMax: 2000,
ControllerResp: 999999,
ExpectCapacity: 1500,
},
{
// max going up is okay, but no expand should occur.
Name: "increase max",
NewMin: 500,
NewMax: 5000,
ExpectMin: 500,
ExpectMax: 5000,
ControllerResp: 999999,
ExpectCapacity: 1500,
},
{
// max lower than min is logically impossible.
Name: "max below min",
NewMin: 3,
NewMax: 2,
ExpectErr: "max requested capacity (2 B) less than or equal to min (3 B)",
},
{
// volume size cannot be reduced.
Name: "max below current",
NewMax: 2,
ExpectErr: "max requested capacity (2 B) less than or equal to current (1.5 kB)",
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
fake.NextControllerExpandVolumeResponse = &cstructs.ClientCSIControllerExpandVolumeResponse{
CapacityBytes: tc.ControllerResp,
NodeExpansionRequired: true,
}
err = endpoint.expandVolume(vol, plug, &csi.CapacityRange{
RequiredBytes: tc.NewMin,
LimitBytes: tc.NewMax,
})
if tc.ExpectErr != "" {
must.EqError(t, err, tc.ExpectErr)
return
}
must.NoError(t, err)
test.Eq(t, tc.ExpectCapacity, vol.Capacity,
test.Sprint("unexpected capacity"))
test.Eq(t, tc.ExpectMin, vol.RequestedCapacityMin,
test.Sprint("unexpected min"))
test.Eq(t, tc.ExpectMax, vol.RequestedCapacityMax,
test.Sprint("unexpected max"))
})
}
}
func TestCSIPluginEndpoint_RegisterViaFingerprint(t *testing.T) {
ci.Parallel(t)
srv, shutdown := TestServer(t, func(c *Config) {
@ -2043,3 +2258,78 @@ func TestCSI_SerializedControllerRPC(t *testing.T) {
must.GreaterEq(t, 50*time.Millisecond, totals["plugin2"])
must.Less(t, 100*time.Millisecond, totals["plugin2"])
}
// testClientWithCSI sets up a client with a fake CSI plugin.
// Much of the plugin/volume configuration is only to pass validation;
// callers should modify MockClientCSI's Next* fields.
func testClientWithCSI(t *testing.T, srv *Server) (c *client.Client, m *MockClientCSI, plugID, volID string) {
t.Helper()
m = newMockClientCSI()
plugID = "fake-plugin"
volID = "fake-volume"
c, cleanup := client.TestClientWithRPCs(t,
func(c *cconfig.Config) {
c.Servers = []string{srv.config.RPCAddr.String()}
c.Node.CSIControllerPlugins = map[string]*structs.CSIInfo{
plugID: {
PluginID: plugID,
Healthy: true,
ControllerInfo: &structs.CSIControllerInfo{
// Supports.* everything, but Next* values must be set on the mock.
SupportsAttachDetach: true,
SupportsClone: true,
SupportsCondition: true,
SupportsCreateDelete: true,
SupportsCreateDeleteSnapshot: true,
SupportsExpand: true,
SupportsGet: true,
SupportsGetCapacity: true,
SupportsListSnapshots: true,
SupportsListVolumes: true,
SupportsListVolumesAttachedNodes: true,
SupportsReadOnlyAttach: true,
},
RequiresControllerPlugin: true,
},
}
c.Node.CSINodePlugins = map[string]*structs.CSIInfo{
plugID: {
PluginID: plugID,
Healthy: true,
NodeInfo: &structs.CSINodeInfo{
ID: c.Node.GetID(),
SupportsCondition: true,
SupportsExpand: true,
SupportsStats: true,
},
},
}
},
map[string]interface{}{"CSI": m}, // MockClientCSI
)
t.Cleanup(func() { test.NoError(t, cleanup()) })
testutil.WaitForClient(t, srv.RPC, c.NodeID(), c.Region())
t.Log("client started with fake CSI plugin 👍")
// Register a minimum-viable fake volume
req := &structs.CSIVolumeRegisterRequest{
Volumes: []*structs.CSIVolume{{
PluginID: plugID,
ID: volID,
Namespace: structs.DefaultNamespace,
RequestedCapabilities: []*structs.CSIVolumeCapability{
{
AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
},
},
}},
WriteRequest: structs.WriteRequest{Region: srv.Region()},
}
must.NoError(t, srv.RPC("CSIVolume.Register", req, &structs.CSIVolumeRegisterResponse{}))
t.Logf("CSI volume %s registered 👍", volID)
return c, m, plugID, volID
}

View File

@ -2421,21 +2421,21 @@ func (s *StateStore) UpsertCSIVolume(index uint64, volumes []*structs.CSIVolume)
}
if obj != nil {
// Allow some properties of a volume to be updated in place, but
// prevent accidentally overwriting important properties, or
// overwriting a volume in use
// prevent accidentally overwriting important properties.
old := obj.(*structs.CSIVolume)
if old.ExternalID != v.ExternalID ||
old.PluginID != v.PluginID ||
old.Provider != v.Provider {
return fmt.Errorf("volume identity cannot be updated: %s", v.ID)
}
s.CSIVolumeDenormalize(nil, old.Copy())
if old.InUse() {
return fmt.Errorf("volume cannot be updated while in use")
}
v.CreateIndex = old.CreateIndex
// Update fields that are safe to change while volume is being used.
if err := old.UpdateSafeFields(v); err != nil {
return fmt.Errorf("unable to update in-use volume: %w", err)
}
v = old
v.ModifyIndex = index
} else {
v.CreateIndex = index
v.ModifyIndex = index

View File

@ -3869,11 +3869,7 @@ func TestStateStore_CSIVolume(t *testing.T) {
vs = slurp(iter)
require.True(t, vs[0].ReadSchedulable())
// registration is an error when the volume is in use
index++
err = state.UpsertCSIVolume(index, []*structs.CSIVolume{v0})
require.Error(t, err, "volume re-registered while in use")
// as is deregistration
// deregistration is an error when the volume is in use
index++
err = state.CSIVolumeDeregister(index, ns, []string{vol0}, false)
require.Error(t, err, "volume deregistered while in use")

View File

@ -10,6 +10,7 @@ import (
"time"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/helper"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
@ -782,20 +783,6 @@ func (v *CSIVolume) Merge(other *CSIVolume) error {
"volume snapshot ID cannot be updated"))
}
// must be compatible with capacity range
// TODO: when ExpandVolume is implemented we'll need to update
// this logic https://github.com/hashicorp/nomad/issues/10324
if v.Capacity != 0 {
if other.RequestedCapacityMax < v.Capacity ||
other.RequestedCapacityMin > v.Capacity {
errs = multierror.Append(errs, errors.New(
"volume requested capacity update was not compatible with existing capacity"))
} else {
v.RequestedCapacityMin = other.RequestedCapacityMin
v.RequestedCapacityMax = other.RequestedCapacityMax
}
}
// must be compatible with volume_capabilities
if v.AccessMode != CSIVolumeAccessModeUnknown ||
v.AttachmentMode != CSIVolumeAttachmentModeUnknown {
@ -846,6 +833,20 @@ func (v *CSIVolume) Merge(other *CSIVolume) error {
return errs.ErrorOrNil()
}
// UpdateSafeFields updates fields that may be mutated while the volume is in use.
func (v *CSIVolume) UpdateSafeFields(other *CSIVolume) error {
if v == nil || other == nil {
return errors.New("unexpected nil volume (this is a bug)")
}
// Expand operation can sometimes happen while in-use.
v.Capacity = other.Capacity
v.RequestedCapacityMin = other.RequestedCapacityMin
v.RequestedCapacityMax = other.RequestedCapacityMax
return nil
}
// Request and response wrappers
type CSIVolumeRegisterRequest struct {
Volumes []*CSIVolume
@ -886,6 +887,19 @@ type CSIVolumeDeleteResponse struct {
QueryMeta
}
type CSIVolumeExpandRequest struct {
VolumeID string
RequestedCapacityMin int64
RequestedCapacityMax int64
Secrets CSISecrets
WriteRequest
}
type CSIVolumeExpandResponse struct {
CapacityBytes int64
QueryMeta
}
type CSIVolumeClaimMode int
const (

View File

@ -595,17 +595,6 @@ func TestCSIVolume_Merge(t *testing.T) {
expected string
expectFn func(t *testing.T, v *CSIVolume)
}{
{
name: "invalid capacity update",
v: &CSIVolume{Capacity: 100},
update: &CSIVolume{
RequestedCapacityMax: 300, RequestedCapacityMin: 200},
expected: "volume requested capacity update was not compatible with existing capacity",
expectFn: func(t *testing.T, v *CSIVolume) {
require.NotEqual(t, 300, v.RequestedCapacityMax)
require.NotEqual(t, 200, v.RequestedCapacityMin)
},
},
{
name: "invalid capability update",
v: &CSIVolume{

View File

@ -14,14 +14,15 @@ import (
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
"github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/helper/grpc-middleware/logging"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"golang.org/x/exp/maps"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/hashicorp/nomad/helper/grpc-middleware/logging"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
)
// PluginTypeCSI implements the CSI plugin interface
@ -75,6 +76,7 @@ type CSIControllerClient interface {
CreateVolume(ctx context.Context, in *csipbv1.CreateVolumeRequest, opts ...grpc.CallOption) (*csipbv1.CreateVolumeResponse, error)
ListVolumes(ctx context.Context, in *csipbv1.ListVolumesRequest, opts ...grpc.CallOption) (*csipbv1.ListVolumesResponse, error)
DeleteVolume(ctx context.Context, in *csipbv1.DeleteVolumeRequest, opts ...grpc.CallOption) (*csipbv1.DeleteVolumeResponse, error)
ControllerExpandVolume(ctx context.Context, in *csipbv1.ControllerExpandVolumeRequest, opts ...grpc.CallOption) (*csipbv1.ControllerExpandVolumeResponse, error)
CreateSnapshot(ctx context.Context, in *csipbv1.CreateSnapshotRequest, opts ...grpc.CallOption) (*csipbv1.CreateSnapshotResponse, error)
DeleteSnapshot(ctx context.Context, in *csipbv1.DeleteSnapshotRequest, opts ...grpc.CallOption) (*csipbv1.DeleteSnapshotResponse, error)
ListSnapshots(ctx context.Context, in *csipbv1.ListSnapshotsRequest, opts ...grpc.CallOption) (*csipbv1.ListSnapshotsResponse, error)
@ -89,6 +91,7 @@ type CSINodeClient interface {
NodeUnstageVolume(ctx context.Context, in *csipbv1.NodeUnstageVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeUnstageVolumeResponse, error)
NodePublishVolume(ctx context.Context, in *csipbv1.NodePublishVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodePublishVolumeResponse, error)
NodeUnpublishVolume(ctx context.Context, in *csipbv1.NodeUnpublishVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeUnpublishVolumeResponse, error)
NodeExpandVolume(ctx context.Context, in *csipbv1.NodeExpandVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeExpandVolumeResponse, error)
}
type client struct {
@ -510,6 +513,44 @@ func (c *client) ControllerDeleteVolume(ctx context.Context, req *ControllerDele
return err
}
func (c *client) ControllerExpandVolume(ctx context.Context, req *ControllerExpandVolumeRequest, opts ...grpc.CallOption) (*ControllerExpandVolumeResponse, error) {
if err := req.Validate(); err != nil {
return nil, err
}
if err := c.ensureConnected(ctx); err != nil {
return nil, err
}
exReq := req.ToCSIRepresentation()
resp, err := c.controllerClient.ControllerExpandVolume(ctx, exReq, opts...)
if err != nil {
code := status.Code(err)
switch code {
case codes.InvalidArgument:
return nil, fmt.Errorf(
"requested capabilities not compatible with volume %q: %v",
req.ExternalVolumeID, err)
case codes.NotFound:
err = fmt.Errorf("volume %q could not be found: %v", req.ExternalVolumeID, err)
case codes.FailedPrecondition:
err = fmt.Errorf("volume %q cannot be expanded online: %v", req.ExternalVolumeID, err)
case codes.OutOfRange:
return nil, fmt.Errorf(
"unsupported capacity_range for volume %q: %v", req.ExternalVolumeID, err)
case codes.Internal:
err = fmt.Errorf("controller plugin returned an internal error, check the plugin allocation logs for more information: %v", err)
default:
err = fmt.Errorf("controller plugin returned an error: %v", err)
}
return nil, err
}
return &ControllerExpandVolumeResponse{
CapacityBytes: resp.GetCapacityBytes(),
NodeExpansionRequired: resp.GetNodeExpansionRequired(),
}, nil
}
// compareCapabilities returns an error if the 'got' capabilities aren't found
// within the 'expected' capability.
//
@ -883,3 +924,7 @@ func (c *client) NodeUnpublishVolume(ctx context.Context, volumeID, targetPath s
return err
}
func (c *client) NodeExpandVolume(ctx context.Context, req *NodeExpandVolumeRequest, opts ...grpc.CallOption) (*NodeExpandVolumeResponse, error) {
return nil, nil
}

View File

@ -13,14 +13,16 @@ import (
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/protobuf/ptypes/wrappers"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/nomad/structs"
fake "github.com/hashicorp/nomad/plugins/csi/testing"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/nomad/structs"
fake "github.com/hashicorp/nomad/plugins/csi/testing"
)
func newTestClient(t *testing.T) (*fake.IdentityClient, *fake.ControllerClient, *fake.NodeClient, CSIPlugin) {
@ -42,6 +44,9 @@ func newTestClient(t *testing.T) (*fake.IdentityClient, *fake.ControllerClient,
controllerClient: cc,
nodeClient: nc,
}
t.Cleanup(func() {
_ = client.Close()
})
return ic, cc, nc, client
}
@ -1170,6 +1175,155 @@ func TestClient_RPC_ControllerListSnapshots(t *testing.T) {
}
}
func TestClient_RPC_ControllerExpandVolume(t *testing.T) {
cases := []struct {
Name string
Request *ControllerExpandVolumeRequest
ExpectCall *csipbv1.ControllerExpandVolumeRequest
ResponseErr error
ExpectedErr error
}{
{
Name: "success",
Request: &ControllerExpandVolumeRequest{
ExternalVolumeID: "vol-1",
RequiredBytes: 1,
LimitBytes: 2,
Capability: &VolumeCapability{
AccessMode: VolumeAccessModeMultiNodeSingleWriter,
},
Secrets: map[string]string{"super": "secret"},
},
ExpectCall: &csipbv1.ControllerExpandVolumeRequest{
VolumeId: "vol-1",
CapacityRange: &csipbv1.CapacityRange{
RequiredBytes: 1,
LimitBytes: 2,
},
VolumeCapability: &csipbv1.VolumeCapability{
AccessMode: &csipbv1.VolumeCapability_AccessMode{
Mode: csipbv1.VolumeCapability_AccessMode_Mode(csipbv1.VolumeCapability_AccessMode_MULTI_NODE_SINGLE_WRITER),
},
AccessType: &csipbv1.VolumeCapability_Block{Block: &csipbv1.VolumeCapability_BlockVolume{}},
},
Secrets: map[string]string{"super": "secret"},
},
},
{
Name: "validate only min set",
Request: &ControllerExpandVolumeRequest{
ExternalVolumeID: "vol-1",
RequiredBytes: 4,
},
ExpectCall: &csipbv1.ControllerExpandVolumeRequest{
VolumeId: "vol-1",
CapacityRange: &csipbv1.CapacityRange{
RequiredBytes: 4,
},
},
},
{
Name: "validate missing volume ID",
Request: &ControllerExpandVolumeRequest{},
ExpectedErr: errors.New("missing ExternalVolumeID"),
},
{
Name: "validate missing max/min size",
Request: &ControllerExpandVolumeRequest{
ExternalVolumeID: "vol-1",
},
ExpectedErr: errors.New("one of LimitBytes or RequiredBytes must be set"),
},
{
Name: "validate min greater than max",
Request: &ControllerExpandVolumeRequest{
ExternalVolumeID: "vol-1",
RequiredBytes: 4,
LimitBytes: 2,
},
ExpectedErr: errors.New("LimitBytes cannot be less than RequiredBytes"),
},
{
Name: "grpc error InvalidArgument",
Request: &ControllerExpandVolumeRequest{
ExternalVolumeID: "vol-1", LimitBytes: 1000},
ResponseErr: status.Errorf(codes.InvalidArgument, "sad args"),
ExpectedErr: errors.New("requested capabilities not compatible with volume \"vol-1\": rpc error: code = InvalidArgument desc = sad args"),
},
{
Name: "grpc error NotFound",
Request: &ControllerExpandVolumeRequest{
ExternalVolumeID: "vol-1", LimitBytes: 1000},
ResponseErr: status.Errorf(codes.NotFound, "does not exist"),
ExpectedErr: errors.New("volume \"vol-1\" could not be found: rpc error: code = NotFound desc = does not exist"),
},
{
Name: "grpc error FailedPrecondition",
Request: &ControllerExpandVolumeRequest{
ExternalVolumeID: "vol-1", LimitBytes: 1000},
ResponseErr: status.Errorf(codes.FailedPrecondition, "unsupported"),
ExpectedErr: errors.New("volume \"vol-1\" cannot be expanded online: rpc error: code = FailedPrecondition desc = unsupported"),
},
{
Name: "grpc error OutOfRange",
Request: &ControllerExpandVolumeRequest{
ExternalVolumeID: "vol-1", LimitBytes: 1000},
ResponseErr: status.Errorf(codes.OutOfRange, "too small"),
ExpectedErr: errors.New("unsupported capacity_range for volume \"vol-1\": rpc error: code = OutOfRange desc = too small"),
},
{
Name: "grpc error Internal",
Request: &ControllerExpandVolumeRequest{
ExternalVolumeID: "vol-1", LimitBytes: 1000},
ResponseErr: status.Errorf(codes.Internal, "some grpc error"),
ExpectedErr: errors.New("controller plugin returned an internal error, check the plugin allocation logs for more information: rpc error: code = Internal desc = some grpc error"),
},
{
Name: "grpc error default case",
Request: &ControllerExpandVolumeRequest{
ExternalVolumeID: "vol-1", LimitBytes: 1000},
ResponseErr: status.Errorf(codes.DataLoss, "misc unspecified error"),
ExpectedErr: errors.New("controller plugin returned an error: rpc error: code = DataLoss desc = misc unspecified error"),
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
_, cc, _, client := newTestClient(t)
cc.NextErr = tc.ResponseErr
// the fake client should take ~no time, but set a timeout just in case
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel()
resp, err := client.ControllerExpandVolume(ctx, tc.Request)
if tc.ExpectedErr != nil {
must.EqError(t, err, tc.ExpectedErr.Error())
return
}
must.NoError(t, err)
must.NotNil(t, resp)
must.Eq(t, tc.ExpectCall, cc.LastExpandVolumeRequest)
})
}
t.Run("connection error", func(t *testing.T) {
c := &client{} // induce c.ensureConnected() error
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel()
resp, err := c.ControllerExpandVolume(ctx, &ControllerExpandVolumeRequest{
ExternalVolumeID: "valid-id",
RequiredBytes: 1,
})
must.Nil(t, resp)
must.EqError(t, err, "address is empty")
})
}
func TestClient_RPC_NodeStageVolume(t *testing.T) {
ci.Parallel(t)

View File

@ -11,10 +11,11 @@ import (
"fmt"
"sync"
"google.golang.org/grpc"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/csi"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"google.golang.org/grpc"
)
var _ csi.CSIPlugin = &Client{}
@ -78,6 +79,10 @@ type Client struct {
NextControllerListSnapshotsErr error
ControllerListSnapshotsCallCount int64
NextControllerExpandVolumeResponse *csi.ControllerExpandVolumeResponse
NextControllerExpandVolumeErr error
ControllerExpandVolumeCallCount int64
NextNodeGetCapabilitiesResponse *csi.NodeCapabilitySet
NextNodeGetCapabilitiesErr error
NodeGetCapabilitiesCallCount int64
@ -98,6 +103,10 @@ type Client struct {
NextNodeUnpublishVolumeErr error
NodeUnpublishVolumeCallCount int64
NextNodeExpandVolumeResponse *csi.NodeExpandVolumeResponse
NextNodeExpandVolumeErr error
NodeExpandVolumeCallCount int64
}
// PluginInfo describes the type and version of a plugin.
@ -235,6 +244,13 @@ func (c *Client) ControllerListSnapshots(ctx context.Context, req *csi.Controlle
return c.NextControllerListSnapshotsResponse, c.NextControllerListSnapshotsErr
}
func (c *Client) ControllerExpandVolume(ctx context.Context, in *csi.ControllerExpandVolumeRequest, opts ...grpc.CallOption) (*csi.ControllerExpandVolumeResponse, error) {
c.Mu.Lock()
defer c.Mu.Unlock()
c.ControllerExpandVolumeCallCount++
return c.NextControllerExpandVolumeResponse, c.NextControllerExpandVolumeErr
}
func (c *Client) NodeGetCapabilities(ctx context.Context) (*csi.NodeCapabilitySet, error) {
c.Mu.Lock()
defer c.Mu.Unlock()
@ -300,6 +316,14 @@ func (c *Client) NodeUnpublishVolume(ctx context.Context, volumeID, targetPath s
return c.NextNodeUnpublishVolumeErr
}
func (c *Client) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest, opts ...grpc.CallOption) (*csi.NodeExpandVolumeResponse, error) {
c.Mu.Lock()
defer c.Mu.Unlock()
c.NodeExpandVolumeCallCount++
return c.NextNodeExpandVolumeResponse, c.NextNodeExpandVolumeErr
}
// Close the client and ensure any connections are cleaned up.
func (c *Client) Close() error {
@ -325,6 +349,9 @@ func (c *Client) Close() error {
c.NextControllerUnpublishVolumeResponse = nil
c.NextControllerUnpublishVolumeErr = fmt.Errorf("closed client")
c.NextControllerExpandVolumeResponse = nil
c.NextControllerExpandVolumeErr = fmt.Errorf("closed client")
c.NextControllerValidateVolumeErr = fmt.Errorf("closed client")
c.NextNodeGetCapabilitiesResponse = nil
@ -341,5 +368,8 @@ func (c *Client) Close() error {
c.NextNodeUnpublishVolumeErr = fmt.Errorf("closed client")
c.NextNodeExpandVolumeResponse = nil
c.NextNodeExpandVolumeErr = fmt.Errorf("closed client")
return nil
}

View File

@ -9,9 +9,10 @@ import (
"fmt"
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/base"
"google.golang.org/grpc"
)
// CSIPlugin implements a lightweight abstraction layer around a CSI Plugin.
@ -60,6 +61,9 @@ type CSIPlugin interface {
// external storage provider
ControllerListVolumes(ctx context.Context, req *ControllerListVolumesRequest, opts ...grpc.CallOption) (*ControllerListVolumesResponse, error)
// ControllerExpandVolume is used to expand a volume's size
ControllerExpandVolume(ctx context.Context, req *ControllerExpandVolumeRequest, opts ...grpc.CallOption) (*ControllerExpandVolumeResponse, error)
// ControllerCreateSnapshot is used to create a volume snapshot in the
// external storage provider
ControllerCreateSnapshot(ctx context.Context, req *ControllerCreateSnapshotRequest, opts ...grpc.CallOption) (*ControllerCreateSnapshotResponse, error)
@ -101,6 +105,11 @@ type CSIPlugin interface {
// for the given volume.
NodeUnpublishVolume(ctx context.Context, volumeID, targetPath string, opts ...grpc.CallOption) error
// NodeExpandVolume is used to expand a volume. This MUST be called after
// any ControllerExpandVolume is called, but only if that RPC indicates
// that node expansion is required
NodeExpandVolume(ctx context.Context, req *NodeExpandVolumeRequest, opts ...grpc.CallOption) (*NodeExpandVolumeResponse, error)
// Shutdown the client and ensure any connections are cleaned up.
Close() error
}
@ -492,7 +501,8 @@ func (r *ControllerCreateVolumeRequest) Validate() error {
return errors.New(
"one of LimitBytes or RequiredBytes must be set if CapacityRange is set")
}
if r.CapacityRange.LimitBytes < r.CapacityRange.RequiredBytes {
if r.CapacityRange.LimitBytes > 0 &&
r.CapacityRange.LimitBytes < r.CapacityRange.RequiredBytes {
return errors.New("LimitBytes cannot be less than RequiredBytes")
}
}
@ -625,6 +635,49 @@ func (r *ControllerDeleteVolumeRequest) Validate() error {
return nil
}
type ControllerExpandVolumeRequest struct {
ExternalVolumeID string
RequiredBytes int64
LimitBytes int64
Capability *VolumeCapability
Secrets structs.CSISecrets
}
func (r *ControllerExpandVolumeRequest) Validate() error {
if r.ExternalVolumeID == "" {
return errors.New("missing ExternalVolumeID")
}
if r.LimitBytes == 0 && r.RequiredBytes == 0 {
return errors.New("one of LimitBytes or RequiredBytes must be set")
}
// per the spec: "A value of 0 is equal to an unspecified field value."
// so in this case, only error if both are set.
if r.LimitBytes > 0 && (r.LimitBytes < r.RequiredBytes) {
return errors.New("LimitBytes cannot be less than RequiredBytes")
}
return nil
}
func (r *ControllerExpandVolumeRequest) ToCSIRepresentation() *csipbv1.ControllerExpandVolumeRequest {
if r == nil {
return nil
}
return &csipbv1.ControllerExpandVolumeRequest{
VolumeId: r.ExternalVolumeID,
CapacityRange: &csipbv1.CapacityRange{
RequiredBytes: r.RequiredBytes,
LimitBytes: r.LimitBytes,
},
Secrets: r.Secrets,
VolumeCapability: r.Capability.ToCSIRepresentation(),
}
}
type ControllerExpandVolumeResponse struct {
CapacityBytes int64
NodeExpansionRequired bool
}
type ControllerListVolumesRequest struct {
MaxEntries int32
StartingToken string
@ -976,3 +1029,32 @@ func (c *CapacityRange) ToCSIRepresentation() *csipbv1.CapacityRange {
LimitBytes: c.LimitBytes,
}
}
type NodeExpandVolumeRequest struct {
ExternalVolumeID string
RequiredBytes int64
LimitBytes int64
TargetPath string
StagingPath string
Capability *VolumeCapability
}
func (r *NodeExpandVolumeRequest) ToCSIRepresentation() *csipbv1.NodeExpandVolumeRequest {
if r == nil {
return nil
}
return &csipbv1.NodeExpandVolumeRequest{
VolumeId: r.ExternalVolumeID,
VolumePath: r.TargetPath,
CapacityRange: &csipbv1.CapacityRange{
RequiredBytes: r.RequiredBytes,
LimitBytes: r.LimitBytes,
},
StagingTargetPath: r.StagingPath,
VolumeCapability: r.Capability.ToCSIRepresentation(),
}
}
type NodeExpandVolumeResponse struct {
CapacityBytes int64
}

View File

@ -54,6 +54,8 @@ type ControllerClient struct {
NextUnpublishVolumeResponse *csipbv1.ControllerUnpublishVolumeResponse
NextValidateVolumeCapabilitiesResponse *csipbv1.ValidateVolumeCapabilitiesResponse
NextCreateVolumeResponse *csipbv1.CreateVolumeResponse
NextExpandVolumeResponse *csipbv1.ControllerExpandVolumeResponse
LastExpandVolumeRequest *csipbv1.ControllerExpandVolumeRequest
NextDeleteVolumeResponse *csipbv1.DeleteVolumeResponse
NextListVolumesResponse *csipbv1.ListVolumesResponse
NextCreateSnapshotResponse *csipbv1.CreateSnapshotResponse
@ -73,6 +75,8 @@ func (c *ControllerClient) Reset() {
c.NextUnpublishVolumeResponse = nil
c.NextValidateVolumeCapabilitiesResponse = nil
c.NextCreateVolumeResponse = nil
c.NextExpandVolumeResponse = nil
c.LastExpandVolumeRequest = nil
c.NextDeleteVolumeResponse = nil
c.NextListVolumesResponse = nil
c.NextCreateSnapshotResponse = nil
@ -111,6 +115,11 @@ func (c *ControllerClient) CreateVolume(ctx context.Context, in *csipbv1.CreateV
return c.NextCreateVolumeResponse, c.NextErr
}
func (c *ControllerClient) ControllerExpandVolume(ctx context.Context, in *csipbv1.ControllerExpandVolumeRequest, opts ...grpc.CallOption) (*csipbv1.ControllerExpandVolumeResponse, error) {
c.LastExpandVolumeRequest = in
return c.NextExpandVolumeResponse, c.NextErr
}
func (c *ControllerClient) DeleteVolume(ctx context.Context, in *csipbv1.DeleteVolumeRequest, opts ...grpc.CallOption) (*csipbv1.DeleteVolumeResponse, error) {
return c.NextDeleteVolumeResponse, c.NextErr
}
@ -140,6 +149,7 @@ type NodeClient struct {
NextUnstageVolumeResponse *csipbv1.NodeUnstageVolumeResponse
NextPublishVolumeResponse *csipbv1.NodePublishVolumeResponse
NextUnpublishVolumeResponse *csipbv1.NodeUnpublishVolumeResponse
NextExpandVolumeResponse *csipbv1.NodeExpandVolumeResponse
}
// NewNodeClient returns a new stub NodeClient
@ -155,6 +165,7 @@ func (c *NodeClient) Reset() {
c.NextUnstageVolumeResponse = nil
c.NextPublishVolumeResponse = nil
c.NextUnpublishVolumeResponse = nil
c.NextExpandVolumeResponse = nil
}
func (c *NodeClient) NodeGetCapabilities(ctx context.Context, in *csipbv1.NodeGetCapabilitiesRequest, opts ...grpc.CallOption) (*csipbv1.NodeGetCapabilitiesResponse, error) {
@ -180,3 +191,7 @@ func (c *NodeClient) NodePublishVolume(ctx context.Context, in *csipbv1.NodePubl
func (c *NodeClient) NodeUnpublishVolume(ctx context.Context, in *csipbv1.NodeUnpublishVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeUnpublishVolumeResponse, error) {
return c.NextUnpublishVolumeResponse, c.NextErr
}
func (c *NodeClient) NodeExpandVolume(ctx context.Context, in *csipbv1.NodeExpandVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeExpandVolumeResponse, error) {
return c.NextExpandVolumeResponse, c.NextErr
}