open-nomad/nomad/csi_endpoint.go
Tim Gross 2082cf738a
csi: support for VolumeContext and VolumeParameters (#7957)
The MVP for CSI in the 0.11.0 release of Nomad did not include support
for opaque volume parameters or volume context. This changeset adds
support for both.

This also moves args for ControllerValidateCapabilities into a struct.
The CSI plugin `ControllerValidateCapabilities` struct that we turn
into a CSI RPC is accumulating arguments, so moving it into a request
struct will reduce the churn of this internal API, make the plugin
code more readable, and make this method consistent with the other
plugin methods in that package.
2020-05-15 08:16:01 -04:00

633 lines
18 KiB
Go

package nomad
import (
"fmt"
"time"
metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/acl"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
)
// CSIVolume wraps the structs.CSIVolume with request data and server context
type CSIVolume struct {
srv *Server
logger log.Logger
}
// QueryACLObj looks up the ACL token in the request and returns the acl.ACL object
// - fallback to node secret ids
func (srv *Server) QueryACLObj(args *structs.QueryOptions, allowNodeAccess bool) (*acl.ACL, error) {
// Lookup the token
aclObj, err := srv.ResolveToken(args.AuthToken)
if err != nil {
// If ResolveToken had an unexpected error return that
if !structs.IsErrTokenNotFound(err) {
return nil, err
}
// If we don't allow access to this endpoint from Nodes, then return token
// not found.
if !allowNodeAccess {
return nil, structs.ErrTokenNotFound
}
ws := memdb.NewWatchSet()
// Attempt to lookup AuthToken as a Node.SecretID since nodes may call
// call this endpoint and don't have an ACL token.
node, stateErr := srv.fsm.State().NodeBySecretID(ws, args.AuthToken)
if stateErr != nil {
// Return the original ResolveToken error with this err
var merr multierror.Error
merr.Errors = append(merr.Errors, err, stateErr)
return nil, merr.ErrorOrNil()
}
// We did not find a Node for this ID, so return Token Not Found.
if node == nil {
return nil, structs.ErrTokenNotFound
}
}
// Return either the users aclObj, or nil if ACLs are disabled.
return aclObj, nil
}
// WriteACLObj calls QueryACLObj for a WriteRequest
func (srv *Server) WriteACLObj(args *structs.WriteRequest, allowNodeAccess bool) (*acl.ACL, error) {
opts := &structs.QueryOptions{
Region: args.RequestRegion(),
Namespace: args.RequestNamespace(),
AuthToken: args.AuthToken,
}
return srv.QueryACLObj(opts, allowNodeAccess)
}
const (
csiVolumeTable = "csi_volumes"
csiPluginTable = "csi_plugins"
)
// replySetIndex sets the reply with the last index that modified the table
func (srv *Server) replySetIndex(table string, reply *structs.QueryMeta) error {
s := srv.fsm.State()
index, err := s.Index(table)
if err != nil {
return err
}
reply.Index = index
// Set the query response
srv.setQueryMeta(reply)
return nil
}
// List replies with CSIVolumes, filtered by ACL access
func (v *CSIVolume) List(args *structs.CSIVolumeListRequest, reply *structs.CSIVolumeListResponse) error {
if done, err := v.srv.forward("CSIVolume.List", args, args, reply); done {
return err
}
allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIListVolume,
acl.NamespaceCapabilityCSIReadVolume,
acl.NamespaceCapabilityCSIMountVolume,
acl.NamespaceCapabilityListJobs)
aclObj, err := v.srv.QueryACLObj(&args.QueryOptions, false)
if err != nil {
return err
}
if !allowVolume(aclObj, args.RequestNamespace()) {
return structs.ErrPermissionDenied
}
metricsStart := time.Now()
defer metrics.MeasureSince([]string{"nomad", "volume", "list"}, metricsStart)
ns := args.RequestNamespace()
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Query all volumes
var err error
var iter memdb.ResultIterator
if args.NodeID != "" {
iter, err = state.CSIVolumesByNodeID(ws, args.NodeID)
} else if args.PluginID != "" {
iter, err = state.CSIVolumesByPluginID(ws, ns, args.PluginID)
} else {
iter, err = state.CSIVolumesByNamespace(ws, ns)
}
if err != nil {
return err
}
// Collect results, filter by ACL access
vs := []*structs.CSIVolListStub{}
for {
raw := iter.Next()
if raw == nil {
break
}
vol := raw.(*structs.CSIVolume)
vol, err := state.CSIVolumeDenormalizePlugins(ws, vol.Copy())
if err != nil {
return err
}
// Remove (possibly again) by PluginID to handle passing both NodeID and PluginID
if args.PluginID != "" && args.PluginID != vol.PluginID {
continue
}
// Remove by Namespace, since CSIVolumesByNodeID hasn't used the Namespace yet
if vol.Namespace != ns {
continue
}
vs = append(vs, vol.Stub())
}
reply.Volumes = vs
return v.srv.replySetIndex(csiVolumeTable, &reply.QueryMeta)
}}
return v.srv.blockingRPC(&opts)
}
// Get fetches detailed information about a specific volume
func (v *CSIVolume) Get(args *structs.CSIVolumeGetRequest, reply *structs.CSIVolumeGetResponse) error {
if done, err := v.srv.forward("CSIVolume.Get", args, args, reply); done {
return err
}
allowCSIAccess := acl.NamespaceValidator(acl.NamespaceCapabilityCSIReadVolume,
acl.NamespaceCapabilityCSIMountVolume,
acl.NamespaceCapabilityReadJob)
aclObj, err := v.srv.QueryACLObj(&args.QueryOptions, true)
if err != nil {
return err
}
ns := args.RequestNamespace()
if !allowCSIAccess(aclObj, ns) {
return structs.ErrPermissionDenied
}
metricsStart := time.Now()
defer metrics.MeasureSince([]string{"nomad", "volume", "get"}, metricsStart)
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
vol, err := state.CSIVolumeByID(ws, ns, args.ID)
if err != nil {
return err
}
if vol != nil {
vol, err = state.CSIVolumeDenormalize(ws, vol)
}
if err != nil {
return err
}
reply.Volume = vol
return v.srv.replySetIndex(csiVolumeTable, &reply.QueryMeta)
}}
return v.srv.blockingRPC(&opts)
}
func (v *CSIVolume) pluginValidateVolume(req *structs.CSIVolumeRegisterRequest, vol *structs.CSIVolume) (*structs.CSIPlugin, error) {
state := v.srv.fsm.State()
ws := memdb.NewWatchSet()
plugin, err := state.CSIPluginByID(ws, vol.PluginID)
if err != nil {
return nil, err
}
if plugin == nil {
return nil, fmt.Errorf("no CSI plugin named: %s could be found", vol.PluginID)
}
vol.Provider = plugin.Provider
vol.ProviderVersion = plugin.Version
return plugin, nil
}
func (v *CSIVolume) controllerValidateVolume(req *structs.CSIVolumeRegisterRequest, vol *structs.CSIVolume, plugin *structs.CSIPlugin) error {
if !plugin.ControllerRequired {
// The plugin does not require a controller, so for now we won't do any
// further validation of the volume.
return nil
}
method := "ClientCSI.ControllerValidateVolume"
cReq := &cstructs.ClientCSIControllerValidateVolumeRequest{
VolumeID: vol.RemoteID(),
AttachmentMode: vol.AttachmentMode,
AccessMode: vol.AccessMode,
Secrets: vol.Secrets,
Parameters: vol.Parameters,
Context: vol.Context,
}
cReq.PluginID = plugin.ID
cResp := &cstructs.ClientCSIControllerValidateVolumeResponse{}
return v.srv.RPC(method, cReq, cResp)
}
// Register registers a new volume
func (v *CSIVolume) Register(args *structs.CSIVolumeRegisterRequest, reply *structs.CSIVolumeRegisterResponse) error {
if done, err := v.srv.forward("CSIVolume.Register", args, args, reply); done {
return err
}
allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIWriteVolume)
aclObj, err := v.srv.WriteACLObj(&args.WriteRequest, false)
if err != nil {
return err
}
metricsStart := time.Now()
defer metrics.MeasureSince([]string{"nomad", "volume", "register"}, metricsStart)
if !allowVolume(aclObj, args.RequestNamespace()) || !aclObj.AllowPluginRead() {
return structs.ErrPermissionDenied
}
// This is the only namespace we ACL checked, force all the volumes to use it.
// We also validate that the plugin exists for each plugin, and validate the
// capabilities when the plugin has a controller.
for _, vol := range args.Volumes {
vol.Namespace = args.RequestNamespace()
if err = vol.Validate(); err != nil {
return err
}
plugin, err := v.pluginValidateVolume(args, vol)
if err != nil {
return err
}
if err := v.controllerValidateVolume(args, vol, plugin); err != nil {
return err
}
}
resp, index, err := v.srv.raftApply(structs.CSIVolumeRegisterRequestType, args)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "register")
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
reply.Index = index
v.srv.setQueryMeta(&reply.QueryMeta)
return nil
}
// Deregister removes a set of volumes
func (v *CSIVolume) Deregister(args *structs.CSIVolumeDeregisterRequest, reply *structs.CSIVolumeDeregisterResponse) error {
if done, err := v.srv.forward("CSIVolume.Deregister", args, args, reply); done {
return err
}
allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIWriteVolume)
aclObj, err := v.srv.WriteACLObj(&args.WriteRequest, false)
if err != nil {
return err
}
metricsStart := time.Now()
defer metrics.MeasureSince([]string{"nomad", "volume", "deregister"}, metricsStart)
ns := args.RequestNamespace()
if !allowVolume(aclObj, ns) {
return structs.ErrPermissionDenied
}
resp, index, err := v.srv.raftApply(structs.CSIVolumeDeregisterRequestType, args)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "deregister")
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
reply.Index = index
v.srv.setQueryMeta(&reply.QueryMeta)
return nil
}
// Claim submits a change to a volume claim
func (v *CSIVolume) Claim(args *structs.CSIVolumeClaimRequest, reply *structs.CSIVolumeClaimResponse) error {
if done, err := v.srv.forward("CSIVolume.Claim", args, args, reply); done {
return err
}
allowVolume := acl.NamespaceValidator(acl.NamespaceCapabilityCSIMountVolume)
aclObj, err := v.srv.WriteACLObj(&args.WriteRequest, true)
if err != nil {
return err
}
metricsStart := time.Now()
defer metrics.MeasureSince([]string{"nomad", "volume", "claim"}, metricsStart)
if !allowVolume(aclObj, args.RequestNamespace()) || !aclObj.AllowPluginRead() {
return structs.ErrPermissionDenied
}
// COMPAT(1.0): the NodeID field was added after 0.11.0 and so we
// need to ensure it's been populated during upgrades from 0.11.0
// to later patch versions. Remove this block in 1.0
if args.Claim != structs.CSIVolumeClaimRelease && args.NodeID == "" {
state := v.srv.fsm.State()
ws := memdb.NewWatchSet()
alloc, err := state.AllocByID(ws, args.AllocationID)
if err != nil {
return err
}
if alloc == nil {
return fmt.Errorf("%s: %s",
structs.ErrUnknownAllocationPrefix, args.AllocationID)
}
args.NodeID = alloc.NodeID
}
if args.Claim != structs.CSIVolumeClaimRelease {
// if this is a new claim, add a Volume and PublishContext from the
// controller (if any) to the reply
err = v.controllerPublishVolume(args, reply)
if err != nil {
return fmt.Errorf("controller publish: %v", err)
}
}
resp, index, err := v.srv.raftApply(structs.CSIVolumeClaimRequestType, args)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "claim")
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
reply.Index = index
v.srv.setQueryMeta(&reply.QueryMeta)
return nil
}
// controllerPublishVolume sends publish request to the CSI controller
// plugin associated with a volume, if any.
func (v *CSIVolume) controllerPublishVolume(req *structs.CSIVolumeClaimRequest, resp *structs.CSIVolumeClaimResponse) error {
plug, vol, err := v.volAndPluginLookup(req.RequestNamespace(), req.VolumeID)
if err != nil {
return err
}
// Set the Response volume from the lookup
resp.Volume = vol
// Validate the existence of the allocation, regardless of whether we need it
// now.
state := v.srv.fsm.State()
ws := memdb.NewWatchSet()
alloc, err := state.AllocByID(ws, req.AllocationID)
if err != nil {
return err
}
if alloc == nil {
return fmt.Errorf("%s: %s", structs.ErrUnknownAllocationPrefix, req.AllocationID)
}
// if no plugin was returned then controller validation is not required.
// Here we can return nil.
if plug == nil {
return nil
}
// get Nomad's ID for the client node (not the storage provider's ID)
targetNode, err := state.NodeByID(ws, alloc.NodeID)
if err != nil {
return err
}
if targetNode == nil {
return fmt.Errorf("%s: %s", structs.ErrUnknownNodePrefix, alloc.NodeID)
}
// get the the storage provider's ID for the client node (not
// Nomad's ID for the node)
targetCSIInfo, ok := targetNode.CSINodePlugins[plug.ID]
if !ok {
return fmt.Errorf("Failed to find NodeInfo for node: %s", targetNode.ID)
}
externalNodeID := targetCSIInfo.NodeInfo.ID
method := "ClientCSI.ControllerAttachVolume"
cReq := &cstructs.ClientCSIControllerAttachVolumeRequest{
VolumeID: vol.RemoteID(),
ClientCSINodeID: externalNodeID,
AttachmentMode: vol.AttachmentMode,
AccessMode: vol.AccessMode,
ReadOnly: req.Claim == structs.CSIVolumeClaimRead,
Secrets: vol.Secrets,
VolumeContext: vol.Context,
}
cReq.PluginID = plug.ID
cResp := &cstructs.ClientCSIControllerAttachVolumeResponse{}
err = v.srv.RPC(method, cReq, cResp)
if err != nil {
return fmt.Errorf("attach volume: %v", err)
}
resp.PublishContext = cResp.PublishContext
return nil
}
func (v *CSIVolume) volAndPluginLookup(namespace, volID string) (*structs.CSIPlugin, *structs.CSIVolume, error) {
state := v.srv.fsm.State()
ws := memdb.NewWatchSet()
vol, err := state.CSIVolumeByID(ws, namespace, volID)
if err != nil {
return nil, nil, err
}
if vol == nil {
return nil, nil, fmt.Errorf("volume not found: %s", volID)
}
if !vol.ControllerRequired {
return nil, vol, nil
}
// note: we do this same lookup in CSIVolumeByID but then throw
// away the pointer to the plugin rather than attaching it to
// the volume so we have to do it again here.
plug, err := state.CSIPluginByID(ws, vol.PluginID)
if err != nil {
return nil, nil, err
}
if plug == nil {
return nil, nil, fmt.Errorf("plugin not found: %s", vol.PluginID)
}
return plug, vol, nil
}
// allowCSIMount is called on Job register to check mount permission
func allowCSIMount(aclObj *acl.ACL, namespace string) bool {
return aclObj.AllowPluginRead() &&
aclObj.AllowNsOp(namespace, acl.NamespaceCapabilityCSIMountVolume)
}
// CSIPlugin wraps the structs.CSIPlugin with request data and server context
type CSIPlugin struct {
srv *Server
logger log.Logger
}
// List replies with CSIPlugins, filtered by ACL access
func (v *CSIPlugin) List(args *structs.CSIPluginListRequest, reply *structs.CSIPluginListResponse) error {
if done, err := v.srv.forward("CSIPlugin.List", args, args, reply); done {
return err
}
aclObj, err := v.srv.QueryACLObj(&args.QueryOptions, false)
if err != nil {
return err
}
if !aclObj.AllowPluginList() {
return structs.ErrPermissionDenied
}
metricsStart := time.Now()
defer metrics.MeasureSince([]string{"nomad", "plugin", "list"}, metricsStart)
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
// Query all plugins
iter, err := state.CSIPlugins(ws)
if err != nil {
return err
}
// Collect results
ps := []*structs.CSIPluginListStub{}
for {
raw := iter.Next()
if raw == nil {
break
}
plug := raw.(*structs.CSIPlugin)
ps = append(ps, plug.Stub())
}
reply.Plugins = ps
return v.srv.replySetIndex(csiPluginTable, &reply.QueryMeta)
}}
return v.srv.blockingRPC(&opts)
}
// Get fetches detailed information about a specific plugin
func (v *CSIPlugin) Get(args *structs.CSIPluginGetRequest, reply *structs.CSIPluginGetResponse) error {
if done, err := v.srv.forward("CSIPlugin.Get", args, args, reply); done {
return err
}
aclObj, err := v.srv.QueryACLObj(&args.QueryOptions, false)
if err != nil {
return err
}
if !aclObj.AllowPluginRead() {
return structs.ErrPermissionDenied
}
withAllocs := aclObj == nil ||
aclObj.AllowNsOp(args.RequestNamespace(), acl.NamespaceCapabilityReadJob)
metricsStart := time.Now()
defer metrics.MeasureSince([]string{"nomad", "plugin", "get"}, metricsStart)
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
plug, err := state.CSIPluginByID(ws, args.ID)
if err != nil {
return err
}
if plug == nil {
return nil
}
if withAllocs {
plug, err = state.CSIPluginDenormalize(ws, plug.Copy())
if err != nil {
return err
}
// Filter the allocation stubs by our namespace. withAllocs
// means we're allowed
var as []*structs.AllocListStub
for _, a := range plug.Allocations {
if a.Namespace == args.RequestNamespace() {
as = append(as, a)
}
}
plug.Allocations = as
}
reply.Plugin = plug
return v.srv.replySetIndex(csiPluginTable, &reply.QueryMeta)
}}
return v.srv.blockingRPC(&opts)
}
// Delete deletes a plugin if it is unused
func (v *CSIPlugin) Delete(args *structs.CSIPluginDeleteRequest, reply *structs.CSIPluginDeleteResponse) error {
if done, err := v.srv.forward("CSIPlugin.Delete", args, args, reply); done {
return err
}
// Check that it is a management token.
if aclObj, err := v.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.IsManagement() {
return structs.ErrPermissionDenied
}
metricsStart := time.Now()
defer metrics.MeasureSince([]string{"nomad", "plugin", "delete"}, metricsStart)
resp, index, err := v.srv.raftApply(structs.CSIPluginDeleteRequestType, args)
if err != nil {
v.logger.Error("csi raft apply failed", "error", err, "method", "delete")
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
reply.Index = index
v.srv.setQueryMeta(&reply.QueryMeta)
return nil
}