60874ebe25
The unpublish workflow requires that we know the mode (RW vs RO) if we want to unpublish the node. Update the hook and the Unpublish RPC so that we mark the claim for release in a new state but leave the mode alone. This fixes a bug where RO claims were failing node unpublish. The core job GC doesn't know the mode, but we don't need it for that workflow, so add a mode specifically for GC; the volumewatcher uses this as a sentinel to check whether claims (with their specific RW vs RO modes) need to be claimed.
211 lines
5.7 KiB
Go
211 lines
5.7 KiB
Go
package allocrunner
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
hclog "github.com/hashicorp/go-hclog"
|
|
multierror "github.com/hashicorp/go-multierror"
|
|
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/hashicorp/nomad/plugins/drivers"
|
|
)
|
|
|
|
// csiHook will wait for remote csi volumes to be attached to the host before
|
|
// continuing.
|
|
//
|
|
// It is a noop for allocs that do not depend on CSI Volumes.
|
|
type csiHook struct {
|
|
ar *allocRunner
|
|
alloc *structs.Allocation
|
|
logger hclog.Logger
|
|
csimanager csimanager.Manager
|
|
rpcClient RPCer
|
|
updater hookResourceSetter
|
|
volumeRequests map[string]*volumeAndRequest
|
|
}
|
|
|
|
func (c *csiHook) Name() string {
|
|
return "csi_hook"
|
|
}
|
|
|
|
func (c *csiHook) Prerun() error {
|
|
if !c.shouldRun() {
|
|
return nil
|
|
}
|
|
|
|
// We use this context only to attach hclog to the gRPC context. The
|
|
// lifetime is the lifetime of the gRPC stream, not specific RPC timeouts,
|
|
// but we manage the stream lifetime via Close in the pluginmanager.
|
|
ctx := context.Background()
|
|
|
|
volumes, err := c.claimVolumesFromAlloc()
|
|
if err != nil {
|
|
return fmt.Errorf("claim volumes: %v", err)
|
|
}
|
|
c.volumeRequests = volumes
|
|
|
|
mounts := make(map[string]*csimanager.MountInfo, len(volumes))
|
|
for alias, pair := range volumes {
|
|
mounter, err := c.csimanager.MounterForPlugin(ctx, pair.volume.PluginID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
usageOpts := &csimanager.UsageOptions{
|
|
ReadOnly: pair.request.ReadOnly,
|
|
AttachmentMode: string(pair.volume.AttachmentMode),
|
|
AccessMode: string(pair.volume.AccessMode),
|
|
MountOptions: pair.request.MountOptions,
|
|
}
|
|
|
|
mountInfo, err := mounter.MountVolume(ctx, pair.volume, c.alloc, usageOpts, pair.publishContext)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
mounts[alias] = mountInfo
|
|
}
|
|
|
|
res := c.updater.GetAllocHookResources()
|
|
res.CSIMounts = mounts
|
|
c.updater.SetAllocHookResources(res)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Postrun sends an RPC to the server to unpublish the volume. This may
|
|
// forward client RPCs to the node plugins or to the controller plugins,
|
|
// depending on whether other allocations on this node have claims on this
|
|
// volume.
|
|
func (c *csiHook) Postrun() error {
|
|
if !c.shouldRun() {
|
|
return nil
|
|
}
|
|
|
|
var mErr *multierror.Error
|
|
|
|
for _, pair := range c.volumeRequests {
|
|
|
|
mode := structs.CSIVolumeClaimRead
|
|
if !pair.request.ReadOnly {
|
|
mode = structs.CSIVolumeClaimWrite
|
|
}
|
|
|
|
req := &structs.CSIVolumeUnpublishRequest{
|
|
VolumeID: pair.request.Source,
|
|
Claim: &structs.CSIVolumeClaim{
|
|
AllocationID: c.alloc.ID,
|
|
NodeID: c.alloc.NodeID,
|
|
Mode: mode,
|
|
State: structs.CSIVolumeClaimStateUnpublishing,
|
|
},
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: c.alloc.Job.Region,
|
|
Namespace: c.alloc.Job.Namespace,
|
|
AuthToken: c.ar.clientConfig.Node.SecretID,
|
|
},
|
|
}
|
|
err := c.rpcClient.RPC("CSIVolume.Unpublish",
|
|
req, &structs.CSIVolumeUnpublishResponse{})
|
|
if err != nil {
|
|
mErr = multierror.Append(mErr, err)
|
|
}
|
|
}
|
|
return mErr.ErrorOrNil()
|
|
}
|
|
|
|
type volumeAndRequest struct {
|
|
volume *structs.CSIVolume
|
|
request *structs.VolumeRequest
|
|
|
|
// When volumeAndRequest was returned from a volume claim, this field will be
|
|
// populated for plugins that require it.
|
|
publishContext map[string]string
|
|
}
|
|
|
|
// claimVolumesFromAlloc is used by the pre-run hook to fetch all of the volume
|
|
// metadata and claim it for use by this alloc/node at the same time.
|
|
func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error) {
|
|
result := make(map[string]*volumeAndRequest)
|
|
tg := c.alloc.Job.LookupTaskGroup(c.alloc.TaskGroup)
|
|
|
|
// Initially, populate the result map with all of the requests
|
|
for alias, volumeRequest := range tg.Volumes {
|
|
|
|
if volumeRequest.Type == structs.VolumeTypeCSI {
|
|
|
|
for _, task := range tg.Tasks {
|
|
caps, err := c.ar.GetTaskDriverCapabilities(task.Name)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not validate task driver capabilities: %v", err)
|
|
}
|
|
|
|
if caps.MountConfigs == drivers.MountConfigSupportNone {
|
|
return nil, fmt.Errorf(
|
|
"task driver %q for %q does not support CSI", task.Driver, task.Name)
|
|
}
|
|
}
|
|
|
|
result[alias] = &volumeAndRequest{request: volumeRequest}
|
|
}
|
|
}
|
|
|
|
// Iterate over the result map and upsert the volume field as each volume gets
|
|
// claimed by the server.
|
|
for alias, pair := range result {
|
|
claimType := structs.CSIVolumeClaimWrite
|
|
if pair.request.ReadOnly {
|
|
claimType = structs.CSIVolumeClaimRead
|
|
}
|
|
|
|
req := &structs.CSIVolumeClaimRequest{
|
|
VolumeID: pair.request.Source,
|
|
AllocationID: c.alloc.ID,
|
|
NodeID: c.alloc.NodeID,
|
|
Claim: claimType,
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: c.alloc.Job.Region,
|
|
Namespace: c.alloc.Job.Namespace,
|
|
AuthToken: c.ar.clientConfig.Node.SecretID,
|
|
},
|
|
}
|
|
|
|
var resp structs.CSIVolumeClaimResponse
|
|
if err := c.rpcClient.RPC("CSIVolume.Claim", req, &resp); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if resp.Volume == nil {
|
|
return nil, fmt.Errorf("Unexpected nil volume returned for ID: %v", pair.request.Source)
|
|
}
|
|
|
|
result[alias].volume = resp.Volume
|
|
result[alias].publishContext = resp.PublishContext
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func newCSIHook(ar *allocRunner, logger hclog.Logger, alloc *structs.Allocation, rpcClient RPCer, csi csimanager.Manager, updater hookResourceSetter) *csiHook {
|
|
return &csiHook{
|
|
ar: ar,
|
|
alloc: alloc,
|
|
logger: logger.Named("csi_hook"),
|
|
rpcClient: rpcClient,
|
|
csimanager: csi,
|
|
updater: updater,
|
|
}
|
|
}
|
|
|
|
func (h *csiHook) shouldRun() bool {
|
|
tg := h.alloc.Job.LookupTaskGroup(h.alloc.TaskGroup)
|
|
for _, vol := range tg.Volumes {
|
|
if vol.Type == structs.VolumeTypeCSI {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|