2019-10-16 20:53:39 +00:00
package nomad
import (
2020-02-13 15:18:55 +00:00
"fmt"
2022-03-09 01:54:17 +00:00
"net/http"
2022-03-22 19:40:24 +00:00
"strings"
2019-10-16 20:53:39 +00:00
"time"
metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/acl"
2020-02-13 15:18:55 +00:00
cstructs "github.com/hashicorp/nomad/client/structs"
2019-10-16 20:53:39 +00:00
"github.com/hashicorp/nomad/nomad/state"
2022-03-09 01:54:17 +00:00
"github.com/hashicorp/nomad/nomad/state/paginator"
2019-10-16 20:53:39 +00:00
"github.com/hashicorp/nomad/nomad/structs"
)
// CSIVolume wraps the structs.CSIVolume with request data and server context
type CSIVolume struct {
srv * Server
logger log . Logger
}
// QueryACLObj looks up the ACL token in the request and returns the acl.ACL object
// - fallback to node secret ids
2021-12-20 10:44:21 +00:00
func ( s * Server ) QueryACLObj ( args * structs . QueryOptions , allowNodeAccess bool ) ( * acl . ACL , error ) {
2019-10-16 20:53:39 +00:00
// Lookup the token
2021-12-20 10:44:21 +00:00
aclObj , err := s . ResolveToken ( args . AuthToken )
2019-10-16 20:53:39 +00:00
if err != nil {
// If ResolveToken had an unexpected error return that
2020-02-11 11:41:18 +00:00
if ! structs . IsErrTokenNotFound ( err ) {
return nil , err
}
// If we don't allow access to this endpoint from Nodes, then return token
// not found.
if ! allowNodeAccess {
return nil , structs . ErrTokenNotFound
}
2019-10-16 20:53:39 +00:00
ws := memdb . NewWatchSet ( )
2020-02-11 11:41:18 +00:00
// Attempt to lookup AuthToken as a Node.SecretID since nodes may call
// call this endpoint and don't have an ACL token.
2021-12-20 10:44:21 +00:00
node , stateErr := s . fsm . State ( ) . NodeBySecretID ( ws , args . AuthToken )
2019-10-16 20:53:39 +00:00
if stateErr != nil {
// Return the original ResolveToken error with this err
var merr multierror . Error
merr . Errors = append ( merr . Errors , err , stateErr )
return nil , merr . ErrorOrNil ( )
}
2020-02-11 11:41:18 +00:00
// We did not find a Node for this ID, so return Token Not Found.
2019-10-16 20:53:39 +00:00
if node == nil {
return nil , structs . ErrTokenNotFound
}
}
2020-02-11 11:41:18 +00:00
// Return either the users aclObj, or nil if ACLs are disabled.
2019-10-16 20:53:39 +00:00
return aclObj , nil
}
// WriteACLObj calls QueryACLObj for a WriteRequest
2021-12-20 10:44:21 +00:00
func ( s * Server ) WriteACLObj ( args * structs . WriteRequest , allowNodeAccess bool ) ( * acl . ACL , error ) {
2019-10-16 20:53:39 +00:00
opts := & structs . QueryOptions {
Region : args . RequestRegion ( ) ,
Namespace : args . RequestNamespace ( ) ,
AuthToken : args . AuthToken ,
}
2021-12-20 10:44:21 +00:00
return s . QueryACLObj ( opts , allowNodeAccess )
2019-10-16 20:53:39 +00:00
}
2020-01-28 15:28:34 +00:00
const (
csiVolumeTable = "csi_volumes"
csiPluginTable = "csi_plugins"
)
// replySetIndex sets the reply with the last index that modified the table
2021-12-20 10:44:21 +00:00
func ( s * Server ) replySetIndex ( table string , reply * structs . QueryMeta ) error {
fmsState := s . fsm . State ( )
2020-01-28 15:28:34 +00:00
2021-12-20 10:44:21 +00:00
index , err := fmsState . Index ( table )
2019-10-16 20:53:39 +00:00
if err != nil {
return err
}
reply . Index = index
// Set the query response
2021-12-20 10:44:21 +00:00
s . setQueryMeta ( reply )
2019-10-16 20:53:39 +00:00
return nil
}
// List replies with CSIVolumes, filtered by ACL access
func ( v * CSIVolume ) List ( args * structs . CSIVolumeListRequest , reply * structs . CSIVolumeListResponse ) error {
if done , err := v . srv . forward ( "CSIVolume.List" , args , args , reply ) ; done {
return err
}
2020-03-17 21:32:39 +00:00
allowVolume := acl . NamespaceValidator ( acl . NamespaceCapabilityCSIListVolume ,
acl . NamespaceCapabilityCSIReadVolume ,
acl . NamespaceCapabilityCSIMountVolume ,
acl . NamespaceCapabilityListJobs )
2020-02-11 11:41:18 +00:00
aclObj , err := v . srv . QueryACLObj ( & args . QueryOptions , false )
2019-10-16 20:53:39 +00:00
if err != nil {
return err
}
2020-03-17 21:32:39 +00:00
if ! allowVolume ( aclObj , args . RequestNamespace ( ) ) {
2020-03-17 15:35:34 +00:00
return structs . ErrPermissionDenied
}
2021-03-22 13:43:30 +00:00
defer metrics . MeasureSince ( [ ] string { "nomad" , "volume" , "list" } , time . Now ( ) )
2019-10-16 20:53:39 +00:00
ns := args . RequestNamespace ( )
opts := blockingOptions {
queryOpts : & args . QueryOptions ,
queryMeta : & reply . QueryMeta ,
run : func ( ws memdb . WatchSet , state * state . StateStore ) error {
2020-11-25 16:15:57 +00:00
snap , err := state . Snapshot ( )
if err != nil {
return err
}
2021-03-18 18:32:40 +00:00
2019-10-16 20:53:39 +00:00
// Query all volumes
var iter memdb . ResultIterator
2021-03-18 18:32:40 +00:00
prefix := args . Prefix
2020-03-11 16:47:14 +00:00
if args . NodeID != "" {
2021-03-18 18:32:40 +00:00
iter , err = snap . CSIVolumesByNodeID ( ws , prefix , args . NodeID )
2020-03-11 16:47:14 +00:00
} else if args . PluginID != "" {
2021-03-18 18:32:40 +00:00
iter , err = snap . CSIVolumesByPluginID ( ws , ns , prefix , args . PluginID )
2022-03-03 22:27:04 +00:00
} else if prefix != "" {
iter , err = snap . CSIVolumesByIDPrefix ( ws , ns , prefix )
} else if ns != structs . AllNamespacesSentinel {
2021-03-18 18:32:40 +00:00
iter , err = snap . CSIVolumesByNamespace ( ws , ns , prefix )
2022-03-03 22:27:04 +00:00
} else {
iter , err = snap . CSIVolumes ( ws )
2019-10-16 20:53:39 +00:00
}
if err != nil {
return err
}
2022-03-09 01:54:17 +00:00
tokenizer := paginator . NewStructsTokenizer (
iter ,
paginator . StructsTokenizerOptions {
WithNamespace : true ,
WithID : true ,
} ,
)
volFilter := paginator . GenericFilter {
Allow : func ( raw interface { } ) ( bool , error ) {
vol := raw . ( * structs . CSIVolume )
// Remove (possibly again) by PluginID to handle passing both
// NodeID and PluginID
if args . PluginID != "" && args . PluginID != vol . PluginID {
return false , nil
}
// Remove by Namespace, since CSIVolumesByNodeID hasn't used
// the Namespace yet
if ns != structs . AllNamespacesSentinel && vol . Namespace != ns {
return false , nil
}
return true , nil
} ,
}
filters := [ ] paginator . Filter { volFilter }
2019-10-16 20:53:39 +00:00
// Collect results, filter by ACL access
2020-03-24 01:21:40 +00:00
vs := [ ] * structs . CSIVolListStub { }
2019-10-16 20:53:39 +00:00
2022-03-09 01:54:17 +00:00
paginator , err := paginator . NewPaginator ( iter , tokenizer , filters , args . QueryOptions ,
func ( raw interface { } ) error {
vol := raw . ( * structs . CSIVolume )
vol , err := snap . CSIVolumeDenormalizePlugins ( ws , vol . Copy ( ) )
if err != nil {
return err
}
vs = append ( vs , vol . Stub ( ) )
return nil
} )
if err != nil {
return structs . NewErrRPCCodedf (
http . StatusBadRequest , "failed to create result paginator: %v" , err )
2019-10-16 20:53:39 +00:00
}
2022-03-09 01:54:17 +00:00
nextToken , err := paginator . Page ( )
if err != nil {
return structs . NewErrRPCCodedf (
http . StatusBadRequest , "failed to read result page: %v" , err )
}
reply . QueryMeta . NextToken = nextToken
2019-10-16 20:53:39 +00:00
reply . Volumes = vs
2020-01-28 15:28:34 +00:00
return v . srv . replySetIndex ( csiVolumeTable , & reply . QueryMeta )
2019-10-16 20:53:39 +00:00
} }
return v . srv . blockingRPC ( & opts )
}
// Get fetches detailed information about a specific volume
func ( v * CSIVolume ) Get ( args * structs . CSIVolumeGetRequest , reply * structs . CSIVolumeGetResponse ) error {
if done , err := v . srv . forward ( "CSIVolume.Get" , args , args , reply ) ; done {
return err
}
2020-03-17 21:32:39 +00:00
allowCSIAccess := acl . NamespaceValidator ( acl . NamespaceCapabilityCSIReadVolume ,
acl . NamespaceCapabilityCSIMountVolume ,
acl . NamespaceCapabilityReadJob )
2020-02-11 11:41:18 +00:00
aclObj , err := v . srv . QueryACLObj ( & args . QueryOptions , true )
2019-10-16 20:53:39 +00:00
if err != nil {
return err
}
2020-03-17 15:35:34 +00:00
ns := args . RequestNamespace ( )
if ! allowCSIAccess ( aclObj , ns ) {
2019-10-16 20:53:39 +00:00
return structs . ErrPermissionDenied
}
2021-03-22 13:43:30 +00:00
defer metrics . MeasureSince ( [ ] string { "nomad" , "volume" , "get" } , time . Now ( ) )
2019-10-16 20:53:39 +00:00
2020-05-20 14:22:24 +00:00
if args . ID == "" {
return fmt . Errorf ( "missing volume ID" )
}
2019-10-16 20:53:39 +00:00
opts := blockingOptions {
queryOpts : & args . QueryOptions ,
queryMeta : & reply . QueryMeta ,
run : func ( ws memdb . WatchSet , state * state . StateStore ) error {
2020-11-25 16:15:57 +00:00
snap , err := state . Snapshot ( )
if err != nil {
return err
}
vol , err := snap . CSIVolumeByID ( ws , ns , args . ID )
2019-10-16 20:53:39 +00:00
if err != nil {
return err
}
2020-01-28 15:28:34 +00:00
if vol != nil {
2020-11-25 16:15:57 +00:00
vol , err = snap . CSIVolumeDenormalize ( ws , vol )
2020-01-28 15:28:34 +00:00
}
if err != nil {
return err
2019-10-16 20:53:39 +00:00
}
reply . Volume = vol
2020-01-28 15:28:34 +00:00
return v . srv . replySetIndex ( csiVolumeTable , & reply . QueryMeta )
2019-10-16 20:53:39 +00:00
} }
return v . srv . blockingRPC ( & opts )
}
2020-04-10 20:47:21 +00:00
func ( v * CSIVolume ) pluginValidateVolume ( req * structs . CSIVolumeRegisterRequest , vol * structs . CSIVolume ) ( * structs . CSIPlugin , error ) {
state := v . srv . fsm . State ( )
2020-02-18 16:08:44 +00:00
2020-11-25 16:15:57 +00:00
plugin , err := state . CSIPluginByID ( nil , vol . PluginID )
2020-02-18 16:08:44 +00:00
if err != nil {
2020-03-09 13:57:59 +00:00
return nil , err
2020-02-18 16:08:44 +00:00
}
if plugin == nil {
2020-03-09 13:57:59 +00:00
return nil , fmt . Errorf ( "no CSI plugin named: %s could be found" , vol . PluginID )
2020-02-18 16:08:44 +00:00
}
2020-03-09 13:57:59 +00:00
vol . Provider = plugin . Provider
vol . ProviderVersion = plugin . Version
2022-03-07 16:06:59 +00:00
2020-03-09 13:57:59 +00:00
return plugin , nil
}
2020-04-10 20:47:21 +00:00
func ( v * CSIVolume ) controllerValidateVolume ( req * structs . CSIVolumeRegisterRequest , vol * structs . CSIVolume , plugin * structs . CSIPlugin ) error {
2020-03-09 13:57:59 +00:00
2020-02-18 16:08:44 +00:00
if ! plugin . ControllerRequired {
// The plugin does not require a controller, so for now we won't do any
// further validation of the volume.
return nil
}
2020-04-02 20:04:56 +00:00
method := "ClientCSI.ControllerValidateVolume"
2020-02-18 16:08:44 +00:00
cReq := & cstructs . ClientCSIControllerValidateVolumeRequest {
2021-06-03 19:47:55 +00:00
VolumeID : vol . RemoteID ( ) ,
VolumeCapabilities : vol . RequestedCapabilities ,
Secrets : vol . Secrets ,
Parameters : vol . Parameters ,
Context : vol . Context ,
2020-02-18 16:08:44 +00:00
}
2020-02-21 10:32:10 +00:00
cReq . PluginID = plugin . ID
2020-02-18 16:08:44 +00:00
cResp := & cstructs . ClientCSIControllerValidateVolumeResponse { }
2020-04-10 20:47:21 +00:00
return v . srv . RPC ( method , cReq , cResp )
2020-02-18 16:08:44 +00:00
}
2022-03-07 16:06:59 +00:00
// Register registers a new volume or updates an existing volume. Note
// that most user-defined CSIVolume fields are immutable once the
// volume has been created.
//
// If the user needs to change fields because they've misconfigured
// the registration of the external volume, we expect that claims
// won't work either, and the user can deregister the volume and try
// again with the right settings. This lets us be as strict with
// validation here as the CreateVolume CSI RPC is expected to be.
2019-10-16 20:53:39 +00:00
func ( v * CSIVolume ) Register ( args * structs . CSIVolumeRegisterRequest , reply * structs . CSIVolumeRegisterResponse ) error {
if done , err := v . srv . forward ( "CSIVolume.Register" , args , args , reply ) ; done {
return err
}
2020-03-17 21:32:39 +00:00
allowVolume := acl . NamespaceValidator ( acl . NamespaceCapabilityCSIWriteVolume )
2020-02-11 11:41:18 +00:00
aclObj , err := v . srv . WriteACLObj ( & args . WriteRequest , false )
2019-10-16 20:53:39 +00:00
if err != nil {
return err
}
2021-03-22 13:43:30 +00:00
defer metrics . MeasureSince ( [ ] string { "nomad" , "volume" , "register" } , time . Now ( ) )
2019-10-16 20:53:39 +00:00
2020-03-17 21:32:39 +00:00
if ! allowVolume ( aclObj , args . RequestNamespace ( ) ) || ! aclObj . AllowPluginRead ( ) {
2019-10-16 20:53:39 +00:00
return structs . ErrPermissionDenied
}
2021-03-22 13:43:30 +00:00
if len ( args . Volumes ) == 0 {
2020-05-20 14:22:24 +00:00
return fmt . Errorf ( "missing volume definition" )
}
2020-02-18 16:08:44 +00:00
// This is the only namespace we ACL checked, force all the volumes to use it.
// We also validate that the plugin exists for each plugin, and validate the
// capabilities when the plugin has a controller.
2020-02-03 20:40:28 +00:00
for _ , vol := range args . Volumes {
2022-03-07 16:06:59 +00:00
snap , err := v . srv . State ( ) . Snapshot ( )
if err != nil {
return err
}
if vol . Namespace == "" {
vol . Namespace = args . RequestNamespace ( )
}
2020-02-03 20:40:28 +00:00
if err = vol . Validate ( ) ; err != nil {
2019-10-16 20:53:39 +00:00
return err
}
2020-03-17 21:32:39 +00:00
2022-03-07 16:06:59 +00:00
ws := memdb . NewWatchSet ( )
existingVol , err := snap . CSIVolumeByID ( ws , vol . Namespace , vol . ID )
2020-03-09 13:57:59 +00:00
if err != nil {
return err
}
2022-03-01 15:15:46 +00:00
2022-03-07 16:06:59 +00:00
// CSIVolume has many user-defined fields which are immutable
// once set, and many fields that are controlled by Nomad and
// are not user-settable. We merge onto a copy of the existing
// volume to allow a user to submit a volume spec for `volume
// create` and reuse it for updates in `volume register`
// without having to manually remove the fields unused by
// register (and similar use cases with API consumers such as
// Terraform).
if existingVol != nil {
existingVol = existingVol . Copy ( )
err = existingVol . Merge ( vol )
if err != nil {
return err
}
* vol = * existingVol
} else if vol . Topologies == nil || len ( vol . Topologies ) == 0 {
// The topologies for the volume have already been set
// when it was created, so for newly register volumes
// we accept the user's description of that topology
2022-03-01 15:15:46 +00:00
if vol . RequestedTopologies != nil {
vol . Topologies = vol . RequestedTopologies . Required
}
}
2022-03-07 16:06:59 +00:00
plugin , err := v . pluginValidateVolume ( args , vol )
if err != nil {
return err
}
if err := v . controllerValidateVolume ( args , vol , plugin ) ; err != nil {
return err
}
2019-10-16 20:53:39 +00:00
}
2020-02-04 13:00:00 +00:00
resp , index , err := v . srv . raftApply ( structs . CSIVolumeRegisterRequestType , args )
2019-10-16 20:53:39 +00:00
if err != nil {
2020-02-03 20:40:28 +00:00
v . logger . Error ( "csi raft apply failed" , "error" , err , "method" , "register" )
2019-10-16 20:53:39 +00:00
return err
}
2020-02-04 13:00:00 +00:00
if respErr , ok := resp . ( error ) ; ok {
return respErr
}
2019-10-16 20:53:39 +00:00
2020-02-03 20:40:28 +00:00
reply . Index = index
v . srv . setQueryMeta ( & reply . QueryMeta )
return nil
2019-10-16 20:53:39 +00:00
}
// Deregister removes a set of volumes
func ( v * CSIVolume ) Deregister ( args * structs . CSIVolumeDeregisterRequest , reply * structs . CSIVolumeDeregisterResponse ) error {
if done , err := v . srv . forward ( "CSIVolume.Deregister" , args , args , reply ) ; done {
return err
}
2020-03-17 21:32:39 +00:00
allowVolume := acl . NamespaceValidator ( acl . NamespaceCapabilityCSIWriteVolume )
2020-02-11 11:41:18 +00:00
aclObj , err := v . srv . WriteACLObj ( & args . WriteRequest , false )
2019-10-16 20:53:39 +00:00
if err != nil {
return err
}
2021-03-22 13:43:30 +00:00
defer metrics . MeasureSince ( [ ] string { "nomad" , "volume" , "deregister" } , time . Now ( ) )
2019-10-16 20:53:39 +00:00
ns := args . RequestNamespace ( )
2020-03-17 21:32:39 +00:00
if ! allowVolume ( aclObj , ns ) {
2019-10-16 20:53:39 +00:00
return structs . ErrPermissionDenied
}
2020-05-20 14:22:24 +00:00
if len ( args . VolumeIDs ) == 0 {
return fmt . Errorf ( "missing volume IDs" )
}
2020-02-04 13:00:00 +00:00
resp , index , err := v . srv . raftApply ( structs . CSIVolumeDeregisterRequestType , args )
2019-10-16 20:53:39 +00:00
if err != nil {
2020-02-03 20:40:28 +00:00
v . logger . Error ( "csi raft apply failed" , "error" , err , "method" , "deregister" )
2019-10-16 20:53:39 +00:00
return err
}
2020-02-04 13:00:00 +00:00
if respErr , ok := resp . ( error ) ; ok {
return respErr
}
reply . Index = index
v . srv . setQueryMeta ( & reply . QueryMeta )
return nil
}
2020-03-16 19:59:42 +00:00
// Claim submits a change to a volume claim
2020-02-04 13:00:00 +00:00
func ( v * CSIVolume ) Claim ( args * structs . CSIVolumeClaimRequest , reply * structs . CSIVolumeClaimResponse ) error {
if done , err := v . srv . forward ( "CSIVolume.Claim" , args , args , reply ) ; done {
return err
}
2020-03-17 21:32:39 +00:00
allowVolume := acl . NamespaceValidator ( acl . NamespaceCapabilityCSIMountVolume )
2020-02-11 11:41:18 +00:00
aclObj , err := v . srv . WriteACLObj ( & args . WriteRequest , true )
2020-02-04 13:00:00 +00:00
if err != nil {
return err
}
2021-03-22 13:43:30 +00:00
defer metrics . MeasureSince ( [ ] string { "nomad" , "volume" , "claim" } , time . Now ( ) )
2020-02-04 13:00:00 +00:00
2020-03-17 21:32:39 +00:00
if ! allowVolume ( aclObj , args . RequestNamespace ( ) ) || ! aclObj . AllowPluginRead ( ) {
2020-02-04 13:00:00 +00:00
return structs . ErrPermissionDenied
}
2020-05-20 14:22:24 +00:00
if args . VolumeID == "" {
return fmt . Errorf ( "missing volume ID" )
}
2020-11-11 18:06:30 +00:00
isNewClaim := args . Claim != structs . CSIVolumeClaimGC &&
args . State == structs . CSIVolumeClaimStateTaken
2020-04-29 15:57:19 +00:00
// COMPAT(1.0): the NodeID field was added after 0.11.0 and so we
// need to ensure it's been populated during upgrades from 0.11.0
// to later patch versions. Remove this block in 1.0
2020-11-11 18:06:30 +00:00
if isNewClaim && args . NodeID == "" {
2020-04-29 15:57:19 +00:00
state := v . srv . fsm . State ( )
ws := memdb . NewWatchSet ( )
alloc , err := state . AllocByID ( ws , args . AllocationID )
if err != nil {
return err
}
if alloc == nil {
return fmt . Errorf ( "%s: %s" ,
structs . ErrUnknownAllocationPrefix , args . AllocationID )
}
args . NodeID = alloc . NodeID
}
2020-02-04 13:00:00 +00:00
resp , index , err := v . srv . raftApply ( structs . CSIVolumeClaimRequestType , args )
if err != nil {
v . logger . Error ( "csi raft apply failed" , "error" , err , "method" , "claim" )
return err
}
if respErr , ok := resp . ( error ) ; ok {
return respErr
}
2019-10-16 20:53:39 +00:00
2022-03-29 13:44:00 +00:00
if isNewClaim {
// if this is a new claim, add a Volume and PublishContext from the
// controller (if any) to the reply
err = v . controllerPublishVolume ( args , reply )
if err != nil {
return fmt . Errorf ( "controller publish: %v" , err )
}
}
2020-02-03 20:40:28 +00:00
reply . Index = index
v . srv . setQueryMeta ( & reply . QueryMeta )
return nil
2020-01-28 15:28:34 +00:00
}
2021-05-22 15:53:29 +00:00
func csiVolumeMountOptions ( c * structs . CSIMountOptions ) * cstructs . CSIVolumeMountOptions {
if c == nil {
return nil
}
return & cstructs . CSIVolumeMountOptions {
Filesystem : c . FSType ,
MountFlags : c . MountFlags ,
}
}
2020-04-10 20:47:21 +00:00
// controllerPublishVolume sends publish request to the CSI controller
// plugin associated with a volume, if any.
func ( v * CSIVolume ) controllerPublishVolume ( req * structs . CSIVolumeClaimRequest , resp * structs . CSIVolumeClaimResponse ) error {
2020-04-13 14:46:43 +00:00
plug , vol , err := v . volAndPluginLookup ( req . RequestNamespace ( ) , req . VolumeID )
2020-04-10 20:47:21 +00:00
if err != nil {
return err
}
// Set the Response volume from the lookup
resp . Volume = vol
// Validate the existence of the allocation, regardless of whether we need it
// now.
state := v . srv . fsm . State ( )
ws := memdb . NewWatchSet ( )
alloc , err := state . AllocByID ( ws , req . AllocationID )
if err != nil {
return err
}
if alloc == nil {
return fmt . Errorf ( "%s: %s" , structs . ErrUnknownAllocationPrefix , req . AllocationID )
}
2021-03-24 18:10:44 +00:00
// Some plugins support controllers for create/snapshot but not attach. So
// if there's no plugin or the plugin doesn't attach volumes, then we can
// skip the controller publish workflow and return nil.
if plug == nil || ! plug . HasControllerCapability ( structs . CSIControllerSupportsAttachDetach ) {
2020-04-10 20:47:21 +00:00
return nil
}
2020-04-23 15:06:23 +00:00
// get Nomad's ID for the client node (not the storage provider's ID)
2020-04-10 20:47:21 +00:00
targetNode , err := state . NodeByID ( ws , alloc . NodeID )
if err != nil {
return err
}
if targetNode == nil {
return fmt . Errorf ( "%s: %s" , structs . ErrUnknownNodePrefix , alloc . NodeID )
}
2020-04-23 15:06:23 +00:00
// get the the storage provider's ID for the client node (not
// Nomad's ID for the node)
2020-04-10 20:47:21 +00:00
targetCSIInfo , ok := targetNode . CSINodePlugins [ plug . ID ]
if ! ok {
2020-08-06 17:51:29 +00:00
return fmt . Errorf ( "failed to find storage provider info for client %q, node plugin %q is not running or has not fingerprinted on this client" , targetNode . ID , plug . ID )
2020-04-10 20:47:21 +00:00
}
2020-04-23 15:06:23 +00:00
externalNodeID := targetCSIInfo . NodeInfo . ID
2020-08-06 17:51:29 +00:00
req . ExternalNodeID = externalNodeID // update with the target info
2020-04-10 20:47:21 +00:00
method := "ClientCSI.ControllerAttachVolume"
cReq := & cstructs . ClientCSIControllerAttachVolumeRequest {
VolumeID : vol . RemoteID ( ) ,
2020-04-23 15:06:23 +00:00
ClientCSINodeID : externalNodeID ,
2021-04-02 18:36:13 +00:00
AttachmentMode : req . AttachmentMode ,
AccessMode : req . AccessMode ,
2021-05-22 15:53:29 +00:00
MountOptions : csiVolumeMountOptions ( vol . MountOptions ) ,
2020-04-10 20:47:21 +00:00
ReadOnly : req . Claim == structs . CSIVolumeClaimRead ,
2020-05-11 21:12:51 +00:00
Secrets : vol . Secrets ,
2020-05-15 12:16:01 +00:00
VolumeContext : vol . Context ,
2020-04-10 20:47:21 +00:00
}
cReq . PluginID = plug . ID
cResp := & cstructs . ClientCSIControllerAttachVolumeResponse { }
err = v . srv . RPC ( method , cReq , cResp )
if err != nil {
2022-03-29 13:44:00 +00:00
if strings . Contains ( err . Error ( ) , "FailedPrecondition" ) {
return fmt . Errorf ( "%v: %v" , structs . ErrCSIClientRPCRetryable , err )
}
return err
2020-04-10 20:47:21 +00:00
}
resp . PublishContext = cResp . PublishContext
return nil
}
2020-04-13 14:46:43 +00:00
func ( v * CSIVolume ) volAndPluginLookup ( namespace , volID string ) ( * structs . CSIPlugin , * structs . CSIVolume , error ) {
state := v . srv . fsm . State ( )
2020-11-25 16:15:57 +00:00
vol , err := state . CSIVolumeByID ( nil , namespace , volID )
2020-04-13 14:46:43 +00:00
if err != nil {
return nil , nil , err
}
if vol == nil {
return nil , nil , fmt . Errorf ( "volume not found: %s" , volID )
}
if ! vol . ControllerRequired {
return nil , vol , nil
}
// note: we do this same lookup in CSIVolumeByID but then throw
// away the pointer to the plugin rather than attaching it to
// the volume so we have to do it again here.
2020-11-25 16:15:57 +00:00
plug , err := state . CSIPluginByID ( nil , vol . PluginID )
2020-04-13 14:46:43 +00:00
if err != nil {
return nil , nil , err
}
if plug == nil {
return nil , nil , fmt . Errorf ( "plugin not found: %s" , vol . PluginID )
}
return plug , vol , nil
}
2020-03-17 21:32:39 +00:00
// allowCSIMount is called on Job register to check mount permission
func allowCSIMount ( aclObj * acl . ACL , namespace string ) bool {
return aclObj . AllowPluginRead ( ) &&
aclObj . AllowNsOp ( namespace , acl . NamespaceCapabilityCSIMountVolume )
}
2020-08-06 17:51:29 +00:00
// Unpublish synchronously sends the NodeUnpublish, NodeUnstage, and
// ControllerUnpublish RPCs to the client. It handles errors according to the
// current claim state.
func ( v * CSIVolume ) Unpublish ( args * structs . CSIVolumeUnpublishRequest , reply * structs . CSIVolumeUnpublishResponse ) error {
if done , err := v . srv . forward ( "CSIVolume.Unpublish" , args , args , reply ) ; done {
return err
}
2021-03-22 13:43:30 +00:00
defer metrics . MeasureSince ( [ ] string { "nomad" , "volume" , "unpublish" } , time . Now ( ) )
2020-08-06 17:51:29 +00:00
allowVolume := acl . NamespaceValidator ( acl . NamespaceCapabilityCSIMountVolume )
aclObj , err := v . srv . WriteACLObj ( & args . WriteRequest , true )
if err != nil {
return err
}
if ! allowVolume ( aclObj , args . RequestNamespace ( ) ) || ! aclObj . AllowPluginRead ( ) {
return structs . ErrPermissionDenied
}
if args . VolumeID == "" {
return fmt . Errorf ( "missing volume ID" )
}
if args . Claim == nil {
return fmt . Errorf ( "missing volume claim" )
}
ws := memdb . NewWatchSet ( )
state := v . srv . fsm . State ( )
vol , err := state . CSIVolumeByID ( ws , args . Namespace , args . VolumeID )
if err != nil {
return err
}
if vol == nil {
return fmt . Errorf ( "no such volume" )
}
claim := args . Claim
// previous checkpoints may have set the past claim state already.
// in practice we should never see CSIVolumeClaimStateControllerDetached
// but having an option for the state makes it easy to add a checkpoint
// in a backwards compatible way if we need one later
switch claim . State {
case structs . CSIVolumeClaimStateNodeDetached :
goto NODE_DETACHED
case structs . CSIVolumeClaimStateControllerDetached :
goto RELEASE_CLAIM
case structs . CSIVolumeClaimStateReadyToFree :
goto RELEASE_CLAIM
}
err = v . nodeUnpublishVolume ( vol , claim )
if err != nil {
return err
}
NODE_DETACHED :
2020-08-07 14:43:45 +00:00
err = v . controllerUnpublishVolume ( vol , claim )
2020-08-06 17:51:29 +00:00
if err != nil {
return err
}
RELEASE_CLAIM :
2022-04-04 14:46:45 +00:00
v . logger . Trace ( "releasing claim" , "vol" , vol . ID )
2020-08-06 17:51:29 +00:00
// advance a CSIVolumeClaimStateControllerDetached claim
claim . State = structs . CSIVolumeClaimStateReadyToFree
err = v . checkpointClaim ( vol , claim )
if err != nil {
return err
}
reply . Index = vol . ModifyIndex
v . srv . setQueryMeta ( & reply . QueryMeta )
return nil
}
func ( v * CSIVolume ) nodeUnpublishVolume ( vol * structs . CSIVolume , claim * structs . CSIVolumeClaim ) error {
2022-04-04 14:46:45 +00:00
v . logger . Trace ( "node unpublish" , "vol" , vol . ID )
2022-06-09 15:33:22 +00:00
store := v . srv . fsm . State ( )
// If the node has been GC'd or is down, we can't send it a node
// unpublish. We need to assume the node has unpublished at its
// end. If it hasn't, any controller unpublish will potentially
// hang or error and need to be retried.
if claim . NodeID != "" {
node , err := store . NodeByID ( memdb . NewWatchSet ( ) , claim . NodeID )
if err != nil {
return err
}
if node == nil || node . Status == structs . NodeStatusDown {
v . logger . Debug ( "skipping node unpublish for down or GC'd node" )
claim . State = structs . CSIVolumeClaimStateNodeDetached
return v . checkpointClaim ( vol , claim )
}
}
2020-08-11 14:18:54 +00:00
if claim . AllocationID != "" {
err := v . nodeUnpublishVolumeImpl ( vol , claim )
if err != nil {
return err
}
claim . State = structs . CSIVolumeClaimStateNodeDetached
return v . checkpointClaim ( vol , claim )
}
2022-01-27 15:39:08 +00:00
// The RPC sent from the 'nomad node detach' command or GC won't have an
2020-08-11 14:18:54 +00:00
// allocation ID set so we try to unpublish every terminal or invalid
2022-01-27 15:39:08 +00:00
// alloc on the node, all of which will be in PastClaims after denormalizing
2022-06-09 15:33:22 +00:00
vol , err := store . CSIVolumeDenormalize ( memdb . NewWatchSet ( ) , vol )
2020-08-11 14:18:54 +00:00
if err != nil {
return err
}
2022-01-27 15:39:08 +00:00
claimsToUnpublish := [ ] * structs . CSIVolumeClaim { }
for _ , pastClaim := range vol . PastClaims {
if claim . NodeID == pastClaim . NodeID {
claimsToUnpublish = append ( claimsToUnpublish , pastClaim )
2020-08-11 14:18:54 +00:00
}
}
2022-01-27 15:39:08 +00:00
2020-08-11 14:18:54 +00:00
var merr multierror . Error
2022-01-27 15:39:08 +00:00
for _ , pastClaim := range claimsToUnpublish {
err := v . nodeUnpublishVolumeImpl ( vol , pastClaim )
2020-08-11 14:18:54 +00:00
if err != nil {
merr . Errors = append ( merr . Errors , err )
}
}
err = merr . ErrorOrNil ( )
if err != nil {
return err
}
claim . State = structs . CSIVolumeClaimStateNodeDetached
return v . checkpointClaim ( vol , claim )
}
func ( v * CSIVolume ) nodeUnpublishVolumeImpl ( vol * structs . CSIVolume , claim * structs . CSIVolumeClaim ) error {
2022-03-29 13:44:00 +00:00
if claim . AccessMode == structs . CSIVolumeAccessModeUnknown {
// claim has already been released client-side
return nil
}
2020-08-06 17:51:29 +00:00
req := & cstructs . ClientCSINodeDetachVolumeRequest {
PluginID : vol . PluginID ,
VolumeID : vol . ID ,
ExternalID : vol . RemoteID ( ) ,
AllocID : claim . AllocationID ,
NodeID : claim . NodeID ,
2022-01-27 15:39:08 +00:00
AttachmentMode : claim . AttachmentMode ,
AccessMode : claim . AccessMode ,
2020-08-06 17:51:29 +00:00
ReadOnly : claim . Mode == structs . CSIVolumeClaimRead ,
}
err := v . srv . RPC ( "ClientCSI.NodeDetachVolume" ,
req , & cstructs . ClientCSINodeDetachVolumeResponse { } )
if err != nil {
// we should only get this error if the Nomad node disconnects and
// is garbage-collected, so at this point we don't have any reason
// to operate as though the volume is attached to it.
2022-03-22 19:40:24 +00:00
// note: errors.Is cannot be used because the RPC call breaks
// error wrapping.
if ! strings . Contains ( err . Error ( ) , structs . ErrUnknownNode . Error ( ) ) {
2020-08-06 17:51:29 +00:00
return fmt . Errorf ( "could not detach from node: %w" , err )
}
}
2020-08-11 14:18:54 +00:00
return nil
2020-08-06 17:51:29 +00:00
}
2020-08-07 14:43:45 +00:00
func ( v * CSIVolume ) controllerUnpublishVolume ( vol * structs . CSIVolume , claim * structs . CSIVolumeClaim ) error {
2022-04-04 14:46:45 +00:00
v . logger . Trace ( "controller unpublish" , "vol" , vol . ID )
2020-08-07 14:43:45 +00:00
if ! vol . ControllerRequired {
2020-08-06 17:51:29 +00:00
claim . State = structs . CSIVolumeClaimStateReadyToFree
return nil
}
2021-03-24 18:10:44 +00:00
state := v . srv . fsm . State ( )
ws := memdb . NewWatchSet ( )
plugin , err := state . CSIPluginByID ( ws , vol . PluginID )
if err != nil {
return fmt . Errorf ( "could not query plugin: %v" , err )
} else if plugin == nil {
return fmt . Errorf ( "no such plugin: %q" , vol . PluginID )
}
if ! plugin . HasControllerCapability ( structs . CSIControllerSupportsAttachDetach ) {
return nil
}
2020-08-07 14:43:45 +00:00
// we only send a controller detach if a Nomad client no longer has
// any claim to the volume, so we need to check the status of claimed
// allocations
2021-03-24 18:10:44 +00:00
vol , err = state . CSIVolumeDenormalize ( ws , vol )
2020-08-07 14:43:45 +00:00
if err != nil {
return err
}
for _ , alloc := range vol . ReadAllocs {
if alloc != nil && alloc . NodeID == claim . NodeID && ! alloc . TerminalStatus ( ) {
claim . State = structs . CSIVolumeClaimStateReadyToFree
return nil
}
}
for _ , alloc := range vol . WriteAllocs {
if alloc != nil && alloc . NodeID == claim . NodeID && ! alloc . TerminalStatus ( ) {
claim . State = structs . CSIVolumeClaimStateReadyToFree
return nil
}
}
2020-08-06 17:51:29 +00:00
// if the RPC is sent by a client node, it doesn't know the claim's
// external node ID.
if claim . ExternalNodeID == "" {
externalNodeID , err := v . lookupExternalNodeID ( vol , claim )
if err != nil {
return fmt . Errorf ( "missing external node ID: %v" , err )
}
claim . ExternalNodeID = externalNodeID
}
req := & cstructs . ClientCSIControllerDetachVolumeRequest {
VolumeID : vol . RemoteID ( ) ,
ClientCSINodeID : claim . ExternalNodeID ,
Secrets : vol . Secrets ,
}
req . PluginID = vol . PluginID
2020-08-07 14:43:45 +00:00
err = v . srv . RPC ( "ClientCSI.ControllerDetachVolume" , req ,
2020-08-06 17:51:29 +00:00
& cstructs . ClientCSIControllerDetachVolumeResponse { } )
if err != nil {
return fmt . Errorf ( "could not detach from controller: %v" , err )
}
claim . State = structs . CSIVolumeClaimStateReadyToFree
return v . checkpointClaim ( vol , claim )
}
// lookupExternalNodeID gets the CSI plugin's ID for a node. we look it up in
// the volume's claims first because it's possible the client has been stopped
// and GC'd by this point, so looking there is the last resort.
func ( v * CSIVolume ) lookupExternalNodeID ( vol * structs . CSIVolume , claim * structs . CSIVolumeClaim ) ( string , error ) {
for _ , rClaim := range vol . ReadClaims {
2022-03-29 13:44:00 +00:00
if rClaim . NodeID == claim . NodeID && rClaim . ExternalNodeID != "" {
2020-08-06 17:51:29 +00:00
return rClaim . ExternalNodeID , nil
}
}
for _ , wClaim := range vol . WriteClaims {
2022-03-29 13:44:00 +00:00
if wClaim . NodeID == claim . NodeID && wClaim . ExternalNodeID != "" {
2020-08-06 17:51:29 +00:00
return wClaim . ExternalNodeID , nil
}
}
for _ , pClaim := range vol . PastClaims {
2022-03-29 13:44:00 +00:00
if pClaim . NodeID == claim . NodeID && pClaim . ExternalNodeID != "" {
2020-08-06 17:51:29 +00:00
return pClaim . ExternalNodeID , nil
}
}
// fallback to looking up the node plugin
ws := memdb . NewWatchSet ( )
state := v . srv . fsm . State ( )
targetNode , err := state . NodeByID ( ws , claim . NodeID )
if err != nil {
return "" , err
}
if targetNode == nil {
return "" , fmt . Errorf ( "%s: %s" , structs . ErrUnknownNodePrefix , claim . NodeID )
}
// get the the storage provider's ID for the client node (not
// Nomad's ID for the node)
targetCSIInfo , ok := targetNode . CSINodePlugins [ vol . PluginID ]
2020-08-07 15:01:36 +00:00
if ! ok || targetCSIInfo . NodeInfo == nil {
2020-08-06 17:51:29 +00:00
return "" , fmt . Errorf ( "failed to find storage provider info for client %q, node plugin %q is not running or has not fingerprinted on this client" , targetNode . ID , vol . PluginID )
}
return targetCSIInfo . NodeInfo . ID , nil
}
func ( v * CSIVolume ) checkpointClaim ( vol * structs . CSIVolume , claim * structs . CSIVolumeClaim ) error {
v . logger . Trace ( "checkpointing claim" )
req := structs . CSIVolumeClaimRequest {
VolumeID : vol . ID ,
AllocationID : claim . AllocationID ,
NodeID : claim . NodeID ,
Claim : claim . Mode ,
State : claim . State ,
WriteRequest : structs . WriteRequest {
Namespace : vol . Namespace ,
} ,
}
resp , index , err := v . srv . raftApply ( structs . CSIVolumeClaimRequestType , req )
if err != nil {
v . logger . Error ( "csi raft apply failed" , "error" , err )
return err
}
if respErr , ok := resp . ( error ) ; ok {
return respErr
}
vol . ModifyIndex = index
return nil
}
2021-03-22 13:43:30 +00:00
func ( v * CSIVolume ) Create ( args * structs . CSIVolumeCreateRequest , reply * structs . CSIVolumeCreateResponse ) error {
if done , err := v . srv . forward ( "CSIVolume.Create" , args , args , reply ) ; done {
return err
}
defer metrics . MeasureSince ( [ ] string { "nomad" , "volume" , "create" } , time . Now ( ) )
allowVolume := acl . NamespaceValidator ( acl . NamespaceCapabilityCSIWriteVolume )
aclObj , err := v . srv . WriteACLObj ( & args . WriteRequest , false )
if err != nil {
return err
}
if ! allowVolume ( aclObj , args . RequestNamespace ( ) ) || ! aclObj . AllowPluginRead ( ) {
return structs . ErrPermissionDenied
}
if len ( args . Volumes ) == 0 {
return fmt . Errorf ( "missing volume definition" )
}
regArgs := & structs . CSIVolumeRegisterRequest { WriteRequest : args . WriteRequest }
type validated struct {
vol * structs . CSIVolume
plugin * structs . CSIPlugin
}
validatedVols := [ ] validated { }
// This is the only namespace we ACL checked, force all the volumes to use it.
// We also validate that the plugin exists for each plugin, and validate the
// capabilities when the plugin has a controller.
for _ , vol := range args . Volumes {
2022-03-29 18:46:39 +00:00
if vol . Namespace == "" {
vol . Namespace = args . RequestNamespace ( )
}
2021-03-22 13:43:30 +00:00
if err = vol . Validate ( ) ; err != nil {
return err
}
plugin , err := v . pluginValidateVolume ( regArgs , vol )
if err != nil {
return err
}
if ! plugin . ControllerRequired {
return fmt . Errorf ( "plugin has no controller" )
}
2021-03-24 18:10:44 +00:00
if ! plugin . HasControllerCapability ( structs . CSIControllerSupportsCreateDelete ) {
return fmt . Errorf ( "plugin does not support creating volumes" )
2021-03-22 13:43:30 +00:00
}
validatedVols = append ( validatedVols , validated { vol , plugin } )
}
// Attempt to create all the validated volumes and write only successfully
// created volumes to raft. And we'll report errors for any failed volumes
//
// NOTE: creating the volume in the external storage provider can't be
// made atomic with the registration, and creating the volume provides
// values we want to write on the CSIVolume in raft anyways. For now
// we'll block the RPC on the external storage provider so that we can
// easily return meaningful errors to the user, but in the future we
// should consider creating registering first and creating a "volume
// eval" that can do the plugin RPCs async.
var mErr multierror . Error
for _ , valid := range validatedVols {
err = v . createVolume ( valid . vol , valid . plugin )
if err != nil {
multierror . Append ( & mErr , err )
} else {
regArgs . Volumes = append ( regArgs . Volumes , valid . vol )
}
}
resp , index , err := v . srv . raftApply ( structs . CSIVolumeRegisterRequestType , regArgs )
if err != nil {
v . logger . Error ( "csi raft apply failed" , "error" , err , "method" , "register" )
return err
}
if respErr , ok := resp . ( error ) ; ok {
multierror . Append ( & mErr , respErr )
}
err = mErr . ErrorOrNil ( )
if err != nil {
return err
}
2021-03-23 13:45:14 +00:00
reply . Volumes = regArgs . Volumes
2021-03-22 13:43:30 +00:00
reply . Index = index
v . srv . setQueryMeta ( & reply . QueryMeta )
return nil
}
func ( v * CSIVolume ) createVolume ( vol * structs . CSIVolume , plugin * structs . CSIPlugin ) error {
method := "ClientCSI.ControllerCreateVolume"
cReq := & cstructs . ClientCSIControllerCreateVolumeRequest {
2022-03-01 15:15:46 +00:00
Name : vol . Name ,
VolumeCapabilities : vol . RequestedCapabilities ,
MountOptions : vol . MountOptions ,
Parameters : vol . Parameters ,
Secrets : vol . Secrets ,
CapacityMin : vol . RequestedCapacityMin ,
CapacityMax : vol . RequestedCapacityMax ,
SnapshotID : vol . SnapshotID ,
CloneID : vol . CloneID ,
RequestedTopologies : vol . RequestedTopologies ,
2021-03-22 13:43:30 +00:00
}
cReq . PluginID = plugin . ID
cResp := & cstructs . ClientCSIControllerCreateVolumeResponse { }
err := v . srv . RPC ( method , cReq , cResp )
if err != nil {
return err
}
vol . ExternalID = cResp . ExternalVolumeID
vol . Capacity = cResp . CapacityBytes
vol . Context = cResp . VolumeContext
2022-03-01 15:15:46 +00:00
vol . Topologies = cResp . Topologies
2021-03-22 13:43:30 +00:00
return nil
}
func ( v * CSIVolume ) Delete ( args * structs . CSIVolumeDeleteRequest , reply * structs . CSIVolumeDeleteResponse ) error {
if done , err := v . srv . forward ( "CSIVolume.Delete" , args , args , reply ) ; done {
return err
}
defer metrics . MeasureSince ( [ ] string { "nomad" , "volume" , "delete" } , time . Now ( ) )
allowVolume := acl . NamespaceValidator ( acl . NamespaceCapabilityCSIWriteVolume )
aclObj , err := v . srv . WriteACLObj ( & args . WriteRequest , false )
if err != nil {
return err
}
2021-04-01 15:16:52 +00:00
if ! allowVolume ( aclObj , args . RequestNamespace ( ) ) || ! aclObj . AllowPluginRead ( ) {
2021-03-22 13:43:30 +00:00
return structs . ErrPermissionDenied
}
if len ( args . VolumeIDs ) == 0 {
return fmt . Errorf ( "missing volume IDs" )
}
for _ , volID := range args . VolumeIDs {
plugin , vol , err := v . volAndPluginLookup ( args . Namespace , volID )
if err != nil {
if err == fmt . Errorf ( "volume not found: %s" , volID ) {
v . logger . Warn ( "volume %q to be deleted was already deregistered" )
continue
} else {
return err
}
}
// NOTE: deleting the volume in the external storage provider can't be
// made atomic with deregistration. We can't delete a volume that's
// not registered because we need to be able to lookup its plugin.
2022-04-05 12:59:11 +00:00
err = v . deleteVolume ( vol , plugin , args . Secrets )
2021-03-22 13:43:30 +00:00
if err != nil {
return err
}
}
deregArgs := & structs . CSIVolumeDeregisterRequest {
VolumeIDs : args . VolumeIDs ,
WriteRequest : args . WriteRequest ,
}
resp , index , err := v . srv . raftApply ( structs . CSIVolumeDeregisterRequestType , deregArgs )
if err != nil {
v . logger . Error ( "csi raft apply failed" , "error" , err , "method" , "deregister" )
return err
}
if respErr , ok := resp . ( error ) ; ok {
return respErr
}
reply . Index = index
v . srv . setQueryMeta ( & reply . QueryMeta )
return nil
}
2022-04-05 12:59:11 +00:00
func ( v * CSIVolume ) deleteVolume ( vol * structs . CSIVolume , plugin * structs . CSIPlugin , querySecrets structs . CSISecrets ) error {
// Combine volume and query secrets into one map.
// Query secrets override any secrets stored with the volume.
combinedSecrets := vol . Secrets
for k , v := range querySecrets {
combinedSecrets [ k ] = v
}
2021-03-22 13:43:30 +00:00
method := "ClientCSI.ControllerDeleteVolume"
cReq := & cstructs . ClientCSIControllerDeleteVolumeRequest {
ExternalVolumeID : vol . ExternalID ,
2022-04-05 12:59:11 +00:00
Secrets : combinedSecrets ,
2021-03-22 13:43:30 +00:00
}
cReq . PluginID = plugin . ID
cResp := & cstructs . ClientCSIControllerDeleteVolumeResponse { }
return v . srv . RPC ( method , cReq , cResp )
}
func ( v * CSIVolume ) ListExternal ( args * structs . CSIVolumeExternalListRequest , reply * structs . CSIVolumeExternalListResponse ) error {
if done , err := v . srv . forward ( "CSIVolume.ListExternal" , args , args , reply ) ; done {
return err
}
defer metrics . MeasureSince ( [ ] string { "nomad" , "volume" , "list_external" } , time . Now ( ) )
allowVolume := acl . NamespaceValidator ( acl . NamespaceCapabilityCSIListVolume ,
acl . NamespaceCapabilityCSIReadVolume ,
acl . NamespaceCapabilityCSIMountVolume ,
acl . NamespaceCapabilityListJobs )
aclObj , err := v . srv . QueryACLObj ( & args . QueryOptions , false )
if err != nil {
return err
}
// NOTE: this is the plugin's namespace, not the volume(s) because they
// might not even be registered
if ! allowVolume ( aclObj , args . RequestNamespace ( ) ) {
return structs . ErrPermissionDenied
}
snap , err := v . srv . fsm . State ( ) . Snapshot ( )
if err != nil {
return err
}
plugin , err := snap . CSIPluginByID ( nil , args . PluginID )
if err != nil {
return err
}
if plugin == nil {
return fmt . Errorf ( "no such plugin" )
}
2021-04-07 14:34:29 +00:00
if ! plugin . HasControllerCapability ( structs . CSIControllerSupportsListVolumes ) {
return fmt . Errorf ( "unimplemented for this plugin" )
}
2021-03-22 13:43:30 +00:00
method := "ClientCSI.ControllerListVolumes"
cReq := & cstructs . ClientCSIControllerListVolumesRequest {
2021-03-23 13:45:14 +00:00
MaxEntries : args . PerPage ,
StartingToken : args . NextToken ,
2021-03-22 13:43:30 +00:00
}
cReq . PluginID = plugin . ID
cResp := & cstructs . ClientCSIControllerListVolumesResponse { }
err = v . srv . RPC ( method , cReq , cResp )
if err != nil {
return err
}
2021-04-09 17:12:47 +00:00
if args . PerPage > 0 && args . PerPage < int32 ( len ( cResp . Entries ) ) {
// this should be done in the plugin already, but enforce it
2021-03-23 13:45:14 +00:00
reply . Volumes = cResp . Entries [ : args . PerPage ]
2021-03-22 13:43:30 +00:00
} else {
reply . Volumes = cResp . Entries
}
reply . NextToken = cResp . NextToken
return nil
}
2021-04-01 15:16:52 +00:00
func ( v * CSIVolume ) CreateSnapshot ( args * structs . CSISnapshotCreateRequest , reply * structs . CSISnapshotCreateResponse ) error {
if done , err := v . srv . forward ( "CSIVolume.CreateSnapshot" , args , args , reply ) ; done {
return err
}
defer metrics . MeasureSince ( [ ] string { "nomad" , "volume" , "create_snapshot" } , time . Now ( ) )
allowVolume := acl . NamespaceValidator ( acl . NamespaceCapabilityCSIWriteVolume )
aclObj , err := v . srv . WriteACLObj ( & args . WriteRequest , false )
if err != nil {
return err
}
if ! allowVolume ( aclObj , args . RequestNamespace ( ) ) || ! aclObj . AllowPluginRead ( ) {
return structs . ErrPermissionDenied
}
state , err := v . srv . fsm . State ( ) . Snapshot ( )
if err != nil {
return err
}
method := "ClientCSI.ControllerCreateSnapshot"
var mErr multierror . Error
for _ , snap := range args . Snapshots {
if snap == nil {
// we intentionally don't multierror here because we're in a weird state
return fmt . Errorf ( "snapshot cannot be nil" )
}
2022-03-07 14:06:50 +00:00
vol , err := state . CSIVolumeByID ( nil , args . RequestNamespace ( ) , snap . SourceVolumeID )
2021-04-01 15:16:52 +00:00
if err != nil {
2022-03-07 14:06:50 +00:00
multierror . Append ( & mErr , fmt . Errorf ( "error querying volume %q: %v" , snap . SourceVolumeID , err ) )
2021-04-01 15:16:52 +00:00
continue
}
2022-03-07 14:06:50 +00:00
if vol == nil {
multierror . Append ( & mErr , fmt . Errorf ( "no such volume %q" , snap . SourceVolumeID ) )
2021-04-01 15:16:52 +00:00
continue
}
2022-03-07 14:06:50 +00:00
pluginID := snap . PluginID
if pluginID == "" {
pluginID = vol . PluginID
2021-04-01 15:16:52 +00:00
}
2022-03-07 14:06:50 +00:00
plugin , err := state . CSIPluginByID ( nil , pluginID )
2021-04-01 15:16:52 +00:00
if err != nil {
2022-03-07 14:06:50 +00:00
multierror . Append ( & mErr ,
fmt . Errorf ( "error querying plugin %q: %v" , pluginID , err ) )
2021-04-01 15:16:52 +00:00
continue
}
2022-03-07 14:06:50 +00:00
if plugin == nil {
multierror . Append ( & mErr , fmt . Errorf ( "no such plugin %q" , pluginID ) )
continue
}
if ! plugin . HasControllerCapability ( structs . CSIControllerSupportsCreateDeleteSnapshot ) {
multierror . Append ( & mErr ,
fmt . Errorf ( "plugin %q does not support snapshot" , pluginID ) )
2021-04-01 15:16:52 +00:00
continue
}
2022-03-24 14:29:50 +00:00
secrets := vol . Secrets
for k , v := range snap . Secrets {
// merge request secrets onto volume secrets
secrets [ k ] = v
}
2021-04-01 15:16:52 +00:00
cReq := & cstructs . ClientCSIControllerCreateSnapshotRequest {
ExternalSourceVolumeID : vol . ExternalID ,
Name : snap . Name ,
2022-03-24 14:29:50 +00:00
Secrets : secrets ,
2021-04-01 15:16:52 +00:00
Parameters : snap . Parameters ,
}
2022-03-07 14:06:50 +00:00
cReq . PluginID = pluginID
2021-04-01 15:16:52 +00:00
cResp := & cstructs . ClientCSIControllerCreateSnapshotResponse { }
err = v . srv . RPC ( method , cReq , cResp )
if err != nil {
multierror . Append ( & mErr , fmt . Errorf ( "could not create snapshot: %v" , err ) )
continue
}
reply . Snapshots = append ( reply . Snapshots , & structs . CSISnapshot {
ID : cResp . ID ,
ExternalSourceVolumeID : cResp . ExternalSourceVolumeID ,
SizeBytes : cResp . SizeBytes ,
CreateTime : cResp . CreateTime ,
IsReady : cResp . IsReady ,
} )
}
return mErr . ErrorOrNil ( )
}
func ( v * CSIVolume ) DeleteSnapshot ( args * structs . CSISnapshotDeleteRequest , reply * structs . CSISnapshotDeleteResponse ) error {
if done , err := v . srv . forward ( "CSIVolume.DeleteSnapshot" , args , args , reply ) ; done {
return err
}
defer metrics . MeasureSince ( [ ] string { "nomad" , "volume" , "delete_snapshot" } , time . Now ( ) )
allowVolume := acl . NamespaceValidator ( acl . NamespaceCapabilityCSIWriteVolume )
aclObj , err := v . srv . WriteACLObj ( & args . WriteRequest , false )
if err != nil {
return err
}
// NOTE: this is the plugin's namespace, not the snapshot(s) because we
// don't track snapshots in the state store at all and their source
// volume(s) because they might not even be registered
if ! allowVolume ( aclObj , args . RequestNamespace ( ) ) || ! aclObj . AllowPluginRead ( ) {
return structs . ErrPermissionDenied
}
stateSnap , err := v . srv . fsm . State ( ) . Snapshot ( )
if err != nil {
return err
}
var mErr multierror . Error
for _ , snap := range args . Snapshots {
if snap == nil {
// we intentionally don't multierror here because we're in a weird state
return fmt . Errorf ( "snapshot cannot be nil" )
}
plugin , err := stateSnap . CSIPluginByID ( nil , snap . PluginID )
if err != nil {
multierror . Append ( & mErr ,
fmt . Errorf ( "could not query plugin %q: %v" , snap . PluginID , err ) )
continue
}
if plugin == nil {
multierror . Append ( & mErr , fmt . Errorf ( "no such plugin" ) )
continue
}
if ! plugin . HasControllerCapability ( structs . CSIControllerSupportsCreateDeleteSnapshot ) {
multierror . Append ( & mErr , fmt . Errorf ( "plugin does not support snapshot" ) )
continue
}
method := "ClientCSI.ControllerDeleteSnapshot"
cReq := & cstructs . ClientCSIControllerDeleteSnapshotRequest { ID : snap . ID }
cReq . PluginID = plugin . ID
cResp := & cstructs . ClientCSIControllerDeleteSnapshotResponse { }
err = v . srv . RPC ( method , cReq , cResp )
if err != nil {
multierror . Append ( & mErr , fmt . Errorf ( "could not delete %q: %v" , snap . ID , err ) )
}
}
return mErr . ErrorOrNil ( )
}
func ( v * CSIVolume ) ListSnapshots ( args * structs . CSISnapshotListRequest , reply * structs . CSISnapshotListResponse ) error {
if done , err := v . srv . forward ( "CSIVolume.ListSnapshots" , args , args , reply ) ; done {
return err
}
defer metrics . MeasureSince ( [ ] string { "nomad" , "volume" , "list_snapshots" } , time . Now ( ) )
allowVolume := acl . NamespaceValidator ( acl . NamespaceCapabilityCSIListVolume ,
acl . NamespaceCapabilityCSIReadVolume ,
acl . NamespaceCapabilityCSIMountVolume ,
acl . NamespaceCapabilityListJobs )
aclObj , err := v . srv . QueryACLObj ( & args . QueryOptions , false )
if err != nil {
return err
}
// NOTE: this is the plugin's namespace, not the volume(s) because they
// might not even be registered
if ! allowVolume ( aclObj , args . RequestNamespace ( ) ) {
return structs . ErrPermissionDenied
}
snap , err := v . srv . fsm . State ( ) . Snapshot ( )
if err != nil {
return err
}
plugin , err := snap . CSIPluginByID ( nil , args . PluginID )
if err != nil {
return err
}
if plugin == nil {
return fmt . Errorf ( "no such plugin" )
}
if ! plugin . HasControllerCapability ( structs . CSIControllerSupportsListSnapshots ) {
return fmt . Errorf ( "plugin does not support listing snapshots" )
}
method := "ClientCSI.ControllerListSnapshots"
cReq := & cstructs . ClientCSIControllerListSnapshotsRequest {
MaxEntries : args . PerPage ,
StartingToken : args . NextToken ,
2021-07-03 00:46:41 +00:00
Secrets : args . Secrets ,
2021-04-01 15:16:52 +00:00
}
cReq . PluginID = plugin . ID
cResp := & cstructs . ClientCSIControllerListSnapshotsResponse { }
err = v . srv . RPC ( method , cReq , cResp )
if err != nil {
return err
}
2021-04-09 17:12:47 +00:00
if args . PerPage > 0 && args . PerPage < int32 ( len ( cResp . Entries ) ) {
// this should be done in the plugin already, but enforce it
2021-04-01 15:16:52 +00:00
reply . Snapshots = cResp . Entries [ : args . PerPage ]
} else {
reply . Snapshots = cResp . Entries
}
reply . NextToken = cResp . NextToken
return nil
}
2020-01-28 15:28:34 +00:00
// CSIPlugin wraps the structs.CSIPlugin with request data and server context
type CSIPlugin struct {
srv * Server
logger log . Logger
}
// List replies with CSIPlugins, filtered by ACL access
func ( v * CSIPlugin ) List ( args * structs . CSIPluginListRequest , reply * structs . CSIPluginListResponse ) error {
if done , err := v . srv . forward ( "CSIPlugin.List" , args , args , reply ) ; done {
return err
}
2020-02-11 11:41:18 +00:00
aclObj , err := v . srv . QueryACLObj ( & args . QueryOptions , false )
2020-01-28 15:28:34 +00:00
if err != nil {
return err
}
2020-03-18 19:29:03 +00:00
if ! aclObj . AllowPluginList ( ) {
2020-01-28 15:28:34 +00:00
return structs . ErrPermissionDenied
}
2021-03-22 13:43:30 +00:00
defer metrics . MeasureSince ( [ ] string { "nomad" , "plugin" , "list" } , time . Now ( ) )
2020-01-28 15:28:34 +00:00
opts := blockingOptions {
queryOpts : & args . QueryOptions ,
queryMeta : & reply . QueryMeta ,
run : func ( ws memdb . WatchSet , state * state . StateStore ) error {
2022-03-04 21:44:09 +00:00
var iter memdb . ResultIterator
var err error
if args . Prefix != "" {
iter , err = state . CSIPluginsByIDPrefix ( ws , args . Prefix )
if err != nil {
return err
}
} else {
// Query all plugins
iter , err = state . CSIPlugins ( ws )
if err != nil {
return err
}
2020-01-28 15:28:34 +00:00
}
// Collect results
2020-03-24 01:21:40 +00:00
ps := [ ] * structs . CSIPluginListStub { }
2020-01-28 15:28:34 +00:00
for {
raw := iter . Next ( )
if raw == nil {
break
}
plug := raw . ( * structs . CSIPlugin )
ps = append ( ps , plug . Stub ( ) )
}
reply . Plugins = ps
return v . srv . replySetIndex ( csiPluginTable , & reply . QueryMeta )
} }
return v . srv . blockingRPC ( & opts )
}
// Get fetches detailed information about a specific plugin
func ( v * CSIPlugin ) Get ( args * structs . CSIPluginGetRequest , reply * structs . CSIPluginGetResponse ) error {
if done , err := v . srv . forward ( "CSIPlugin.Get" , args , args , reply ) ; done {
return err
}
2020-02-11 11:41:18 +00:00
aclObj , err := v . srv . QueryACLObj ( & args . QueryOptions , false )
2020-01-28 15:28:34 +00:00
if err != nil {
return err
}
2020-03-18 19:29:03 +00:00
if ! aclObj . AllowPluginRead ( ) {
2020-01-28 15:28:34 +00:00
return structs . ErrPermissionDenied
}
2020-03-18 19:29:03 +00:00
withAllocs := aclObj == nil ||
aclObj . AllowNsOp ( args . RequestNamespace ( ) , acl . NamespaceCapabilityReadJob )
2021-03-22 13:43:30 +00:00
defer metrics . MeasureSince ( [ ] string { "nomad" , "plugin" , "get" } , time . Now ( ) )
2020-01-28 15:28:34 +00:00
2020-05-20 14:22:24 +00:00
if args . ID == "" {
return fmt . Errorf ( "missing plugin ID" )
}
2020-01-28 15:28:34 +00:00
opts := blockingOptions {
queryOpts : & args . QueryOptions ,
queryMeta : & reply . QueryMeta ,
run : func ( ws memdb . WatchSet , state * state . StateStore ) error {
2020-11-25 16:15:57 +00:00
snap , err := state . Snapshot ( )
if err != nil {
return err
}
plug , err := snap . CSIPluginByID ( ws , args . ID )
2020-01-28 15:28:34 +00:00
if err != nil {
return err
}
2020-03-18 19:29:03 +00:00
if plug == nil {
return nil
2020-01-28 15:28:34 +00:00
}
2020-03-18 19:29:03 +00:00
if withAllocs {
2020-11-25 16:15:57 +00:00
plug , err = snap . CSIPluginDenormalize ( ws , plug . Copy ( ) )
2020-03-18 19:29:03 +00:00
if err != nil {
return err
}
// Filter the allocation stubs by our namespace. withAllocs
// means we're allowed
var as [ ] * structs . AllocListStub
for _ , a := range plug . Allocations {
if a . Namespace == args . RequestNamespace ( ) {
as = append ( as , a )
}
}
plug . Allocations = as
}
2020-01-28 15:28:34 +00:00
reply . Plugin = plug
return v . srv . replySetIndex ( csiPluginTable , & reply . QueryMeta )
} }
return v . srv . blockingRPC ( & opts )
2019-10-16 20:53:39 +00:00
}
2020-05-06 20:49:12 +00:00
// Delete deletes a plugin if it is unused
func ( v * CSIPlugin ) Delete ( args * structs . CSIPluginDeleteRequest , reply * structs . CSIPluginDeleteResponse ) error {
if done , err := v . srv . forward ( "CSIPlugin.Delete" , args , args , reply ) ; done {
return err
}
// Check that it is a management token.
if aclObj , err := v . srv . ResolveToken ( args . AuthToken ) ; err != nil {
return err
} else if aclObj != nil && ! aclObj . IsManagement ( ) {
return structs . ErrPermissionDenied
}
2021-03-22 13:43:30 +00:00
defer metrics . MeasureSince ( [ ] string { "nomad" , "plugin" , "delete" } , time . Now ( ) )
2020-05-06 20:49:12 +00:00
2020-05-20 14:22:24 +00:00
if args . ID == "" {
return fmt . Errorf ( "missing plugin ID" )
}
2020-05-06 20:49:12 +00:00
resp , index , err := v . srv . raftApply ( structs . CSIPluginDeleteRequestType , args )
if err != nil {
v . logger . Error ( "csi raft apply failed" , "error" , err , "method" , "delete" )
return err
}
if respErr , ok := resp . ( error ) ; ok {
return respErr
}
reply . Index = index
v . srv . setQueryMeta ( & reply . QueryMeta )
return nil
}