CSI: volume snapshot

This commit is contained in:
Tim Gross 2021-04-01 11:16:52 -04:00 committed by GitHub
parent c5bb1cf69c
commit 466b620fa4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 2488 additions and 17 deletions

View File

@ -114,6 +114,49 @@ func (v *CSIVolumes) Detach(volID, nodeID string, w *WriteOptions) error {
return err
}
// CreateSnapshot snapshots an external storage volume.
func (v *CSIVolumes) CreateSnapshot(snap *CSISnapshot, w *WriteOptions) ([]*CSISnapshot, *WriteMeta, error) {
req := &CSISnapshotCreateRequest{
Snapshots: []*CSISnapshot{snap},
}
resp := &CSISnapshotCreateResponse{}
meta, err := v.client.write(fmt.Sprintf("/v1/volumes/snapshot"), req, resp, w)
return resp.Snapshots, meta, err
}
// DeleteSnapshot deletes an external storage volume snapshot.
func (v *CSIVolumes) DeleteSnapshot(snap *CSISnapshot, w *WriteOptions) error {
req := &CSISnapshotDeleteRequest{
Snapshots: []*CSISnapshot{snap},
}
_, err := v.client.delete(fmt.Sprintf("/v1/volumes/snapshot"), req, w)
return err
}
// ListSnapshots lists external storage volume snapshots.
func (v *CSIVolumes) ListSnapshots(pluginID string, q *QueryOptions) (*CSISnapshotListResponse, *QueryMeta, error) {
var resp *CSISnapshotListResponse
qp := url.Values{}
if pluginID != "" {
qp.Set("plugin_id", pluginID)
}
if q.NextToken != "" {
qp.Set("next_token", q.NextToken)
}
if q.PerPage != 0 {
qp.Set("per_page", fmt.Sprint(q.PerPage))
}
qm, err := v.client.query("/v1/volumes/snapshots?"+qp.Encode(), &resp, q)
if err != nil {
return nil, nil, err
}
sort.Sort(CSISnapshotSort(resp.Snapshots))
return resp, qm, nil
}
// CSIVolumeAttachmentMode chooses the type of storage api that will be used to
// interact with the device. (Duplicated in nomad/structs/csi.go)
type CSIVolumeAttachmentMode string
@ -312,6 +355,68 @@ type CSIVolumeDeregisterRequest struct {
WriteRequest
}
// CSISnapshot is the storage provider's view of a volume snapshot
type CSISnapshot struct {
ID string // storage provider's ID
ExternalSourceVolumeID string // storage provider's ID for volume
SizeBytes int64 // value from storage provider
CreateTime int64 // value from storage provider
IsReady bool // value from storage provider
SourceVolumeID string // Nomad volume ID
PluginID string // CSI plugin ID
// These field are only used during snapshot creation and will not be
// populated when the snapshot is returned
Name string // suggested name of the snapshot, used for creation
Secrets CSISecrets // secrets needed to create snapshot
Parameters map[string]string // secrets needed to create snapshot
}
// CSISnapshotSort is a helper used for sorting snapshots by creation time.
type CSISnapshotSort []*CSISnapshot
func (v CSISnapshotSort) Len() int {
return len(v)
}
func (v CSISnapshotSort) Less(i, j int) bool {
return v[i].CreateTime > v[j].CreateTime
}
func (v CSISnapshotSort) Swap(i, j int) {
v[i], v[j] = v[j], v[i]
}
type CSISnapshotCreateRequest struct {
Snapshots []*CSISnapshot
WriteRequest
}
type CSISnapshotCreateResponse struct {
Snapshots []*CSISnapshot
QueryMeta
}
type CSISnapshotDeleteRequest struct {
Snapshots []*CSISnapshot
WriteRequest
}
// CSISnapshotListRequest is a request to a controller plugin to list all the
// snapshot known to the the storage provider. This request is paginated by
// the plugin and accepts the QueryOptions.PerPage and QueryOptions.NextToken
// fields
type CSISnapshotListRequest struct {
PluginID string
QueryOptions
}
type CSISnapshotListResponse struct {
Snapshots []*CSISnapshot
NextToken string
QueryMeta
}
// CSI Plugins are jobs with plugin specific data
type CSIPlugins struct {
client *Client

View File

@ -169,7 +169,7 @@ func (c *CSI) ControllerDetachVolume(req *structs.ClientCSIControllerDetachVolum
if errors.Is(err, nstructs.ErrCSIClientRPCIgnorable) {
// if the controller detach previously happened but the server failed to
// checkpoint, we'll get an error from the plugin but can safely ignore it.
c.c.logger.Debug("could not unpublish volume: %v", err)
c.c.logger.Debug("could not unpublish volume", "error", err)
return nil
}
if err != nil {
@ -245,7 +245,7 @@ func (c *CSI) ControllerDeleteVolume(req *structs.ClientCSIControllerDeleteVolum
if errors.Is(err, nstructs.ErrCSIClientRPCIgnorable) {
// if the volume was deleted out-of-band, we'll get an error from
// the plugin but can safely ignore it
c.c.logger.Debug("could not delete volume: %v", err)
c.c.logger.Debug("could not delete volume", "error", err)
return nil
}
if err != nil {
@ -311,6 +311,135 @@ func (c *CSI) ControllerListVolumes(req *structs.ClientCSIControllerListVolumesR
return nil
}
func (c *CSI) ControllerCreateSnapshot(req *structs.ClientCSIControllerCreateSnapshotRequest, resp *structs.ClientCSIControllerCreateSnapshotResponse) error {
defer metrics.MeasureSince([]string{"client", "csi_controller", "create_snapshot"}, time.Now())
plugin, err := c.findControllerPlugin(req.PluginID)
if err != nil {
// the server's view of the plugin health is stale, so let it know it
// should retry with another controller instance
return fmt.Errorf("CSI.ControllerCreateSnapshot: %w: %v",
nstructs.ErrCSIClientRPCRetryable, err)
}
defer plugin.Close()
csiReq, err := req.ToCSIRequest()
if err != nil {
return fmt.Errorf("CSI.ControllerCreateSnapshot: %v", err)
}
ctx, cancelFn := c.requestContext()
defer cancelFn()
// CSI ControllerCreateSnapshot errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
cresp, err := plugin.ControllerCreateSnapshot(ctx, csiReq,
grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
if err != nil {
return fmt.Errorf("CSI.ControllerCreateSnapshot: %v", err)
}
if cresp == nil || cresp.Snapshot == nil {
c.c.logger.Warn("plugin did not return error or snapshot; this is a bug in the plugin and should be reported to the plugin author")
return fmt.Errorf("CSI.ControllerCreateSnapshot: plugin did not return error or snapshot")
}
resp.ID = cresp.Snapshot.ID
resp.ExternalSourceVolumeID = cresp.Snapshot.SourceVolumeID
resp.SizeBytes = cresp.Snapshot.SizeBytes
resp.CreateTime = cresp.Snapshot.CreateTime
resp.IsReady = cresp.Snapshot.IsReady
return nil
}
func (c *CSI) ControllerDeleteSnapshot(req *structs.ClientCSIControllerDeleteSnapshotRequest, resp *structs.ClientCSIControllerDeleteSnapshotResponse) error {
defer metrics.MeasureSince([]string{"client", "csi_controller", "delete_snapshot"}, time.Now())
plugin, err := c.findControllerPlugin(req.PluginID)
if err != nil {
// the server's view of the plugin health is stale, so let it know it
// should retry with another controller instance
return fmt.Errorf("CSI.ControllerDeleteSnapshot: %w: %v",
nstructs.ErrCSIClientRPCRetryable, err)
}
defer plugin.Close()
csiReq := req.ToCSIRequest()
ctx, cancelFn := c.requestContext()
defer cancelFn()
// CSI ControllerDeleteSnapshot errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
err = plugin.ControllerDeleteSnapshot(ctx, csiReq,
grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
if errors.Is(err, nstructs.ErrCSIClientRPCIgnorable) {
// if the snapshot was deleted out-of-band, we'll get an error from
// the plugin but can safely ignore it
c.c.logger.Debug("could not delete snapshot", "error", err)
return nil
}
if err != nil {
return fmt.Errorf("CSI.ControllerDeleteSnapshot: %v", err)
}
return err
}
func (c *CSI) ControllerListSnapshots(req *structs.ClientCSIControllerListSnapshotsRequest, resp *structs.ClientCSIControllerListSnapshotsResponse) error {
defer metrics.MeasureSince([]string{"client", "csi_controller", "list_snapshots"}, time.Now())
plugin, err := c.findControllerPlugin(req.PluginID)
if err != nil {
// the server's view of the plugin health is stale, so let it know it
// should retry with another controller instance
return fmt.Errorf("CSI.ControllerListSnapshots: %w: %v",
nstructs.ErrCSIClientRPCRetryable, err)
}
defer plugin.Close()
csiReq := req.ToCSIRequest()
ctx, cancelFn := c.requestContext()
defer cancelFn()
// CSI ControllerListSnapshots errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
cresp, err := plugin.ControllerListSnapshots(ctx, csiReq,
grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
if err != nil {
return fmt.Errorf("CSI.ControllerListSnapshots: %v", err)
}
resp.NextToken = cresp.NextToken
resp.Entries = []*nstructs.CSISnapshot{}
for _, entry := range cresp.Entries {
if entry.Snapshot == nil {
return fmt.Errorf("CSI.ControllerListSnapshot: plugin returned an invalid entry")
}
snap := &nstructs.CSISnapshot{
ID: entry.Snapshot.ID,
ExternalSourceVolumeID: entry.Snapshot.SourceVolumeID,
SizeBytes: entry.Snapshot.SizeBytes,
CreateTime: entry.Snapshot.CreateTime,
IsReady: entry.Snapshot.IsReady,
PluginID: req.PluginID,
}
resp.Entries = append(resp.Entries, snap)
if req.MaxEntries != 0 && int32(len(resp.Entries)) == req.MaxEntries {
break
}
}
return nil
}
// NodeDetachVolume is used to detach a volume from a CSI Cluster from
// the storage node provided in the request.
func (c *CSI) NodeDetachVolume(req *structs.ClientCSINodeDetachVolumeRequest, resp *structs.ClientCSINodeDetachVolumeResponse) error {

View File

@ -627,6 +627,262 @@ func TestCSIController_ListVolumes(t *testing.T) {
})
}
}
func TestCSIController_CreateSnapshot(t *testing.T) {
t.Parallel()
cases := []struct {
Name string
ClientSetupFunc func(*fake.Client)
Request *structs.ClientCSIControllerCreateSnapshotRequest
ExpectedErr error
ExpectedResponse *structs.ClientCSIControllerCreateSnapshotResponse
}{
{
Name: "returns plugin not found errors",
Request: &structs.ClientCSIControllerCreateSnapshotRequest{
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: "some-garbage",
},
},
ExpectedErr: errors.New("CSI.ControllerCreateSnapshot: CSI client error (retryable): plugin some-garbage for type csi-controller not found"),
},
{
Name: "returns transitive errors",
ClientSetupFunc: func(fc *fake.Client) {
fc.NextControllerCreateSnapshotErr = errors.New("internal plugin error")
},
Request: &structs.ClientCSIControllerCreateSnapshotRequest{
ExternalSourceVolumeID: "vol-1",
Name: "1234-4321-1234-4321",
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
},
ExpectedErr: errors.New("CSI.ControllerCreateSnapshot: internal plugin error"),
},
{
Name: "returns snapshot on success",
ClientSetupFunc: func(fc *fake.Client) {
fc.NextControllerCreateSnapshotResponse = &csi.ControllerCreateSnapshotResponse{
Snapshot: &csi.Snapshot{
ID: "snap-12345",
SourceVolumeID: "vol-1",
SizeBytes: 10000000,
IsReady: true,
},
}
},
Request: &structs.ClientCSIControllerCreateSnapshotRequest{
ExternalSourceVolumeID: "vol-1",
Name: "1234-4321-1234-4321",
Secrets: nstructs.CSISecrets{"password": "xyzzy"},
Parameters: map[string]string{"foo": "bar"},
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
},
ExpectedResponse: &structs.ClientCSIControllerCreateSnapshotResponse{
ID: "snap-12345",
ExternalSourceVolumeID: "vol-1",
SizeBytes: 10000000,
IsReady: true,
},
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
require := require.New(t)
client, cleanup := TestClient(t, nil)
defer cleanup()
fakeClient := &fake.Client{}
if tc.ClientSetupFunc != nil {
tc.ClientSetupFunc(fakeClient)
}
dispenserFunc := func(*dynamicplugins.PluginInfo) (interface{}, error) {
return fakeClient, nil
}
client.dynamicRegistry.StubDispenserForType(
dynamicplugins.PluginTypeCSIController, dispenserFunc)
err := client.dynamicRegistry.RegisterPlugin(fakePlugin)
require.Nil(err)
var resp structs.ClientCSIControllerCreateSnapshotResponse
err = client.ClientRPC("CSI.ControllerCreateSnapshot", tc.Request, &resp)
require.Equal(tc.ExpectedErr, err)
if tc.ExpectedResponse != nil {
require.Equal(tc.ExpectedResponse, &resp)
}
})
}
}
func TestCSIController_DeleteSnapshot(t *testing.T) {
t.Parallel()
cases := []struct {
Name string
ClientSetupFunc func(*fake.Client)
Request *structs.ClientCSIControllerDeleteSnapshotRequest
ExpectedErr error
ExpectedResponse *structs.ClientCSIControllerDeleteSnapshotResponse
}{
{
Name: "returns plugin not found errors",
Request: &structs.ClientCSIControllerDeleteSnapshotRequest{
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: "some-garbage",
},
},
ExpectedErr: errors.New("CSI.ControllerDeleteSnapshot: CSI client error (retryable): plugin some-garbage for type csi-controller not found"),
},
{
Name: "returns transitive errors",
ClientSetupFunc: func(fc *fake.Client) {
fc.NextControllerDeleteSnapshotErr = errors.New("internal plugin error")
},
Request: &structs.ClientCSIControllerDeleteSnapshotRequest{
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
ID: "1234-4321-1234-4321",
},
ExpectedErr: errors.New("CSI.ControllerDeleteSnapshot: internal plugin error"),
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
require := require.New(t)
client, cleanup := TestClient(t, nil)
defer cleanup()
fakeClient := &fake.Client{}
if tc.ClientSetupFunc != nil {
tc.ClientSetupFunc(fakeClient)
}
dispenserFunc := func(*dynamicplugins.PluginInfo) (interface{}, error) {
return fakeClient, nil
}
client.dynamicRegistry.StubDispenserForType(
dynamicplugins.PluginTypeCSIController, dispenserFunc)
err := client.dynamicRegistry.RegisterPlugin(fakePlugin)
require.Nil(err)
var resp structs.ClientCSIControllerDeleteSnapshotResponse
err = client.ClientRPC("CSI.ControllerDeleteSnapshot", tc.Request, &resp)
require.Equal(tc.ExpectedErr, err)
if tc.ExpectedResponse != nil {
require.Equal(tc.ExpectedResponse, &resp)
}
})
}
}
func TestCSIController_ListSnapshots(t *testing.T) {
t.Parallel()
cases := []struct {
Name string
ClientSetupFunc func(*fake.Client)
Request *structs.ClientCSIControllerListSnapshotsRequest
ExpectedErr error
ExpectedResponse *structs.ClientCSIControllerListSnapshotsResponse
}{
{
Name: "returns plugin not found errors",
Request: &structs.ClientCSIControllerListSnapshotsRequest{
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: "some-garbage",
},
},
ExpectedErr: errors.New("CSI.ControllerListSnapshots: CSI client error (retryable): plugin some-garbage for type csi-controller not found"),
},
{
Name: "returns transitive errors",
ClientSetupFunc: func(fc *fake.Client) {
fc.NextControllerListSnapshotsErr = errors.New("internal plugin error")
},
Request: &structs.ClientCSIControllerListSnapshotsRequest{
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
},
ExpectedErr: errors.New("CSI.ControllerListSnapshots: internal plugin error"),
},
{
Name: "returns volumes",
ClientSetupFunc: func(fc *fake.Client) {
fc.NextControllerListSnapshotsResponse = &csi.ControllerListSnapshotsResponse{
Entries: []*csi.ListSnapshotsResponse_Entry{
{
Snapshot: &csi.Snapshot{
ID: "snap-1",
SourceVolumeID: "vol-1",
SizeBytes: 1000000,
IsReady: true,
},
},
},
NextToken: "2",
}
},
Request: &structs.ClientCSIControllerListSnapshotsRequest{
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
StartingToken: "1",
MaxEntries: 100,
},
ExpectedResponse: &structs.ClientCSIControllerListSnapshotsResponse{
Entries: []*nstructs.CSISnapshot{
{
ID: "snap-1",
ExternalSourceVolumeID: "vol-1",
SizeBytes: 1000000,
IsReady: true,
PluginID: fakePlugin.Name,
},
},
NextToken: "2",
},
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
require := require.New(t)
client, cleanup := TestClient(t, nil)
defer cleanup()
fakeClient := &fake.Client{}
if tc.ClientSetupFunc != nil {
tc.ClientSetupFunc(fakeClient)
}
dispenserFunc := func(*dynamicplugins.PluginInfo) (interface{}, error) {
return fakeClient, nil
}
client.dynamicRegistry.StubDispenserForType(
dynamicplugins.PluginTypeCSIController, dispenserFunc)
err := client.dynamicRegistry.RegisterPlugin(fakePlugin)
require.Nil(err)
var resp structs.ClientCSIControllerListSnapshotsResponse
err = client.ClientRPC("CSI.ControllerListSnapshots", tc.Request, &resp)
require.Equal(tc.ExpectedErr, err)
if tc.ExpectedResponse != nil {
require.Equal(tc.ExpectedResponse, &resp)
}
})
}
}
func TestCSINode_DetachVolume(t *testing.T) {
t.Parallel()

View File

@ -283,6 +283,78 @@ type ClientCSIControllerListVolumesResponse struct {
NextToken string
}
// ClientCSIControllerCreateSnapshotRequest the RPC made from the server to a
// Nomad client to tell a CSI controller plugin on that client to perform
// CreateSnapshot
type ClientCSIControllerCreateSnapshotRequest struct {
ExternalSourceVolumeID string
Name string
Secrets structs.CSISecrets
Parameters map[string]string
CSIControllerQuery
}
func (req *ClientCSIControllerCreateSnapshotRequest) ToCSIRequest() (*csi.ControllerCreateSnapshotRequest, error) {
return &csi.ControllerCreateSnapshotRequest{
VolumeID: req.ExternalSourceVolumeID,
Name: req.Name,
Secrets: req.Secrets,
Parameters: req.Parameters,
}, nil
}
type ClientCSIControllerCreateSnapshotResponse struct {
ID string
ExternalSourceVolumeID string
SizeBytes int64
CreateTime int64
IsReady bool
}
// ClientCSIControllerDeleteSnapshotRequest the RPC made from the server to a
// Nomad client to tell a CSI controller plugin on that client to perform
// DeleteSnapshot
type ClientCSIControllerDeleteSnapshotRequest struct {
ID string
Secrets structs.CSISecrets
CSIControllerQuery
}
func (req *ClientCSIControllerDeleteSnapshotRequest) ToCSIRequest() *csi.ControllerDeleteSnapshotRequest {
return &csi.ControllerDeleteSnapshotRequest{
SnapshotID: req.ID,
Secrets: req.Secrets,
}
}
type ClientCSIControllerDeleteSnapshotResponse struct{}
// ClientCSIControllerListSnapshotsRequest is the RPC made from the server to
// a Nomad client to tell a CSI controller plugin on that client to perform
// ListSnapshots
type ClientCSIControllerListSnapshotsRequest struct {
// these pagination fields match the pagination fields of the plugins and
// not Nomad's own fields, for clarity when mapping between the two RPCs
MaxEntries int32
StartingToken string
CSIControllerQuery
}
func (req *ClientCSIControllerListSnapshotsRequest) ToCSIRequest() *csi.ControllerListSnapshotsRequest {
return &csi.ControllerListSnapshotsRequest{
MaxEntries: req.MaxEntries,
StartingToken: req.StartingToken,
}
}
type ClientCSIControllerListSnapshotsResponse struct {
Entries []*structs.CSISnapshot
NextToken string
}
// ClientCSINodeDetachVolumeRequest is the RPC made from the server to
// a Nomad client to tell a CSI node plugin on that client to perform
// NodeUnpublish and NodeUnstage.

View File

@ -280,6 +280,71 @@ func (s *HTTPServer) csiVolumeDetach(id string, resp http.ResponseWriter, req *h
return nil, nil
}
func (s *HTTPServer) CSISnapshotsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
switch req.Method {
case http.MethodPut, http.MethodPost:
return s.csiSnapshotCreate(resp, req)
case http.MethodDelete:
return s.csiSnapshotDelete(resp, req)
case http.MethodGet:
return s.csiSnapshotList(resp, req)
}
return nil, CodedError(405, ErrInvalidMethod)
}
func (s *HTTPServer) csiSnapshotCreate(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.CSISnapshotCreateRequest{}
if err := decodeBody(req, &args); err != nil {
return err, CodedError(400, err.Error())
}
s.parseWriteRequest(req, &args.WriteRequest)
var out structs.CSISnapshotCreateResponse
if err := s.agent.RPC("CSIVolume.CreateSnapshot", &args, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
return out.Snapshots, nil
}
func (s *HTTPServer) csiSnapshotDelete(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.CSISnapshotDeleteRequest{}
if err := decodeBody(req, &args); err != nil {
return err, CodedError(400, err.Error())
}
s.parseWriteRequest(req, &args.WriteRequest)
var out structs.CSISnapshotDeleteResponse
if err := s.agent.RPC("CSIVolume.DeleteSnapshot", &args, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
return nil, nil
}
func (s *HTTPServer) csiSnapshotList(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.CSISnapshotListRequest{}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}
query := req.URL.Query()
args.PluginID = query.Get("plugin_id")
var out structs.CSISnapshotListResponse
if err := s.agent.RPC("CSIVolume.SnapshotList", &args, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
return out.Snapshots, nil
}
// CSIPluginsRequest lists CSI plugins
func (s *HTTPServer) CSIPluginsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != http.MethodGet {

View File

@ -128,6 +128,29 @@ func TestHTTP_CSIEndpointCreateVolume(t *testing.T) {
})
}
func TestHTTP_CSIEndpointSnapshot(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {
server := s.Agent.Server()
cleanup := state.CreateTestCSIPlugin(server.State(), "foo")
defer cleanup()
args := &api.CSISnapshotCreateRequest{
Snapshots: []*api.CSISnapshot{{
Name: "snap-*",
PluginID: "foo",
SourceVolumeID: "bar",
}},
}
body := encodeReq(args)
req, err := http.NewRequest("PUT", "/v1/volumes/snapshot", body)
require.NoError(t, err)
resp := httptest.NewRecorder()
_, err = s.Server.CSISnapshotsRequest(resp, req)
require.Error(t, err, "no such volume: bar")
})
}
// TestHTTP_CSIEndpoint_Cast is a smoke test for converting from structs to
// API structs
func TestHTTP_CSIEndpoint_Cast(t *testing.T) {

View File

@ -265,6 +265,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/deployment/", s.wrap(s.DeploymentSpecificRequest))
s.mux.HandleFunc("/v1/volumes", s.wrap(s.CSIVolumesRequest))
s.mux.HandleFunc("/v1/volumes/snapshot", s.wrap(s.CSISnapshotsRequest))
s.mux.HandleFunc("/v1/volume/csi/", s.wrap(s.CSIVolumeSpecificRequest))
s.mux.HandleFunc("/v1/plugins", s.wrap(s.CSIPluginsRequest))
s.mux.HandleFunc("/v1/plugin/csi/", s.wrap(s.CSIPluginSpecificRequest))

View File

@ -831,6 +831,21 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"volume snapshot create": func() (cli.Command, error) {
return &VolumeSnapshotCreateCommand{
Meta: meta,
}, nil
},
"volume snapshot delete": func() (cli.Command, error) {
return &VolumeSnapshotDeleteCommand{
Meta: meta,
}, nil
},
"volume snapshot list": func() (cli.Command, error) {
return &VolumeSnapshotListCommand{
Meta: meta,
}, nil
},
}
deprecated := map[string]cli.CommandFactory{

View File

@ -0,0 +1,111 @@
package command
import (
"fmt"
"strings"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/api/contexts"
"github.com/posener/complete"
)
type VolumeSnapshotCreateCommand struct {
Meta
}
func (c *VolumeSnapshotCreateCommand) Help() string {
helpText := `
Usage: nomad volume snapshot create <volume id> [snapshot_name]
Create a snapshot of an external storage volume. This command requires a
volume ID or prefix. If there is an exact match based on the provided volume
ID or prefix, then the specific volume is snapshotted. Otherwise, a list of
matching volumes and information will be displayed. The volume must still be
registered with Nomad in order to be snapshotted.
If an optional snapshot name is provided, the argument will be passed to the
CSI plugin to be used as the ID of the resulting snapshot. Not all plugins
accept this name and it may be ignored.
When ACLs are enabled, this command requires a token with the
'csi-write-volume' capability for the volume's namespace.
General Options:
` + generalOptionsUsage(usageOptsDefault) + `
`
return strings.TrimSpace(helpText)
}
func (c *VolumeSnapshotCreateCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{})
}
func (c *VolumeSnapshotCreateCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictFunc(func(a complete.Args) []string {
client, err := c.Meta.Client()
if err != nil {
return nil
}
resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Volumes, nil)
if err != nil {
return []string{}
}
matches := resp.Matches[contexts.Volumes]
return matches
})
}
func (c *VolumeSnapshotCreateCommand) Synopsis() string {
return "Snapshot a volume"
}
func (c *VolumeSnapshotCreateCommand) Name() string { return "volume snapshot create" }
func (c *VolumeSnapshotCreateCommand) Run(args []string) int {
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
var verbose bool
flags.BoolVar(&verbose, "verbose", false, "")
if err := flags.Parse(args); err != nil {
c.Ui.Error(fmt.Sprintf("Error parsing arguments %s", err))
return 1
}
// Check that we at least one argument
args = flags.Args()
if l := len(args); l == 0 {
c.Ui.Error("This command takes at least one argument: <vol id> [snapshot name]")
c.Ui.Error(commandErrorText(c))
return 1
}
volID := args[0]
snapshotName := ""
if len(args) == 2 {
snapshotName = args[1]
}
// Get the HTTP client
client, err := c.Meta.Client()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}
snaps, _, err := client.CSIVolumes().CreateSnapshot(&api.CSISnapshot{
SourceVolumeID: volID,
Name: snapshotName,
}, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error snapshotting volume: %s", err))
return 1
}
c.Ui.Output(csiFormatSnapshots(snaps, verbose))
return 0
}

View File

@ -0,0 +1,114 @@
package command
import (
"fmt"
"strings"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/api/contexts"
flaghelper "github.com/hashicorp/nomad/helper/flags"
"github.com/posener/complete"
)
type VolumeSnapshotDeleteCommand struct {
Meta
}
func (c *VolumeSnapshotDeleteCommand) Help() string {
helpText := `
Usage: nomad volume snapshot delete [options] <plugin id> <snapshot id>
Delete a snapshot from an external storage provider.
When ACLs are enabled, this command requires a token with the
'csi-write-volume' and 'plugin:read' capabilities.
General Options:
` + generalOptionsUsage(usageOptsDefault) + `
Snapshot Options:
-secret
Secrets to pass to the plugin to create the snapshot. Accepts multiple
flags in the form -secret key=value
`
return strings.TrimSpace(helpText)
}
func (c *VolumeSnapshotDeleteCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"-secret": complete.PredictNothing,
})
}
func (c *VolumeSnapshotDeleteCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictFunc(func(a complete.Args) []string {
client, err := c.Meta.Client()
if err != nil {
return nil
}
resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Plugins, nil)
if err != nil {
return []string{}
}
return resp.Matches[contexts.Plugins]
})
}
func (c *VolumeSnapshotDeleteCommand) Synopsis() string {
return "Delete a snapshot"
}
func (c *VolumeSnapshotDeleteCommand) Name() string { return "volume snapshot delete" }
func (c *VolumeSnapshotDeleteCommand) Run(args []string) int {
var secretsArgs flaghelper.StringFlag
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
flags.Var(&secretsArgs, "secret", "secrets for snapshot, ex. -secret key=value")
if err := flags.Parse(args); err != nil {
c.Ui.Error(fmt.Sprintf("Error parsing arguments %s", err))
return 1
}
// Check that we get exactly two arguments
args = flags.Args()
if l := len(args); l != 2 {
c.Ui.Error("This command takes two arguments: <plugin id> <snapshot id>")
c.Ui.Error(commandErrorText(c))
return 1
}
pluginID := args[0]
snapID := args[1]
// Get the HTTP client
client, err := c.Meta.Client()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}
secrets := api.CSISecrets{}
for _, kv := range secretsArgs {
s := strings.Split(kv, "=")
if len(s) == 2 {
secrets[s[0]] = s[1]
}
}
err = client.CSIVolumes().DeleteSnapshot(&api.CSISnapshot{
ID: snapID,
PluginID: pluginID,
Secrets: secrets,
}, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error deleting volume: %s", err))
return 1
}
return 0
}

View File

@ -0,0 +1,180 @@
package command
import (
"fmt"
"sort"
"strings"
humanize "github.com/dustin/go-humanize"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/api/contexts"
"github.com/posener/complete"
)
type VolumeSnapshotListCommand struct {
Meta
}
func (c *VolumeSnapshotListCommand) Help() string {
helpText := `
Usage: nomad volume snapshot list [-plugin plugin_id]
Display a list of CSI volume snapshots along with their
source volume ID as known to the external storage provider.
When ACLs are enabled, this command requires a token with the
'csi-list-volumes' capability for the plugin's namespace.
General Options:
` + generalOptionsUsage(usageOptsDefault) + `
List Options:
-plugin: Display only snapshots managed by a particular plugin. By default
this command will query all plugins for their snapshots.
`
return strings.TrimSpace(helpText)
}
func (c *VolumeSnapshotListCommand) Synopsis() string {
return "Display a list of volume snapshots"
}
func (c *VolumeSnapshotListCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{})
}
func (c *VolumeSnapshotListCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictFunc(func(a complete.Args) []string {
client, err := c.Meta.Client()
if err != nil {
return nil
}
resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Plugins, nil)
if err != nil {
return []string{}
}
return resp.Matches[contexts.Plugins]
})
}
func (c *VolumeSnapshotListCommand) Name() string { return "volume snapshot list" }
func (c *VolumeSnapshotListCommand) Run(args []string) int {
var pluginID string
var verbose bool
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
flags.StringVar(&pluginID, "plugin", "", "")
flags.BoolVar(&verbose, "verbose", false, "")
if err := flags.Parse(args); err != nil {
c.Ui.Error(fmt.Sprintf("Error parsing arguments %s", err))
return 1
}
args = flags.Args()
if len(args) > 0 {
c.Ui.Error("This command takes no arguments")
c.Ui.Error(commandErrorText(c))
return 1
}
// Get the HTTP client
client, err := c.Meta.Client()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}
// filter by plugin if a plugin ID was passed
if pluginID != "" {
plugs, _, err := client.CSIPlugins().List(&api.QueryOptions{Prefix: pluginID})
if err != nil {
c.Ui.Error(fmt.Sprintf("Error querying CSI plugins: %s", err))
return 1
}
if len(plugs) > 1 {
out, err := c.csiFormatPlugins(plugs)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error formatting: %s", err))
return 1
}
c.Ui.Error(fmt.Sprintf("Prefix matched multiple plugins\n\n%s", out))
return 1
}
if len(plugs) == 0 {
c.Ui.Error(fmt.Sprintf("No plugins(s) with prefix or ID %q found", pluginID))
return 1
}
pluginID = plugs[0].ID
}
q := &api.QueryOptions{PerPage: 30} // TODO: tune page size
for {
resp, _, err := client.CSIVolumes().ListSnapshots(pluginID, q)
if err != nil {
c.Ui.Error(fmt.Sprintf(
"Error querying CSI external volumes for plugin %q: %s", pluginID, err))
return 1
}
if len(resp.Snapshots) > 0 {
c.Ui.Output(csiFormatSnapshots(resp.Snapshots, verbose))
}
q.NextToken = resp.NextToken
if q.NextToken == "" {
break
}
// we can't know the shape of arbitrarily-sized lists of snapshots,
// so break after each page
c.Ui.Output("...")
}
return 0
}
func csiFormatSnapshots(snapshots []*api.CSISnapshot, verbose bool) string {
rows := []string{"Snapshot ID|Volume ID|Size|Create Time|Ready?"}
length := 12
if verbose {
length = 30
}
for i, v := range snapshots {
rows[i+1] = fmt.Sprintf("%s|%s|%s|%s|%v",
limit(v.ID, length),
limit(v.ExternalSourceVolumeID, length),
humanize.IBytes(uint64(v.SizeBytes)),
formatUnixNanoTime(v.CreateTime),
v.IsReady,
)
}
return formatList(rows)
}
func (c *VolumeSnapshotListCommand) csiFormatPlugins(plugs []*api.CSIPluginListStub) (string, error) {
// TODO: this has a lot of overlap with 'nomad plugin status', so we
// should factor out some shared formatting helpers.
sort.Slice(plugs, func(i, j int) bool { return plugs[i].ID < plugs[j].ID })
length := 30
rows := make([]string, len(plugs)+1)
rows[0] = "ID|Provider|Controllers Healthy/Expected|Nodes Healthy/Expected"
for i, p := range plugs {
rows[i+1] = fmt.Sprintf("%s|%s|%d/%d|%d/%d",
limit(p.ID, length),
p.Provider,
p.ControllersHealthy,
p.ControllersExpected,
p.NodesHealthy,
p.NodesExpected,
)
}
return formatList(rows), nil
}

View File

@ -97,6 +97,45 @@ func (a *ClientCSI) ControllerListVolumes(args *cstructs.ClientCSIControllerList
return nil
}
func (a *ClientCSI) ControllerCreateSnapshot(args *cstructs.ClientCSIControllerCreateSnapshotRequest, reply *cstructs.ClientCSIControllerCreateSnapshotResponse) error {
defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "create_snapshot"}, time.Now())
err := a.sendCSIControllerRPC(args.PluginID,
"CSI.ControllerCreateSnapshot",
"ClientCSI.ControllerCreateSnapshot",
args, reply)
if err != nil {
return fmt.Errorf("controller create snapshot: %v", err)
}
return nil
}
func (a *ClientCSI) ControllerDeleteSnapshot(args *cstructs.ClientCSIControllerDeleteSnapshotRequest, reply *cstructs.ClientCSIControllerDeleteSnapshotResponse) error {
defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "delete_snapshot"}, time.Now())
err := a.sendCSIControllerRPC(args.PluginID,
"CSI.ControllerDeleteSnapshot",
"ClientCSI.ControllerDeleteSnapshot",
args, reply)
if err != nil {
return fmt.Errorf("controller delete snapshot: %v", err)
}
return nil
}
func (a *ClientCSI) ControllerListSnapshots(args *cstructs.ClientCSIControllerListSnapshotsRequest, reply *cstructs.ClientCSIControllerListSnapshotsResponse) error {
defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "list_snapshots"}, time.Now())
err := a.sendCSIControllerRPC(args.PluginID,
"CSI.ControllerListSnapshots",
"ClientCSI.ControllerListSnapshots",
args, reply)
if err != nil {
return fmt.Errorf("controller list snapshots: %v", err)
}
return nil
}
func (a *ClientCSI) sendCSIControllerRPC(pluginID, method, fwdMethod string, args cstructs.CSIControllerRequest, reply interface{}) error {
clientIDs, err := a.clientIDsForController(pluginID)

View File

@ -24,23 +24,30 @@ import (
// responses that have no bodies have no "Next*Response" field and will always
// return an empty response body.
type MockClientCSI struct {
NextValidateError error
NextAttachError error
NextAttachResponse *cstructs.ClientCSIControllerAttachVolumeResponse
NextDetachError error
NextCreateError error
NextCreateResponse *cstructs.ClientCSIControllerCreateVolumeResponse
NextDeleteError error
NextListExternalError error
NextListExternalResponse *cstructs.ClientCSIControllerListVolumesResponse
NextNodeDetachError error
NextValidateError error
NextAttachError error
NextAttachResponse *cstructs.ClientCSIControllerAttachVolumeResponse
NextDetachError error
NextCreateError error
NextCreateResponse *cstructs.ClientCSIControllerCreateVolumeResponse
NextDeleteError error
NextListExternalError error
NextListExternalResponse *cstructs.ClientCSIControllerListVolumesResponse
NextCreateSnapshotError error
NextCreateSnapshotResponse *cstructs.ClientCSIControllerCreateSnapshotResponse
NextDeleteSnapshotError error
NextListExternalSnapshotsError error
NextListExternalSnapshotsResponse *cstructs.ClientCSIControllerListSnapshotsResponse
NextNodeDetachError error
}
func newMockClientCSI() *MockClientCSI {
return &MockClientCSI{
NextAttachResponse: &cstructs.ClientCSIControllerAttachVolumeResponse{},
NextCreateResponse: &cstructs.ClientCSIControllerCreateVolumeResponse{},
NextListExternalResponse: &cstructs.ClientCSIControllerListVolumesResponse{},
NextAttachResponse: &cstructs.ClientCSIControllerAttachVolumeResponse{},
NextCreateResponse: &cstructs.ClientCSIControllerCreateVolumeResponse{},
NextListExternalResponse: &cstructs.ClientCSIControllerListVolumesResponse{},
NextCreateSnapshotResponse: &cstructs.ClientCSIControllerCreateSnapshotResponse{},
NextListExternalSnapshotsResponse: &cstructs.ClientCSIControllerListSnapshotsResponse{},
}
}
@ -71,6 +78,20 @@ func (c *MockClientCSI) ControllerListVolumes(req *cstructs.ClientCSIControllerL
return c.NextListExternalError
}
func (c *MockClientCSI) ControllerCreateSnapshot(req *cstructs.ClientCSIControllerCreateSnapshotRequest, resp *cstructs.ClientCSIControllerCreateSnapshotResponse) error {
*resp = *c.NextCreateSnapshotResponse
return c.NextCreateSnapshotError
}
func (c *MockClientCSI) ControllerDeleteSnapshot(req *cstructs.ClientCSIControllerDeleteSnapshotRequest, resp *cstructs.ClientCSIControllerDeleteSnapshotResponse) error {
return c.NextDeleteSnapshotError
}
func (c *MockClientCSI) ControllerListSnapshots(req *cstructs.ClientCSIControllerListSnapshotsRequest, resp *cstructs.ClientCSIControllerListSnapshotsResponse) error {
*resp = *c.NextListExternalSnapshotsResponse
return c.NextListExternalSnapshotsError
}
func (c *MockClientCSI) NodeDetachVolume(req *cstructs.ClientCSINodeDetachVolumeRequest, resp *cstructs.ClientCSINodeDetachVolumeResponse) error {
return c.NextNodeDetachError
}
@ -271,6 +292,104 @@ func TestClientCSIController_ListVolumes_Forwarded(t *testing.T) {
require.Contains(err.Error(), "no plugins registered for type")
}
func TestClientCSIController_CreateSnapshot_Local(t *testing.T) {
t.Parallel()
require := require.New(t)
codec, cleanup := setupLocal(t)
defer cleanup()
req := &cstructs.ClientCSIControllerCreateSnapshotRequest{
CSIControllerQuery: cstructs.CSIControllerQuery{PluginID: "minnie"},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientCSI.ControllerCreateSnapshot", req, &resp)
require.Error(err)
require.Contains(err.Error(), "no plugins registered for type")
}
func TestClientCSIController_CreateSnapshot_Forwarded(t *testing.T) {
t.Parallel()
require := require.New(t)
codec, cleanup := setupForward(t)
defer cleanup()
req := &cstructs.ClientCSIControllerCreateSnapshotRequest{
CSIControllerQuery: cstructs.CSIControllerQuery{PluginID: "minnie"},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientCSI.ControllerCreateSnapshot", req, &resp)
require.Error(err)
require.Contains(err.Error(), "no plugins registered for type")
}
func TestClientCSIController_DeleteSnapshot_Local(t *testing.T) {
t.Parallel()
require := require.New(t)
codec, cleanup := setupLocal(t)
defer cleanup()
req := &cstructs.ClientCSIControllerDeleteSnapshotRequest{
ID: "test",
CSIControllerQuery: cstructs.CSIControllerQuery{PluginID: "minnie"},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientCSI.ControllerDeleteSnapshot", req, &resp)
require.Error(err)
require.Contains(err.Error(), "no plugins registered for type")
}
func TestClientCSIController_DeleteSnapshot_Forwarded(t *testing.T) {
t.Parallel()
require := require.New(t)
codec, cleanup := setupForward(t)
defer cleanup()
req := &cstructs.ClientCSIControllerDeleteSnapshotRequest{
ID: "test",
CSIControllerQuery: cstructs.CSIControllerQuery{PluginID: "minnie"},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientCSI.ControllerDeleteSnapshot", req, &resp)
require.Error(err)
require.Contains(err.Error(), "no plugins registered for type")
}
func TestClientCSIController_ListSnapshots_Local(t *testing.T) {
t.Parallel()
require := require.New(t)
codec, cleanup := setupLocal(t)
defer cleanup()
req := &cstructs.ClientCSIControllerListSnapshotsRequest{
CSIControllerQuery: cstructs.CSIControllerQuery{PluginID: "minnie"},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientCSI.ControllerListSnapshots", req, &resp)
require.Error(err)
require.Contains(err.Error(), "no plugins registered for type")
}
func TestClientCSIController_ListSnapshots_Forwarded(t *testing.T) {
t.Parallel()
require := require.New(t)
codec, cleanup := setupForward(t)
defer cleanup()
req := &cstructs.ClientCSIControllerListSnapshotsRequest{
CSIControllerQuery: cstructs.CSIControllerQuery{PluginID: "minnie"},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientCSI.ControllerListSnapshots", req, &resp)
require.Error(err)
require.Contains(err.Error(), "no plugins registered for type")
}
func TestClientCSI_NodeForControllerPlugin(t *testing.T) {
t.Parallel()
srv, shutdown := TestServer(t, func(c *Config) {})
@ -404,6 +523,9 @@ func setupLocal(t *testing.T) (rpc.ClientCodec, func()) {
mockCSI.NextCreateError = fmt.Errorf("no plugins registered for type")
mockCSI.NextDeleteError = fmt.Errorf("no plugins registered for type")
mockCSI.NextListExternalError = fmt.Errorf("no plugins registered for type")
mockCSI.NextCreateSnapshotError = fmt.Errorf("no plugins registered for type")
mockCSI.NextDeleteSnapshotError = fmt.Errorf("no plugins registered for type")
mockCSI.NextListExternalSnapshotsError = fmt.Errorf("no plugins registered for type")
c1, cleanupC1 := client.TestClientWithRPCs(t,
func(c *config.Config) {

View File

@ -935,8 +935,7 @@ func (v *CSIVolume) Delete(args *structs.CSIVolumeDeleteRequest, reply *structs.
return err
}
ns := args.RequestNamespace()
if !allowVolume(aclObj, ns) {
if !allowVolume(aclObj, args.RequestNamespace()) || !aclObj.AllowPluginRead() {
return structs.ErrPermissionDenied
}
@ -1052,6 +1051,205 @@ func (v *CSIVolume) ListExternal(args *structs.CSIVolumeExternalListRequest, rep
return nil
}
func (v *CSIVolume) CreateSnapshot(args *structs.CSISnapshotCreateRequest, reply *structs.CSISnapshotCreateResponse) error {
if done, err := v.srv.forward("CSIVolume.CreateSnapshot", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "volume", "create_snapshot"}, time.Now())
allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIWriteVolume)
aclObj, err := v.srv.WriteACLObj(&args.WriteRequest, false)
if err != nil {
return err
}
if !allowVolume(aclObj, args.RequestNamespace()) || !aclObj.AllowPluginRead() {
return structs.ErrPermissionDenied
}
state, err := v.srv.fsm.State().Snapshot()
if err != nil {
return err
}
method := "ClientCSI.ControllerCreateSnapshot"
var mErr multierror.Error
for _, snap := range args.Snapshots {
if snap == nil {
// we intentionally don't multierror here because we're in a weird state
return fmt.Errorf("snapshot cannot be nil")
}
plugin, err := state.CSIPluginByID(nil, snap.PluginID)
if err != nil {
multierror.Append(&mErr,
fmt.Errorf("error querying plugin %q: %v", snap.PluginID, err))
continue
}
if plugin == nil {
multierror.Append(&mErr, fmt.Errorf("no such plugin %q", snap.PluginID))
continue
}
if !plugin.HasControllerCapability(structs.CSIControllerSupportsCreateDeleteSnapshot) {
multierror.Append(&mErr,
fmt.Errorf("plugin %q does not support snapshot", snap.PluginID))
continue
}
vol, err := state.CSIVolumeByID(nil, args.RequestNamespace(), snap.SourceVolumeID)
if err != nil {
multierror.Append(&mErr, fmt.Errorf("error querying volume %q: %v", snap.SourceVolumeID, err))
continue
}
if vol == nil {
multierror.Append(&mErr, fmt.Errorf("no such volume %q", snap.SourceVolumeID))
continue
}
cReq := &cstructs.ClientCSIControllerCreateSnapshotRequest{
ExternalSourceVolumeID: vol.ExternalID,
Name: snap.Name,
Secrets: snap.Secrets,
Parameters: snap.Parameters,
}
cReq.PluginID = plugin.ID
cResp := &cstructs.ClientCSIControllerCreateSnapshotResponse{}
err = v.srv.RPC(method, cReq, cResp)
if err != nil {
multierror.Append(&mErr, fmt.Errorf("could not create snapshot: %v", err))
continue
}
reply.Snapshots = append(reply.Snapshots, &structs.CSISnapshot{
ID: cResp.ID,
ExternalSourceVolumeID: cResp.ExternalSourceVolumeID,
SizeBytes: cResp.SizeBytes,
CreateTime: cResp.CreateTime,
IsReady: cResp.IsReady,
})
}
return mErr.ErrorOrNil()
}
func (v *CSIVolume) DeleteSnapshot(args *structs.CSISnapshotDeleteRequest, reply *structs.CSISnapshotDeleteResponse) error {
if done, err := v.srv.forward("CSIVolume.DeleteSnapshot", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "volume", "delete_snapshot"}, time.Now())
allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIWriteVolume)
aclObj, err := v.srv.WriteACLObj(&args.WriteRequest, false)
if err != nil {
return err
}
// NOTE: this is the plugin's namespace, not the snapshot(s) because we
// don't track snapshots in the state store at all and their source
// volume(s) because they might not even be registered
if !allowVolume(aclObj, args.RequestNamespace()) || !aclObj.AllowPluginRead() {
return structs.ErrPermissionDenied
}
stateSnap, err := v.srv.fsm.State().Snapshot()
if err != nil {
return err
}
var mErr multierror.Error
for _, snap := range args.Snapshots {
if snap == nil {
// we intentionally don't multierror here because we're in a weird state
return fmt.Errorf("snapshot cannot be nil")
}
plugin, err := stateSnap.CSIPluginByID(nil, snap.PluginID)
if err != nil {
multierror.Append(&mErr,
fmt.Errorf("could not query plugin %q: %v", snap.PluginID, err))
continue
}
if plugin == nil {
multierror.Append(&mErr, fmt.Errorf("no such plugin"))
continue
}
if !plugin.HasControllerCapability(structs.CSIControllerSupportsCreateDeleteSnapshot) {
multierror.Append(&mErr, fmt.Errorf("plugin does not support snapshot"))
continue
}
method := "ClientCSI.ControllerDeleteSnapshot"
cReq := &cstructs.ClientCSIControllerDeleteSnapshotRequest{ID: snap.ID}
cReq.PluginID = plugin.ID
cResp := &cstructs.ClientCSIControllerDeleteSnapshotResponse{}
err = v.srv.RPC(method, cReq, cResp)
if err != nil {
multierror.Append(&mErr, fmt.Errorf("could not delete %q: %v", snap.ID, err))
}
}
return mErr.ErrorOrNil()
}
func (v *CSIVolume) ListSnapshots(args *structs.CSISnapshotListRequest, reply *structs.CSISnapshotListResponse) error {
if done, err := v.srv.forward("CSIVolume.ListSnapshots", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "volume", "list_snapshots"}, time.Now())
allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIListVolume,
acl.NamespaceCapabilityCSIReadVolume,
acl.NamespaceCapabilityCSIMountVolume,
acl.NamespaceCapabilityListJobs)
aclObj, err := v.srv.QueryACLObj(&args.QueryOptions, false)
if err != nil {
return err
}
// NOTE: this is the plugin's namespace, not the volume(s) because they
// might not even be registered
if !allowVolume(aclObj, args.RequestNamespace()) {
return structs.ErrPermissionDenied
}
snap, err := v.srv.fsm.State().Snapshot()
if err != nil {
return err
}
plugin, err := snap.CSIPluginByID(nil, args.PluginID)
if err != nil {
return err
}
if plugin == nil {
return fmt.Errorf("no such plugin")
}
if !plugin.HasControllerCapability(structs.CSIControllerSupportsListSnapshots) {
return fmt.Errorf("plugin does not support listing snapshots")
}
method := "ClientCSI.ControllerListSnapshots"
cReq := &cstructs.ClientCSIControllerListSnapshotsRequest{
MaxEntries: args.PerPage,
StartingToken: args.NextToken,
}
cReq.PluginID = plugin.ID
cResp := &cstructs.ClientCSIControllerListSnapshotsResponse{}
err = v.srv.RPC(method, cReq, cResp)
if err != nil {
return err
}
if args.PerPage > 0 {
reply.Snapshots = cResp.Entries[:args.PerPage]
} else {
reply.Snapshots = cResp.Entries
}
reply.NextToken = cResp.NextToken
return nil
}
// CSIPlugin wraps the structs.CSIPlugin with request data and server context
type CSIPlugin struct {
srv *Server

View File

@ -998,6 +998,279 @@ func TestCSIVolumeEndpoint_ListExternal(t *testing.T) {
require.Equal(t, "page2", resp.NextToken)
}
func TestCSIVolumeEndpoint_CreateSnapshot(t *testing.T) {
t.Parallel()
var err error
srv, shutdown := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer shutdown()
testutil.WaitForLeader(t, srv.RPC)
fake := newMockClientCSI()
fake.NextCreateSnapshotError = nil
fake.NextCreateSnapshotResponse = &cstructs.ClientCSIControllerCreateSnapshotResponse{
ID: "snap-12345",
ExternalSourceVolumeID: "vol-12345",
SizeBytes: 42,
IsReady: true,
}
client, cleanup := client.TestClientWithRPCs(t,
func(c *cconfig.Config) {
c.Servers = []string{srv.config.RPCAddr.String()}
},
map[string]interface{}{"CSI": fake},
)
defer cleanup()
node := client.Node()
req0 := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp0 structs.NodeUpdateResponse
err = client.RPC("Node.Register", req0, &resp0)
require.NoError(t, err)
testutil.WaitForResult(func() (bool, error) {
nodes := srv.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
t.Fatalf("should have a client")
})
ns := structs.DefaultNamespace
state := srv.fsm.State()
codec := rpcClient(t, srv)
index := uint64(1000)
node.CSIControllerPlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
ControllerInfo: &structs.CSIControllerInfo{
SupportsCreateDeleteSnapshot: true,
},
RequiresControllerPlugin: true,
},
}
index++
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, index, node))
// Create the volume
vols := []*structs.CSIVolume{{
ID: "test-volume0",
Namespace: ns,
AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
PluginID: "minnie",
ExternalID: "vol-12345",
}}
index++
require.NoError(t, state.CSIVolumeRegister(index, vols))
// Create the snapshot request
req1 := &structs.CSISnapshotCreateRequest{
Snapshots: []*structs.CSISnapshot{{
Name: "snap",
SourceVolumeID: "test-volume0",
Secrets: structs.CSISecrets{"mysecret": "secretvalue"},
Parameters: map[string]string{"myparam": "paramvalue"},
PluginID: "minnie",
}},
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: ns,
},
}
resp1 := &structs.CSISnapshotCreateResponse{}
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.CreateSnapshot", req1, resp1)
require.NoError(t, err)
snap := resp1.Snapshots[0]
require.Equal(t, "vol-12345", snap.ExternalSourceVolumeID) // set by the args
require.Equal(t, "snap-12345", snap.ID) // set by the plugin
require.Equal(t, "csi.CSISecrets(map[])", snap.Secrets.String()) // should not be set
require.Len(t, snap.Parameters, 0) // should not be set
}
func TestCSIVolumeEndpoint_DeleteSnapshot(t *testing.T) {
t.Parallel()
var err error
srv, shutdown := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer shutdown()
testutil.WaitForLeader(t, srv.RPC)
fake := newMockClientCSI()
fake.NextDeleteSnapshotError = nil
client, cleanup := client.TestClientWithRPCs(t,
func(c *cconfig.Config) {
c.Servers = []string{srv.config.RPCAddr.String()}
},
map[string]interface{}{"CSI": fake},
)
defer cleanup()
node := client.Node()
req0 := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp0 structs.NodeUpdateResponse
err = client.RPC("Node.Register", req0, &resp0)
require.NoError(t, err)
testutil.WaitForResult(func() (bool, error) {
nodes := srv.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
t.Fatalf("should have a client")
})
ns := structs.DefaultNamespace
state := srv.fsm.State()
codec := rpcClient(t, srv)
index := uint64(1000)
node.CSIControllerPlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
ControllerInfo: &structs.CSIControllerInfo{
SupportsCreateDeleteSnapshot: true,
},
RequiresControllerPlugin: true,
},
}
index++
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, index, node))
// Delete the snapshot request
req1 := &structs.CSISnapshotDeleteRequest{
Snapshots: []*structs.CSISnapshot{
{
ID: "snap-12345",
PluginID: "minnie",
},
{
ID: "snap-34567",
PluginID: "minnie",
},
},
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: ns,
},
}
resp1 := &structs.CSISnapshotDeleteResponse{}
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.DeleteSnapshot", req1, resp1)
require.NoError(t, err)
}
func TestCSIVolumeEndpoint_ListSnapshots(t *testing.T) {
t.Parallel()
var err error
srv, shutdown := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer shutdown()
testutil.WaitForLeader(t, srv.RPC)
fake := newMockClientCSI()
fake.NextListExternalSnapshotsResponse = &cstructs.ClientCSIControllerListSnapshotsResponse{
Entries: []*structs.CSISnapshot{
{
ID: "snap-12345",
ExternalSourceVolumeID: "vol-12345",
SizeBytes: 70000,
IsReady: true,
},
{
ID: "snap-abcde",
ExternalSourceVolumeID: "vol-abcde",
SizeBytes: 70000,
IsReady: false,
},
{
ExternalSourceVolumeID: "you should not see me",
},
},
NextToken: "page2",
}
client, cleanup := client.TestClientWithRPCs(t,
func(c *cconfig.Config) {
c.Servers = []string{srv.config.RPCAddr.String()}
},
map[string]interface{}{"CSI": fake},
)
defer cleanup()
node := client.Node()
req0 := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp0 structs.NodeUpdateResponse
err = client.RPC("Node.Register", req0, &resp0)
require.NoError(t, err)
testutil.WaitForResult(func() (bool, error) {
nodes := srv.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
t.Fatalf("should have a client")
})
state := srv.fsm.State()
codec := rpcClient(t, srv)
index := uint64(1000)
node.CSIControllerPlugins = map[string]*structs.CSIInfo{
"minnie": {
PluginID: "minnie",
Healthy: true,
ControllerInfo: &structs.CSIControllerInfo{
SupportsListSnapshots: true,
},
RequiresControllerPlugin: true,
},
}
index++
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, index, node))
// List snapshots
req := &structs.CSISnapshotListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: structs.DefaultNamespace,
PerPage: 2,
NextToken: "page1",
},
}
resp := &structs.CSISnapshotListResponse{}
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.ListSnapshots", req, resp)
require.NoError(t, err)
require.Len(t, resp.Snapshots, 2)
require.Equal(t, "vol-12345", resp.Snapshots[0].ExternalSourceVolumeID)
require.Equal(t, "vol-abcde", resp.Snapshots[1].ExternalSourceVolumeID)
require.True(t, resp.Snapshots[0].IsReady)
require.Equal(t, "page2", resp.NextToken)
}
func TestCSIPluginEndpoint_RegisterViaFingerprint(t *testing.T) {
t.Parallel()
srv, shutdown := TestServer(t, func(c *Config) {

View File

@ -774,6 +774,60 @@ type CSIVolumeUnpublishResponse struct {
QueryMeta
}
// CSISnapshot is the storage provider's view of a volume snapshot
type CSISnapshot struct {
// These fields map to those returned by the storage provider plugin
ID string // storage provider's ID
ExternalSourceVolumeID string // storage provider's ID for volume
SizeBytes int64
CreateTime int64
IsReady bool
// These fields are controlled by Nomad
SourceVolumeID string
PluginID string
// These field are only used during snapshot creation and will not be
// populated when the snapshot is returned
Name string
Secrets CSISecrets
Parameters map[string]string
}
type CSISnapshotCreateRequest struct {
Snapshots []*CSISnapshot
WriteRequest
}
type CSISnapshotCreateResponse struct {
Snapshots []*CSISnapshot
QueryMeta
}
type CSISnapshotDeleteRequest struct {
Snapshots []*CSISnapshot
WriteRequest
}
type CSISnapshotDeleteResponse struct {
QueryMeta
}
// CSISnapshotListRequest is a request to a controller plugin to list all the
// snapshot known to the the storage provider. This request is paginated by
// the plugin and accepts the QueryOptions.PerPage and QueryOptions.NextToken
// fields
type CSISnapshotListRequest struct {
PluginID string
QueryOptions
}
type CSISnapshotListResponse struct {
Snapshots []*CSISnapshot
NextToken string
QueryMeta
}
// CSIPlugin collects fingerprint info context for the plugin for clients
type CSIPlugin struct {
ID string

View File

@ -71,6 +71,9 @@ type CSIControllerClient interface {
CreateVolume(ctx context.Context, in *csipbv1.CreateVolumeRequest, opts ...grpc.CallOption) (*csipbv1.CreateVolumeResponse, error)
ListVolumes(ctx context.Context, in *csipbv1.ListVolumesRequest, opts ...grpc.CallOption) (*csipbv1.ListVolumesResponse, error)
DeleteVolume(ctx context.Context, in *csipbv1.DeleteVolumeRequest, opts ...grpc.CallOption) (*csipbv1.DeleteVolumeResponse, error)
CreateSnapshot(ctx context.Context, in *csipbv1.CreateSnapshotRequest, opts ...grpc.CallOption) (*csipbv1.CreateSnapshotResponse, error)
DeleteSnapshot(ctx context.Context, in *csipbv1.DeleteSnapshotRequest, opts ...grpc.CallOption) (*csipbv1.DeleteSnapshotResponse, error)
ListSnapshots(ctx context.Context, in *csipbv1.ListSnapshotsRequest, opts ...grpc.CallOption) (*csipbv1.ListSnapshotsResponse, error)
}
// CSINodeClient defines the minimal CSI Node Plugin interface used
@ -548,6 +551,107 @@ NEXT_CAP:
return err.ErrorOrNil()
}
func (c *client) ControllerCreateSnapshot(ctx context.Context, req *ControllerCreateSnapshotRequest, opts ...grpc.CallOption) (*ControllerCreateSnapshotResponse, error) {
err := req.Validate()
if err != nil {
return nil, err
}
creq := req.ToCSIRepresentation()
resp, err := c.controllerClient.CreateSnapshot(ctx, creq, opts...)
// these standard gRPC error codes are overloaded with CSI-specific
// meanings, so translate them into user-understandable terms
// https://github.com/container-storage-interface/spec/blob/master/spec.md#createsnapshot-errors
if err != nil {
code := status.Code(err)
switch code {
case codes.AlreadyExists:
return nil, fmt.Errorf(
"snapshot %q already exists but is incompatible with volume ID %q: %v",
req.Name, req.VolumeID, err)
case codes.Aborted:
return nil, fmt.Errorf(
"snapshot %q is already pending: %v",
req.Name, err)
case codes.ResourceExhausted:
return nil, fmt.Errorf(
"storage provider does not have enough space for this snapshot: %v", err)
case codes.Internal:
return nil, fmt.Errorf(
"controller plugin returned an internal error, check the plugin allocation logs for more information: %v", err)
}
return nil, err
}
snap := resp.GetSnapshot()
return &ControllerCreateSnapshotResponse{
Snapshot: &Snapshot{
ID: snap.GetSnapshotId(),
SourceVolumeID: snap.GetSourceVolumeId(),
SizeBytes: snap.GetSizeBytes(),
CreateTime: snap.GetCreationTime().GetSeconds(),
IsReady: snap.GetReadyToUse(),
},
}, nil
}
func (c *client) ControllerDeleteSnapshot(ctx context.Context, req *ControllerDeleteSnapshotRequest, opts ...grpc.CallOption) error {
err := req.Validate()
if err != nil {
return err
}
creq := req.ToCSIRepresentation()
_, err = c.controllerClient.DeleteSnapshot(ctx, creq, opts...)
// these standard gRPC error codes are overloaded with CSI-specific
// meanings, so translate them into user-understandable terms
// https://github.com/container-storage-interface/spec/blob/master/spec.md#deletesnapshot-errors
if err != nil {
code := status.Code(err)
switch code {
case codes.FailedPrecondition:
return fmt.Errorf(
"snapshot %q could not be deleted because it is in use: %v",
req.SnapshotID, err)
case codes.Aborted:
return fmt.Errorf("snapshot %q has a pending operation: %v", req.SnapshotID, err)
case codes.Internal:
return fmt.Errorf(
"controller plugin returned an internal error, check the plugin allocation logs for more information: %v", err)
}
return err
}
return nil
}
func (c *client) ControllerListSnapshots(ctx context.Context, req *ControllerListSnapshotsRequest, opts ...grpc.CallOption) (*ControllerListSnapshotsResponse, error) {
err := req.Validate()
if err != nil {
return nil, err
}
creq := req.ToCSIRepresentation()
resp, err := c.controllerClient.ListSnapshots(ctx, creq, opts...)
// these standard gRPC error codes are overloaded with CSI-specific
// meanings, so translate them into user-understandable terms
// https://github.com/container-storage-interface/spec/blob/master/spec.md#listsnapshot-errors
if err != nil {
code := status.Code(err)
switch code {
case codes.Aborted:
return nil, fmt.Errorf(
"invalid starting token %q: %v", req.StartingToken, err)
case codes.Internal:
return nil, fmt.Errorf(
"controller plugin returned an internal error, check the plugin allocation logs for more information: %v", err)
}
return nil, err
}
return NewListSnapshotsResponse(resp), nil
}
//
// Node Endpoints
//

View File

@ -943,6 +943,173 @@ func TestClient_RPC_ControllerListVolume(t *testing.T) {
}
}
func TestClient_RPC_ControllerCreateSnapshot(t *testing.T) {
cases := []struct {
Name string
Request *ControllerCreateSnapshotRequest
Response *csipbv1.CreateSnapshotResponse
ResponseErr error
ExpectedErr error
}{
{
Name: "handles underlying grpc errors",
Request: &ControllerCreateSnapshotRequest{
VolumeID: "vol-12345",
Name: "snap-12345",
},
ResponseErr: status.Errorf(codes.Internal, "some grpc error"),
ExpectedErr: fmt.Errorf("controller plugin returned an internal error, check the plugin allocation logs for more information: rpc error: code = Internal desc = some grpc error"),
},
{
Name: "handles error missing volume ID",
Request: &ControllerCreateSnapshotRequest{},
ExpectedErr: errors.New("missing VolumeID"),
},
{
Name: "handles success",
Request: &ControllerCreateSnapshotRequest{
VolumeID: "vol-12345",
Name: "snap-12345",
},
Response: &csipbv1.CreateSnapshotResponse{
Snapshot: &csipbv1.Snapshot{
SizeBytes: 100000,
SnapshotId: "snap-12345",
SourceVolumeId: "vol-12345",
ReadyToUse: true,
},
},
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
_, cc, _, client := newTestClient()
defer client.Close()
cc.NextErr = tc.ResponseErr
cc.NextCreateSnapshotResponse = tc.Response
// note: there's nothing interesting to assert about the response
// here other than that we don't throw a NPE during transformation
// from protobuf to our struct
_, err := client.ControllerCreateSnapshot(context.TODO(), tc.Request)
if tc.ExpectedErr != nil {
require.EqualError(t, err, tc.ExpectedErr.Error())
} else {
require.NoError(t, err, tc.Name)
}
})
}
}
func TestClient_RPC_ControllerDeleteSnapshot(t *testing.T) {
cases := []struct {
Name string
Request *ControllerDeleteSnapshotRequest
ResponseErr error
ExpectedErr error
}{
{
Name: "handles underlying grpc errors",
Request: &ControllerDeleteSnapshotRequest{SnapshotID: "vol-12345"},
ResponseErr: status.Errorf(codes.Internal, "some grpc error"),
ExpectedErr: fmt.Errorf("controller plugin returned an internal error, check the plugin allocation logs for more information: rpc error: code = Internal desc = some grpc error"),
},
{
Name: "handles error missing volume ID",
Request: &ControllerDeleteSnapshotRequest{},
ExpectedErr: errors.New("missing SnapshotID"),
},
{
Name: "handles success",
Request: &ControllerDeleteSnapshotRequest{SnapshotID: "vol-12345"},
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
_, cc, _, client := newTestClient()
defer client.Close()
cc.NextErr = tc.ResponseErr
err := client.ControllerDeleteSnapshot(context.TODO(), tc.Request)
if tc.ExpectedErr != nil {
require.EqualError(t, err, tc.ExpectedErr.Error())
return
}
require.NoError(t, err, tc.Name)
})
}
}
func TestClient_RPC_ControllerListSnapshots(t *testing.T) {
cases := []struct {
Name string
Request *ControllerListSnapshotsRequest
ResponseErr error
ExpectedErr error
}{
{
Name: "handles underlying grpc errors",
Request: &ControllerListSnapshotsRequest{},
ResponseErr: status.Errorf(codes.Internal, "some grpc error"),
ExpectedErr: fmt.Errorf("controller plugin returned an internal error, check the plugin allocation logs for more information: rpc error: code = Internal desc = some grpc error"),
},
{
Name: "handles error invalid max entries",
Request: &ControllerListSnapshotsRequest{MaxEntries: -1},
ExpectedErr: errors.New("MaxEntries cannot be negative"),
},
{
Name: "handles success",
Request: &ControllerListSnapshotsRequest{},
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
_, cc, _, client := newTestClient()
defer client.Close()
cc.NextErr = tc.ResponseErr
if tc.ResponseErr != nil {
// note: there's nothing interesting to assert here other than
// that we don't throw a NPE during transformation from
// protobuf to our struct
cc.NextListSnapshotsResponse = &csipbv1.ListSnapshotsResponse{
Entries: []*csipbv1.ListSnapshotsResponse_Entry{
{
Snapshot: &csipbv1.Snapshot{
SizeBytes: 1000000,
SnapshotId: "snap-12345",
SourceVolumeId: "vol-12345",
ReadyToUse: true,
},
},
},
NextToken: "abcdef",
}
}
resp, err := client.ControllerListSnapshots(context.TODO(), tc.Request)
if tc.ExpectedErr != nil {
require.EqualError(t, err, tc.ExpectedErr.Error())
return
}
require.NoError(t, err, tc.Name)
require.NotNil(t, resp)
})
}
}
func TestClient_RPC_NodeStageVolume(t *testing.T) {
cases := []struct {
Name string

View File

@ -64,6 +64,17 @@ type Client struct {
NextControllerValidateVolumeErr error
ControllerValidateVolumeCallCount int64
NextControllerCreateSnapshotResponse *csi.ControllerCreateSnapshotResponse
NextControllerCreateSnapshotErr error
ControllerCreateSnapshotCallCount int64
NextControllerDeleteSnapshotErr error
ControllerDeleteSnapshotCallCount int64
NextControllerListSnapshotsResponse *csi.ControllerListSnapshotsResponse
NextControllerListSnapshotsErr error
ControllerListSnapshotsCallCount int64
NextNodeGetCapabilitiesResponse *csi.NodeCapabilitySet
NextNodeGetCapabilitiesErr error
NodeGetCapabilitiesCallCount int64
@ -200,6 +211,27 @@ func (c *Client) ControllerListVolumes(ctx context.Context, req *csi.ControllerL
return c.NextControllerListVolumesResponse, c.NextControllerListVolumesErr
}
func (c *Client) ControllerCreateSnapshot(ctx context.Context, req *csi.ControllerCreateSnapshotRequest, opts ...grpc.CallOption) (*csi.ControllerCreateSnapshotResponse, error) {
c.Mu.Lock()
defer c.Mu.Unlock()
c.ControllerCreateSnapshotCallCount++
return c.NextControllerCreateSnapshotResponse, c.NextControllerCreateSnapshotErr
}
func (c *Client) ControllerDeleteSnapshot(ctx context.Context, req *csi.ControllerDeleteSnapshotRequest, opts ...grpc.CallOption) error {
c.Mu.Lock()
defer c.Mu.Unlock()
c.ControllerDeleteSnapshotCallCount++
return c.NextControllerDeleteSnapshotErr
}
func (c *Client) ControllerListSnapshots(ctx context.Context, req *csi.ControllerListSnapshotsRequest, opts ...grpc.CallOption) (*csi.ControllerListSnapshotsResponse, error) {
c.Mu.Lock()
defer c.Mu.Unlock()
c.ControllerListSnapshotsCallCount++
return c.NextControllerListSnapshotsResponse, c.NextControllerListSnapshotsErr
}
func (c *Client) NodeGetCapabilities(ctx context.Context) (*csi.NodeCapabilitySet, error) {
c.Mu.Lock()
defer c.Mu.Unlock()

View File

@ -57,6 +57,18 @@ type CSIPlugin interface {
// external storage provider
ControllerListVolumes(ctx context.Context, req *ControllerListVolumesRequest, opts ...grpc.CallOption) (*ControllerListVolumesResponse, error)
// ControllerCreateSnapshot is used to create a volume snapshot in the
// external storage provider
ControllerCreateSnapshot(ctx context.Context, req *ControllerCreateSnapshotRequest, opts ...grpc.CallOption) (*ControllerCreateSnapshotResponse, error)
// ControllerDeleteSnapshot is used to delete a volume snapshot from the
// external storage provider
ControllerDeleteSnapshot(ctx context.Context, req *ControllerDeleteSnapshotRequest, opts ...grpc.CallOption) error
// ControllerListSnapshots is used to list all volume snapshots available
// in the external storage provider
ControllerListSnapshots(ctx context.Context, req *ControllerListSnapshotsRequest, opts ...grpc.CallOption) (*ControllerListSnapshotsResponse, error)
// NodeGetCapabilities is used to return the available capabilities from the
// Node Service.
NodeGetCapabilities(ctx context.Context) (*NodeCapabilitySet, error)
@ -681,6 +693,116 @@ type VolumeCondition struct {
Message string
}
type ControllerCreateSnapshotRequest struct {
VolumeID string
Name string
Secrets structs.CSISecrets
Parameters map[string]string
}
func (r *ControllerCreateSnapshotRequest) ToCSIRepresentation() *csipbv1.CreateSnapshotRequest {
return &csipbv1.CreateSnapshotRequest{
SourceVolumeId: r.VolumeID,
Name: r.Name,
Secrets: r.Secrets,
Parameters: r.Parameters,
}
}
func (r *ControllerCreateSnapshotRequest) Validate() error {
if r.VolumeID == "" {
return errors.New("missing VolumeID")
}
if r.Name == "" {
return errors.New("missing Name")
}
return nil
}
type ControllerCreateSnapshotResponse struct {
Snapshot *Snapshot
}
type Snapshot struct {
ID string
SourceVolumeID string
SizeBytes int64
CreateTime int64
IsReady bool
}
type ControllerDeleteSnapshotRequest struct {
SnapshotID string
Secrets structs.CSISecrets
}
func (r *ControllerDeleteSnapshotRequest) ToCSIRepresentation() *csipbv1.DeleteSnapshotRequest {
return &csipbv1.DeleteSnapshotRequest{
SnapshotId: r.SnapshotID,
Secrets: r.Secrets,
}
}
func (r *ControllerDeleteSnapshotRequest) Validate() error {
if r.SnapshotID == "" {
return errors.New("missing SnapshotID")
}
return nil
}
type ControllerListSnapshotsRequest struct {
MaxEntries int32
StartingToken string
}
func (r *ControllerListSnapshotsRequest) ToCSIRepresentation() *csipbv1.ListSnapshotsRequest {
return &csipbv1.ListSnapshotsRequest{
MaxEntries: r.MaxEntries,
StartingToken: r.StartingToken,
}
}
func (r *ControllerListSnapshotsRequest) Validate() error {
if r.MaxEntries < 0 {
return errors.New("MaxEntries cannot be negative")
}
return nil
}
func NewListSnapshotsResponse(resp *csipbv1.ListSnapshotsResponse) *ControllerListSnapshotsResponse {
if resp == nil {
return &ControllerListSnapshotsResponse{}
}
entries := []*ListSnapshotsResponse_Entry{}
if resp.Entries != nil {
for _, entry := range resp.Entries {
snap := entry.GetSnapshot()
entries = append(entries, &ListSnapshotsResponse_Entry{
Snapshot: &Snapshot{
SizeBytes: snap.GetSizeBytes(),
ID: snap.GetSnapshotId(),
SourceVolumeID: snap.GetSourceVolumeId(),
CreateTime: snap.GetCreationTime().GetSeconds(),
IsReady: snap.GetReadyToUse(),
},
})
}
}
return &ControllerListSnapshotsResponse{
Entries: entries,
NextToken: resp.NextToken,
}
}
type ControllerListSnapshotsResponse struct {
Entries []*ListSnapshotsResponse_Entry
NextToken string
}
type ListSnapshotsResponse_Entry struct {
Snapshot *Snapshot
}
type NodeCapabilitySet struct {
HasStageUnstageVolume bool
}

View File

@ -53,6 +53,9 @@ type ControllerClient struct {
NextCreateVolumeResponse *csipbv1.CreateVolumeResponse
NextDeleteVolumeResponse *csipbv1.DeleteVolumeResponse
NextListVolumesResponse *csipbv1.ListVolumesResponse
NextCreateSnapshotResponse *csipbv1.CreateSnapshotResponse
NextDeleteSnapshotResponse *csipbv1.DeleteSnapshotResponse
NextListSnapshotsResponse *csipbv1.ListSnapshotsResponse
}
// NewControllerClient returns a new ControllerClient
@ -69,6 +72,9 @@ func (f *ControllerClient) Reset() {
f.NextCreateVolumeResponse = nil
f.NextDeleteVolumeResponse = nil
f.NextListVolumesResponse = nil
f.NextCreateSnapshotResponse = nil
f.NextDeleteSnapshotResponse = nil
f.NextListSnapshotsResponse = nil
}
func (c *ControllerClient) ControllerGetCapabilities(ctx context.Context, in *csipbv1.ControllerGetCapabilitiesRequest, opts ...grpc.CallOption) (*csipbv1.ControllerGetCapabilitiesResponse, error) {
@ -110,6 +116,18 @@ func (c *ControllerClient) ListVolumes(ctx context.Context, in *csipbv1.ListVolu
return c.NextListVolumesResponse, c.NextErr
}
func (c *ControllerClient) CreateSnapshot(ctx context.Context, in *csipbv1.CreateSnapshotRequest, opts ...grpc.CallOption) (*csipbv1.CreateSnapshotResponse, error) {
return c.NextCreateSnapshotResponse, c.NextErr
}
func (c *ControllerClient) DeleteSnapshot(ctx context.Context, in *csipbv1.DeleteSnapshotRequest, opts ...grpc.CallOption) (*csipbv1.DeleteSnapshotResponse, error) {
return c.NextDeleteSnapshotResponse, c.NextErr
}
func (c *ControllerClient) ListSnapshots(ctx context.Context, in *csipbv1.ListSnapshotsRequest, opts ...grpc.CallOption) (*csipbv1.ListSnapshotsResponse, error) {
return c.NextListSnapshotsResponse, c.NextErr
}
// NodeClient is a CSI Node client used for testing
type NodeClient struct {
NextErr error

View File

@ -114,6 +114,49 @@ func (v *CSIVolumes) Detach(volID, nodeID string, w *WriteOptions) error {
return err
}
// CreateSnapshot snapshots an external storage volume.
func (v *CSIVolumes) CreateSnapshot(snap *CSISnapshot, w *WriteOptions) ([]*CSISnapshot, *WriteMeta, error) {
req := &CSISnapshotCreateRequest{
Snapshots: []*CSISnapshot{snap},
}
resp := &CSISnapshotCreateResponse{}
meta, err := v.client.write(fmt.Sprintf("/v1/volumes/snapshot"), req, resp, w)
return resp.Snapshots, meta, err
}
// DeleteSnapshot deletes an external storage volume snapshot.
func (v *CSIVolumes) DeleteSnapshot(snap *CSISnapshot, w *WriteOptions) error {
req := &CSISnapshotDeleteRequest{
Snapshots: []*CSISnapshot{snap},
}
_, err := v.client.delete(fmt.Sprintf("/v1/volumes/snapshot"), req, w)
return err
}
// ListSnapshots lists external storage volume snapshots.
func (v *CSIVolumes) ListSnapshots(pluginID string, q *QueryOptions) (*CSISnapshotListResponse, *QueryMeta, error) {
var resp *CSISnapshotListResponse
qp := url.Values{}
if pluginID != "" {
qp.Set("plugin_id", pluginID)
}
if q.NextToken != "" {
qp.Set("next_token", q.NextToken)
}
if q.PerPage != 0 {
qp.Set("per_page", fmt.Sprint(q.PerPage))
}
qm, err := v.client.query("/v1/volumes/snapshots?"+qp.Encode(), &resp, q)
if err != nil {
return nil, nil, err
}
sort.Sort(CSISnapshotSort(resp.Snapshots))
return resp, qm, nil
}
// CSIVolumeAttachmentMode chooses the type of storage api that will be used to
// interact with the device. (Duplicated in nomad/structs/csi.go)
type CSIVolumeAttachmentMode string
@ -312,6 +355,68 @@ type CSIVolumeDeregisterRequest struct {
WriteRequest
}
// CSISnapshot is the storage provider's view of a volume snapshot
type CSISnapshot struct {
ID string // storage provider's ID
ExternalSourceVolumeID string // storage provider's ID for volume
SizeBytes int64 // value from storage provider
CreateTime int64 // value from storage provider
IsReady bool // value from storage provider
SourceVolumeID string // Nomad volume ID
PluginID string // CSI plugin ID
// These field are only used during snapshot creation and will not be
// populated when the snapshot is returned
Name string // suggested name of the snapshot, used for creation
Secrets CSISecrets // secrets needed to create snapshot
Parameters map[string]string // secrets needed to create snapshot
}
// CSISnapshotSort is a helper used for sorting snapshots by creation time.
type CSISnapshotSort []*CSISnapshot
func (v CSISnapshotSort) Len() int {
return len(v)
}
func (v CSISnapshotSort) Less(i, j int) bool {
return v[i].CreateTime > v[j].CreateTime
}
func (v CSISnapshotSort) Swap(i, j int) {
v[i], v[j] = v[j], v[i]
}
type CSISnapshotCreateRequest struct {
Snapshots []*CSISnapshot
WriteRequest
}
type CSISnapshotCreateResponse struct {
Snapshots []*CSISnapshot
QueryMeta
}
type CSISnapshotDeleteRequest struct {
Snapshots []*CSISnapshot
WriteRequest
}
// CSISnapshotListRequest is a request to a controller plugin to list all the
// snapshot known to the the storage provider. This request is paginated by
// the plugin and accepts the QueryOptions.PerPage and QueryOptions.NextToken
// fields
type CSISnapshotListRequest struct {
PluginID string
QueryOptions
}
type CSISnapshotListResponse struct {
Snapshots []*CSISnapshot
NextToken string
QueryMeta
}
// CSI Plugins are jobs with plugin specific data
type CSIPlugins struct {
client *Client

View File

@ -0,0 +1,56 @@
---
layout: docs
page_title: 'Commands: volume snapshot create'
description: |
Create external volume snapshots.
---
# Command: volume snapshot create
The `volume snapshot create` command creates a snapshot of an existing
[Container Storage Interface (CSI)][csi] volume. Only CSI plugins that
implement the [Controller][csi_plugins_internals] interface support this
command.
## Usage
```plaintext
nomad volume snapshot create [volume] [snapshot_name]
```
The `volume snapshot create` command requires a volume ID or prefix. If there
is an exact match based on the provided volume ID or prefix, then the specific
volume is snapshotted. Otherwise, a list of matching volumes and information
will be displayed. The volume must still be [registered] with Nomad in order
to be snapshotted.
If an optional snapshot name is provided, the argument will be passed to the
CSI plugin to be used as the ID of the resulting snapshot. Not all plugins
accept this name and it may be ignored.
When ACLs are enabled, this command requires a token with the
`csi-write-volume` capability for the volume's namespace.
## General Options
@include 'general_options.mdx'
## Examples
Snapshot a volume:
```shell-session
$ nomad volume snapshot create ebs_prod_db1
Completed snapshot of volume ebs_prod_db1 with snapshot ID snap-12345.
```
Snapshot a volume with a suggested snapshot ID:
```shell-session
$ nomad volume snapshot create ebs_prod_db1 snap-12345
Completed snapshot of volume ebs_prod_db1 with snapshot ID snap-12345.
```
[csi]: https://github.com/container-storage-interface/spec
[csi_plugin]: /docs/job-specification/csi_plugin
[registered]: /docs/commands/volume/register

View File

@ -0,0 +1,43 @@
---
layout: docs
page_title: 'Commands: volume snapshot delete'
description: |
Delete external volume snapshots.
---
# Command: volume snapshot delete
The `volume snapshot delete` command deletes a snapshot of an existing
[Container Storage Interface (CSI)][csi] volume. Only CSI plugins that
implement the [Controller][csi_plugins_internals] interface support this
command.
## Usage
```plaintext
nomad volume snapshot delete [plugin_id] [snapshot_id]
```
The `volume snapshot delete` command requires both the plugin ID and the
snapshot ID. The volume that was the source of the snapshot does not still
need to be [registered] with Nomad in order to be deleted.
When ACLs are enabled, this command requires a token with the `csi-write-
volume` and `plugin:read` capabilities.
## General Options
@include 'general_options.mdx'
## Examples
Delete a volume snapshot:
```shell-session
$ nomad volume snapshot delete aws-ebs0 snap-12345
Deleted snapshot snap-12345.
```
[csi]: https://github.com/container-storage-interface/spec
[csi_plugin]: /docs/job-specification/csi_plugin
[registered]: /docs/commands/volume/register

View File

@ -0,0 +1,55 @@
---
layout: docs
page_title: 'Commands: volume snapshot list'
description: |
List external volume snapshots.
---
# Command: volume snapshot list
The `volume snapshot list` command lists volume snapshots known to to a
[Container Storage Interface (CSI)][csi] storage provider. Only CSI plugins
that implement the [Controller][csi_plugins_internals] interface support this
command.
## Usage
```plaintext
nomad volume snapshot list [-plugin plugin_id]
```
The `volume snapshot list` command returns a list of snapshots along with their
source volume ID as known to the external storage provider. This is not the
same as the Nomad volume ID, as the source volume may not be [registered] with
Nomad.
## General Options
@include 'general_options.mdx'
## Status Options
- `-plugin`: Display only snapshots managed by a particular [CSI
plugin][csi_plugin]. By default the `snapshot list` command will query all
plugins for their snapshots. This flag accepts a plugin ID or prefix. If
there is an exact match based on the provided plugin, then that specific
plugin will be queried. Otherwise, a list of matching plugins will be
displayed.
When ACLs are enabled, this command requires a token with the
`csi-list-volumes` capability for the plugin's namespace.
## Examples
List volume snapshots for a plugin:
```shell-session
$ nomad volume snapshot list -plugin aws-ebs0
Snapshot ID External ID Size Creation Time Ready?
snap-12345 vol-abcdef 50GiB 2021-01-03T12:15:02Z true
snap-67890 vol-fedcba 50GiB 2021-01-04T15:45:00Z true
```
[csi]: https://github.com/container-storage-interface/spec
[csi_plugin]: /docs/job-specification/csi_plugin
[registered]: /docs/commands/volume/register

View File

@ -747,6 +747,18 @@
"title": "register",
"path": "commands/volume/register"
},
{
"title": "snapshot create",
"path": "commands/volume/snapshot-create"
},
{
"title": "snapshot delete",
"path": "commands/volume/snapshot-delete"
},
{
"title": "snapshot list",
"path": "commands/volume/snapshot-list"
},
{
"title": "status",
"path": "commands/volume/status"