open-nomad/client/allocrunner/csi_hook.go
Danielle Lancashire da4f6b60a2 csi: Pass through usage options to the csimanager
The CSI Spec requires us to attach and stage volumes based on different
types of usage information when it may effect how they are bound. Here
we pass through some basic usage options in the CSI Hook (specifically
the volume aliases ReadOnly field), and the attachment/access mode from
the volume. We pass the attachment/access mode seperately from the
volume as it simplifies some handling and doesn't necessarily force
every attachment to use the same mode should more be supported (I.e if
we let each `volume "foo" {}` specify an override in the future).
2020-03-23 13:58:30 -04:00

169 lines
4.1 KiB
Go

package allocrunner
import (
"context"
"fmt"
hclog "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
"github.com/hashicorp/nomad/nomad/structs"
)
// csiHook will wait for remote csi volumes to be attached to the host before
// continuing.
//
// It is a noop for allocs that do not depend on CSI Volumes.
type csiHook struct {
alloc *structs.Allocation
logger hclog.Logger
csimanager csimanager.Manager
rpcClient RPCer
updater hookResourceSetter
}
func (c *csiHook) Name() string {
return "csi_hook"
}
func (c *csiHook) Prerun() error {
if !c.shouldRun() {
return nil
}
ctx := context.TODO()
volumes, err := c.csiVolumesFromAlloc()
if err != nil {
return err
}
mounts := make(map[string]*csimanager.MountInfo, len(volumes))
for alias, pair := range volumes {
mounter, err := c.csimanager.MounterForVolume(ctx, pair.volume)
if err != nil {
return err
}
usageOpts := &csimanager.UsageOptions{
ReadOnly: pair.request.ReadOnly,
AttachmentMode: string(pair.volume.AttachmentMode),
AccessMode: string(pair.volume.AccessMode),
}
mountInfo, err := mounter.MountVolume(ctx, pair.volume, c.alloc, usageOpts)
if err != nil {
return err
}
mounts[alias] = mountInfo
}
res := c.updater.GetAllocHookResources()
res.CSIMounts = mounts
c.updater.SetAllocHookResources(res)
return nil
}
func (c *csiHook) Postrun() error {
if !c.shouldRun() {
return nil
}
ctx := context.TODO()
volumes, err := c.csiVolumesFromAlloc()
if err != nil {
return err
}
// For Postrun, we accumulate all unmount errors, rather than stopping on the
// first failure. This is because we want to make a best effort to free all
// storage, and in some cases there may be incorrect errors from volumes that
// never mounted correctly during prerun when an alloc is failed. It may also
// fail because a volume was externally deleted while in use by this alloc.
var result *multierror.Error
for _, pair := range volumes {
mounter, err := c.csimanager.MounterForVolume(ctx, pair.volume)
if err != nil {
result = multierror.Append(result, err)
continue
}
usageOpts := &csimanager.UsageOptions{
ReadOnly: pair.request.ReadOnly,
AttachmentMode: string(pair.volume.AttachmentMode),
AccessMode: string(pair.volume.AccessMode),
}
err = mounter.UnmountVolume(ctx, pair.volume, c.alloc, usageOpts)
if err != nil {
result = multierror.Append(result, err)
continue
}
}
return result.ErrorOrNil()
}
type volumeAndRequest struct {
volume *structs.CSIVolume
request *structs.VolumeRequest
}
// csiVolumesFromAlloc finds all the CSI Volume requests from the allocation's
// task group and then fetches them from the Nomad Server, before returning
// them in the form of map[RequestedAlias]*structs.CSIVolume.
//
// If any volume fails to validate then we return an error.
func (c *csiHook) csiVolumesFromAlloc() (map[string]*volumeAndRequest, error) {
vols := make(map[string]*volumeAndRequest)
tg := c.alloc.Job.LookupTaskGroup(c.alloc.TaskGroup)
for alias, vol := range tg.Volumes {
if vol.Type == structs.VolumeTypeCSI {
vols[alias] = &volumeAndRequest{request: vol}
}
}
for alias, pair := range vols {
req := &structs.CSIVolumeGetRequest{
ID: pair.request.Source,
}
req.Region = c.alloc.Job.Region
var resp structs.CSIVolumeGetResponse
if err := c.rpcClient.RPC("CSIVolume.Get", req, &resp); err != nil {
return nil, err
}
if resp.Volume == nil {
return nil, fmt.Errorf("Unexpected nil volume returned for ID: %v", pair.request.Source)
}
vols[alias].volume = resp.Volume
}
return vols, nil
}
func newCSIHook(logger hclog.Logger, alloc *structs.Allocation, rpcClient RPCer, csi csimanager.Manager, updater hookResourceSetter) *csiHook {
return &csiHook{
alloc: alloc,
logger: logger.Named("csi_hook"),
rpcClient: rpcClient,
csimanager: csi,
updater: updater,
}
}
func (h *csiHook) shouldRun() bool {
tg := h.alloc.Job.LookupTaskGroup(h.alloc.TaskGroup)
for _, vol := range tg.Volumes {
if vol.Type == structs.VolumeTypeCSI {
return true
}
}
return false
}